Rabbitmq an excellent. Net message queue framework

rabbitmq excellent. excellent message queue


1 brief introduction

RabbitMQ There are thousands of users , Is one of the most popular open source message agents .

1.1 AMQP What is it?

AMQP( Advanced message queue protocol ) It's a network protocol . It supports qualified client applications (application) And message middleware agents (messaging middleware broker) Communicate with each other .

1.2 What is the message queue

MQ Its full name is Message Queue, Message queue . It's an application to application communication method . The application reads and writes messages in and out of the queue ( Data for applications ) To communication , Instead of having a dedicated connection to link them .

2 install

adopt docker Installation

First , Get into RabbitMQ Official website http://www.rabbitmq.com/download.html

then , find Docker image And enter
Find the version you need to install , -management It means that there is a management interface , It can be accessed by browser .

next , Take it docker install , I installed it here 3-management:

docker run -d --hostname my-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management

Last , Take a look at :http://localhost:15672/ , user name / password : guest/guest

3 Use

3.1 “ Hello World!”

RabbitMQ It's the message broker : It receives and forwards messages . You can think of it as a post office : When you put your email in your mailbox , You can make sure that the postman, Mr. or Ms. finally delivers the mail to the recipient .
In the following illustration ,“ P” It's our producer ,“ C” It's our consumers . The middle box is a queue

Producer code :

using RabbitMQ.Client; //1. Use namespaces
using System;
using System.Text;
namespace Example.RabbitMQ.HelloWorld.Producer.ConsoleApp
{
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection()) //2. Create a connection to the server
using (var channel = connection.CreateModel()) //3. Create a channel
{
channel.QueueDeclare(queue: "Example.RabbitMQ.HelloWorld", durable: false, exclusive: false, autoDelete: false, arguments: null); //4. Declare the queue to send to
string message = "Hello World!";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "", routingKey: "Example.RabbitMQ.HelloWorld", basicProperties: null, body: body);//5. Publish the message to the queue
Console.WriteLine(" Send a message :{0}", message);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
}

Consumer code : using namespace std , Create a server connection , Create channels , Declaration queues are consistent with producer code , Added to deliver messages in the queue to us . Because it will send us messages asynchronously , So we offer callbacks . This is it. EventingBasicConsumer.Received What the event handler does .

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
namespace Example.RabbitMQ.HelloWorld.Consumer.ConsoleApp
{
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "Example.RabbitMQ.HelloWorld", durable: false, exclusive: false, autoDelete: false, arguments: null);
Console.WriteLine(" Waiting for news .");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" receive messages :{0}", message);
};
channel.BasicConsume(queue: "Example.RabbitMQ.HelloWorld", autoAck: true, consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
}

Let's look at the output :

The sender :

 Send a message :Hello World!
Press [enter] to exit.

The receiver :

 Waiting for news .
Press [enter] to exit.
receive messages :Hello World!

3.2 Work queue

Work queue ( Also known as task queue ) The main idea is to avoid performing resource intensive tasks immediately , Then you have to wait for it to finish . contrary , We arranged to finish the task later . We encapsulate the task as a message and send it to the queue . The work is running in the background and constantly fetching tasks from the queue and then executing . When you run multiple work processes , The tasks in the task queue will be shared and executed by the worker process .

Producer code :

using RabbitMQ.Client;
using System;
using System.Text;
namespace Example.RabbitMQ.WorkQueues.Producer.ConsoleApp
{
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "Example.RabbitMQ.WorkQueues", durable: true, exclusive: false, autoDelete: false, arguments: null);
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
channel.BasicPublish(exchange: "", routingKey: "Example.RabbitMQ.WorkQueues", basicProperties: properties, body: body);
Console.WriteLine(" Send a message :{0}", message);
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
private static string GetMessage(string[] args)
{
return args.Length > 0 ? string.Join(" ", args) : "Hello World!";
}
}
}

Consumer code :

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading;
namespace Example.RabbitMQ.WorkQueues.Consumer.ConsoleApp
{
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "Example.RabbitMQ.WorkQueues", durable: true, exclusive: false, autoDelete: false, arguments: null);
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
Console.WriteLine(" Waiting for news .");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
byte[] body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" receive messages :{0}", message);
int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000);
Console.WriteLine(" Completion of reception ");
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: "Example.RabbitMQ.WorkQueues", autoAck: false, consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
}

Cyclic scheduling

The advantage of using task queue is that it's easy to work in parallel . If we have a lot of backlog , We can solve the problem just by adding more workers , Make the scalability of the system easier .

Let's look at the output :

The sender :

\bin\Debug\net5.0>Example.RabbitMQ.WorkQueues.Producer.ConsoleApp news 1
Send a message : news 1
Press [enter] to exit.
\bin\Debug\net5.0>Example.RabbitMQ.WorkQueues.Producer.ConsoleApp news 2
Send a message : news 2
Press [enter] to exit.
\bin\Debug\net5.0>Example.RabbitMQ.WorkQueues.Producer.ConsoleApp news 3
Send a message : news 3
Press [enter] to exit.
\bin\Debug\net5.0>Example.RabbitMQ.WorkQueues.Producer.ConsoleApp news 4
Send a message : news 4
Press [enter] to exit.

The receiver 1:

 Waiting for news .
Press [enter] to exit.
receive messages : news 1
Completion of reception
receive messages : news 3
Completion of reception

The receiver 2:

 Waiting for news .
Press [enter] to exit.
receive messages : news 2
Completion of reception
receive messages : news 4
Completion of reception

By default ,RabbitMQ Each message will be sent in order to the next consumer . On average, , Every consumer receives the same number of messages . This way of distributing messages is called a loop . Try with three or more workers .

Message confirmation

To make sure that messages never get lost ,RabbitMQ Support message confirmation . The consumer sends back a confirmation (acknowledgement), Informing RabbitMQ Has been received , Processed specific messages , also RabbitMQ You are free to delete it .

channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);

Use this code , We can make sure , Even if you use CTRL + C Kill the staff , And no information will be lost . Soon after the worker died , All unacknowledged messages will be resend .

Message persistence

We've learned how to make sure that even if the consumer dies , The mission will not be lost . however , If RabbitMQ The server stops , Our mission will still be lost .

When RabbitMQ When exiting or collapsing , Unless you tell me not to , Otherwise it will forget about queues and messages . Make sure messages are not lost , Two things need to be done : We need to mark both queues and messages as persistent .

First , We need to make sure that the queue will be in RabbitMQ The node continues to exist after restart . So , We need to make it permanent :

channel.QueueDeclare(queue: "Example.RabbitMQ.WorkQueues", durable: true, exclusive: false, autoDelete: false, arguments: null);

Last , We need to mark messages as persistent - By way of IBasicProperties.SetPersistent Set to true.

var properties = channel.CreateBasicProperties();
properties.Persistent = true;

Fair dispatch

We can BasicQos Methods and prefetchCount = 1 Settings are used together . This tells RabbitMQ Don't give workers more than one message at a time . let me put it another way , Before processing and confirming the last message , Don't send new messages to staff . Instead, assign it to the next working program that is not busy .

channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

3.3 Release / subscribe

In the last tutorial , We created a work queue . The assumption behind the work queue is , Each task is delivered to just one worker . In this part , We're going to do something totally different - We deliver the message to multiple consumers . This pattern is called “ Release / subscribe ”.

Producer code :

using RabbitMQ.Client;
using System;
using System.Text;
namespace Example.RabbitMQ.PublishSubscribe.Producer.ConsoleApp
{
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "Example.RabbitMQ.PublishSubscribe", type: ExchangeType.Fanout);
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "Example.RabbitMQ.PublishSubscribe", routingKey: "", basicProperties: null, body: body);
Console.WriteLine(" Send a message :{0}", message);
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
private static string GetMessage(string[] args)
{
return args.Length > 0 ? string.Join(" ", args) : "info: Hello World!";
}
}
}

The producer code doesn't look very different from the previous tutorial . The most important change is that we now want to publish news to Example.RabbitMQ.PublishSubscribe exchanger , It's not a nameless message exchanger . There are several types of exchange :direct,topic,headers and fanout, Here we use fanout Exchange type .

Consumer code :

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
namespace Example.RabbitMQ.PublishSubscribe.Consumer.ConsoleApp
{
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "Example.RabbitMQ.PublishSubscribe", type: ExchangeType.Fanout);
var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName, exchange: "Example.RabbitMQ.PublishSubscribe", routingKey: "");
Console.WriteLine(" Waiting for news .");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
byte[] body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" receive messages :{0}", message);
};
channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
}

If there is no queue bound to the switch , The message will be lost , But it's OK for us . If no consumer is listening , We can safely discard the message .

3.4 route

In the last tutorial , We created a release / subscribe . We can broadcast messages to many receivers . In this tutorial , We're going to add features to it - Assign message categories to specific subscribers .

Producer code :

using RabbitMQ.Client;
using System;
using System.Linq;
using System.Text;
namespace Example.RabbitMQ.Routing.Producer.ConsoleApp
{
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "Example.RabbitMQ.Routing", type: "direct");
var severity = (args.Length > 0) ? args[0] : "info";
var message = (args.Length > 1) ? string.Join(" ", args.Skip(1).ToArray()) : "Hello World!";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "Example.RabbitMQ.Routing", routingKey: severity, basicProperties: null, body: body);
Console.WriteLine(" Send a message :'{0}':'{1}'", severity, message);
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}

Consumer code :

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
namespace Example.RabbitMQ.Routing.Consumer.ConsoleApp
{
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "Example.RabbitMQ.Routing", type: "direct");
var queueName = channel.QueueDeclare().QueueName;
if (args.Length < 1)
{
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
Environment.ExitCode = 1;
return;
}
foreach (var severity in args)
{
channel.QueueBind(queue: queueName, exchange: "Example.RabbitMQ.Routing", routingKey: severity);
}
Console.WriteLine(" Waiting for news .");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
var routingKey = ea.RoutingKey;
Console.WriteLine(" receive messages :'{0}':'{1}'", routingKey, message);
};
channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
}

3.5 topic of conversation

In the last tutorial , We improved the messaging system . Instead of using a fan out switch that can only perform virtual broadcasting , We use direct switches , And the possibility of selectively receiving messages .

Although we have improved our system with direct exchange , But it still has limitations - It cannot route based on multiple conditions .

*( asterisk ) Instead of a word .
#( Hash ) Can replace zero or more words .

Producer code :

using RabbitMQ.Client;
using System;
using System.Linq;
using System.Text;
namespace Example.RabbitMQ.Topics.Producer.ConsoleApp
{
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "Example.RabbitMQ.Topics", type: "topic");
var routingKey = (args.Length > 0) ? args[0] : "info";
var message = (args.Length > 1) ? string.Join(" ", args.Skip(1).ToArray()) : "Hello World!";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "Example.RabbitMQ.Topics", routingKey: routingKey, basicProperties: null, body: body);
Console.WriteLine(" Send a message :'{0}':'{1}'", routingKey, message);
}
}
}
}

Consumer code :

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
namespace Example.RabbitMQ.Topics.Consumer.ConsoleApp
{
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "Example.RabbitMQ.Topics", type: "topic");
var queueName = channel.QueueDeclare().QueueName;
if (args.Length < 1)
{
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
Environment.ExitCode = 1;
return;
}
foreach (var bindingKey in args)
{
channel.QueueBind(queue: queueName, exchange: "Example.RabbitMQ.Topics", routingKey: bindingKey);
}
Console.WriteLine(" Waiting for news . To exit press CTRL+C");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
var routingKey = ea.RoutingKey;
Console.WriteLine(" receive messages :'{0}':'{1}'", routingKey, message);
};
channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
}
版权声明
本文为[Nan rongxiangru talks about programming]所创,转载请带上原文链接,感谢
https://javamana.com/2021/01/20210114185512249w.html

  1. Learn about RPC, why RPC was born, and what's the difference between RPC and HTTP?
  2. Learn about RPC, why RPC was born, and what's the difference between RPC and HTTP?
  3. Learn java base conversion supplementary learning
  4. JDBC测试连接数据库
  5. JDBC test connection database
  6. 大厂面试官竟然这么爱问Kafka,一连八个Kafka问题把我问蒙了?
  7. The interviewers of big factories love to ask Kafka so much. I'm blinded by eight Kafka questions in a row?
  8. 安卓开发和java开发有什么区别!2021年BATJ30套大厂Android经典高频面试题,面试必问
  9. Spring Security OAuth2.0認證授權四:分散式系統認證授權
  10. What's the difference between Android development and java development! 2021 batj30 Android classic high frequency interview questions
  11. Spring security oauth2.0 authentication and authorization 4: distributed system authentication and authorization
  12. Java微服务 vs Go微服务,究竟谁更强!?
  13. 大厂面试官竟然这么爱问Kafka,一连八个Kafka问题把我问蒙了?
  14. Who is stronger, Java microservice vs go microservice!?
  15. Java微服务 vs Go微服务,究竟谁更强!?
  16. The interviewers of big factories love to ask Kafka so much. I'm blinded by eight Kafka questions in a row?
  17. Who is stronger, Java microservice vs go microservice!?
  18. springboot异常处理之404
  19. Spring boot exception handling 404
  20. Spring Boot Security 国际化 多语言 i18n 趟过巨坑
  21. springboot异常处理之404
  22. Spring boot security international multilingual I18N
  23. Spring boot exception handling 404
  24. Netty系列化之Google Protobuf编解码
  25. Netty之编解码
  26. Java编解码
  27. Netty解码器
  28. Netty与TCP粘包拆包
  29. Netty开发入门
  30. Java集合遍历时遇到的坑
  31. Spring IOC 源码解析(下)
  32. Spring IoC源码解析(上)
  33. Google protobuf codec of netty serialization
  34. Encoding and decoding of netty
  35. Java codec
  36. Netty decoder
  37. Netty and TCP packet sticking and unpacking
  38. Introduction to netty development
  39. Problems encountered in Java collection traversal
  40. Spring IOC source code analysis (2)
  41. Spring IOC source code analysis (Part one)
  42. 半小时用Spring Boot注解实现Redis分布式锁
  43. Implementing redis distributed lock with spring boot annotation in half an hour
  44. What should we do if we can't get tickets for Spring Festival transportation? You can solve this problem by using these ticket grabbing apps!
  45. 百度智能(文本识别),API传图OC代码与SDK使用
  46. springboot源码解析-管中窥豹系列之aware(六)
  47. Baidu intelligent (text recognition), API map, OC code and SDK
  48. Spring boot source code analysis
  49. springboot源码解析-管中窥豹系列之aware(六)
  50. 百度智能(文本识别),API传图OC代码与SDK使用
  51. Spring boot source code analysis
  52. Baidu intelligent (text recognition), API map, OC code and SDK
  53. Java学习笔记
  54. Java learning notes
  55. Sentry(v20.12.1) K8S 雲原生架構探索, SENTRY FOR JAVASCRIPT 手動捕獲事件基本用法
  56. 我的程式設計師之路:自學Java篇
  57. SpringBoot專案,如何優雅的把介面引數中的空白值替換為null值?
  58. Sentry (v20.12.1) k8s cloud native architecture exploration, sentry for JavaScript manual capture event basic usage
  59. My way of programmer: self study java
  60. Spring boot project, how to gracefully replace the blank value in the interface argument with null value?