前言

Redis5新增了一个Stream的数据类型,这个类型作为消息队列来使用时弥补了ListPub/Sub的不足并且提供了更强大的功能,比如ack机制以及消费者组等概念,在有轻量消息队列使用需求时,使用这个新类型那是再好不过了。对于这个类型,在这里就不赘述了,想了解的话可以看一下这篇文章,在这里,我们就具体来讲一下在SpringBoot中的实践与踩坑。

注意,SpringBoot版本需要大于2.2(即spring-data-redis需要大于2.2)。

新API和类

在开始正文之前,我们先简单了解一下在2.2引入的和Stream操作相关的方法和类。

消息和消息ID的对象

消息(或者称为记录)和消息ID在Spring-Data-Redis中使用RecordRecordId来表示。

一个Record包含三部分内容:

  • stream表示这个消息要发往那个Stream,也就是Stream的key
  • recordId表示这个消息的ID,一般Redis服务器自动生成,也可以指定
  • value表示消息内容

SpringBoot为我们提供了五种消息类型的抽象:MapRecordObjectRecordByteRecordByteBufferRecordStringRecord,以及一个消息ID类型:RecordId

这里另外说一下:其实除开ObjectRecord,其他几个Record都是通过继承MapRecord扩展而来的。StringRecord中的消息内容也并非仅仅是一个字符串,而是一个键值都为字符串类型的MapByteRecordByteBufferRecord同理)。而ObjectRecord最后也会使用HashMapper转换成MapRecord。为什么最后都是操作Map类型?这是因为Stream中的内容是以多个key-value这种键值对的形式存储的。

那么我们怎样去创建一个消息对象呢?

一般来说我们使用前两个消息类型比较多,所以Spring-Data-Redis很贴心的在Record这个顶级接口中提供了两个静态方法用于直接构造MapRecordObjectRecord

1
2
3
4
5
6
7
8
static <S, K, V> MapRecord<S, K, V> of(Map<K, V> map)
Assert.notNull(map, "Map must not be null!");
return StreamRecords.mapBacked(map);
}
static <S, V> ObjectRecord<S, V> of(V value) {
Assert.notNull(value, "Value must not be null!");
return StreamRecords.objectBacked(value);
}

我们可以看到,这两个方法实际上是调用了StreamRecords中提供的静态方法来创建,StreamRecords这个类提供了下面这些方法用于创建五种Record

1
2
3
4
5
6
ByteRecord rawBytes(Map<byte[], byte[]> raw) 
ByteBufferRecord rawBuffer(Map<ByteBuffer, ByteBuffer> raw)
StringRecord string(Map<String, String> raw)
<S, K, V> MapRecord<S, K, V> mapBacked(Map<K, V> map)
<S, V> ObjectRecord<S, V> objectBacked(V value)
RecordBuilder<?> newRecord() // 通过builder方式创建

当然,我们还可以通过使用某个具体的Record类型的create静态方法来创建,下面是几个示例:

1
2
3
4
5
6
7
8
9
10
String streamKey = "channel:stream:key1";//stream key
MailInfo mailInfo = new MailInfo("554205726@qq.com", "sendmail");//定义一个Object类型的消息内容
Map<String, String> map = new HashMap<String, String>() {{
put("receiver", "534619360@qq.com");
}}; //定义一个Map类型的消息内容
Record.of(mailInfo).withStreamKey(streamKey);
Record.of(map).withStreamKey(streamKey).withId(RecordId.of("123"));//指定id
StreamRecords.objectBacked(mailInfo).withStreamKey(streamKey);
StreamRecords.mapBacked(map).withStreamKey(streamKey).withId(RecordId.autoGenerate());//指定id
ObjectRecord.create(streamKey, mailInfo); //使用ObjectRecord的create静态方法创建

如果我们不通过withId方法显示调用去指定id,那么默认的情况下就是使用RecordId.autoGenerate()自动生成。还有一个需要注意的地方就是在使用StreamRecords的方法来构建Record时一定要记住用withStreamKey方法来指定Stream Key

不管是消息或是消息ID,这些类基本都提供了扁平化的api来构造,使用起来还是很简单的。那么在构造了一个Record后怎么将其持久化到Redis的Stream类型中呢?

向Stream添加消息(Record)

使用RedisTemplate操作Stream

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
String streamKey = "channel:stream:key1";//stream key
MailInfo mailInfo = new MailInfo("554205726@qq.com", "sendmail");

ObjectRecord<String, MailInfo> record = ObjectRecord.create(streamKey, mailInfo);

RecordId id = record.getId(); //构造Record时使用的RecordId
RecordId recordId = redisTemplate.opsForStream().add(record); //返回的RecordId

id.getTimestamp(); //null
id.getSequence(); //null
id.shouldBeAutoGenerated(); //true

recordId.getTimestamp(); //not null
recordId.getSequence(); //not null
recordId.shouldBeAutoGenerated(); //false

使用add方法来添加记录,该方法执行成功后返回添加的记录的id信息。注意下面的结果,这里有两个问题需要注意:

  1. 为什么我们构造Record时使用的RecordId和添加记录返回的RecordId不同?

    这个问题很好理解,因为构造Record时不指定id时虽然是自动生成,但是这个自动生成并不是在构造时就自动生成好了的,而是在执行Redis命令持久化时Redis服务器来自动生成的,所以前者在获取时间戳和序号的时返回null

  2. 为什么添加记录返回的RecordId调用shouldBeAutoGenerated方法返回false呢,不是自动生成了吗?

    其实也很好理解,因为在持久化一条Stream的记录时,我们可以指定id,也可以选择让Redis来自动生成,那么这也就导致add方法执行成功获取到Redis返回的id信息后在构造RecordId并不知道返回的这个id是我们之前指定的还是Redis自动生成的,所以说前者返回true,后者返回false并不难理解。

说到这里,其实你去看一下Record构造时默认自动生成id是如何做到的就很好理解了。在这里稍微提一下:在构造Record时默认使用RecordId.autoGenerate()作为RecordId,而这个方法返回了一个匿名对象,这个匿名对象重写了上面那三个方法,前两个方法重写直接返回null,后者也就是shouldBeAutoGenerated方法返回true

实现消息队列

在基本了解了SpringBoot2.2新增的几个Stream操作api和相关类之后,也就到了我们Stream实现消息队列的实践部分了。为了方便,下面我会以发送邮件为例来讲一下如何使用Strean来实现消息队列。

为了方便后续的讲解,先构造一个简单的邮件信息类作为我们的消息内容:

MailInfo.java
1
2
3
4
5
6
7
8
9
@Data
@AllArgsConstructor
@NoArgsConstructor
public class MailInfo {

private String receiver;

private String description;
}

构造StreamMessageListenerContainer

在使用Pub/Sub模式时,我们都是先创建一个RedisMessageListenerContainer容器,向这个容器注册监听器然后在onMessage方法中处理业务逻辑即可。那么使用Stream类型的话有没有提供一个类似的容器呢?答案是肯定的。在SpringBoot2.2提供了StreamMessageListenerContainer这个Stream类型专有的消息监听容器,而唯一的实现也就是DefaultStreamMessageListenerContainer

StreamMessageListenerContainer的构造函数相比RedisMessageListenerContainer多了一个StreamMessageListenerContainerOptions,这个对象是使用builder方式来创建的:

1
2
3
4
5
6
7
8
9
StreamMessageListenerContainerOptions<String, ObjectRecord<String, MailInfo>> options =
StreamMessageListenerContainerOptions.builder()
.batchSize(100) //一批次拉取的最大count数
.executor(executor) //线程池
.pollTimeout(Duration.ZERO) //阻塞式轮询
.targetType(MailInfo.class) //目标类型(消息内容的类型)
.build();
StreamMessageListenerContainer<String, ObjectRecord<String, MailInfo>> container =
StreamMessageListenerContainer.create(redisConnectionFactory, options);

在构造StreamMessageListenerContainerOptions时最关键的就是targetTypeobjectMapper以及设置序列化器这几个方法,这些参数的设置会直接影响到后续接收到消息后能否反序列化为java对象!由于这部分内容涉及源码过多,在后面一部分我们再针对这几个方法进行详细的探查。

在构造完StreamMessageListenerContainer之后,现在该怎么注册消息监听器呢?我们接着往下看。

注册StreamListener

在Pub/Sub模式中我们使用addMessageListener(MessageListener, Topic)方法添加一个MessageListener到指定的Topic,那么在使用Stream消息的监听容器时,我们是使用receive方法。

Spring-data-redis提供了三个方法用于注册StreamListener

1
2
3
4
5
6
7
8
9
default Subscription receive(StreamOffset<K> streamOffset, StreamListener<K, V> listener) {
return register(StreamReadRequest.builder(streamOffset).build(), listener);
}
default Subscription receive(Consumer consumer, StreamOffset<K> streamOffset, StreamListener<K, V> listener) {
return register(StreamReadRequest.builder(streamOffset).consumer(consumer).autoAcknowledge(false).build(), listener);
}
default Subscription receiveAutoAck(Consumer consumer, StreamOffset<K> streamOffset, StreamListener<K, V> listener) {
return register(StreamReadRequest.builder(streamOffset).consumer(consumer).autoAcknowledge(true).build(), listener);
}

如果你发现你哪里receive方法和我的不太一样?那么你使用的版本应该是2.2,这个版本的问题比较多,比如上面这个地方,使用第二个方法注册StreamListener,在消息被消费之后会自动ack,因为ConsumerStreamReadRequestBuilderautoAck属性默认就是true(除非使用第一个方法指定StreamReadRequest),这个问题在2.3修复了,感兴趣可以去看看这部分源码,修补提交在这里

我个人最初就是使用的SpringBoot2.2.2,使用过程中发现问题真的有点很多。比如还有一处序列化器泛型类型错误导致StreamMessageListenerContainerOptions构造混乱的问题【修补提交】,所以说如果你想尝鲜,那么强烈建议使用SpringBoot最新发布的版本。

Consumer和StreamOffset

可以看到receive方法另外还需要ConsumerStreamOffset两个参数,

Consumer

1
Consumer.from("group name", "consumer name")

Consumer表示消费者组中的某个消费者,这个东西只会在消费者组模式中用到。我们一般通过上面这种方式来创建,第一个参数表示消费者组,第二个参数表示消费者。

StreamOffset

StreamOffset用于表示在某个Stream上的偏移量,它包含两部分内容,一个是stream的key,另一个是ReadOffset用于表示读取偏移量。前者应该不需要过多的解释,那么ReadOffset这个读取偏移量是干嘛用的呢?

要搞清楚ReadOffset,我们首先要知道Stream中偏移量的含义,在Stream中偏移量既可以表示消费记录时的偏移量,又可以表示消费者组在Stream上的偏移量。还记得Redis中我们怎么读取Stream中的记录吗?

通过xread命令也就是非消费者组模式直接读取,或者使用xreadgroup命令在消费者组中命令一个消费者去消费一条记录,这个时候,我们可以通过0>$分别表示第一条记录、最后一次未被消费的记录和最新一条记录,这也就是ReadOffset的用途之一:用于表示直接读取或消费者组中消费者读取记录时的偏移量

那么还有另外的用途吗?

当然了,还记得怎样创建消费者组吗?一般我们使用xgroup create命令创建一个消费者组时可以选择从Stream的第一条消息开始,或者Stream的中间某个记录开始,又或者从Stream的最新一条记录开始。也就分别代表了0$。这也就是ReadOffset的用途之二:用于表示创建消费者组时该消费者组在Stream上的偏移量

理解ReadOffset最快最简单的方法就是在Redis-cli中用Redis命令操作一番。这其中还有一些值得注意的问题,比如创建消费者组时不能使用>表示最后一次未被消费的记录;比如0表示从第一条开始并且包括第一条;$表示从最新一条开始但并不是指当前Stream的最后一条记录,所以使用$时最新一条也就是表示下一个xadd添加的那一条记录,所以说$在非消费者组模式的阻塞读取下才有意义!

实现StreamListener

同样的用Pub/Sub来类比,在Pub/Sub模式下我们实现的消息监听器是一般是MessageListener或者使用MessageListenerAdapter反射调用处理方法,在Strean消息队列的实现中必然也需要一个监听器用于处理真正的业务逻辑,这个类目前只有一个,也就是StreamListener

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class StreamMessageListener implements StreamListener<String, ObjectRecord<String, MailInfo>> {

private final Logger logger = LoggerFactory.getLogger(StreamMessageListener.class);
private final StringRedisTemplate stringRedisTemplate;

public StreamMessageListener(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}

@Override
public void onMessage(ObjectRecord<String, MailInfo> message) {
RecordId id = message.getId();
MailInfo messageValue = message.getValue();
logger.info("消费stream:{}中的信息:{}, 消息id:{}", message.getStream(), messageValue, id);
// 发邮件...
stringRedisTemplate.opsForStream().acknowledge(MAIL_GROUP, message); //手动ack
}
}

StreamListenerMessageListener差不多,只需要实现onMessage方法,只不过多了个泛型参数罢了。在实现消息监听器后也就可以使用receive方法进行注册了:

1
2
3
container.receive(Consumer.from(MAIL_GROUP, "consumer-1"),
StreamOffset.create(MAIL_CHANNEL, ReadOffset.lastConsumed()),
new StreamMessageListener(stringRedisTemplate));

注册完成之后启动StreamMessageListenerContainer容器:

1
container.start();

完整代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
@Component
public class StreamConsumerRunner implements ApplicationRunner, DisposableBean {

public static final String MAIL_CHANNEL = "channel:stream:mail";
public static final String MAIL_GROUP = "group:mail";

private final ThreadPoolTaskExecutor executor;
private final RedisConnectionFactory redisConnectionFactory;
private final StringRedisTemplate stringRedisTemplate;

private StreamMessageListenerContainer<String, ObjectRecord<String, MailInfo>> container;

public StreamConsumerRunner(ThreadPoolTaskExecutor executor, RedisConnectionFactory redisConnectionFactory, StringRedisTemplate stringRedisTemplate) {
this.executor = executor;
this.redisConnectionFactory = redisConnectionFactory;
this.stringRedisTemplate = stringRedisTemplate;
}


@Override
public void run(ApplicationArguments args) {
StreamMessageListenerContainerOptions<String, ObjectRecord<String, MailInfo>> options =
StreamMessageListenerContainerOptions.builder()
.batchSize(10)
.executor(executor)
.pollTimeout(Duration.ZERO)
.targetType(MailInfo.class)
.build();

StreamMessageListenerContainer<String, ObjectRecord<String, MailInfo>> container =
StreamMessageListenerContainer.create(redisConnectionFactory, options);

prepareChannelAndGroup(stringRedisTemplate.opsForStream(), MAIL_CHANNEL, MAIL_GROUP);

container.receive(Consumer.from(MAIL_GROUP, "consumer-1"),
StreamOffset.create(MAIL_CHANNEL, ReadOffset.lastConsumed()),
new StreamMessageListener(stringRedisTemplate));

this.container = container;
this.container.start();
}

private void prepareChannelAndGroup(StreamOperations<String, ?, ?> ops, String channel, String group) {
String status = "OK";
try {
StreamInfo.XInfoGroups groups = ops.groups(channel);
if (groups.stream().noneMatch(xInfoGroup -> group.equals(xInfoGroup.groupName()))) {
status = ops.createGroup(channel, group);
}
} catch (Exception exception) {
RecordId initialRecord = ops.add(ObjectRecord.create(channel, "Initial Record"));
Assert.notNull(initialRecord, "Cannot initialize stream with key '" + channel + "'");
status = ops.createGroup(channel, ReadOffset.from(initialRecord), group);
} finally {
Assert.isTrue("OK".equals(status), "Cannot create group with name '" + group + "'");
}
}

@Override
public void destroy() {
this.container.stop();
}

public static class StreamMessageListener implements StreamListener<String, ObjectRecord<String, MailInfo>> {

private final Logger logger = LoggerFactory.getLogger(StreamMessageListener.class);
private final StringRedisTemplate stringRedisTemplate;

public StreamMessageListener(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}

@Override
public void onMessage(ObjectRecord<String, MailInfo> message) {
RecordId id = message.getId();
MailInfo messageValue = message.getValue();

logger.info("消费stream:{}中的信息:{}, 消息id:{}", message.getStream(), messageValue, id);

stringRedisTemplate.opsForStream().acknowledge(MAIL_GROUP, message);
}
}
}

注意prepareChannelAndGroup方法,在初始化容器时,如果key对应的stream或者group不存在时会抛出异常,所以我们需要提前检查并且初始化。

测试

添加一个测试接口:

1
2
3
4
5
6
7
@GetMapping("/sendMail")
public ResponseEntity<RecordId> sendMail(String receiver, String description) {
MailInfo mailInfo = new MailInfo(receiver, description);
ObjectRecord<String, MailInfo> record = Record.of(mailInfo).withStreamKey(channel);
RecordId recordId = redisTemplate.opsForStream().add(record);
return ResponseEntity.ok(recordId);
}

访问进行测试:

1
2020-06-28 19:26:17.870  INFO 21900 --- [         task-1] reamConsumerRunner$StreamMessageListener : 消费stream:channel:stream:mail中的信息:MailInfo(receiver=534619360@qq.com, description=发送邮件), 消息id:1593343576237-0

控制台输出日志,如果在redis-cli中使用xpending命令检查ack信息会发现也是0,因为我们虽然使用receive方法注册,但是在onMessage中手动提交了确认,当然,你也可以使用receiveAutoAck方法添加。

实践中踩到的坑

自动ack和泛型类型错误

这两个问题在前面已经提到了并且在2.3已经修复,这里不多说。还是那句话,如果想尝鲜,那么强烈推荐使用SpringBoot最新发布的版本。

RedisTemplate序列化器使用错误导致容器无法反序列化

RedisTemplate的hashvalue的序列化器最初使用的json序列化器,导致容器监听到新消息反序列化时抛出异常:

1
2
3
4
5
6
7
8
9
10
11
12
13
java.lang.IllegalArgumentException: Value must not be null!
at org.springframework.util.Assert.notNull(Assert.java:198)
at org.springframework.data.redis.connection.stream.Record.of(Record.java:81)
at org.springframework.data.redis.connection.stream.MapRecord.toObjectRecord(MapRecord.java:147)
at org.springframework.data.redis.core.StreamObjectMapper.toObjectRecord(StreamObjectMapper.java:132)
at org.springframework.data.redis.core.StreamObjectMapper.map(StreamObjectMapper.java:158)
at org.springframework.data.redis.core.StreamOperations.read(StreamOperations.java:458)
at org.springframework.data.redis.stream.DefaultStreamMessageListenerContainer.lambda$getReadFunction$2(DefaultStreamMessageListenerContainer.java:232)
at org.springframework.data.redis.stream.StreamPollTask.doLoop(StreamPollTask.java:138)
at org.springframework.data.redis.stream.StreamPollTask.run(StreamPollTask.java:123)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

这是为什么呢?我们知道Redis中Stream中存的是键值对并且DefaultStreamOperations中操作的都是byte[],也就是说我们虽然添加的是ObjectRecord,但是会先转换成MapRecord,然后再被转换成ByteRecord,最后进行序列化。来看一下add方法:

1
2
3
4
5
6
public RecordId add(Record<K, ?> record) {
Assert.notNull(record, "Record must not be null");
MapRecord<K, HK, HV> input = StreamObjectMapper.toMapRecord(this, record); //转换成MapRecord
ByteRecord binaryRecord = input.serialize(keySerializer(), hashKeySerializer(), hashValueSerializer()); //再使用序列化器转换成ByteRecord
return execute(connection -> connection.xAdd(binaryRecord), true);
}

通过上面这个方法我们可以发现stream序列化时和其他类型不一样,我们在使用json序列化一个对象时都是直接进行的,而这里分了两步并且序列化器是用于第二部转换,那么ObjectRecord是怎么转换成MapRecord的呢?点进StreamObjectMapper.toMapRecord方法可以看到其实是通过ObjectRecord#toMapRecord方法完成的,这个方法需要一个HashMapper用于将对象的属性/属性值映射构造成Map类型,你会发现opsForStream方法重载了一个默认无参的方法,而这个方法默认使用的是ObjectHashMapper,在我们构造StreamMessageListenerContainerOptionsBuilder时调用targetType时默认使用的也是ObjectHashMapper。而这个ObjectHashMapper会将对象中的属性和属性值转换成byte[]形式,所以在第一步之后这个MapRecord中的值的类型已经是byte[]了,那么也就导致第二步在使用json序列化器转换为ByteRecord时出现这种情况:objectMapper.writeValueAsBytes(byte[]),这是一个测试实例:

1
2
3
4
5
6
7
8
@Test
void test() throws JsonProcessingException {
ObjectMapper mapper = new ObjectMapper();
byte[] value = "534619360@qq.com".getBytes();
byte[] bytes = mapper.writeValueAsBytes(value);
System.out.println(new String(bytes));
//输出"NTM0NjE5MzYwQHFxLmNvbQ=="
}

反应到stream中值就变成了\“NTM0NjE5MzYwQHFxLmNvbQ==\“(引号需要转义)。为了便于理解,我们可以使用设置使用json序列化器的RedisTemplate进行add断点debug测试看一下转换后的两个Record中的内容:

1
2
3
4
5
6
@Test
void test() {
stringRedisTemplate.setHashValueSerializer(RedisSerializer.json());
MailInfo mailInfo = new MailInfo("534619360@qq.com", "send mail");
stringRedisTemplate.opsForStream().add(Record.of(mailInfo).withStreamKey(channel));
}

运行测试并断点查看MapRecordByteRecord

使用Redis Desktop Manager查看值:

测试结束终端抛出上面提到的异常。这个问题解决办法就是使用String序列化器也就是使用StringRedisTemplate,因为这个序列化器不能序列化byte[]类型的对象,使用这个序列化器在序列化时如果已经是byte[],那么就会直接返回原byte[]

更具体的细节可以跟着add方法debug一遍。

ReadOffset使用错误导致group中消费者消费失败

异常:

1
RedisCommandExecutionException: ERR The $ ID is meaningless in the context of XREADGROUP: you want to read the history of this consumer by specifying a proper ID, or use the > ID to get new messages. The $ ID would just return an empty result set.

上面StreamOffset中也提到了,这涉及到0>$的使用场景和范围,如果出现这个异常,很有可能你在消费者组模式下设置消费者读取的offset时使用了ReadOffset.latest(),而这个对应着$,也就是最新一条记录。如果不明白那么你可能对这三个标识符的使用还不是很理解,最好的解决办法就是使用redis命令先完整的操作一遍。

stream或者group不存在导致启动抛出异常

同样在上面提到了,在构造StreamMessageListenerContainer时需要stream和group存在才可以。解决方法就是提前检查并初始化,上面已给出代码。

总结

其实在实践之初,我在网上也搜了很多相关的资料,但是发现这些资料基本都是使用redis-cli进行命令上的操作,并没有SpringBoot中实现。这次实践可谓是艰辛,由于目前该支持的迭代次数比较少,不乏一些bug或者小问题(2.3已经比较稳定),并且只有lettuce提供了stream类型的操作实现,而lettuce本身又有些小毛病,这些因素结合在一起也就导致这个过程花费了我整整两天时间,而这篇博客又花了整整一下午的时间才算完成,其中可能有些内容因为涉及东西比较多只能粗略提一下,并且语言组织上不太好可能不好去理解。话说回来,这篇文章也算是我自己实践后的一个个人总结吧,这个过程其实学到的东西还是很多的,也不枉费花了这么多时间。如果你发现文章有什么地方有问题或者有什么地方不理解,也欢迎在评论区留言一起交流~