logo头像

Always believe youself.

redis实现队列,延时队列

image

消息队列

redis中实现消息队列的几种方案。

  1. 基于List的 LPUSH+BRPOP 的实现

  2. PUB/SUB,订阅/发布模式

  3. 基于Sorted-Set的实现

  4. 基于Stream类型的实现

基于List

lpop/rpop 操作出队列 和 lpush /rpush 操作入队列

  • 队列为空是lpop 和 rpop 会一直空轮训,消耗资源。 可以引入阻塞读 blpop /brpop (b 代表blocking) 阻塞读在队列没有数据时进入休眠状态。一旦有了数据则会立即醒过来,消息延迟几乎为零。
  • 空闲连接问题,若线程一直阻塞在哪里,客户端的连接就成了空闲连接,长时间会主动断开连接,减少资源占用,blpop 和 brpop 会抛出异常。所以需要在客户端消费端进行捕获异常,进行重试。
  • 做消费者确认ack 比较麻烦,不能保证消费消息是否成功处理。通常需要维护一个 pending列表,保证消息处理确认。
  • 不能重复消费,一旦消费就会被删除
  • 不支持分组消费

订阅和发布模式

SUBSCRIBE,用于订阅信道; PUBLISH,向信道发送消息; UNSUBSCRIBE,取消订阅

此模式允许生产者只生产一次消息,由中间件负责将消息复制到多个消息队列,每个消息队列由对应的消费组消费。

优点

  • 多个消费者
  • 多信道订阅,消费者可以订阅多个信道
  • 消息即时发送

缺点

  • 消息丢失,不能寻回
  • 消费者接收时间会不一致
  • 消费端消息挤压后,会强制断开,导致消息意外丢失。

可见 Pub/Sub 模式不适合做消息存储,消息积压类的业务,而是擅长处理广播,即时通讯,即时反馈的业务。

基于 Sorted-set 的实现

Sortes Set (有序列表) 类似 java 的 sortedSet 和 Hashmap 的结合体,一方面是 set 保证唯一性,另一方面他可以给每个value 赋予一个 score 代表这个 value 的排序权重,内部实现 “跳跃表”。

  • 可以自定义消息 ID
  • 不允许重复消息,因为是集合

基于 Stream 类型的实现

image

Stream为redis 5.0后新增的数据结构。支持多播的可持久化消息队列,实现借鉴了Kafka设计。

延时消息队列

可以使用 zset这个命令,用设置好的时间戳作为score进行排序,使用 zadd score1 value1 ….命令就可以一直往内存中生产消息。再利用 zrangebysocre 查询符合条件的所有待处理的任务,通过循环执行队列任务即可。也可以通过 zrangebyscore key min max withscores limit 0 1 查询最早的一条任务,来进行消费

image

Redis延时队列优势

优势:

  • Redis zset支持高性能的 score 排序。

  • Redis是在内存上进行操作的,速度非常快。

  • Redis可以搭建集群,当消息很多时候,我们可以用集群来提高消息处理的速度,提高可用性。

  • Redis具有持久化机制,当出现故障的时候,可以通过AOF和RDB方式来对数据进行恢复,保证了数据的可靠性

劣势

  • 使用 Redis 实现的延时消息队列也存在数据持久化, 消息可靠性的问题

  • 没有重试机制

  • 没有 ACK 机制

RabbitMQ发消息之前要创建 Exchange,再创建 Queue,还要将 Queue 和 Exchange 通过某种规则绑定起来,发消息的时候要指定 routingkey,还要控制头部信息