broker 相当于mysql服务器,virtual host相当于数据库(可以有多个数据库)
queue相当于表,消息相当于记录。
消息队列有三个核心要素: 消息生产者、消息队列、消息消费者;
生产者(Producer):发送消息的应用;(java程序,也可能是别的语言写的程序)
消费者(Consumer):接收消息的应用;(java程序,也可能是别的语言写的程序)
代理(Broker):就是消息服务器,RabbitMQ Server就是Message Broker;
连接(Connection):连接RabbitMQ服务器的TCP长连接;
信道(Channel):连接中的一个虚拟通道,消息队列发送或者接收消息时,都是通过信道进行的;
虚拟主机(Virtual host):一个虚拟分组,在代码中就是一个字符串,当多个不同的用户使用同一个RabbitMQ服务时,可以划分出多个Virtual host,每个用户在自己的Virtual host创建exchange/queue等;(分类比较清晰、相互隔离)
交换机(Exchange):交换机负责从生产者接收消息,并根据交换机类型分发到对应的消息队列中,起到一个路由的作用;
路由键(Routing Key):交换机根据路由键来决定消息分发到哪个队列,路由键是消息的目的地址;
绑定(Binding):绑定是队列和交换机的一个关联连接(关联关系);
队列(Queue):存储消息的缓存;
消息(Message):由生产者通过RabbitMQ发送给消费者的信息;(消息可以任何数据,字符串、user对象,json串等等)
Exchange(X) 可翻译成交换机/交换器/路由器
1、Fanout Exchange(扇形)
2、Direct Exchange(直连)
3、Topic Exchange(主题)
Fanout 扇形的,散开的; 扇形交换机
投递到所有绑定的队列,不需要路由键,不需要进行路由键的匹配,相当于广播、群发;
根据路由键精确匹配(一模一样)进行路由消息队列;
通配符匹配,相当于模糊匹配;
#匹配多个单词,用来表示任意数量(零个或多个)单词
匹配一个单词(必须有一个,而且只有一个),用.隔开的为一个单词:
beijing.# == beijing.queue.abc, beijing.queue.xyz.xxx
beijing. == beijing.queue, beijing.xyz
发送时指定的路由键:lazy.orange.rabbit
** **
基于消息内容中的headers属性进行匹配;
绑定参考代码:
Map<String, Object> headerValues = new HashMap<>();
headerValues.put("type", "m");
headerValues.put("status", 1);
return BindingBuilder.bind(queueA).to(headersExchange).whereAll(headerValues).match();
发送参考代码
MessageProperties messageProperties = new MessageProperties();
messageProperties.setHeader("type", "m");
messageProperties.setHeader("status", 1);
Message message = new Message(msg.getBytes(), messageProperties);
// void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;
amqpTemplate.convertAndSend(RabbitConfig.EXCHANGE, null, message);
学习它的目的是:发消息时可以指定消息属性(MessageProperties)
过期消息也叫TTL消息,TTL:Time To Live
消息的过期时间有两种设置方式:(过期消息)
参考代码
MessageProperties messageProperties = new MessageProperties();messageProperties.setExpiration("15000");
// 设置过期时间,单位:毫秒
Message message = new Message(json.getBytes(), messageProperties);
//发送消息
Template.convertAndSend(RabbitConfig.DIRECT_EXCHANGE, RabbitConfig.DIRECT_ROUTINGKEY, message);
System.out.println("发送完毕:" + new Date());
单条消息的过期时间决定了在没有任何消费者消费时,消息可以存活多久。
@Bean
public Queue directQueue() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-message-ttl", 10000);
return new Queue(DIRECT_QUEUE, true, false, false, arguments);
}
队列的过期时间决定了在没有任何消费者的情况下,队列中的消息可以存活多久。
注意事项:
如果消息和对列都设置过期时间,则消息的TTL以两者之间较小的那个数值为准。
也有叫 死信交换机、死信邮箱等说法;
DLX: Dead-Letter-Exchange 死信交换器,死信邮箱
如下情况下一个消息会进入DLX(Dead Letter Exchange)死信交换机。
参考代码
MessageProperties messageProperties=new MessageProperties();
//设置此条消息的过期时间为10秒
messageProperties.setExpiration("10000");
参考代码
Map<String, Object> arguments =new HashMap<>();
//指定死信交换机,通过x-dead-letter-exchange 来设置
arguments.put("x-dead-letter-exchange",EXCHANGE_DLX);
//设置死信路由key,value 为死信交换机和死信队列绑定的key,要一模一样,因为死信交换机是直连交换机
arguments.put("x-dead-letter-routing-key",BINDING_DLX_KEY);
//队列的过期时间
arguments.put("x-message-ttl",10000);
return new Queue(QUEUE_NORMAL,true,false,false,arguments);
TTL: Time to Live的简称,过期时间
Map<String, Object> arguments = new HashMap<String, Object>();
//设置队列的最大长度 ,对头的消息会被挤出变成死信
arguments.put("x-max-length", 5);
从正常的队列接收消息,但是对消息不进行确认,并且不对消息进行重新投递,此时消息就进入死信队列。
application.yml 启动手动确认
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual
参考代码
/**
* 监听正常的那个队列的名字,不是监听那个死信队列
* 我们从正常的队列接收消息,但是对消息不进行确认,并且不对消息进行重新投递,此时消息就进入死信队列
*
* channel 消息信道(是连接下的一个消息信道,一个连接下有多个消息信息,发消息/接消息都是通过信道完成的)
*/
@RabbitListener(queues = {RabbitConfig.QUEUE})
public void process(Message message, Channel channel) {
System.out.println("接收到的消息:" + message);
//对消息不确认, ack单词是 确认 的意思
// void basicNack(long deliveryTag, boolean multiple, boolean requeue)
// deliveryTag:消息的一个数字标签
// multiple:翻译成中文是多个的意思,如果是true表示对小于deliveryTag标签下的消息都进行Nack不确认,false表示只对当前deliveryTag标签的消息Nack
// requeue:如果是true表示消息被Nack后,重新发送到队列,如果是false,消息被Nack后,不会重新发送到队列
try {
System.out.println("deliveryTag = " + message.getMessageProperties().getDeliveryTag());
//要开启rabbitm消息消费的手动确认模式,然后才这么写代码;
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
} catch (IOException e) {
e.printStackTrace();
}
}
开启手动确认模式,并拒绝消息,不重新投递,则进入死信队列
参考代码:
/**
* 监听正常的那个队列的名字,不是监听那个死信队列
* 我们从正常的队列接收消息,但是对消息不进行确认,并且不对消息进行重新投递,此时消息就进入死信队列
*
* channel 消息信道(是连接下的一个消息信道,一个连接下有多个消息信息,发消息/接消息都是通过信道完成的)
*/
@RabbitListener(queues = {RabbitConfig.QUEUE})
public void process(Message message, Channel channel) {
System.out.println("接收到的消息:" + message);
//对消息不确认, ack单词是 确认 的意思
// void basicNack(long deliveryTag, boolean multiple, boolean requeue)
// deliveryTag:消息的一个数字标签
// multiple:翻译成中文是多个的意思,如果是true表示对小于deliveryTag标签下的消息都进行Nack不确认,false表示只对当前deliveryTag标签的消息Nack
// requeue:如果是true表示消息被Nack后,重新发送到队列,如果是false,消息被Nack后,不会重新发送到队列
try {
System.out.println("deliveryTag = " + message.getMessageProperties().getDeliveryTag());
//要开启rabbitm消息消费的手动确认模式,然后才这么写代码;
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e) {
e.printStackTrace();
}
}
场景:有一个订单,15分钟内如果不支付,就把该订单设置为交易关闭,那么就不能支付了,这类实现延迟任务的场景就可以采用延迟队列来实现,当然除了延迟队列来实现,也可以有一些其他办法实现;
每隔3秒扫描一次数据库,查询过期的订单然后进行处理;
优点:
简单,容易实现;
缺点:
1、存在延迟(延迟时间不准确),如果你每隔1分钟扫一次,那么就有可能延迟1分钟;
2、性能较差,每次扫描数据库,如果订单量很大
1. 被动取消
当用户查询订单的时候,判断订单是否超时,超时了就取消(交易关闭);
优点:
对服务器而言,压力小;
缺点:
1、用户不查询订单,将永远处于待支付状态,会对数据统计等功能造成影响;
2、用户打开订单页面,有可能比较慢,因为要处理大量订单,用户体验少稍差;
DelayedQueue
无界阻塞队列,该队列只有在延迟期满的时候才能从中获取元素
优点:
实现简单,任务延迟低;
缺点:
服务重启、宕机,数据丢失;
只适合单机版,不适合集群;
订单量大,可能内存不足而发生异常; oom
1、RabbitMQ本身不支持延迟队列,可以使用TTL结合DLX的方式来实现消息的延迟投递,即把DLX跟某个队列绑定,到了指定时间,消息过期后,就会从DLX路由到这个队列,消费者可以从这个队列取走消息。
代码:正常延迟
//问题? 如果先发送的消息,消息延迟时间长,会影响后面的 延迟时间段的消息的消费;
//解决:不同延迟时间的消息要发到不同的队列上,同一个队列的消息,它的延迟时间应该一样
代码 延迟问题
选择对应的版本下载 rabbitmq-delayed-message-exchange 插件,下载地址:http://www.rabbitmq.com/community-plugins.html
2、插件拷贝到 RabbitMQ 服务器plugins目录下
unzip rabbitmq_delayed_message_exchange-3.10.2.ez
如果unzip 没有安装,先安装一下
yum install unzip -y
./rabbitmq-plugins enable rabbitmq_delayed_message_exchange
开启插件;
./rabbitmq-plugins list
查询安装的所有插件;
重启rabbitmq使其生效;(此处也可以不重启)
消息发送后不会直接投递到队列,
而是存储到 Mnesia(嵌入式数据库),检查 x-delay 时间(消息头部);
延迟插件在 RabbitMQ 3.5.7 及以上的版本才支持,依赖 Erlang/OPT 18.0 及以上运行环境;
Mnesia 是一个小型数据库,不适合于大量延迟消息的实现
解决了消息过期时间不一致出现的问题。
参考代码:
@Component
@Slf4j
public class RabbitConfig {
public static final String _EXCHANGE = "exchange:plugin";
public static final String _QUEUE = "queue.plugin";
public static final String _KEY = "plugin";
@Bean
public CustomExchange customExchange() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-delayed-type", "direct");
// CustomExchange(String name, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments)
return new CustomExchange(EXCHANGE, "x-delayed-message", true, false, arguments);
}
@Bean
public Queue queue() {
return QueueBuilder.durable(QUEUE).build();
}
@Bean
public Binding binding(CustomExchange customExchange, Queue queue) {
return BindingBuilder.bind(queue).to(customExchange).with(KEY).noargs();
}
}
发消息参考代码
MessageProperties messageProperties=new MessageProperties();
messageProperties.setHeader("x-delay",16000);String msg = "hello world";
Message message=new Message(msg.getBytes(),messageProperties);
rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE, "plugin", message);
log.info("发送完毕,发送时间为:{}",new Date());
阅读量:2056
点赞量:0
收藏量:0