本文共 4516 字,大约阅读时间需要 15 分钟。
这里我使用Redis的发布、订阅功能实现简单的消息队列,基本的命令有publish、subscribe等。
在Jedis中,有对应的java方法,但是只能发布字符串消息。为了传输对象,需要将对象进行序列化,并封装成字符串进行处理。
1.封装一个消息对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | public class Message implements Serializable{ private static final long serialVersionUID = 1L; private String titile; private String info; public Message(String titile,String info){ this .titile=titile; this .info=info; } public String getTitile() { return titile; } public void setTitile(String titile) { this .titile = titile; } public String getInfo() { return info; } public void setInfo(String info) { this .info = info; } } |
2.为这个消息对象提供序列化方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | public class MessageUtil { //convert To String public static String convertToString(Object obj,String charset) throws IOException{ ByteArrayOutputStream bo = new ByteArrayOutputStream(); ObjectOutputStream oo = new ObjectOutputStream(bo); oo.writeObject(obj); String str = bo.toString(charset); bo.close(); oo.close(); return str; } //convert To Message public static Object convertToMessage( byte [] bytes) throws Exception{ ByteArrayInputStream in = new ByteArrayInputStream(bytes); ObjectInputStream sIn = new ObjectInputStream(in); return sIn.readObject(); } } |
3.从Jedis连接池中获取连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | public class RedisUtil { /** * Jedis connection pool * @Title: config */ public static JedisPool getJedisPool(){ ResourceBundle bundle=ResourceBundle.getBundle( "redis" ); String host=bundle.getString( "host" ); int port=Integer.valueOf(bundle.getString( "port" )); int timeout=Integer.valueOf(bundle.getString( "timeout" )); // String password=bundle.getString("password"); JedisPoolConfig config= new JedisPoolConfig(); config.setMaxActive(Integer.valueOf(bundle.getString( "maxActive" ))); config.setMaxWait(Integer.valueOf(bundle.getString( "maxWait" ))); config.setTestOnBorrow(Boolean.valueOf(bundle.getString( "testOnBorrow" ))); config.setTestOnReturn(Boolean.valueOf(bundle.getString( "testOnReturn" ))); JedisPool pool= new JedisPool(config, host, port, timeout); return pool; } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | public class Producer { private Jedis jedis; private JedisPool pool; public Producer(){ pool=RedisUtil.getJedisPool(); jedis = pool.getResource(); } public void provide(String channel,Message message) throws IOException{ String str1=MessageUtil.convertToString(channel, "UTF-8" ); String str2=MessageUtil.convertToString(message, "UTF-8" ); jedis.publish(str1, str2); } //close the connection public void close() throws IOException { //将Jedis对象归还给连接池,关闭连接 pool.returnResource(jedis); } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 | public class Consumer { private Jedis jedis; private JedisPool pool; public Consumer(){ pool=RedisUtil.getJedisPool(); jedis = pool.getResource(); } public void consum(String channel) throws IOException{ JedisPubSub jedisPubSub = new JedisPubSub() { // 取得订阅的消息后的处理 public void onMessage(String channel, String message) { System.out.println( "Channel:" +channel); System.out.println( "Message:" +message.toString()); } // 初始化订阅时候的处理 public void onSubscribe(String channel, int subscribedChannels) { System.out.println( "onSubscribe:" +channel); } // 取消订阅时候的处理 public void onUnsubscribe(String channel, int subscribedChannels) { System.out.println( "onUnsubscribe:" +channel); } // 初始化按表达式的方式订阅时候的处理 public void onPSubscribe(String pattern, int subscribedChannels) { // System.out.println(pattern + "=" + subscribedChannels); } // 取消按表达式的方式订阅时候的处理 public void onPUnsubscribe(String pattern, int subscribedChannels) { // System.out.println(pattern + "=" + subscribedChannels); } // 取得按表达式的方式订阅的消息后的处理 public void onPMessage(String pattern, String channel, String message) { System.out.println(pattern + "=" + channel + "=" + message); } }; jedis.subscribe(jedisPubSub, channel); } //close the connection public void close() throws IOException { //将Jedis对象归还给连接池 pool.returnResource(jedis); } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 | public static void main(String[] args){ Message msg= new Message( "hello!" , "this is the first message!" ); Producer producer= new Producer(); Consumer consumer= new Consumer(); try { producer.provide( "chn1" ,msg); consumer.consum( "chn1" ); } catch (IOException e) { e.printStackTrace(); } } |
转载地址:http://mqvdl.baihongyu.com/