使用Redis Stream来做消息队列和在Asp.Net Core中的实现

乔达摩 2021-09-15 07:40:06
redis 队列 使用 stream 消息


Redis - Wikipedia

写在前面

我一直以来使用redis的时候,很多低烈度需求(并发要求不是很高)需要用到消息队列的时候,在项目本身已经使用了Redis的情况下都想直接用Redis来做消息队列,而不想引入新的服务,kafka和RabbitMQ等;

奈何这兄弟一直不给力;

虽然 Redis 的Pub/Sub 是实现了发布/订阅的,但这家伙最坑的是:丢数据

由于Pub/Sub 只是简单的实现了发布订阅模式,简单的沟通起生产者和消费者,当接收生产者的数据后并立即推送或者说转发给订阅消费者,并不会做任何的持久化、存储操作。由此:

  1. ​ 消费者(客户端)掉线;
  2. ​ 消费者未订阅(所以使用的时候一定记得先订阅再生产);
  3. ​ 服务端宕机;
  4. ​ 消费者消费不过来,消息堆积(生产数据受数据缓冲区限制);

以上情况都会导致生产数据的丢失,基于上坑,据我所知大家很少使用Pub/Sub ;

不过官方的哨兵集群通信的时候就是用的Pub/Sub;

然后,各路大佬结合队列、阻塞等等实现了各种各样的方案,主要是使用:BLPOP+LPUSH 的实现

这里就不一一展开了,有兴趣请看叶老板文章

可能是各种实现都会带来各种的问题,redis的官方也看到了社区的挣扎。终于,到了Redis5.0,官方带来了消息队列的实现:Stream

Redis Stream介绍

简单来说Redis Stream 就是想用Redis 做消息队列的最佳推荐;

XADD--发布消息

XADD stream1 * name hei age 18
XADD stream1 * name zhangshan age 19 #再发一条
127.0.0.1:6379> XADD stream1 * name hei age 18
"1631628884174-0"
127.0.0.1:6379> XADD stream1 * name zhangshan age 19
"1631628890025-0"

其中的'*'表示让 Redis 自动生成唯一的消息 ID,格式是 「时间戳-自增序号」

XREAD--订阅消息

订阅消息

XREAD COUNT 5 STREAMS stream1 0-0
127.0.0.1:6379> XREAD COUNT 5 STREAMS stream1 0-0
1) 1) "stream1"
2) 1) 1) "1631628884174-0"
2) 1) "name"
2) "hei"
3) "age"
4) "18"
2) 1) "1631628890025-0"
2) 1) "name"
2) "zhangshan"
3) "age"
4) "19"

'0-0' 表示从开头读取

如果需继续拉取下一条,需传入上一条消息的id

阻塞等待消息

XREAD COUNT 5 BLOCK 50000 STREAMS stream1 1631628890025-0

阻塞等待消息id ‘1631628890025-0’ 后的消息

50000 阻塞时间(毫秒) ‘0’ 表示无限期阻塞

从到这里就可以看出 Pub/Sub多端订阅的最大优点,Stream也是支持的。有的同学很快就发现问题了,这里多端订阅后,没有消息确认ACK机制。

没错,因为现在所有的消费者都是订阅共同的消息,多端订阅,如果某个客户端ACK某条消息后,其他端消费不了,就实现不了多端消费了。

由此,引出 分组:GROUP

GROUP--订阅分组消息(多端订阅)

同样先发布消息

XADD stream1 * name hei age 18
XADD stream1 * name zhangshan age 19
127.0.0.1:6379> XADD stream1 * name hei age 18
"1631629080208-0"
127.0.0.1:6379> XADD stream1 * name zhangshan age 19
"1631629084083-0"

XGROUP CREATE 创建分组

创建分组1

XGROUP CREATE stream1 group1 0-0
127.0.0.1:6379> XGROUP CREATE stream1 group1 0-0
OK

‘0-0’ 表示从开头读取

'>' 表示读取最新,未被消费过的消息

XREADGROUP--分组读取

分组 group1

XREADGROUP GROUP group1 consumer1 COUNT 5 STREAMS stream1 >

consumer1 消费者名称, redis服务器会记住第一次使用的消费者名称;


127.0.0.1:6379> XREADGROUP GROUP group1 consumer1 COUNT 5 STREAMS stream1 >
1) 1) "stream1"
2) 1) 1) "1631628884174-0"
2) 1) "name"
2) "hei"
3) "age"
4) "18"
2) 1) "1631628890025-0"
2) 1) "name"
2) "zhangshan"
3) "age"
4) "19"
3) 1) "1631629080208-0"
2) 1) "name"
2) "hei"
3) "age"
4) "18"
4) 1) "1631629084083-0"
2) 1) "name"
2) "zhangshan"
3) "age"
4) "19"
127.0.0.1:6379> XREADGROUP GROUP group1 consumer1 COUNT 5 STREAMS stream1 >
(nil)

同样

‘0-0’ 表示从开头读取

'>' 表示读取最新,未被消费过的消息 (可以看到命令执行第二遍已经读不到新消息了)

分组 group2

127.0.0.1:6379> XGROUP CREATE stream1 group2 0-0
OK
127.0.0.1:6379> XREADGROUP GROUP group2 consumer1 COUNT 5 STREAMS stream1 >
1) 1) "stream1"
2) 1) 1) "1631628884174-0"
2) 1) "name"
2) "hei"
3) "age"
4) "18"
2) 1) "1631628890025-0"
2) 1) "name"
2) "zhangshan"
3) "age"
4) "19"
3) 1) "1631629080208-0"
2) 1) "name"
2) "hei"
3) "age"
4) "18"
4) 1) "1631629084083-0"
2) 1) "name"
2) "zhangshan"
3) "age"
4) "19

可以看到可以读到同样的消息,多端订阅没有问题;

当然分组也支持阻塞读取:

#和XREAD一样
XREAD COUNT 5 BLOCK 0 STREAMS queue 1618469127777-0
#分组阻塞
XREADGROUP GROUP group2 consumer1 COUNT 5 BLOCK 0 STREAMS stream1 >

‘0’ 表示无限期阻塞,单位(毫秒)

XPENDING--待处理消息

消息使用XREADGROUP 读取后会进入待处理条目列表(PEL);

我们看看:

 XPENDING stream1 group2
127.0.0.1:6379> XPENDING stream1 group2
1) (integer) 4
2) "1631628884174-0"
3) "1631629084083-0"
4) 1) 1) "consumer1"
2) "4"

表示:

  1. (integer) 4 //表示当前消费者组的待处理消息的数量
  2. "1631628884174-0" //消息最大id
  3. "1631629084083-0" //最小id
      1. "consumer1" // 消费者名称
      2. "4" //消费者待处理消息数量

XACK--删除已处理消息(消息确认机制)

我们已经知道group2待处理消息有4条,我们从头读取看看:

XREADGROUP GROUP group2 consumer1 COUNT 5 STREAMS stream1 0-0
127.0.0.1:6379> XREADGROUP GROUP group2 consumer1 COUNT 5 STREAMS stream1 0-0
1) 1) "stream1"
2) 1) 1) "1631628884174-0"
2) 1) "name"
2) "hei"
3) "age"
4) "18"
2) 1) "1631628890025-0"
2) 1) "name"
2) "zhangshan"
3) "age"
4) "19"
3) 1) "1631629080208-0"
2) 1) "name"
2) "hei"
3) "age"
4) "18"
4) 1) "1631629084083-0"
2) 1) "name"
2) "zhangshan"
3) "age"
4) "19"

假设最后一条消息 ‘1631629084083-0’ 我已处理完成

127.0.0.1:6379> XACK stream1 group2 1631629084083-0
(integer) 1

再看:

127.0.0.1:6379> XREADGROUP GROUP group2 consumer1 COUNT 5 STREAMS stream1 0-0
1) 1) "stream1"
2) 1) 1) "1631628884174-0"
2) 1) "name"
2) "hei"
3) "age"
4) "18"
2) 1) "1631628890025-0"
2) 1) "name"
2) "zhangshan"
3) "age"
4) "19"
3) 1) "1631629080208-0"
2) 1) "name"
2) "hei"
3) "age"
4) "18"
127.0.0.1:6379> XPENDING stream1 group2
1) (integer) 3
2) "1631628884174-0"
3) "1631629080208-0"
4) 1) 1) "consumer1"
2) "3"

可以清楚看到goroup2 待处理消息剩下3条;

这时 Redis 已经把这条消息标记为「处理完成」不再追踪;

Stream在Asp.net Core中的使用

private static string _connstr = "172.16.3.119:6379";
private static string _keyStream = "stream1";
private static string _nameGrourp = "group1";
private static string _nameConsumer = "consumer1";

发布:

csRedis.XAdd(_keyStream, "*", ("name", "message1"));

订阅:

static async Task CsRedisStreamConsumer()
{
Console.WriteLine("CsRedis StreamConsumer start!");
var csRedis = new CSRedis.CSRedisClient(_connstr);
csRedis.XAdd(_keyStream, "*", ("name", "message1"));
try
{
csRedis.XGroupCreate(_keyStream, _nameGrourp);
}
catch { }
(string key, (string id, string[] items)[] data)[] product;
(string Pid, string Platform, string Time) data = (null, null, null);
while (true)
{
try
{
product = csRedis.XReadGroup(_nameGrourp, _nameConsumer, 1, 10000, (_keyStream, ">"));
if (product?.Length > 0 == true && product[0].data?.Length > 0 == true)
{
Console.WriteLine($"message-id:{product.FirstOrDefault().data.FirstOrDefault().id}");
product.FirstOrDefault().data.FirstOrDefault().items.ToList().ForEach(value =>
{
Console.WriteLine($" {value}");
});
//csRedis.XAck(_keyStream, _nameGrourp, product[0].data[0].id);
}
}
catch (Exception)
{
//throw;
}
}
}

CSRedisCore

动画2

这里的超时报错可通过修改连接参数:syncTimeout 解决

CSRedisCore支持阻塞读取;

StackExchange.Redis

发布:

db.StreamAdd(_keyStream, "name", "message1", "*");

订阅:

static async Task StackExchangeRedisStreamConsumer()
{
Console.WriteLine("StackExchangeRedis StreamConsumer start!");
var redis = ConnectionMultiplexer.Connect(_connstr);
var db = redis.GetDatabase();
try
{
///初始化方式1
//db.StreamAdd(_keyStream, "name", "message1", "*");
//db.StreamCreateConsumerGroup(_keyStream, _nameGrourp);
//方式2
db.StreamCreateConsumerGroup(_keyStream, _nameGrourp, StreamPosition.NewMessages);
}
catch { }
StreamEntry[] data = null;
while (true)
{
data = db.StreamReadGroup(_keyStream, _nameGrourp, _nameConsumer, ">", count: 1, noAck: true);
if (data?.Length > 0 == true)
{
Console.WriteLine($"message-id:{data.FirstOrDefault().Id}");
data.FirstOrDefault().Values.ToList().ForEach(c =>
{
Console.WriteLine($" {c.Name}:{c.Value}");
});
db.StreamAcknowledge(_keyStream, _nameGrourp, data.FirstOrDefault().Id);
}
}
}

动画

StackExchange.Redis 有点比较坑的是不存在阻塞读取;理由:https://stackexchange.github.io/StackExchange.Redis/PipelinesMultiplexers.html#multiplexing

QA

Q:Stream是否支持AOF、RDB持久化?

A:支持,其它数据类型一样,每个写操作,也都会写入到 RDB 和 AOF 中。

Q:Stream是否还是会丢数据?若是,何种情况下?;

A:会;1、AOF是定时写盘的,如果数据还在内存中时redis服务宕机就会;2、主从切换时(从库还未同步完成主库发来的数据,就被提成主库)

总结

技术中有的时候没有“银弹”,只有更适合的技术,汝之蜜糖彼之砒霜;

很多时候的技术选型都是个比较麻烦的东西,对选型人的要求很高;你可能不是只需要熟悉其中的一种路线,而是要踩过各种各样的坑,再根据当前受限的环境,选择比较适合目前需求/团队的;

回到Stream上,我认为目前Stream能满足挺大部分队列需求;

特别是“在项目本身已经使用了Redis的情况下都想直接用Redis来做消息队列,而不想引入新的更专业的mq,比如kafka和RabbitMQ的时候”

当然,最终决定需要用更专业的mq与否的,还是需求;

引用

http://www.redis.cn/

https://database.51cto.com/art/202104/659208.htm

https://github.com/2881099/csredis/

https://stackexchange.github.io/StackExchange.Redis/Streams.html

版权声明
本文为[乔达摩]所创,转载请带上原文链接,感谢
https://www.cnblogs.com/xiaxiaolu/p/15270334.html

  1. 国内一线互联网公司面试题汇总,2021年大厂Java岗面试必问,
  2. 啃完吃透保你涨薪5K,熬夜整理小米Java面试题,
  3. 和字节跳动大佬的技术面谈,Redis成神之路电子版教程已问世,
  4. Le terme professionnel le plus complet de l'histoire des micro - services interview 50 questions, Byte Jumping Java post Classic interview vrai problème,
  5. After using mybatisplus, I haven't written SQL for a long time
  6. [springboot2 starts from 0] how to write a springboot application?
  7. Huawei cloud guassdb (for redis) released a new version, and the two core features were officially unveiled
  8. 和字節跳動大佬的技術面談,Redis成神之路電子版教程已問世,
  9. 啃完吃透保你漲薪5K,熬夜整理小米Java面試題,
  10. Avec l'interview technique du gigolo, le tutoriel électronique redis est sorti.
  11. Après avoir mangé, assurez - vous d'augmenter votre salaire de 5K et de rester debout tard pour trier les questions d'entrevue Java de millet.
  12. Résumé des questions d'entrevue pour les entreprises Internet nationales de première ligne, qui doivent être posées lors de l'entrevue d'emploi Java de la grande usine en 2021,
  13. Le tri des crachats de sang, la force de l'équipe Tencent pour créer le tutoriel d'introduction au printemps,
  14. Java and scala concurrency Fundamentals
  15. Analysis of java thread source code based on Hotspot
  16. 國內一線互聯網公司面試題匯總,2021年大廠Java崗面試必問,
  17. Introduction au module de contrôle de Connexion MySQL
  18. 大厂高级测试面试题,Java面试基础技能罗列,
  19. Comprendre l'architecture sous - jacente d'InnoDB en exécutant une instruction
  20. Chargeur de classe 1 Tomcat
  21. 小白也能看懂的dubbo3应用级服务发现详解
  22. SpringBoot异步使用@Async原理及线程池配置
  23. Questions d'entrevue de test avancé de Dachang, liste des compétences de base de l'entrevue Java,
  24. SpringBoot异步使用@Async原理及線程池配置
  25. Springboot utilise asynchrone le principe @ async et la configuration du pool de threads
  26. Détails de la découverte du Service d'application Dubbo 3 que Xiaobai peut également comprendre
  27. Springboot utilise asynchrone le principe @ async et la configuration du pool de threads
  28. 如何强大且优雅的搞定Linux文件系统,算法题 JVM,
  29. 太牛了,阿里P7架构师带你看透maven的来龙去脉,
  30. Oracle central et Oracle décentralisé
  31. java JavaBean
  32. Java wrapper type
  33. Java super keyword
  34. Java static keyword
  35. Java this keyword
  36. Java interface
  37. 太牛了,阿裏P7架構師帶你看透maven的來龍去脈,
  38. C'est génial, l'architecte Ali p7 vous montre à travers Maven.
  39. Comment traiter le système de fichiers Linux avec puissance et élégance, algorithme JVM,
  40. Java + SSM Social Insurance Pension System for Computer Graduation Design
  41. Usage of Java scanner
  42. Java inheritance
  43. Java method review
  44. java JVM
  45. Java Basics
  46. Java file operation object IO stream
  47. Java console reads multi character input and output
  48. Java simple array sorting
  49. In addition to MySQL master-slave, you have another choice, Galera
  50. Configuration standard dockerfile et docker-composer.yml
  51. 字节大神强推千页PDF学习笔记,2021Java开发学习路线,
  52. 字节大牛耗时八个月又一力作,靠这份Java知识点PDF成功跳槽,
  53. 字节大牛教你手撕Java学习,最新大厂程序员进阶宝典,
  54. Comment l'automne est - il beau?Ces 24 ensembles de modèles d'automne et d'hiver sont grands, minces et vieillissants
  55. 字節大牛教你手撕Java學習,最新大廠程序員進階寶典,
  56. 字節大牛耗時八個月又一力作,靠這份Java知識點PDF成功跳槽,
  57. Byte Bull vous apprend à déchiqueter Java à la main, le dernier dictionnaire avancé des programmeurs de grandes usines,
  58. Byte Bull a pris huit mois à travailler dur et a réussi à changer d'emploi avec ce PDF Java Knowledge point.
  59. Byte God Push 1000 pages PDF Learning notes, 2021 Java Development Learning route,
  60. Five minutes to understand MySQL index push down