0%

redis---队列

消息队列

注意, 如果对消息的可靠性有极致追求, 则不建议使用redis作为队列

直接用redis的list结构来实现, 生产者 rpush, 消费者 lpop

问题1: 空队列

如果队列为空, 消费者还一直空轮训, 会拉高客户端cpu, 以及redis的QPS.

解决

  1. sleep()解决, 但是会导致消息的延迟增大
  2. blpop 阻塞读. 一旦数据准备好, 就会立刻醒来, 消息延迟几乎为0

问题2: 空连接自动断开

使用 blpop 时, 若消息一直没有准备好, redis的客户端连接会变成闲置连接. 闲置时间过久, 服务器会自动断开连接, 这时blpop就会抛出异常

解决

捕获异常, 并重试

延时队列

带有延时功能的消息队列
场景案例: 用户下单30分钟后未付款, 自动关闭订单

实现

使用zset, score为到期处理的时间, value为序列化的消息. 多线程轮询zset获取到期任务进行处理. 多线程是可用性保障, 为了防止一个线程挂了还有其他的线程可以处理. 因为多线程抢占, 所以还要考虑一个并发性问题, 保证消息不能被处理多次, 这里采用zrem, 因为一个值只能被删除一次.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package redis;

import java.lang.reflect.Type;
import java.util.Set;
import java.util.UUID;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;

import redis.clients.jedis.Jedis;

/**
* @Author wenxuan.hao
* @create 2020-02-13 22:03
*/
public class RedisDelayingQueue<T> {
static class TaskItem<T> {
public String id;
public T msg;
}

// fastjson 序列化对象中存在 generic 类型时,需要使用 TypeReference
private Type TaskType = new TypeReference<TaskItem<T>>(){}.getType();

private Jedis jedis;
private String queueKey;

public RedisDelayingQueue(Jedis jedis, String queueKey) {
this.jedis = jedis;
this.queueKey = queueKey;
}

public void delay(T msg) {
TaskItem<T> task = new TaskItem<T>();
task.id = UUID.randomUUID().toString(); // 分配唯一的 uuid
task.msg = msg;
String s = JSON.toJSONString(task); // fastjson 序列化
jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s); // 塞入延时队列 ,5s 后再试
}

public void loop() {
while (!Thread.interrupted()) {
// 只取一条
Set<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);
// 没取到
if (values.isEmpty()) {
try {
Thread.sleep(500); // 歇会继续
} catch (InterruptedException e) {
break;
}
continue;
}
// 取到了
String s = values.iterator().next();
if (jedis.zrem(queueKey, s) > 0) { // 防止一条数据被处理多次, 因为只能成功删除一次
TaskItem<T> task = JSON.parseObject(s, TaskType); // fastjson 反序列化
this.handleMsg(task.msg);
}
}
}

public void handleMsg(T msg) {
System.out.println(msg);
}

public static void main(String[] args) {
Jedis jedis = new Jedis();
RedisDelayingQueue<String> queue = new RedisDelayingQueue<>(jedis, "q-demo");

Thread producer = new Thread() {

public void run() {
for (int i = 0; i < 10; i++) {
queue.delay("codehole" + i);
}
}

};
Thread consumer = new Thread() {

public void run() {
queue.loop();
}

};


producer.start();
consumer.start();

try {
producer.join();
Thread.sleep(6000);
consumer.interrupt();
consumer.join();
} catch (InterruptedException e) {
}
}
}