博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RabbitMQ~消费者实时与消息服务器保持通话
阅读量:5730 次
发布时间:2019-06-18

本文共 3502 字,大约阅读时间需要 11 分钟。

这个文章主要介绍简单的消费者的实现,rabbitMQ实现的消费者可以对消息服务器进行实时监听,当有消息(生产者把消息推到服务器上之后),消费者可以自动去消费它,这通常是开启一个进程去维护这个对话,它与消息服务器保持一个TCP的长连接,整个这个过程于rabbitMQ为我们提供,程序开发人员只需要实现自己的回调方法即可.

简单的rabbitMQ消费者

///     /// 消息消费者    ///     public class RabbitMqSubscriber : Lind.DDD.Commons.DisposableBase    {        private readonly string exchangeName;        private readonly string queueName;        private readonly IConnection connection;        private readonly IModel channel;        private bool disposed;        ///         /// 从消息服务器拉到消息后触发        ///         public event EventHandler
MessageReceived; ///
/// Initializes a new instance of
RabbitMqMessageSubscriber
class. ///
///
///
///
public RabbitMqSubscriber(string uri, string queueName, string userName = "", string password = "") { this.exchangeName = exchangeName; this.queueName = queueName; var factory = new ConnectionFactory() { Uri = uri }; if (!string.IsNullOrWhiteSpace(userName)) factory.UserName = userName; if (!string.IsNullOrWhiteSpace(password)) factory.Password = password; this.connection = factory.CreateConnection(); this.channel = connection.CreateModel(); } public void Subscribe() { channel.QueueDeclare( queue: this.queueName, durable: false,//持久化 exclusive: false, //独占,只能被一个consumer使用 autoDelete: false,//自己删除,在最后一个consumer完成后删除它 arguments: null); var consumer = new EventingBasicConsumer(channel); consumer.Received += (sender, e) => { var body = e.Body; var json = Encoding.UTF8.GetString(body); var message = JsonConvert.DeserializeObject(json, new JsonSerializerSettings { TypeNameHandling = TypeNameHandling.All }); this.OnMessageReceived(new MessageReceivedEventArgs(message)); channel.BasicAck(e.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: queueName, noAck: false, consumer: consumer); } private void OnMessageReceived(MessageReceivedEventArgs e) { this.MessageReceived?.Invoke(this, e); } protected override void Finalize(bool disposing) { if (disposing) { if (!disposed) { this.channel.Dispose(); this.connection.Dispose(); disposed = true; } } } }

简单调用

class Program    {        static void Main(string[] args)        {            var subscriber = new Lind.DDD.RabbitMq.RabbitMqSubscriber("amqp://localhost:5672", "zzl");            subscriber.MessageReceived += Subscriber_MessageReceived;            subscriber.Subscribe();            Console.ReadKey();        }        private static void Subscriber_MessageReceived(object sender, RabbitMq.MessageReceivedEventArgs e)        {            Console.WriteLine("消费者2->消费了一个消息{0}", e.Message);            Lind.DDD.Logger.LoggerFactory.Instance.Logger_Debug("消费者2->消费了一个消息{0}" + e.Message);            Thread.Sleep(2000);        }    }

实时拉消息

RabbitMQ消息模型

通过上面图我们可以更容易和清晰的去理解rabbitmq的工作流程.

转载于:https://www.cnblogs.com/lori/p/6477931.html

你可能感兴趣的文章
MSTP实现负载均衡
查看>>
HDFS元数据管理机制
查看>>
入门一班 20181113 php5 install php7 install
查看>>
gawk 初识
查看>>
linux命令大全
查看>>
学习jQueryUI
查看>>
toUtf-8转换
查看>>
直流稳压电源的分类
查看>>
【flash】解决flash遮挡其他dom元素的办法
查看>>
基于arduino+web的物联网demo,web和微信控制
查看>>
jQuery带次数带弹窗的大转盘抽奖代码(支持h5)
查看>>
OpenStack搭建私有云一:认证服务
查看>>
抖音新王牌:“多闪”怎么快速引流脚本爆粉软件?批量添加多闪群批量私信好友!...
查看>>
centos7 安装tomcat
查看>>
云谋略时代与云操作体系概述
查看>>
Vulkan Tutorial 03 理解Instance
查看>>
hadoop的HA实现,超详细(一)
查看>>
Linux 学习命令之修改日期时间
查看>>
solusvm xen ISO 安装 装 template
查看>>
Linux学习笔记<六>——用户登陆配置
查看>>