1 brief introduction
RabbitMQ There are thousands of users , Is one of the most popular open source message agents .
1.1 AMQP What is it?
AMQP( Advanced message queue protocol ) It's a network protocol . It supports qualified client applications (application) And message middleware agents (messaging middleware broker) Communicate with each other .
1.2 What is the message queue
MQ Its full name is Message Queue, Message queue . It's an application to application communication method . The application reads and writes messages in and out of the queue ( Data for applications ) To communication , Instead of having a dedicated connection to link them .
2 install
adopt docker Installation
First , Get into RabbitMQ Official website http://www.rabbitmq.com/download.html
then , find Docker image And enter
Find the version you need to install , -management It means that there is a management interface , It can be accessed by browser .
next , Take it docker install , I installed it here 3-management:
docker run -d --hostname my-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management
Last , Take a look at :http://localhost:15672/ , user name / password : guest/guest
3 Use
3.1 “ Hello World!”
RabbitMQ It's the message broker : It receives and forwards messages . You can think of it as a post office : When you put your email in your mailbox , You can make sure that the postman, Mr. or Ms. finally delivers the mail to the recipient .
In the following illustration ,“ P” It's our producer ,“ C” It's our consumers . The middle box is a queue
Producer code :
using RabbitMQ.Client; //1. Use namespaces
using System;
using System.Text;
namespace Example.RabbitMQ.HelloWorld.Producer.ConsoleApp
{
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection()) //2. Create a connection to the server
using (var channel = connection.CreateModel()) //3. Create a channel
{
channel.QueueDeclare(queue: "Example.RabbitMQ.HelloWorld", durable: false, exclusive: false, autoDelete: false, arguments: null); //4. Declare the queue to send to
string message = "Hello World!";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "", routingKey: "Example.RabbitMQ.HelloWorld", basicProperties: null, body: body);//5. Publish the message to the queue
Console.WriteLine(" Send a message :{0}", message);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
}
Consumer code : using namespace std , Create a server connection , Create channels , Declaration queues are consistent with producer code , Added to deliver messages in the queue to us . Because it will send us messages asynchronously , So we offer callbacks . This is it. EventingBasicConsumer.Received What the event handler does .
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
namespace Example.RabbitMQ.HelloWorld.Consumer.ConsoleApp
{
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "Example.RabbitMQ.HelloWorld", durable: false, exclusive: false, autoDelete: false, arguments: null);
Console.WriteLine(" Waiting for news .");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" receive messages :{0}", message);
};
channel.BasicConsume(queue: "Example.RabbitMQ.HelloWorld", autoAck: true, consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
}
Let's look at the output :
The sender :
Send a message :Hello World!
Press [enter] to exit.
The receiver :
Waiting for news .
Press [enter] to exit.
receive messages :Hello World!
3.2 Work queue
Work queue ( Also known as task queue ) The main idea is to avoid performing resource intensive tasks immediately , Then you have to wait for it to finish . contrary , We arranged to finish the task later . We encapsulate the task as a message and send it to the queue . The work is running in the background and constantly fetching tasks from the queue and then executing . When you run multiple work processes , The tasks in the task queue will be shared and executed by the worker process .
Producer code :
using RabbitMQ.Client;
using System;
using System.Text;
namespace Example.RabbitMQ.WorkQueues.Producer.ConsoleApp
{
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "Example.RabbitMQ.WorkQueues", durable: true, exclusive: false, autoDelete: false, arguments: null);
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
channel.BasicPublish(exchange: "", routingKey: "Example.RabbitMQ.WorkQueues", basicProperties: properties, body: body);
Console.WriteLine(" Send a message :{0}", message);
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
private static string GetMessage(string[] args)
{
return args.Length > 0 ? string.Join(" ", args) : "Hello World!";
}
}
}
Consumer code :
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading;
namespace Example.RabbitMQ.WorkQueues.Consumer.ConsoleApp
{
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "Example.RabbitMQ.WorkQueues", durable: true, exclusive: false, autoDelete: false, arguments: null);
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
Console.WriteLine(" Waiting for news .");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
byte[] body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" receive messages :{0}", message);
int dots = message.Split('.').Length - 1;
Thread.Sleep(dots * 1000);
Console.WriteLine(" Completion of reception ");
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: "Example.RabbitMQ.WorkQueues", autoAck: false, consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
}
Cyclic scheduling
The advantage of using task queue is that it's easy to work in parallel . If we have a lot of backlog , We can solve the problem just by adding more workers , Make the scalability of the system easier .
Let's look at the output :
The sender :
\bin\Debug\net5.0>Example.RabbitMQ.WorkQueues.Producer.ConsoleApp news 1
Send a message : news 1
Press [enter] to exit.
\bin\Debug\net5.0>Example.RabbitMQ.WorkQueues.Producer.ConsoleApp news 2
Send a message : news 2
Press [enter] to exit.
\bin\Debug\net5.0>Example.RabbitMQ.WorkQueues.Producer.ConsoleApp news 3
Send a message : news 3
Press [enter] to exit.
\bin\Debug\net5.0>Example.RabbitMQ.WorkQueues.Producer.ConsoleApp news 4
Send a message : news 4
Press [enter] to exit.
The receiver 1:
Waiting for news .
Press [enter] to exit.
receive messages : news 1
Completion of reception
receive messages : news 3
Completion of reception
The receiver 2:
Waiting for news .
Press [enter] to exit.
receive messages : news 2
Completion of reception
receive messages : news 4
Completion of reception
By default ,RabbitMQ Each message will be sent in order to the next consumer . On average, , Every consumer receives the same number of messages . This way of distributing messages is called a loop . Try with three or more workers .
Message confirmation
To make sure that messages never get lost ,RabbitMQ Support message confirmation . The consumer sends back a confirmation (acknowledgement), Informing RabbitMQ Has been received , Processed specific messages , also RabbitMQ You are free to delete it .
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
Use this code , We can make sure , Even if you use CTRL + C Kill the staff , And no information will be lost . Soon after the worker died , All unacknowledged messages will be resend .
Message persistence
We've learned how to make sure that even if the consumer dies , The mission will not be lost . however , If RabbitMQ The server stops , Our mission will still be lost .
When RabbitMQ When exiting or collapsing , Unless you tell me not to , Otherwise it will forget about queues and messages . Make sure messages are not lost , Two things need to be done : We need to mark both queues and messages as persistent .
First , We need to make sure that the queue will be in RabbitMQ The node continues to exist after restart . So , We need to make it permanent :
channel.QueueDeclare(queue: "Example.RabbitMQ.WorkQueues", durable: true, exclusive: false, autoDelete: false, arguments: null);
Last , We need to mark messages as persistent - By way of IBasicProperties.SetPersistent Set to true.
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
Fair dispatch
We can BasicQos Methods and prefetchCount = 1 Settings are used together . This tells RabbitMQ Don't give workers more than one message at a time . let me put it another way , Before processing and confirming the last message , Don't send new messages to staff . Instead, assign it to the next working program that is not busy .
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
3.3 Release / subscribe
In the last tutorial , We created a work queue . The assumption behind the work queue is , Each task is delivered to just one worker . In this part , We're going to do something totally different - We deliver the message to multiple consumers . This pattern is called “ Release / subscribe ”.
Producer code :
using RabbitMQ.Client;
using System;
using System.Text;
namespace Example.RabbitMQ.PublishSubscribe.Producer.ConsoleApp
{
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "Example.RabbitMQ.PublishSubscribe", type: ExchangeType.Fanout);
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "Example.RabbitMQ.PublishSubscribe", routingKey: "", basicProperties: null, body: body);
Console.WriteLine(" Send a message :{0}", message);
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
private static string GetMessage(string[] args)
{
return args.Length > 0 ? string.Join(" ", args) : "info: Hello World!";
}
}
}
The producer code doesn't look very different from the previous tutorial . The most important change is that we now want to publish news to Example.RabbitMQ.PublishSubscribe exchanger , It's not a nameless message exchanger . There are several types of exchange :direct,topic,headers and fanout, Here we use fanout Exchange type .
Consumer code :
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
namespace Example.RabbitMQ.PublishSubscribe.Consumer.ConsoleApp
{
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "Example.RabbitMQ.PublishSubscribe", type: ExchangeType.Fanout);
var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName, exchange: "Example.RabbitMQ.PublishSubscribe", routingKey: "");
Console.WriteLine(" Waiting for news .");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
byte[] body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" receive messages :{0}", message);
};
channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
}
If there is no queue bound to the switch , The message will be lost , But it's OK for us . If no consumer is listening , We can safely discard the message .
3.4 route
In the last tutorial , We created a release / subscribe . We can broadcast messages to many receivers . In this tutorial , We're going to add features to it - Assign message categories to specific subscribers .
Producer code :
using RabbitMQ.Client;
using System;
using System.Linq;
using System.Text;
namespace Example.RabbitMQ.Routing.Producer.ConsoleApp
{
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "Example.RabbitMQ.Routing", type: "direct");
var severity = (args.Length > 0) ? args[0] : "info";
var message = (args.Length > 1) ? string.Join(" ", args.Skip(1).ToArray()) : "Hello World!";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "Example.RabbitMQ.Routing", routingKey: severity, basicProperties: null, body: body);
Console.WriteLine(" Send a message :'{0}':'{1}'", severity, message);
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
Consumer code :
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
namespace Example.RabbitMQ.Routing.Consumer.ConsoleApp
{
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "Example.RabbitMQ.Routing", type: "direct");
var queueName = channel.QueueDeclare().QueueName;
if (args.Length < 1)
{
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
Environment.ExitCode = 1;
return;
}
foreach (var severity in args)
{
channel.QueueBind(queue: queueName, exchange: "Example.RabbitMQ.Routing", routingKey: severity);
}
Console.WriteLine(" Waiting for news .");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
var routingKey = ea.RoutingKey;
Console.WriteLine(" receive messages :'{0}':'{1}'", routingKey, message);
};
channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
}
3.5 topic of conversation
In the last tutorial , We improved the messaging system . Instead of using a fan out switch that can only perform virtual broadcasting , We use direct switches , And the possibility of selectively receiving messages .
Although we have improved our system with direct exchange , But it still has limitations - It cannot route based on multiple conditions .
*( asterisk ) Instead of a word .
#( Hash ) Can replace zero or more words .
Producer code :
using RabbitMQ.Client;
using System;
using System.Linq;
using System.Text;
namespace Example.RabbitMQ.Topics.Producer.ConsoleApp
{
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "Example.RabbitMQ.Topics", type: "topic");
var routingKey = (args.Length > 0) ? args[0] : "info";
var message = (args.Length > 1) ? string.Join(" ", args.Skip(1).ToArray()) : "Hello World!";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "Example.RabbitMQ.Topics", routingKey: routingKey, basicProperties: null, body: body);
Console.WriteLine(" Send a message :'{0}':'{1}'", routingKey, message);
}
}
}
}
Consumer code :
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
namespace Example.RabbitMQ.Topics.Consumer.ConsoleApp
{
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "Example.RabbitMQ.Topics", type: "topic");
var queueName = channel.QueueDeclare().QueueName;
if (args.Length < 1)
{
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
Environment.ExitCode = 1;
return;
}
foreach (var bindingKey in args)
{
channel.QueueBind(queue: queueName, exchange: "Example.RabbitMQ.Topics", routingKey: bindingKey);
}
Console.WriteLine(" Waiting for news . To exit press CTRL+C");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
var routingKey = ea.RoutingKey;
Console.WriteLine(" receive messages :'{0}':'{1}'", routingKey, message);
};
channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
}