博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Redis笔记(七)Java实现Redis消息队列
阅读量:6904 次
发布时间:2019-06-27

本文共 4516 字,大约阅读时间需要 15 分钟。

这里我使用Redis的发布、订阅功能实现简单的消息队列,基本的命令有publish、subscribe等。

在Jedis中,有对应的java方法,但是只能发布字符串消息。为了传输对象,需要将对象进行序列化,并封装成字符串进行处理。

使用Redis实现消息队列

1.封装一个消息对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public 
class 
Message 
implements 
Serializable{
 
private 
static 
final 
long 
serialVersionUID = 1L;
 
private 
String titile;
private 
String info;
 
public 
Message(String titile,String info){
this
.titile=titile;
this
.info=info;
}
 
public 
String getTitile() {
return 
titile;
}
public 
void 
setTitile(String titile) {
this
.titile = titile;
}
public 
String getInfo() {
return 
info;
}
public 
void 
setInfo(String info) {
this
.info = info;
}
}

  

2.为这个消息对象提供序列化方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public 
class 
MessageUtil {
 
//convert To String
public 
static 
String convertToString(Object obj,String charset) 
throws 
IOException{
 
ByteArrayOutputStream bo = 
new 
ByteArrayOutputStream();
ObjectOutputStream oo = 
new 
ObjectOutputStream(bo);
oo.writeObject(obj);
String str = bo.toString(charset);
bo.close();
oo.close();
return 
str;
}
 
//convert To Message
public 
static 
Object convertToMessage(
byte
[] bytes) 
throws 
Exception{
ByteArrayInputStream in = 
new 
ByteArrayInputStream(bytes);
ObjectInputStream sIn = 
new 
ObjectInputStream(in);
return 
sIn.readObject();
 
}
}

  

3.从Jedis连接池中获取连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public 
class 
RedisUtil {
 
/**
* Jedis connection pool
* @Title: config
*/
public 
static 
JedisPool getJedisPool(){
ResourceBundle bundle=ResourceBundle.getBundle(
"redis"
);
String host=bundle.getString(
"host"
);
int 
port=Integer.valueOf(bundle.getString(
"port"
));
int 
timeout=Integer.valueOf(bundle.getString(
"timeout"
));
//  String password=bundle.getString("password");
 
JedisPoolConfig config=
new 
JedisPoolConfig();
config.setMaxActive(Integer.valueOf(bundle.getString(
"maxActive"
)));
config.setMaxWait(Integer.valueOf(bundle.getString(
"maxWait"
)));
config.setTestOnBorrow(Boolean.valueOf(bundle.getString(
"testOnBorrow"
)));
config.setTestOnReturn(Boolean.valueOf(bundle.getString(
"testOnReturn"
)));
 
JedisPool pool=
new 
JedisPool(config, host, port, timeout);
 
return 
pool;
}
}

  

4.创建Provider类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public 
class 
Producer {
 
private 
Jedis jedis;
private 
JedisPool pool;
 
public 
Producer(){
pool=RedisUtil.getJedisPool();
jedis = pool.getResource();
}
 
 
public 
void 
provide(String channel,Message message) 
throws 
IOException{
String str1=MessageUtil.convertToString(channel,
"UTF-8"
);
String str2=MessageUtil.convertToString(message,
"UTF-8"
);
jedis.publish(str1, str2);
}
 
//close the connection
public 
void 
close() 
throws 
IOException {
//将Jedis对象归还给连接池,关闭连接
pool.returnResource(jedis);
}
}

  

5.创建Consumer类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public 
class 
Consumer {
 
private 
Jedis jedis;
private 
JedisPool pool;
 
public 
Consumer(){
pool=RedisUtil.getJedisPool();
jedis = pool.getResource();
}
 
 
public 
void 
consum(String channel) 
throws 
IOException{
JedisPubSub jedisPubSub = 
new 
JedisPubSub() {
// 取得订阅的消息后的处理
public 
void 
onMessage(String channel, String message) {
System.out.println(
"Channel:"
+channel);
System.out.println(
"Message:"
+message.toString());
}
 
// 初始化订阅时候的处理
public 
void 
onSubscribe(String channel, 
int 
subscribedChannels) {
System.out.println(
"onSubscribe:"
+channel);
}
 
// 取消订阅时候的处理
public 
void 
onUnsubscribe(String channel, 
int 
subscribedChannels) {
System.out.println(
"onUnsubscribe:"
+channel);
}
 
// 初始化按表达式的方式订阅时候的处理
public 
void 
onPSubscribe(String pattern, 
int 
subscribedChannels) {
// System.out.println(pattern + "=" + subscribedChannels);
}
 
// 取消按表达式的方式订阅时候的处理
public 
void 
onPUnsubscribe(String pattern, 
int 
subscribedChannels) {
// System.out.println(pattern + "=" + subscribedChannels);
}
 
// 取得按表达式的方式订阅的消息后的处理
public 
void 
onPMessage(String pattern, String channel, String message) {
System.out.println(pattern + 
"=" 
+ channel + 
"=" 
+ message);
}
};
 
jedis.subscribe(jedisPubSub, channel);
}
 
//close the connection
public 
void 
close() 
throws 
IOException {
//将Jedis对象归还给连接池
pool.returnResource(jedis);
}
}

  

6.测试方法

1
2
3
4
5
6
7
8
9
10
11
12
13
public 
static 
void 
main(String[] args){
 
Message msg=
new 
Message(
"hello!"
"this is the first message!"
);
 
Producer producer=
new 
Producer();
Consumer consumer=
new 
Consumer();
try 
{
producer.provide(
"chn1"
,msg);
consumer.consum(
"chn1"
);
catch 
(IOException e) {
e.printStackTrace();
}
}

  

 

转载地址:http://mqvdl.baihongyu.com/

你可能感兴趣的文章
python bootstrap 文件上传_文件上传控件bootstrap-fileinput与Python交互
查看>>
mysql分片备份不一致问题_光大银行分布式实战:国内最大缴费平台的数据库架构转型...
查看>>
java 参数对象一起封装成json_0基础掌握Django框架(29)HttpResponse对象
查看>>
mysql导入没有选择字段_[MySQL]load data local infile向MySQL数据库中导入数据时,无法导入和字段不分离问题。...
查看>>
php对mysql最匹配输出_php – 在十进制列中查找MySQL中最接近的匹配项
查看>>
thinkphp5 mysql缓存_thinkphp+redis实现秒杀,缓存等功能
查看>>
usb一转多 树莓派zero_windows下一根数据线玩转树莓派zero (w)
查看>>
mysql写保护_84个MySQL性能优化的首选技巧
查看>>
mysql onlibe all_MySQL--Online DDL
查看>>
mysql qps 计算_请问MYSQL数据库QPS,TPS采用哪种计算方式?
查看>>
python to_excel参数解释_pandas中read_excel() 和 to_excel()各参数详解
查看>>
python列表输出奇数_Python程序在列表中打印奇数
查看>>
mysql下存在的风险_MySQL部分表复制配置下存在的运维风险、原因及一种方案
查看>>
在python中的占位符中、请你选出不属于占位符的选项_在信息组织和存储中,最基本的单位是( )。...
查看>>
java date 转换sql date_如何将java.util.Date转换为java.sql.Date?
查看>>
java 编写cgi_编写CGI小结(Java)
查看>>
java内存泄露 垃圾回收_Java中内存泄露及垃圾回收机制
查看>>
rife java_Java世界的ruby on rails — rife (转)
查看>>
java yang模型_java 深入理解jvm内存模型 jvm学习笔记
查看>>
java 任务栏_java 如何将当前程序隐藏到任务栏(类似windows上的其他程序)
查看>>