前言
Redis5
新增了一个Stream
的数据类型,这个类型作为消息队列来使用时弥补了List
和Pub/Sub
的不足并且提供了更强大的功能,比如ack
机制以及消费者组等概念,在有轻量消息队列使用需求时,使用这个新类型那是再好不过了。对于这个类型,在这里就不赘述了,想了解的话可以看一下这篇文章,在这里,我们就具体来讲一下在SpringBoot
中的实践与踩坑。
注意,SpringBoot
版本需要大于2.2(即spring-data-redis
需要大于2.2)。
新API和类
在开始正文之前,我们先简单了解一下在2.2引入的和Stream操作相关的方法和类。
消息和消息ID的对象
消息(或者称为记录)和消息ID在Spring-Data-Redis中使用Record
和RecordId
来表示。
一个Record
包含三部分内容:
stream
表示这个消息要发往那个Stream,也就是Stream的keyrecordId
表示这个消息的ID,一般Redis服务器自动生成,也可以指定value
表示消息内容
SpringBoot为我们提供了五种消息类型的抽象:MapRecord
、ObjectRecord
、ByteRecord
、ByteBufferRecord
、StringRecord
,以及一个消息ID类型:RecordId
。
这里另外说一下:其实除开
ObjectRecord
,其他几个Record
都是通过继承MapRecord
扩展而来的。StringRecord
中的消息内容也并非仅仅是一个字符串,而是一个键值都为字符串类型的Map
(ByteRecord
、ByteBufferRecord
同理)。而ObjectRecord
最后也会使用HashMapper
转换成MapRecord
。为什么最后都是操作Map
类型?这是因为Stream中的内容是以多个key-value
这种键值对的形式存储的。
那么我们怎样去创建一个消息对象呢?
一般来说我们使用前两个消息类型比较多,所以Spring-Data-Redis很贴心的在Record
这个顶级接口中提供了两个静态方法用于直接构造MapRecord
和ObjectRecord
:
1 | static <S, K, V> MapRecord<S, K, V> of(Map<K, V> map) |
我们可以看到,这两个方法实际上是调用了StreamRecords
中提供的静态方法来创建,StreamRecords
这个类提供了下面这些方法用于创建五种Record
:
1 | ByteRecord rawBytes(Map<byte[], byte[]> raw) |
当然,我们还可以通过使用某个具体的Record
类型的create
静态方法来创建,下面是几个示例:
1 | String streamKey = "channel:stream:key1";//stream key |
如果我们不通过withId
方法显示调用去指定id
,那么默认的情况下就是使用RecordId.autoGenerate()
自动生成。还有一个需要注意的地方就是在使用StreamRecords
的方法来构建Record
时一定要记住用withStreamKey
方法来指定Stream Key
。
不管是消息或是消息ID,这些类基本都提供了扁平化的api来构造,使用起来还是很简单的。那么在构造了一个Record
后怎么将其持久化到Redis的Stream类型中呢?
向Stream添加消息(Record)
使用RedisTemplate
操作Stream
:
1 | String streamKey = "channel:stream:key1";//stream key |
使用add
方法来添加记录,该方法执行成功后返回添加的记录的id
信息。注意下面的结果,这里有两个问题需要注意:
为什么我们构造
Record
时使用的RecordId
和添加记录返回的RecordId
不同?这个问题很好理解,因为构造
Record
时不指定id
时虽然是自动生成,但是这个自动生成并不是在构造时就自动生成好了的,而是在执行Redis命令持久化时Redis服务器来自动生成的,所以前者在获取时间戳和序号的时返回null
。为什么添加记录返回的
RecordId
调用shouldBeAutoGenerated
方法返回false
呢,不是自动生成了吗?其实也很好理解,因为在持久化一条Stream的记录时,我们可以指定
id
,也可以选择让Redis来自动生成,那么这也就导致add
方法执行成功获取到Redis返回的id
信息后在构造RecordId
时并不知道返回的这个id
是我们之前指定的还是Redis自动生成的,所以说前者返回true
,后者返回false
并不难理解。
说到这里,其实你去看一下
Record
构造时默认自动生成id
是如何做到的就很好理解了。在这里稍微提一下:在构造Record
时默认使用RecordId.autoGenerate()
作为RecordId
,而这个方法返回了一个匿名对象,这个匿名对象重写了上面那三个方法,前两个方法重写直接返回null
,后者也就是shouldBeAutoGenerated
方法返回true
。
实现消息队列
在基本了解了SpringBoot2.2
新增的几个Stream操作api和相关类之后,也就到了我们Stream实现消息队列的实践部分了。为了方便,下面我会以发送邮件为例来讲一下如何使用Strean来实现消息队列。
为了方便后续的讲解,先构造一个简单的邮件信息类作为我们的消息内容:
1 |
|
构造StreamMessageListenerContainer
在使用Pub/Sub
模式时,我们都是先创建一个RedisMessageListenerContainer
容器,向这个容器注册监听器然后在onMessage
方法中处理业务逻辑即可。那么使用Stream
类型的话有没有提供一个类似的容器呢?答案是肯定的。在SpringBoot2.2
提供了StreamMessageListenerContainer
这个Stream类型专有的消息监听容器,而唯一的实现也就是DefaultStreamMessageListenerContainer
。
StreamMessageListenerContainer
的构造函数相比RedisMessageListenerContainer
多了一个StreamMessageListenerContainerOptions
,这个对象是使用builder
方式来创建的:
1 | StreamMessageListenerContainerOptions<String, ObjectRecord<String, MailInfo>> options = |
在构造StreamMessageListenerContainerOptions
时最关键的就是targetType
、objectMapper
以及设置序列化器这几个方法,这些参数的设置会直接影响到后续接收到消息后能否反序列化为java对象!由于这部分内容涉及源码过多,在后面一部分我们再针对这几个方法进行详细的探查。
在构造完StreamMessageListenerContainer
之后,现在该怎么注册消息监听器呢?我们接着往下看。
注册StreamListener
在Pub/Sub模式中我们使用addMessageListener(MessageListener, Topic)
方法添加一个MessageListener
到指定的Topic
,那么在使用Stream消息的监听容器时,我们是使用receive
方法。
Spring-data-redis
提供了三个方法用于注册StreamListener
:
1 | default Subscription receive(StreamOffset<K> streamOffset, StreamListener<K, V> listener) { |
如果你发现你哪里receive
方法和我的不太一样?那么你使用的版本应该是2.2,这个版本的问题比较多,比如上面这个地方,使用第二个方法注册StreamListener
,在消息被消费之后会自动ack,因为ConsumerStreamReadRequestBuilder
的autoAck
属性默认就是true
(除非使用第一个方法指定StreamReadRequest
),这个问题在2.3修复了,感兴趣可以去看看这部分源码,修补提交在这里。
我个人最初就是使用的SpringBoot2.2.2,使用过程中发现问题真的有点很多。比如还有一处序列化器泛型类型错误导致
StreamMessageListenerContainerOptions
构造混乱的问题【修补提交】,所以说如果你想尝鲜,那么强烈建议使用SpringBoot
最新发布的版本。
Consumer和StreamOffset
可以看到receive
方法另外还需要Consumer
和StreamOffset
两个参数,
Consumer
1 | Consumer.from("group name", "consumer name") |
Consumer
表示消费者组中的某个消费者,这个东西只会在消费者组模式中用到。我们一般通过上面这种方式来创建,第一个参数表示消费者组,第二个参数表示消费者。
StreamOffset
StreamOffset
用于表示在某个Stream上的偏移量,它包含两部分内容,一个是stream的key
,另一个是ReadOffset
用于表示读取偏移量。前者应该不需要过多的解释,那么ReadOffset
这个读取偏移量是干嘛用的呢?
要搞清楚ReadOffset
,我们首先要知道Stream中偏移量的含义,在Stream中偏移量既可以表示消费记录时的偏移量,又可以表示消费者组在Stream上的偏移量。还记得Redis中我们怎么读取Stream中的记录吗?
通过xread
命令也就是非消费者组模式直接读取,或者使用xreadgroup
命令在消费者组中命令一个消费者去消费一条记录,这个时候,我们可以通过0
、>
、$
分别表示第一条记录、最后一次未被消费的记录和最新一条记录,这也就是ReadOffset
的用途之一:用于表示直接读取或消费者组中消费者读取记录时的偏移量。
那么还有另外的用途吗?
当然了,还记得怎样创建消费者组吗?一般我们使用xgroup create
命令创建一个消费者组时可以选择从Stream的第一条消息开始,或者Stream的中间某个记录开始,又或者从Stream的最新一条记录开始。也就分别代表了0
、$
。这也就是ReadOffset
的用途之二:用于表示创建消费者组时该消费者组在Stream上的偏移量。
理解ReadOffset
最快最简单的方法就是在Redis-cli中用Redis命令操作一番。这其中还有一些值得注意的问题,比如创建消费者组时不能使用>
表示最后一次未被消费的记录;比如0
表示从第一条开始并且包括第一条;$
表示从最新一条开始但并不是指当前Stream的最后一条记录,所以使用$
时最新一条也就是表示下一个xadd
添加的那一条记录,所以说$
在非消费者组模式的阻塞读取下才有意义!
实现StreamListener
同样的用Pub/Sub来类比,在Pub/Sub模式下我们实现的消息监听器是一般是MessageListener
或者使用MessageListenerAdapter
反射调用处理方法,在Strean消息队列的实现中必然也需要一个监听器用于处理真正的业务逻辑,这个类目前只有一个,也就是StreamListener
:
1 | public class StreamMessageListener implements StreamListener<String, ObjectRecord<String, MailInfo>> { |
StreamListener
和MessageListener
差不多,只需要实现onMessage
方法,只不过多了个泛型参数罢了。在实现消息监听器后也就可以使用receive
方法进行注册了:
1 | container.receive(Consumer.from(MAIL_GROUP, "consumer-1"), |
注册完成之后启动StreamMessageListenerContainer
容器:
1 | container.start(); |
完整代码:
1 |
|
注意prepareChannelAndGroup
方法,在初始化容器时,如果key对应的stream或者group不存在时会抛出异常,所以我们需要提前检查并且初始化。
测试
添加一个测试接口:
1 | "/sendMail") ( |
访问进行测试:
1 | 2020-06-28 19:26:17.870 INFO 21900 --- [ task-1] reamConsumerRunner$StreamMessageListener : 消费stream:channel:stream:mail中的信息:MailInfo(receiver=534619360@qq.com, description=发送邮件), 消息id:1593343576237-0 |
控制台输出日志,如果在redis-cli中使用xpending
命令检查ack信息会发现也是0,因为我们虽然使用receive
方法注册,但是在onMessage
中手动提交了确认,当然,你也可以使用receiveAutoAck
方法添加。
实践中踩到的坑
自动ack和泛型类型错误
这两个问题在前面已经提到了并且在2.3已经修复,这里不多说。还是那句话,如果想尝鲜,那么强烈推荐使用SpringBoot最新发布的版本。
RedisTemplate序列化器使用错误导致容器无法反序列化
RedisTemplate
的hashvalue的序列化器最初使用的json序列化器,导致容器监听到新消息反序列化时抛出异常:
1 | java.lang.IllegalArgumentException: Value must not be null! |
这是为什么呢?我们知道Redis中Stream中存的是键值对并且DefaultStreamOperations
中操作的都是byte[]
,也就是说我们虽然添加的是ObjectRecord
,但是会先转换成MapRecord
,然后再被转换成ByteRecord
,最后进行序列化。来看一下add
方法:
1 | public RecordId add(Record<K, ?> record) { |
通过上面这个方法我们可以发现stream序列化时和其他类型不一样,我们在使用json序列化一个对象时都是直接进行的,而这里分了两步并且序列化器是用于第二部转换,那么ObjectRecord
是怎么转换成MapRecord
的呢?点进StreamObjectMapper.toMapRecord
方法可以看到其实是通过ObjectRecord#toMapRecord
方法完成的,这个方法需要一个HashMapper
用于将对象的属性/属性值映射构造成Map类型,你会发现opsForStream
方法重载了一个默认无参的方法,而这个方法默认使用的是ObjectHashMapper
,在我们构造StreamMessageListenerContainerOptionsBuilder
时调用targetType
时默认使用的也是ObjectHashMapper
。而这个ObjectHashMapper
会将对象中的属性和属性值转换成byte[]
形式,所以在第一步之后这个MapRecord
中的值的类型已经是byte[]
了,那么也就导致第二步在使用json序列化器转换为ByteRecord
时出现这种情况:objectMapper.writeValueAsBytes(byte[])
,这是一个测试实例:
1 |
|
反应到stream中值就变成了\“NTM0NjE5MzYwQHFxLmNvbQ==\“(引号需要转义)。为了便于理解,我们可以使用设置使用json序列化器的RedisTemplate
进行add
断点debug测试看一下转换后的两个Record
中的内容:
1 |
|
运行测试并断点查看MapRecord
和ByteRecord
:
使用Redis Desktop Manager查看值:
测试结束终端抛出上面提到的异常。这个问题解决办法就是使用String
序列化器也就是使用StringRedisTemplate
,因为这个序列化器不能序列化byte[]
类型的对象,使用这个序列化器在序列化时如果已经是byte[]
,那么就会直接返回原byte[]
:
更具体的细节可以跟着add
方法debug一遍。
ReadOffset使用错误导致group中消费者消费失败
异常:
1 | RedisCommandExecutionException: ERR The $ ID is meaningless in the context of XREADGROUP: you want to read the history of this consumer by specifying a proper ID, or use the > ID to get new messages. The $ ID would just return an empty result set. |
上面StreamOffset
中也提到了,这涉及到0
、>
、$
的使用场景和范围,如果出现这个异常,很有可能你在消费者组模式下设置消费者读取的offset时使用了ReadOffset.latest()
,而这个对应着$
,也就是最新一条记录。如果不明白那么你可能对这三个标识符的使用还不是很理解,最好的解决办法就是使用redis命令先完整的操作一遍。
stream或者group不存在导致启动抛出异常
同样在上面提到了,在构造StreamMessageListenerContainer
时需要stream和group存在才可以。解决方法就是提前检查并初始化,上面已给出代码。
总结
其实在实践之初,我在网上也搜了很多相关的资料,但是发现这些资料基本都是使用redis-cli进行命令上的操作,并没有SpringBoot
中实现。这次实践可谓是艰辛,由于目前该支持的迭代次数比较少,不乏一些bug或者小问题(2.3已经比较稳定),并且只有lettuce
提供了stream类型的操作实现,而lettuce
本身又有些小毛病,这些因素结合在一起也就导致这个过程花费了我整整两天时间,而这篇博客又花了整整一下午的时间才算完成,其中可能有些内容因为涉及东西比较多只能粗略提一下,并且语言组织上不太好可能不好去理解。话说回来,这篇文章也算是我自己实践后的一个个人总结吧,这个过程其实学到的东西还是很多的,也不枉费花了这么多时间。如果你发现文章有什么地方有问题或者有什么地方不理解,也欢迎在评论区留言一起交流~