破解php网站后台密码,有个网站可以接单做设计的,济南建设网站企业,用iis搭建网站RabbitMQ中的各模式及其用法 工作队列模式一、生产者代码1、封装工具类2、编写代码3、发送消息效果 二、消费者代码1、编写代码2、运行效果 发布订阅模式一、生产者代码二、消费者代码1、消费者1号2、消费者2号 三、运行效果四、小结 路由模式一、生产者代码二、消费者代码1、消… RabbitMQ中的各模式及其用法 工作队列模式一、生产者代码1、封装工具类2、编写代码3、发送消息效果 二、消费者代码1、编写代码2、运行效果 发布订阅模式一、生产者代码二、消费者代码1、消费者1号2、消费者2号 三、运行效果四、小结 路由模式一、生产者代码二、消费者代码1、消费者1号2、消费者2号 三、运行结果1、绑定关系2、消费消息 主题模式一、生产者代码二、消费者代码1、消费者1号2、消费者2号三、运行效果 总结 工作队列模式
一、生产者代码
新建一个module在module下创建属于自己的包并且创建一个名为“work”的子包以及工具类包“util”。结构如图所示 在pom文件中添加图中所示依赖
dependenciesdependencygroupIdcom.rabbitmq/groupIdartifactIdamqp-client/artifactIdversion5.20.0/version/dependency/dependencies此时准备工作基本完成。
1、封装工具类
修改rabbitMQ地址替换为自己的。
package com.xxx.rabbitmq.util;import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;/*** ClassName: ConnectionUtil* Package: com.xxx.rabbitmq.util* Author: * CreateDate: * Version: V1.0.0* Description:*/public class ConnectionUtil {public static final String HOST_ADDRESS 192.168.xxx.xxx;public static Connection getConnection() throws Exception {// 定义连接工厂ConnectionFactory factory new ConnectionFactory();// 设置服务地址factory.setHost(HOST_ADDRESS);// 端口factory.setPort(5672);//设置账号信息用户名、密码、vhostfactory.setVirtualHost(/);factory.setUsername(guest);factory.setPassword(123456);// 通过工程获取连接Connection connection factory.newConnection();return connection;}public static void main(String[] args) throws Exception {Connection con ConnectionUtil.getConnection();// amqp://guest192.168.xxx.xxx:5672/System.out.println(con);con.close();}}
2、编写代码
新建生产者类Producer
package com.xxx.rabbitmq.work;import com.xxx.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;/*** ClassName: Producer* Package: com.xxx.rabbitmq.work* Author: * CreateDate: * Version: V1.0.0* Description:*/public class Producer {public static final String QUEUE_NAME work_queue;public static void main(String[] args) throws Exception {Connection connection ConnectionUtil.getConnection();Channel channel connection.createChannel();channel.queueDeclare(QUEUE_NAME,true,false,false,null);for (int i 1; i 10; i) {String body ihello rabbitmq~~~;channel.basicPublish(,QUEUE_NAME,null,body.getBytes());}channel.close();connection.close();}
}
3、发送消息效果 二、消费者代码
1、编写代码
创建Consumer1和Consumer2。Consumer2只是类名和打印提示不同代码完全一样。 Consumer1:
package com.xxx.rabbitmq.work;import com.xxx.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;/*** ClassName: Consumer1* Package: com.xxx.rabbitmq.work* Author: * CreateDate: * Version: V1.0.0* Description:*/public class Consumer1 {static final String QUEUE_NAME work_queue;public static void main(String[] args) throws Exception {Connection connection ConnectionUtil.getConnection();Channel channel connection.createChannel();channel.queueDeclare(QUEUE_NAME,true,false,false,null);Consumer consumer new DefaultConsumer(channel){Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(Consumer1 bodynew String(body));}};channel.basicConsume(QUEUE_NAME,true,consumer);}
}
Consumer2:
package com.xxx.rabbitmq.work;import com.xxx.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;/*** ClassName: Consumer2* Package: com.xxx.rabbitmq.work* Author: * CreateDate: * Version: V1.0.0* Description:*/public class Consumer2 {static final String QUEUE_NAME work_queue;public static void main(String[] args) throws Exception {Connection connection ConnectionUtil.getConnection();Channel channel connection.createChannel();channel.queueDeclare(QUEUE_NAME,true,false,false,null);Consumer consumer new DefaultConsumer(channel){Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(Consumer2 bodynew String(body));}};channel.basicConsume(QUEUE_NAME,true,consumer);}
}
** 注意:** 运行的时候先启动两个消费端程序然后再启动生产者端程序。 如果已经运行过生产者程序则手动把work_queue队列删掉。
2、运行效果
最终两个消费端程序竞争结果如下 这样就完成了工作队列模式的演示。
发布订阅模式
一、生产者代码
还是在上面的module内新建一个名为fanout的子包在包内创建Producer类
package com.xxx.rabbitmq.fanout;import com.xxx.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;/*** ClassName: Producer* Package: com.xxx.rabbitmq.fanout* Author: * CreateDate: * Version: V1.0.0* Description:*/public class Producer {public static void main(String[] args) throws Exception {// 1、获取连接Connection connection ConnectionUtil.getConnection();// 2、创建频道Channel channel connection.createChannel();// 参数1. exchange交换机名称// 参数2. type交换机类型// DIRECT(direct)定向// FANOUT(fanout)扇形广播发送消息到每一个与之绑定队列。// TOPIC(topic)通配符的方式// HEADERS(headers)参数匹配// 参数3. durable是否持久化// 参数4. autoDelete自动删除// 参数5. internal内部使用。一般false// 参数6. arguments其它参数String exchangeName test_fanout;// 3、创建交换机channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);// 4、创建队列String queue1Name test_fanout_queue1;String queue2Name test_fanout_queue2;channel.queueDeclare(queue1Name,true,false,false,null);channel.queueDeclare(queue2Name,true,false,false,null);// 5、绑定队列和交换机// 参数1. queue队列名称// 参数2. exchange交换机名称// 参数3. routingKey路由键绑定规则// 如果交换机的类型为fanoutroutingKey设置为channel.queueBind(queue1Name,exchangeName,);channel.queueBind(queue2Name,exchangeName,);String body 日志信息张三调用了findAll方法...日志级别info...;// 6、发送消息channel.basicPublish(exchangeName,,null,body.getBytes());// 7、释放资源channel.close();connection.close();}
}二、消费者代码
1、消费者1号
package com.xxx.rabbitmq.fanout;import com.xxx.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;/*** ClassName: Consumer1* Package: com.xxx.rabbitmq.fanout* Author: * CreateDate: * Version: V1.0.0* Description:*/public class Consumer1 {public static void main(String[] args) throws Exception {Connection connection ConnectionUtil.getConnection();Channel channel connection.createChannel();String queue1Name test_fanout_queue1;channel.queueDeclare(queue1Name,true,false,false,null);Consumer consumer new DefaultConsumer(channel){Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(bodynew String(body));System.out.println(队列 1 消费者 1 将日志信息打印到控制台.....);}};channel.basicConsume(queue1Name,true,consumer);}
}
2、消费者2号
package com.xxx.rabbitmq.fanout;import com.xxx.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;/*** ClassName: Consumer2* Package: com.xxx.rabbitmq.fanout* Author: * CreateDate: * Version: V1.0.0* Description:*/public class Consumer2 {public static void main(String[] args) throws Exception {Connection connection ConnectionUtil.getConnection();Channel channel connection.createChannel();String queue2Name test_fanout_queue2;channel.queueDeclare(queue2Name,true,false,false,null);Consumer consumer new DefaultConsumer(channel){Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(bodynew String(body));System.out.println(队列 2 消费者 2 将日志信息打印到控制台.....);}};channel.basicConsume(queue2Name,true,consumer);}
}
三、运行效果
先启动消费者然后再运行生产者程序发送消息
四、小结
交换机和队列的绑定关系如下图所示 交换机需要与队列进行绑定绑定之后一个消息可以被多个消费者都收到。 发布订阅模式与工作队列模式的区别
工作队列模式本质上是绑定默认交换机发布订阅模式绑定指定交换机监听同一个队列的消费端程序彼此之间是竞争关系绑定同一个交换机的多个队列在发布订阅模式下消息是广播的每个队列都能接收到消息
路由模式
一、生产者代码
新建子包routing并新建Producer类
package com.xxx.rabbitmq.routing;import com.xxx.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;/*** ClassName: Producer* Package: com.xxx.rabbitmq.routing* Author: * CreateDate: * Version: V1.0.0* Description:*/public class Producer {public static void main(String[] args) throws Exception {Connection connection ConnectionUtil.getConnection();Channel channel connection.createChannel();String exchangeName test_direct;// 创建交换机channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);// 创建队列String queue1Name test_direct_queue1;String queue2Name test_direct_queue2;// 声明创建队列channel.queueDeclare(queue1Name,true,false,false,null);channel.queueDeclare(queue2Name,true,false,false,null);// 队列绑定交换机// 队列1绑定errorchannel.queueBind(queue1Name,exchangeName,error);// 队列2绑定info error warningchannel.queueBind(queue2Name,exchangeName,info);channel.queueBind(queue2Name,exchangeName,error);channel.queueBind(queue2Name,exchangeName,warning);String message 日志信息张三调用了delete方法.错误了,日志级别error;// 发送消息channel.basicPublish(exchangeName,error,null,message.getBytes());System.out.println(message);// 释放资源channel.close();connection.close();}
}
二、消费者代码
1、消费者1号
package com.xxx.rabbitmq.routing;import com.xxx.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;/*** ClassName: Consumer1* Package: com.xxx.rabbitmq.routing* Author: * CreateDate: * Version: V1.0.0* Description:*/public class Consumer1 {public static void main(String[] args) throws Exception {Connection connection ConnectionUtil.getConnection();Channel channel connection.createChannel();String queue1Name test_direct_queue1;channel.queueDeclare(queue1Name,true,false,false,null);Consumer consumer new DefaultConsumer(channel){Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(bodynew String(body));System.out.println(Consumer1 将日志信息打印到控制台.....);}};channel.basicConsume(queue1Name,true,consumer);}
}
2、消费者2号
package com.xxx.rabbitmq.routing;import com.xxx.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;/*** ClassName: Consumer2* Package: com.xxx.rabbitmq.routing* Author: * CreateDate: * Version: V1.0.0* Description:*/public class Consumer2 {public static void main(String[] args) throws Exception {Connection connection ConnectionUtil.getConnection();Channel channel connection.createChannel();String queue2Name test_direct_queue2;channel.queueDeclare(queue2Name,true,false,false,null);Consumer consumer new DefaultConsumer(channel){Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(bodynew String(body));System.out.println(Consumer2 将日志信息存储到数据库.....);}};channel.basicConsume(queue2Name,true,consumer);}}
三、运行结果
1、绑定关系 2、消费消息 主题模式
一、生产者代码
新建子包topic新建生产者类Producer
package com.xxx.rabbitmq.topic;import com.xxx.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;/*** ClassName: Producer* Package: com.xxx.rabbitmq.topic* Author: * CreateDate: * Version: V1.0.0* Description:*/public class Producer {public static void main(String[] args) throws Exception {Connection connection ConnectionUtil.getConnection();Channel channel connection.createChannel();String exchangeName test_topic;channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);String queue1Name test_topic_queue1;String queue2Name test_topic_queue2;channel.queueDeclare(queue1Name,true,false,false,null);channel.queueDeclare(queue2Name,true,false,false,null);// 绑定队列和交换机// 参数1. queue队列名称// 参数2. exchange交换机名称// 参数3. routingKey路由键,绑定规则// 如果交换机的类型为fanout ,routingKey设置为// routing key 常用格式系统的名称.日志的级别。// 需求 所有error级别的日志存入数据库,所有order系统的日志存入数据库channel.queueBind(queue1Name,exchangeName,#.error);channel.queueBind(queue1Name,exchangeName,order.*);channel.queueBind(queue2Name,exchangeName,*.*);// 分别发送消息到队列order.info、goods.info、goods.errorString body [所在系统order][日志级别info][日志内容订单生成保存成功];channel.basicPublish(exchangeName,order.info,null,body.getBytes());body [所在系统goods][日志级别info][日志内容商品发布成功];channel.basicPublish(exchangeName,goods.info,null,body.getBytes());body [所在系统goods][日志级别error][日志内容商品发布失败];channel.basicPublish(exchangeName,goods.error,null,body.getBytes());channel.close();connection.close();}
}
二、消费者代码
1、消费者1号
消费者1监听队列1
package com.xxx.rabbitmq.topic;import com.xxx.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;/*** ClassName: Consumer1* Package: com.xxx.rabbitmq.topic* Author: * CreateDate: * Version: V1.0.0* Description:*/public class Consumer1 {public static void main(String[] args) throws Exception {Connection connection ConnectionUtil.getConnection();Channel channel connection.createChannel();String QUEUE_NAME test_topic_queue1;channel.queueDeclare(QUEUE_NAME,true,false,false,null);Consumer consumer new DefaultConsumer(channel){Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(bodynew String(body));}};channel.basicConsume(QUEUE_NAME,true,consumer);}
}
2、消费者2号
消费者2监听队列2
package com.xxx.rabbitmq.topic;import com.xxx.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;import java.io.IOException;/*** ClassName: Consumer2* Package: com.xxx.rabbitmq.topic* Author: * CreateDate: * Version: V1.0.0* Description:*/public class Consumer2 {public static void main(String[] args) throws Exception {Connection connection ConnectionUtil.getConnection();Channel channel connection.createChannel();String QUEUE_NAME test_topic_queue2;channel.queueDeclare(QUEUE_NAME,true,false,false,null);Consumer consumer new DefaultConsumer(channel){Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(bodynew String(body));}};channel.basicConsume(QUEUE_NAME,true,consumer);}
}
三、运行效果
队列1 队列2 至此就完成了RabbitMQ各模式的使用演示。
总结
在选择使用什么模式时需要对应业务需求结合需求选择合适的模式。