消息队列
注意, 如果对消息的可靠性有极致追求, 则不建议使用redis作为队列
直接用redis的list结构来实现, 生产者 rpush, 消费者 lpop
问题1: 空队列 如果队列为空, 消费者还一直空轮训, 会拉高客户端cpu, 以及redis的QPS.
解决
sleep()解决, 但是会导致消息的延迟增大
blpop 阻塞读. 一旦数据准备好, 就会立刻醒来, 消息延迟几乎为0
问题2: 空连接自动断开 使用 blpop 时, 若消息一直没有准备好, redis的客户端连接会变成闲置连接. 闲置时间过久, 服务器会自动断开连接, 这时blpop就会抛出异常
解决 捕获异常, 并重试
延时队列 带有延时功能的消息队列 场景案例: 用户下单30分钟后未付款, 自动关闭订单
实现 使用zset, score为到期处理的时间, value为序列化的消息. 多线程轮询zset获取到期任务进行处理. 多线程是可用性保障, 为了防止一个线程挂了还有其他的线程可以处理. 因为多线程抢占, 所以还要考虑一个并发性问题, 保证消息不能被处理多次, 这里采用zrem, 因为一个值只能被删除一次.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 package redis; import java.lang.reflect.Type; import java.util.Set; import java.util.UUID; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.TypeReference; import redis.clients.jedis.Jedis; /** * @Author wenxuan.hao * @create 2020-02-13 22:03 */ public class RedisDelayingQueue<T> { static class TaskItem<T> { public String id; public T msg; } // fastjson 序列化对象中存在 generic 类型时,需要使用 TypeReference private Type TaskType = new TypeReference<TaskItem<T>>(){}.getType(); private Jedis jedis; private String queueKey; public RedisDelayingQueue(Jedis jedis, String queueKey) { this.jedis = jedis; this.queueKey = queueKey; } public void delay(T msg) { TaskItem<T> task = new TaskItem<T>(); task.id = UUID.randomUUID().toString(); // 分配唯一的 uuid task.msg = msg; String s = JSON.toJSONString(task); // fastjson 序列化 jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s); // 塞入延时队列 ,5s 后再试 } public void loop() { while (!Thread.interrupted()) { // 只取一条 Set<String> values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1); // 没取到 if (values.isEmpty()) { try { Thread.sleep(500); // 歇会继续 } catch (InterruptedException e) { break; } continue; } // 取到了 String s = values.iterator().next(); if (jedis.zrem(queueKey, s) > 0) { // 防止一条数据被处理多次, 因为只能成功删除一次 TaskItem<T> task = JSON.parseObject(s, TaskType); // fastjson 反序列化 this.handleMsg(task.msg); } } } public void handleMsg(T msg) { System.out.println(msg); } public static void main(String[] args) { Jedis jedis = new Jedis(); RedisDelayingQueue<String> queue = new RedisDelayingQueue<>(jedis, "q-demo"); Thread producer = new Thread() { public void run() { for (int i = 0; i < 10; i++) { queue.delay("codehole" + i); } } }; Thread consumer = new Thread() { public void run() { queue.loop(); } }; producer.start(); consumer.start(); try { producer.join(); Thread.sleep(6000); consumer.interrupt(); consumer.join(); } catch (InterruptedException e) { } } }