本文主要介绍如何使用rabbitmq进行简单的消息入队,出队操作。
首先,我们下载官方的.net客户端软件,链接:http://www.rabbitmq.com/dotnet.html
下载完成后,将bin目录下的这两个DLL文件复制到我们示例项目中,并添加引用:
RabbitMQ.Client.dll //基于WCF的发布订阅消息的功能类
RabbitMQ.ServiceModel.dll //包括基于WCF方式发布订阅服务模型类
接着,我们创建两个类,一个是ProducerMQ.cs(用于产生消息),一个是CustmerMq.cs(用于消费消息),代码如下:
首先是ProducerMQ:
Uri uri = new Uri("amqp://192.168.10.81:5672/");//服务器地址 string exchange = "ex1";//路由 string exchangeType = "direct";//交换模式 //direct类型的Exchange路由会把消息路由到 //那些binding key与routing key完全匹配的Queue中。 string routingKey = "m1";//路由关键字 bool persistMode = true;//是否对消息队列持久化保存 ConnectionFactory cf = new ConnectionFactory(); cf.UserName = "username"; cf.Password = "password"; cf.VirtualHost = "/";//vhost虚拟主机 cf.RequestedHeartbeat = 0;//heatbeat在客户端和 //服务端之间检测tcp链接是否正常。 cf.Endpoint = new AmqpTcpEndpoint(uri);//创建一个连接到具体总结点的连接 using (IConnection conn = cf.CreateConnection())//创建并返回一个新连接到具体节点的通道 { using (IModel ch = conn.CreateModel()) { if (exchangeType != null) { ch.ExchangeDeclare(Exchange, exchangeType);//声明一个路由 ch.QueueDeclare("q1", true, false, false, null);//声明一个队列 ch.QueueBind("q1", "ex1", "m1");//将一个队列和一个路由绑定起来,并制定路由关键字 } //构造消息实体对象并发布到消息队列上 RabbitMQ.Client.Content.IMapMessageBuilder b = new MapMessageBuilder(ch); IDictionary target = b.Headers; target["header"] = Header; IDictionary targetBody = b.Body; targetBody["body"] = Body;//这个才是具体的发送内容 if (persistMode) { ((IBasicProperties)b.GetContentHeader()).DeliveryMode = 2;//设定传输模式 } //写入 ch.BasicPublish(Exchange, RountingKey, (IBasicProperties)b.GetContentHeader(), b.GetContentBody()); } }
然后是CustmerMq:
string result=""; string serverAddress = "amqp://192.168.10.81:5672/"; ConnectionFactory cf = new ConnectionFactory(); cf.Uri = serverAddress; cf.UserName = "xiaobo"; cf.Password = "xiaobo"; cf.VirtualHost = "/"; cf.RequestedHeartbeat = 0; using (IConnection conn = cf.CreateConnection()) { using (IModel ch = conn.CreateModel()) { //普通使用方式BasicGet //noAck = true,不需要回复,接收到消息后,queue上的消息就会清除 //noAck = false,需要回复,接收到消息后,queue上的消息不会被清除, //直到调用channel.basicAck(deliveryTag, false); //queue上的消息才会被清除 而且, //在当前连接断开以前,其它客户端将不能收到此queue上的消息 BasicGetResult res = ch.BasicGet(QueenName, false/*noAck*/); if (res != null) { ch.BasicAck(res.DeliveryTag, false); result= System.Text.UTF8Encoding.UTF8.GetString(res.Body); } else { result= "无内容!!"; } ch.Close(); } conn.Close(); } return result;