如果还没有MQ环境,可以参考上一篇的博客:
https://www.cnblogs.com/weskynet/p/14877932.html
接下来开始.net core操作Rabbitmq有关的内容。我打算使用比较简单的单机的direct直连模式,来演示一下有关操作,基本套路差不多。
首先,我在我的package包项目上面,添加对RabbitMQ.Client的引用:
"User": "wesky", // 用户名
"Password": "wesky123", // 密码
"ExchangeName": "WeskyExchange", // 设定一个Exchange名称,
"Durable": true // 是否启用持久化
然后,在实体类项目下,新建实体类MqConfigInfo,用于把读取的配置信息赋值到该实体类下:
public string Host { get; set; }
public int Port { get; set; }
public string User { get; set; }
public string Password { get; set; }
public string ExchangeName { get; set; }
public bool Durable { get; set; }
View Code
在刚刚新建的RabbitMQ类库项目下面,引用该实体类库项目,以及APppSettings项目。然后新建一个类,叫做ReadMqConfigHelper,以及它的interface接口,并且提供一个方法,叫ReadMqConfig,用来进行读取配置信息使用:
public class ReadMqConfigHelper:IReadMqConfigHelper
private readonly ILogger<ReadMqConfigHelper> _logger;
public ReadMqConfigHelper(ILogger<ReadMqConfigHelper> logger)
_logger = logger;
public List<MqConfigInfo> ReadMqConfig()
List<MqConfigInfo> config = AppHelper.ReadAppSettings<MqConfigInfo>(new string[] { "MQ" }); // 读取MQ配置信息
if (config.Any())
return config;
_logger.LogError($"获取MQ配置信息失败:没有可用数据集");
return null;
catch (Exception ex)
_logger.LogError($"获取MQ配置信息失败:{ex.Message}");
return null;
View Code
接着,新建类MqConnectionHelper以及接口IMqConnectionHelper,用于做MQ连接、创建生产者和消费者等有关操作:
private readonly ILogger<MqConnectionHelper> _logger;
public MqConnectionHelper(ILogger<MqConnectionHelper> logger)
_logger = logger;
_connectionReceiveFactory = new IConnectionFactory[_costomerCount];
_connectionReceive = new IConnection[_costomerCount];
_modelReceive = new IModel[_costomerCount];
_basicConsumer = new EventingBasicConsumer[_costomerCount];
备注:使用数组的部分,是给消费端用的。目前生产者只设置了一个,消费者可能存在多个。
当然,有条件的还可以上RabbitMQ集群进行处理,会更好玩一点。
private static IConnectionFactory _connectionSendFactory; //RabbitMQ工厂 发送端
private static IConnectionFactory[] _connectionReceiveFactory; //RabbitMQ工厂 接收端
private static IConnection _connectionSend; //连接 发送端
private static IConnection[] _connectionReceive; //连接 消费端
public static List<MqConfigInfo> _mqConfig; // 配置信息
private static IModel _modelSend; //通道 发送端
private static IModel[] _modelReceive; //通道 消费端
private static EventingBasicConsumer[] _basicConsumer; // 事件
/* 设置两个routingKey 和 队列名称,用来做测试使用*/
public static int _costomerCount = 2;
public static string[] _routingKey = new string[] {"WeskyNet001","WeskyNet002" };
public static string[] _queueName = new string[] { "Queue001", "Queue002" };
/// <summary>
/// 生产者初始化连接配置
/// </summary>
public void SendFactoryConnectionInit()
_connectionSendFactory = new ConnectionFactory
HostName = _mqConfig.FirstOrDefault().Host,
Port = _mqConfig.FirstOrDefault().Port,
UserName = _mqConfig.FirstOrDefault().User,
Password = _mqConfig.FirstOrDefault().Password
/// <summary>
/// 生产者连接
/// </summary>
public void SendFactoryConnection()
if (null != _connectionSend && _connectionSend.IsOpen)
return; // 已有连接
_connectionSend = _connectionSendFactory.CreateConnection(); // 创建生产者连接
if (null != _modelSend && _modelSend.IsOpen)
return; // 已有通道
_modelSend = _connectionSend.CreateModel(); // 创建生产者通道
_modelSend.ExchangeDeclare(_mqConfig.FirstOrDefault().ExchangeName, ExchangeType.Direct); // 定义交换机名称和类型(direct)
/// <summary>
/// 消费者初始化连接配置
/// </summary>
public void ReceiveFactoryConnectionInit()
var factories = new ConnectionFactory
HostName = _mqConfig.FirstOrDefault().Host,
Port = _mqConfig.FirstOrDefault().Port,
UserName = _mqConfig.FirstOrDefault().User,
Password = _mqConfig.FirstOrDefault().Password
for (int i = 0; i < _costomerCount; i++)
_connectionReceiveFactory[i] = factories; // 给每个消费者绑定一个连接工厂
/// <summary>
/// 消费者连接
/// </summary>
/// <param name="consumeIndex"></param>
/// <param name="exchangeName"></param>
/// <param name="routeKey"></param>
/// <param name="queueName"></param>
public void ConnectionReceive(int consumeIndex, string exchangeName, string routeKey, string queueName)
_logger.LogInformation($"开始连接RabbitMQ消费者:{routeKey}");
if (null != _connectionReceive[consumeIndex] && _connectionReceive[consumeIndex].IsOpen)
return;
_connectionReceive[consumeIndex] = _connectionReceiveFactory[consumeIndex].CreateConnection(); // 创建消费者连接
if (null != _modelReceive[consumeIndex] && _modelReceive[consumeIndex].IsOpen)
return;
_modelReceive[consumeIndex] = _connectionReceive[consumeIndex].CreateModel(); // 创建消费者通道
_basicConsumer[consumeIndex] = new EventingBasicConsumer(_modelReceive[consumeIndex]);
_modelReceive[consumeIndex].ExchangeDeclare(exchangeName, ExchangeType.Direct); // 定义交换机名称和类型 与生产者保持一致
_modelReceive[consumeIndex].QueueDeclare(
queue: queueName, //消息队列名称
durable: _mqConfig.FirstOrDefault().Durable, // 是否可持久化,此处配置在文件中,默认全局持久化(true),也可以自定义更改
exclusive: false,
autoDelete: false,
arguments: null
); // 定义消费者队列
_modelReceive[consumeIndex].QueueBind(queueName, exchangeName, routeKey); // 队列绑定给指定的交换机
_modelReceive[consumeIndex].BasicQos(0, 1, false); // 设置消费者每次只接收一条消息
StartListener((model, ea) =>
byte[] message = ea.Body.ToArray(); // 接收到的消息
string msg = Encoding.UTF8.GetString(message);
_logger.LogInformation($"队列{queueName}接收到消息:{msg}");
Thread.Sleep(2000);
_modelReceive[consumeIndex].BasicAck(ea.DeliveryTag, true);
}, queueName, consumeIndex);
/// <summary>
/// 消费者接收消息的确认机制
/// </summary>
/// <param name="basicDeliverEventArgs"></param>
/// <param name="queueName"></param>
/// <param name="consumeIndex"></param>
private static void StartListener(EventHandler<BasicDeliverEventArgs> basicDeliverEventArgs, string queueName, int consumeIndex)
_basicConsumer[consumeIndex].Received += basicDeliverEventArgs;
_modelReceive[consumeIndex].BasicConsume(queue: queueName, autoAck: false, consumer: _basicConsumer[consumeIndex]); // 设置手动确认。
/// <summary>
/// 消息发布
/// </summary>
/// <param name="message"></param>
/// <param name="exchangeName"></param>
/// <param name="routingKey"></param>
public static void PublishExchange(string message, string exchangeName, string routingKey = "")
byte[] body = Encoding.UTF8.GetBytes(message);
_modelSend.BasicPublish(exchangeName, routingKey, null, body);
View Code
现在,我把整个Wsk.Core.RabbitMQ项目进行添加到依赖注入:
搭建RabbitMQ简单通用的直连方法如果还没有MQ环境,可以参考上一篇的博客:https://www.cnblogs.com/weskynet/p/14877932.html接下来开始.netcore操作Rabbitmq有关的内容。我打算使用比较简单的单机的direct直连模式,来演示一下有关操作,基本套路差不多。首先,我在我的package包项目上面,添加对RabbitMQ.Cl...
1. 首先我要保证的是项目在启动之初,就要同时启动队列,并进行消费
2. 队列的各种配置与RabbitMQ连接对象必须保证单例且必须全局注入
3. 支持扩展也就是多个消费者,共同消费一个队列。
4. 把方法独立出来,利于以后的扩展和业务增加
5. 写入数据库中,保存数据(由于是子线程运行rabbitmq,所以没办法直接得到主线程的容器进行注入因为这个我纠结了两个小时,具体方法看代码)
带有.NET Core API的基本RabbitMQ
基本Web API .NET CORE,具有使用RabbitMQ的队列实现
要实现此项目,必须通过docker运行RabbitMQ服务器。 跑步
docker run -d --hostname rabbitserver --name rabbitmq-server -p 15672:15672 -p 5672:5672 rabbitmq:3-management
RabbitMQ服务器
要查看服务器管理,您应该使用在docker image上配置的端口转到本地主机服务器
http://localhost:15672/
运行API项目将请求发送到RabbitMQ的服务器,您可以使用任何请求提交工具。
" id " : 0 ,
" itemName " : " " ,
" price " :
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</N
微信公众号:趣编程ACE关注可了解更多的.NET日常实战开发技巧,如需源码 请公众号后台留言 源码;[如果觉得本公众号对您有帮助,欢迎关注].Net中RabbitMQ的使用超清观看视频哦~官网链接RabbitMQ代码演示-详细见代码注释,操作看上文视频生产者代码1usingRabbitMQ.Client;
2usingSystem.Text;
5//...
本文实例为大家分享了RabbitMQ .NET消息队列使用方法,供大家参考,具体内容如下
首先下载安装包,我都环境是win7 64位:
去官网下载 otp_win64_19.0.exe 和rabbitmq-server-3.6.3.exe安装好
然后开始编程了:
(1)创建生产者类:
class Program
private static void Main()
//建立RabbitMQ连接和通道
var connectionFactory = new ConnectionFactory
HostName = 12
1. Redis消息队列
Redis消息队列是基于Redis的发布/订阅模式实现的。发布者将消息发送到指定的频道,订阅者从频道中订阅消息。Redis的发布/订阅模式是一个简单的模型,适用于一些简单的场景,例如实时消息推送等。但是,它不支持消息的持久化,也不支持消息的路由和过滤,所以在一些复杂的场景下可能会有限制。
2. RabbitMQ消息队列
RabbitMQ消息队列是一个完整的消息中间件,它支持多种消息传输协议和多种编程语言,包括AMQP、STOMP、MQTT等。它提供了许多高级特性,例如消息持久化、消息路由、消息过滤、消息确认等。RabbitMQ还支持集群部署和负载均衡,可以保证高可靠性和高可扩展性。但是,RabbitMQ的实现比Redis复杂,需要更多的配置和管理。
总的来说,Redis消息队列适用于一些简单的场景,而RabbitMQ消息队列适用于更复杂的场景,需要更高级的特性和更完善的管理。