网站建设所需要的东西,ps做网站边框,10个奇怪又有趣的网站,国外的wordpress主题RabbitMQ高级特性
消息可靠性投递 Consumer ACK 消费端限流 TTL 死信队列 延迟队列 日志与监控 消息可靠性分析与追踪 管理
消息可靠性投递
在使用 RabbitMQ 的时候#xff0c;作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制…RabbitMQ高级特性
消息可靠性投递 Consumer ACK 消费端限流 TTL 死信队列 延迟队列 日志与监控 消息可靠性分析与追踪 管理
消息可靠性投递
在使用 RabbitMQ 的时候作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。 confirm 确认模式 return 退回模式
rabbitmq 整个消息投递的路径为 producer—rabbitmq broker—exchange—queue—consumer 消息从 producer 到 exchange 则会返回一个 confirmCallback 。 消息从 exchange–queue 投递失败则会返回一个 returnCallback 。 我们将利用这两个 callback 控制消息的可靠性投递
设置ConnectionFactory的publisher-confirms“true” 开启 确认模式。使用rabbitTemplate.setConfirmCallback设置回调函数。当消息发送到exchange后回调confirm方法。在方法中判断ack如果为true则发送成功如果为false则发送失败需要处理。设置ConnectionFactory的publisher-returns“true” 开启 退回模式。使用rabbitTemplate.setReturnCallback设置退回函数当消息从exchange路由到queue失败后如果设置了rabbitTemplate.setMandatory(true)参数则会将消息退回给producer。并执行回调函数returnedMessage。 在RabbitMQ中也提供了事务机制但是性能较差此处不做讲解。使用channel下列方法完成事务控制 txSelect(), 用于将当前channel设置成transaction模式txCommit()用于提交事务txRollback(),用于回滚事务pom.xml
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdcom.itheima/groupIdartifactIdrabbitmq-producer-spring/artifactIdversion1.0-SNAPSHOT/versiondependenciesdependencygroupIdorg.springframework/groupIdartifactIdspring-context/artifactIdversion5.1.7.RELEASE/version/dependencydependencygroupIdorg.springframework.amqp/groupIdartifactIdspring-rabbit/artifactIdversion2.1.8.RELEASE/version/dependencydependencygroupIdjunit/groupIdartifactIdjunit/artifactIdversion4.12/version/dependencydependencygroupIdorg.springframework/groupIdartifactIdspring-test/artifactIdversion5.1.7.RELEASE/version/dependency/dependenciesbuildpluginsplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-compiler-plugin/artifactIdversion3.8.0/versionconfigurationsource1.8/sourcetarget1.8/target/configuration/plugin/plugins/build
/projectrabbitmq.hostlocalhost
rabbitmq.port5672
rabbitmq.usernameguest
rabbitmq.passwordguest
rabbitmq.virtual-host/?xml version1.0 encodingUTF-8?
beans xmlnshttp://www.springframework.org/schema/beansxmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexmlns:contexthttp://www.springframework.org/schema/contextxmlns:rabbithttp://www.springframework.org/schema/rabbitxsi:schemaLocationhttp://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/contexthttps://www.springframework.org/schema/context/spring-context.xsdhttp://www.springframework.org/schema/rabbithttp://www.springframework.org/schema/rabbit/spring-rabbit.xsd!--加载配置文件--context:property-placeholder locationclasspath:rabbitmq.properties/!-- 定义rabbitmq connectionFactory --rabbit:connection-factory idconnectionFactory host${rabbitmq.host}port${rabbitmq.port}username${rabbitmq.username}password${rabbitmq.password}virtual-host${rabbitmq.virtual-host}publisher-confirmstruepublisher-returnstrue/!--定义管理交换机、队列--rabbit:admin connection-factoryconnectionFactory/!--定义rabbitTemplate对象操作可以在代码中方便发送消息--rabbit:template idrabbitTemplate connection-factoryconnectionFactory/!--消息可靠性投递生产端--rabbit:queue idtest_queue_confirm nametest_queue_confirm/rabbit:queuerabbit:direct-exchange nametest_exchange_confirmrabbit:bindingsrabbit:binding queuetest_queue_confirm keyconfirm/rabbit:binding/rabbit:bindings/rabbit:direct-exchange
/beanspackage com.itheima;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;RunWith(SpringJUnit4ClassRunner.class)
ContextConfiguration(locations classpath:spring-rabbitmq-producer.xml)
public class
ProducerTest {Autowiredprivate RabbitTemplate rabbitTemplate;/*** 确认模式* 步骤* 1. 确认模式开启ConnectionFactory中开启publisher-confirmstrue* 2. 在rabbitTemplate定义ConfirmCallBack回调函数*/Testpublic void testConfirm() throws InterruptedException {//2. 定义回调rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {/*** param correlationData 相关配置信息* param ack exchange交换机 是否成功收到了消息。true 成功false代表失败* param cause 失败原因*/Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println(confirm方法被执行了....);if (ack) {//接收成功System.out.println(接收成功消息 cause);} else {//接收失败System.out.println(接收失败消息 cause);//做一些处理让消息再次发送。}}});//3. 发送消息rabbitTemplate.convertAndSend(test_exchange_confirm, confirm, message confirm....);Thread.sleep(200);}/*** 回退模式当消息发送给Exchange后Exchange路由到Queue是啊比是 才会执行ReturnCallBack* 步骤* 开启回退模式* 设置ReturnCallBack publisher-returnstrue* 设置Exchange处理消息的模式* 1 消息没有路由到Queue,则丢弃消息 默认* 2 消息没有路由到Queue返回消息发送方ReturnCallBack*/Testpublic void testReturn() throws InterruptedException {/// 设置交换机处理失败的模式 失败执行回调rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {/**** param message 发送消息对象* param replyCode 失败的错误码* param replyText 错误信息* param exchange 交换机* param routingKey 路由键*/Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.out.println(return 执行了);/*** (Body:message confirm.... MessageProperties [headers{}, contentTypetext/plain,* contentEncodingUTF-8, contentLength0,* receivedDeliveryModePERSISTENT, priority0, deliveryTag0])* 312* NO_ROUTE* test_exchange_confirm* confirm111*/System.out.println(message);System.out.println(replyCode);System.out.println(replyText);System.out.println(exchange);System.out.println(routingKey);//处理}});//3. 发送消息rabbitTemplate.convertAndSend(test_exchange_confirm, confirm111, message confirm....);Thread.sleep(200);}}Consumer Ack
ack指Acknowledge确认。 表示消费端收到消息后的确认方式。 有三种确认方式 自动确认acknowledge“none” 手动确认acknowledge“manual” 根据异常情况确认acknowledge“auto”这种方式使用麻烦不作讲解
其中自动确认是指当消息一旦被Consumer接收到则自动确认收到并将相应 message 从 RabbitMQ 的消息缓存中移除。但是在实际业务处理中很可能消息接收到业务处理出现异常那么该消息就会丢失。如果设置了手动确认方式则需要在业务处理成功后调用channel.basicAck()手动签收如果出现异常则调用channel.basicNack()方法让其自动重新发送消息。
在rabbit:listener-container标签中设置acknowledge属性设置ack方式 none自动确认manual手动确认如果在消费端没有出现异常则调用channel.basicAck(deliveryTag,false);方法确认签收消息如果出现异常则在catch中调用 basicNack或 basicReject拒绝消息让MQ重新发送消息。
package com.itheima.listener;import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;/*** Consumer ACK 机制* 1 默认设置手动签收 acknowledgemanual* 2 让监听器类ChannelAwareMessageListener接口 不要实现这个MessageListener* 3 如果消息成功处理 则调用channeld的basicAck签收* 4 如果消息处理失败则调用channel的basicNack拒绝签收broker重新发送consumer*/Component
public class AckListener implements ChannelAwareMessageListener {Overridepublic void onMessage(Message message, Channel channel) throws Exception {long deliveryTag message.getMessageProperties().getDeliveryTag();try{System.out.println(new String(message.getBody()));// 处理业务System.out.println(处理业务逻辑);//int i 3/0; //出错// 手动签收channel.basicAck(deliveryTag, true);}catch (Exception e) {/*** 拒绝签收 第三个参数 requeue重回队列 如果设置为true 则消息重回到queue broker会重新发送该消息给消费端*/channel.basicNack(deliveryTag,true,true);//channel.basicReject(deliveryTag,true);}}
}package com.itheima.test;import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;RunWith(SpringJUnit4ClassRunner.class)
ContextConfiguration(locations classpath:spring-rabbitmq-consumer.xml)
public class ConsumerTest {Testpublic void test(){while (true) {}}
}!--加载配置文件--
context:property-placeholder locationclasspath:rabbitmq.properties/!-- 定义rabbitmq connectionFactory --
rabbit:connection-factory idconnectionFactory host${rabbitmq.host}port${rabbitmq.port}username${rabbitmq.username}password${rabbitmq.password}virtual-host${rabbitmq.virtual-host}/!--包扫描-- !--定义监听器--
context:component-scan base-packagecom.itheima.listener /
!--定义监听器--
rabbit:listener-container connection-factoryconnectionFactory acknowledgemanualrabbit:listener refackListener queue-namestest_queue_confirm/rabbit:listener
/rabbit:listener-container持久化 exchange要持久化 queue要持久化 message要持久化 生产方确认Confirm 消费方确认Ack Broker高可用
1.3 消费端限流 package com.itheima.listener;import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;/*** Consumer 限流机制* 1 确保ack机制为手动确认* 2 listener-container配置熟悉* perfetch1 表示消费端每次从mq拉取一条消息来消费直到手动确认消费完毕后才会继续拉取下一条消息*/Component
public class QosListener implements ChannelAwareMessageListener {Overridepublic void onMessage(Message message, Channel channel) throws Exception {Thread.sleep(1000);/// 获取消息System.out.println(new String(message.getBody()));// 处理逻辑//签收 肯应应答处理完消息之后提醒RabbitMQ可以删除当前队列deliveryTag当前队列中选中的消息multiple是否批量应答channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);}
}!--定义监听器--rabbit:listener-container connection-factoryconnectionFactory acknowledgemanual prefetch1
!-- rabbit:listener refackListener queue-namestest_queue_confirm/rabbit:listener--rabbit:listener refqosListener queue-namestest_queue_confirm/rabbit:listener/rabbit:listener-container测试 Testpublic void test(){while (true) {}}
}生产者发送消息 Testpublic void testSend() throws InterruptedException {for (int i 0; i 10 ; i) {//3. 发送消息rabbitTemplate.convertAndSend(test_exchange_confirm, confirm, message confirm....);//Thread.sleep(200);}}在rabbit:listener-container 中配置 prefetch属性设置消费端一次拉取多少消息
消费端的确认模式一定为手动确认。acknowledge“manual”
1.4 TTL
TTL 全称 Time To Live存活时间/过期时间。
当消息到达存活时间后还没有被消费会被自动清除。
RabbitMQ可以对消息设置过期时间也可以对整个队列Queue设置过期时间。
/**** ttl:过期时间* 1 队列统一过期* 2 消息单独过期** 如果设置消息的过期时间也设置了队列的过期时间它以时间短的为准* 队列过期后会将队列所有消息全部移除* 消息过期后只有消息在队列顶端才会去判断其是否过期(移除掉)*/
Test
public void testTLL() throws InterruptedException {队列统一过期//for (int i 0; i 10 ; i) {// //3. 发送消息// rabbitTemplate.convertAndSend(test_exchange_ttl, ttl.hehe, message tll....);// //Thread.sleep(200);//}///消息后处理对象设置一些消息的参数信息MessagePostProcessor messagePostProcessor new MessagePostProcessor() {Overridepublic Message postProcessMessage(Message message) throws AmqpException {//1 设置message的信息message.getMessageProperties().setExpiration(5000);//消息的过期时间//2 返回该信息return message;}};///消息单独过期//rabbitTemplate.convertAndSend(test_exchange_ttl, ttl.hehe, message tll...., messagePostProcessor);for (int i 0; i 10; i) {if(i5){rabbitTemplate.convertAndSend(test_exchange_ttl, ttl.hehe, message tll...., messagePostProcessor);}else{//不过期消息rabbitTemplate.convertAndSend(test_exchange_ttl, ttl.hehe, message tll....);}}}!--ttl--rabbit:queue nametest_queue_ttl idtest_queue_ttl!--设置queue的参数--rabbit:queue-arguments!--x-message-ttl指队列的过期时间--entry keyx-message-ttl value100000 value-typejava.lang.Integer/entry/rabbit:queue-arguments/rabbit:queue!--声明交换机 绑定--rabbit:topic-exchange nametest_exchange_ttlrabbit:bindingsrabbit:binding patternttl.# queuetest_queue_ttl/rabbit:binding/rabbit:bindings/rabbit:topic-exchange设置队列过期时间使用参数x-message-ttl单位ms(毫秒)会对整个队列消息统一过期。
设置消息过期时间使用参数expiration。单位ms(毫秒)当该消息在队列头部时消费时会单独判断这一消息是否过期。
如果两者都进行了设置以时间短的为准。
1.5 死信队列
死信队列英文缩写DLX 。Dead Letter Exchange死信交换机当消息成为Dead message后可以被重新发送到另一个交换机这个交换机就是DLX。
消息成为死信的三种情况 队列消息长度到达限制 消费者拒接消费消息basicNack/basicReject,并且不把消息重新放入原目标队列,requeuefalse 原队列存在消息过期设置消息到达超时时间未被消费
队列绑定死信交换机
给队列设置参数 x-dead-letter-exchange 设置交换机的名称 和 x-dead-letter-routing-key /*** 发送测试死信消息* 1 过期时间* 2 长度限制* 3 消息拒收*/Testpublic void testDlx(){// 测试过期时间死信消息//rabbitTemplate.convertAndSend(test_exchange_dlx,test.dlx.hehe,我是一条消息我会死吗?);//测试长度限制后消息死信//for (int i 0; i 20; i) {// rabbitTemplate.convertAndSend(test_exchange_dlx,test.dlx.hehe,我是一条消息我会死吗?);//}//3 测试消息拒收rabbitTemplate.convertAndSend(test_exchange_dlx,test.dlx.hehe,我是一条消息我会死吗?);}!--1 声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)--rabbit:queue nametest_queue_dlx idtest_queue_dlx!--3 正常队列绑定死信交换机--rabbit:queue-arguments!--x-dead-letter-exchange:死信交换机名称--entry keyx-dead-letter-exchange valueexchange_dlx/!--x-dead-letter-routing-key发送给死信交换机的rooutingkey--entry keyx-dead-letter-routing-key valuedlx.hehe/!--设置队列的过期时间 ttl--entry keyx-message-ttl value10000 value-typejava.lang.Integer/entry!--设置队列的长度现在 max-length--entry keyx-max-length value10 value-typejava.lang.Integer/entry/rabbit:queue-arguments/rabbit:queuerabbit:topic-exchange nametest_exchange_dlxrabbit:bindingsrabbit:binding patterntest.dlx.# queuetest_queue_dlx/rabbit:binding/rabbit:bindings/rabbit:topic-exchange!--2 声明死信队列queue_dlx和死信交换机exchange_dlx--rabbit:queue namequeue_dlx idqueue_dlx/rabbit:queuerabbit:topic-exchange nameexchange_dlxrabbit:bindingsrabbit:binding patterndlx.# queuequeue_dlx/rabbit:binding/rabbit:bindings/rabbit:topic-exchange消费者
package com.itheima.listener;import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;Component
public class DlxListener implements ChannelAwareMessageListener {Overridepublic void onMessage(Message message, Channel channel) throws Exception {long deliveryTag message.getMessageProperties().getDeliveryTag();try{System.out.println(new String(message.getBody()));// 处理业务System.out.println(处理业务逻辑);int i 3/0; //出错// 手动签收channel.basicAck(deliveryTag, true);}catch (Exception e) {/*** 拒绝签收 第三个参数 requeue重回队列 如果设置为true 则消息重回到queue broker会重新发送该消息给消费端*/System.out.println(出现异常拒绝接受);//拒绝签收不重回队列 requeue falsechannel.basicNack(deliveryTag,true,false);//channel.basicReject(deliveryTag,true);}}
}!--定义监听器--rabbit:listener-container connection-factoryconnectionFactory acknowledgemanual prefetch1rabbit:listener refdlxListener queue-namestest_queue_dlx/rabbit:listener/rabbit:listener-container测试 Testpublic void test(){while (true) {}}
}死信交换机和死信队列和普通的没有区别 当消息成为死信后如果该队列绑定了死信交换机则消息会被死信交换机重新路由到死信队列 消息成为死信的三种情况 队列消息长度到达限制 消费者拒接消费消息并且不重回队列 原队列存在消息过期设置消息到达超时时间未被消费
1.6 延迟队列
延迟队列即消息进入队列后不会立即被消费只有到达指定时间后才会被消费。
需求 下单后30分钟未支付取消订单回滚库存。 新用户注册成功7天后发送短信问候。
实现方式 3. 定时器
延迟队列 很可惜在RabbitMQ中并未提供延迟队列功能。
但是可以使用TTL死信队列 组合实现延迟队列的效果。 !--延迟队列:1 定义正常交换机(order_exchange)和队列(order_queue)2 定义死信交换机(order_exchange_dlx)和队列(order_queue_dlx)3 绑定设置正常队列过期时间为30分钟--!--1 定义正常交换机(order_exchange)和队列(order_queue)--rabbit:queue idorder_queue nameorder_queue!--3 绑定设置正常队列过期时间为30分钟--rabbit:queue-arguments!--x-dead-letter-exchange:死信交换机名称--entry keyx-dead-letter-exchange valueorder_exchange_dlx/!--x-dead-letter-routing-key发送给死信交换机的rooutingkey--entry keyx-dead-letter-routing-key valuedlx.order.cancel/!--设置队列的过期时间 ttl--entry keyx-message-ttl value10000 value-typejava.lang.Integer/entry/rabbit:queue-arguments/rabbit:queuerabbit:topic-exchange nameorder_exchangerabbit:bindingsrabbit:binding patternorder.# queueorder_queue/rabbit:binding/rabbit:bindings/rabbit:topic-exchange!-- 2 定义死信交换机(order_exchange_dlx)和队列(order_queue_dlx)--rabbit:queue idorder_queue_dlx nameorder_queue_dlx/rabbit:queuerabbit:topic-exchange nameorder_exchange_dlxrabbit:bindingsrabbit:binding patterndlx.order.# queueorder_queue_dlx/rabbit:binding/rabbit:bindings/rabbit:topic-exchange/*** 延迟队列*/Testpublic void testDelay() throws InterruptedException {// 发送订单消息 将来是在订单系统中下单后发送消息rabbitTemplate.convertAndSend(order_exchange,order.msg,订单信息id1,time2023年3月9日03:27:28);//打印倒计时10秒for (int i 10; i 0 ; i--) {System.out.println(i...);Thread.sleep(1000);}}消费者
package com.itheima.listener;import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;Component
public class OrderListener implements ChannelAwareMessageListener {Overridepublic void onMessage(Message message, Channel channel) throws Exception {long deliveryTag message.getMessageProperties().getDeliveryTag();try{System.out.println(new String(message.getBody()));// 处理业务System.out.println(处理业务逻辑);System.out.println(根据订单id查询其状态);System.out.println(判断状态是否为支付成功);System.out.println(取消订单,回滚库存...);// 手动签收channel.basicAck(deliveryTag, true);}catch (Exception e) {/*** 拒绝签收 第三个参数 requeue重回队列 如果设置为true 则消息重回到queue broker会重新发送该消息给消费端*/System.out.println(出现异常拒绝接受);//拒绝签收不重回队列 requeue falsechannel.basicNack(deliveryTag,true,false);//channel.basicReject(deliveryTag,true);}}
}测试 Testpublic void test(){while (true) {}}
}延迟队列 指消息进入队列后可以被延迟一定时间再进行消费。 RabbitMQ没有提供延迟队列功能但是可以使用 TTL DLX 来实现延迟队列效果。
1.7 日志与监控
1.7.1 RabbitMQ日志
RabbitMQ默认日志存放路径 /var/log/rabbitmq/rabbitxxx.log
日志包含了RabbitMQ的版本号、Erlang的版本号、RabbitMQ服务节点名称、cookie的hash值、RabbitMQ配置文件地址、内存限制、磁盘限制、默认账户guest的创建以及权限配置等等。
1.7.3 rabbitmqctl管理和监控
查看队列
rabbitmqctl list_queues
查看exchanges
rabbitmqctl list_exchanges
查看用户
rabbitmqctl list_users
查看连接
rabbitmqctl list_connections
查看消费者信息
rabbitmqctl list_consumers
查看环境变量
rabbitmqctl environment
查看未被确认的队列
rabbitmqctl list_queues name messages_unacknowledged
查看单个队列的内存使用
rabbitmqctl list_queues name memory
查看准备就绪的队列
rabbitmqctl list_queues name messages_ready
1.8 消息追踪
在使用任何消息中间件的过程中难免会出现某条消息异常丢失的情况。对于RabbitMQ而言可能是因为生产者或消费者与RabbitMQ断开了连接而它们与RabbitMQ又采用了不同的确认机制也有可能是因为交换器与队列之间不同的转发策略甚至是交换器并没有与任何队列进行绑定生产者又不感知或者没有采取相应的措施另外RabbitMQ本身的集群策略也可能导致消息的丢失。这个时候就需要有一个较好的机制跟踪记录消息的投递过程以此协助开发和运维人员进行问题的定位。
在RabbitMQ中可以使用Firehose和rabbitmq_tracing插件功能来实现消息追踪。
1.8 消息追踪-Firehose
firehose的机制是将生产者投递给rabbitmq的消息rabbitmq投递给消费者的消息按照指定的格式发送到默认的exchange上。这个默认的exchange的名称为amq.rabbitmq.trace它是一个topic类型的exchange。发送到这个exchange上的消息的routing key为 publish.exchangename 和 deliver.queuename。其中exchangename和queuename为实际exchange和queue的名称分别对应生产者投递到exchange的消息和消费者从queue上获取的消息。
注意打开 trace 会影响消息写入功能适当打开后请关闭。 rabbitmqctl trace_on开启Firehose命令
rabbitmqctl trace_off关闭Firehose命令
1.8 消息追踪-rabbitmq_tracing
rabbitmq_tracing和Firehose在实现上如出一辙只不过rabbitmq_tracing的方式比Firehose多了一层GUI的包装更容易使用和管理。
启用插件rabbitmq-plugins enable rabbitmq_tracing
RabbitMQ应用问题
消息可靠性保障 消息补偿机制消息幂等性保障 乐观锁解决方案
2.2 消息幂等性保障
幂等性指一次和多次请求某一个资源对于资源本身应该具有同样的结果。也就是说其任意多次执行对资源本身所产生的影响均与一次执行的影响相同。
在MQ中指消费多条相同的消息得到与消费该消息一次相同的结果。 3.RabbitMQ集群搭建
摘要实际生产应用中都会采用消息队列的集群方案如果选择RabbitMQ那么有必要了解下它的集群方案原理
一般来说如果只是为了学习RabbitMQ或者验证业务工程的正确性那么在本地环境或者测试环境上使用其单实例部署就可以了但是出于MQ中间件本身的可靠性、并发性、吞吐量和消息堆积能力等问题的考虑在生产环境上一般都会考虑使用RabbitMQ的集群方案。
3.1 集群方案的原理
RabbitMQ这款消息队列中间件产品本身是基于Erlang编写Erlang语言天生具备分布式特性通过同步Erlang集群各节点的magic cookie来实现。因此RabbitMQ天然支持Clustering。这使得RabbitMQ本身不需要像ActiveMQ、Kafka那样通过ZooKeeper分别来实现HA方案和保存集群的元数据。集群是保证可靠性的一种方式同时可以通过水平扩展以达到增加消息吞吐量能力的目的。
3.2 单机多实例部署
由于某些因素的限制有时候你不得不在一台机器上去搭建一个rabbitmq集群这个有点类似zookeeper的单机版。真实生成环境还是要配成多机集群的。有关怎么配置多机集群的可以参考其他的资料这里主要论述如何在单机中配置多个rabbitmq实例。
主要参考官方文档https://www.rabbitmq.com/clustering.html
首先确保RabbitMQ运行没有问题 shell [rootsuper ~]# rabbitmqctl status Status of node rabbitsuper … [{pid,10232}, {running_applications, [{rabbitmq_management,“RabbitMQ Management Console”,“3.6.5”}, {rabbitmq_web_dispatch,“RabbitMQ Web Dispatcher”,“3.6.5”}, {webmachine,“webmachine”,“1.10.3”}, {mochiweb,“MochiMedia Web Server”,“2.13.1”}, {rabbitmq_management_agent,“RabbitMQ Management Agent”,“3.6.5”}, {rabbit,“RabbitMQ”,“3.6.5”}, {os_mon,“CPO CXC 138 46”,“2.4”}, {syntax_tools,“Syntax tools”,“1.7”}, {inets,“INETS CXC 138 49”,“6.2”}, {amqp_client,“RabbitMQ AMQP Client”,“3.6.5”}, {rabbit_common,[],“3.6.5”}, {ssl,“Erlang/OTP SSL application”,“7.3”}, {public_key,“Public key infrastructure”,“1.1.1”}, {asn1,“The Erlang ASN1 compiler version 4.0.2”,“4.0.2”}, {ranch,“Socket acceptor pool for TCP protocols.”,“1.2.1”}, {mnesia,“MNESIA CXC 138 12”,“4.13.3”}, {compiler,“ERTS CXC 138 10”,“6.0.3”}, {crypto,“CRYPTO”,“3.6.3”}, {xmerl,“XML parser”,“1.3.10”}, {sasl,“SASL CXC 138 11”,“2.7”}, {stdlib,“ERTS CXC 138 10”,“2.8”}, {kernel,“ERTS CXC 138 10”,“4.2”}]}, {os,{unix,linux}}, {erlang_version, “Erlang/OTP 18 [erts-7.3] [source] [64-bit] [async-threads:64] [hipe] [kernel-poll:true]\n”}, {memory, [{total,56066752}, {connection_readers,0}, {connection_writers,0}, {connection_channels,0}, {connection_other,2680}, {queue_procs,268248}, {queue_slave_procs,0}, {plugins,1131936}, {other_proc,18144280}, {mnesia,125304}, {mgmt_db,921312}, {msg_index,69440}, {other_ets,1413664}, {binary,755736}, {code,27824046}, {atom,1000601}, {other_system,4409505}]}, {alarms,[]}, {listeners,[{clustering,25672,“::”},{amqp,5672,“::”}]}, {vm_memory_high_watermark,0.4}, {vm_memory_limit,411294105}, {disk_free_limit,50000000}, {disk_free,13270233088}, {file_descriptors, [{total_limit,924},{total_used,6},{sockets_limit,829},{sockets_used,0}]}, {processes,[{limit,1048576},{used,262}]}, {run_queue,0}, {uptime,43651}, {kernel,{net_ticktime,60}}] 停止rabbitmq服务shell
[rootsuper sbin]# service rabbitmq-server stop
Stopping rabbitmq-server: rabbitmq-server.
启动第一个节点
[rootsuper sbin]# RABBITMQ_NODE_PORT5673 RABBITMQ_NODENAMErabbit1 rabbitmq-server startRabbitMQ 3.6.5. Copyright (C) 2007-2016 Pivotal Software, Inc.## ## Licensed under the MPL. See http://www.rabbitmq.com/## ############ Logs: /var/log/rabbitmq/rabbit1.log###### ## /var/log/rabbitmq/rabbit1-sasl.log##########Starting broker...completed with 6 plugins.启动第二个节点 web管理插件端口占用,所以还要指定其web插件占用的端口号。 [rootsuper ~]# RABBITMQ_NODE_PORT5674 RABBITMQ_SERVER_START_ARGS-rabbitmq_management listener [{port,15674}] RABBITMQ_NODENAMErabbit2 rabbitmq-server startRabbitMQ 3.6.5. Copyright (C) 2007-2016 Pivotal Software, Inc.## ## Licensed under the MPL. See http://www.rabbitmq.com/## ############ Logs: /var/log/rabbitmq/rabbit2.log###### ## /var/log/rabbitmq/rabbit2-sasl.log##########Starting broker...completed with 6 plugins.
结束命令
rabbitmqctl -n rabbit1 stop
rabbitmqctl -n rabbit2 stoprabbit1操作作为主节点shell
[rootsuper ~]# rabbitmqctl -n rabbit1 stop_app
Stopping node rabbit1super ...
[rootsuper ~]# rabbitmqctl -n rabbit1 reset
Resetting node rabbit1super ...
[rootsuper ~]# rabbitmqctl -n rabbit1 start_app
Starting node rabbit1super ...
[rootsuper ~]# rabbit2操作为从节点
[rootsuper ~]# rabbitmqctl -n rabbit2 stop_app
Stopping node rabbit2super ...
[rootsuper ~]# rabbitmqctl -n rabbit2 reset
Resetting node rabbit2super ...
[rootsuper ~]# rabbitmqctl -n rabbit2 join_cluster rabbit1super ###内是主机名换成自己的
Clustering node rabbit2super with rabbit1super ...
[rootsuper ~]# rabbitmqctl -n rabbit2 start_app
Starting node rabbit2super ...
查看集群状态
[rootsuper ~]# rabbitmqctl cluster_status -n rabbit1
Cluster status of node rabbit1super ...
[{nodes,[{disc,[rabbit1super,rabbit2super]}]},{running_nodes,[rabbit2super,rabbit1super]},{cluster_name,rabbit1super},{partitions,[]},{alarms,[{rabbit2super,[]},{rabbit1super,[]}]}]web监控 3.3 集群管理
rabbitmqctl join_cluster {cluster_node} [–ram] 将节点加入指定集群中。在这个命令执行前需要停止RabbitMQ应用并重置节点。
rabbitmqctl cluster_status 显示集群的状态。
rabbitmqctl change_cluster_node_type {disc|ram} 修改集群节点的类型。在这个命令执行前需要停止RabbitMQ应用。
rabbitmqctl forget_cluster_node [–offline] 将节点从集群中删除允许离线执行。
rabbitmqctl update_cluster_nodes {clusternode}
在集群中的节点应用启动前咨询clusternode节点的最新信息并更新相应的集群信息。这个和join_cluster不同它不加入集群。考虑这样一种情况节点A和节点B都在集群中当节点A离线了节点C又和节点B组成了一个集群然后节点B又离开了集群当A醒来的时候它会尝试联系节点B但是这样会失败因为节点B已经不在集群中了。
rabbitmqctl cancel_sync_queue [-p vhost] {queue} 取消队列queue同步镜像的操作。
rabbitmqctl set_cluster_name {name} 设置集群名称。集群名称在客户端连接时会通报给客户端。Federation和Shovel插件也会有用到集群名称的地方。集群名称默认是集群中第一个节点的名称通过这个命令可以重新设置。
3.4 RabbitMQ镜像集群配置 上面已经完成RabbitMQ默认集群模式但并不保证队列的高可用性尽管交换机、绑定这些可以复制到集群里的任何一个节点但是队列内容不会复制。虽然该模式解决一项目组节点压力但队列节点宕机直接导致该队列无法应用只能等待重启所以要想在队列节点宕机或故障也能正常应用就要复制队列内容到集群里的每个节点必须要创建镜像队列。 镜像队列是基于普通的集群模式的然后再添加一些策略所以你还是得先配置普通集群然后才能设置镜像队列我们就以上面的集群接着做。 设置的镜像队列可以通过开启的网页的管理端Admin-Policies也可以通过命令。 rabbitmqctl set_policy my_ha “^” ‘{“ha-mode”:“all”}’ Name:策略名称Pattern匹配的规则如果是匹配所有的队列是^.Definition:使用ha-mode模式中的all也就是同步所有匹配的队列。问号链接帮助文档。 3.5 负载均衡-HAProxy
HAProxy提供高可用性、负载均衡以及基于TCP和HTTP应用的代理支持虚拟主机它是免费、快速并且可靠的一种解决方案,包括TwitterRedditStackOverflowGitHub在内的多家知名互联网公司在使用。HAProxy实现了一种事件驱动、单一进程模型此模型支持非常大的并发连接数。
3.5.1 安装HAProxy
//下载依赖包
yum install gcc vim wget
//上传haproxy源码包
//解压
tar -zxvf haproxy-1.6.5.tar.gz -C /usr/local
//进入目录、进行编译、安装
cd /usr/local/haproxy-1.6.5
make TARGETlinux31 PREFIX/usr/local/haproxy
make install PREFIX/usr/local/haproxy
mkdir /etc/haproxy
//赋权
groupadd -r -g 149 haproxy
useradd -g haproxy -r -s /sbin/nologin -u 149 haproxy
//创建haproxy配置文件
vim /etc/haproxy/haproxy.cfg3.5.2 配置HAProxy
配置文件路径/etc/haproxy/haproxy.cfg
#logging options
globallog 127.0.0.1 local0 infomaxconn 5120chroot /usr/local/haproxyuid 99gid 99daemonquietnbproc 20pidfile /var/run/haproxy.piddefaultslog globalmode tcpoption tcplogoption dontlognullretries 3option redispatchmaxconn 2000contimeout 5sclitimeout 60ssrvtimeout 15s
#front-end IP for consumers and producterslisten rabbitmq_clusterbind 0.0.0.0:5672mode tcp#balance url_param userid#balance url_param session_id check_post 64#balance hdr(User-Agent)#balance hdr(host)#balance hdr(Host) use_domain_only#balance rdp-cookie#balance leastconn#balance source //ipbalance roundrobinserver node1 127.0.0.1:5673 check inter 5000 rise 2 fall 2server node2 127.0.0.1:5674 check inter 5000 rise 2 fall 2listen statsbind 172.16.98.133:8100mode httpoption httplogstats enablestats uri /rabbitmq-statsstats refresh 5s启动HAproxy负载
/usr/local/haproxy/sbin/haproxy -f /etc/haproxy/haproxy.cfg
//查看haproxy进程状态
ps -ef | grep haproxy访问如下地址对mq节点进行监控
http://175.24.181.110:8100/rabbitmq-stats代码中访问mq集群地址则变为访问haproxy地址:5672