一、什么是MQ
1、什么是MQ
MQ(message queue),本质是个队列,FIFO先入先出。只不过队列中放的是message,是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ是一种非常常见的上下游解耦的消息通信服务。使用了MQ之后,消息发送上游只需要依赖MQ,不用依赖其他服务。
2、为什么使用MQ
当一个系统只能够承受一万次点击的时候,这时候来了两万次点击,服务器就会宕机(采取方式是限制第一万次以上的访问)。而现在使用MQ来做缓冲,可以取消这个限制,把一秒内的访问分散成一段时间来处理(排队),缺点是,排队就要时间,所以收到访问结果会慢,但是这比不能访问要好。
以电商应用为例,用户下单,通过订单系统调用库存系统、物流系统、支付系统,如果耦合调用,当任何一个子系统出现了故障,都会造成下单操作异常。而当转变为基于消息队列的方式后(在调用之间加上队列),当一个子系统出现故障需要几分钟修复,在这几分钟内,用户的下单操作依然可以正常进行(对这个子系统要处理的内存会被缓存在消息队列中),等待故障的子系统修复好后就从缓存中获取去处理。这样用户基本感受不到子系统出现故障(因为可以正常下单操作),提升系统的可用性
异步调用服务时,例如A调用B,B需要花费很长时间去执行,但是A需要知道B什么时候执行完成。以前的方式,是通过A过一段时间去调用B的查询api查询,或者A提供一个callback api,当B执行完就调用这个api来通知A。现在使用mq,A调用B后,只需要监听mq转发的消息。当B完成后,会发送一条信息给MQ,MQ会将此信息转发给A服务。这样A服务能及时得到异步处理成功的消息
简而言之,就是通过一个第三方,来接受B的完成信息,然后转发给A
3、MQ分类
优点:单机吞吐量万级,时效性 ms 级,可用性高,基于主从架构实现高可用性,消息可靠性较
低的概率丢失数据
缺点:官方社区现在对 ActiveMQ 5.x 维护越来越少,高吞吐量场景较少使用。
为大数据而生的消息中间件。
优点: 性能卓越,单机写入 TPS 约在百万条/秒,最大的优点,就是吞吐量高。时效性 ms 级可用性非常高,kafka 是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用,消费者采用 Pull 方式获取消息, 消息有序, 通过控制能够保证所有消息被消费且仅被消费一次;有优秀的第三方Kafka Web 管理界KafkaManager;在日志领域比较成熟,被多家公司和多个开源项目使用;功能支持:功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用
缺点:Kafka 单机超过 64 个队列/分区,Load 会发生明显的飙高现象,队列越多,load 越高,发送消
息响应时间变长,使用短轮询方式,实时性取决于轮询间隔时间,消费失败不支持重试;支持消息顺序,
但是一台代理宕机后,就会产生消息乱序,社区更新较慢;
阿里巴巴的开源产品,用Java语言实现
优点:单机吞吐量十万级,可用性非常高,分布式架构,消息可以做到 0 丢失,MQ 功能较为完善,还是分
布式的,扩展性好,支持 10 亿级别的消息堆积,不会因为堆积导致性能下降,源码是 java 我们可以自己阅
读源码,定制自己公司的 MQ
缺点:支持的客户端语言不多,目前是 java 及 c++,其中 c++不成熟;社区活跃度一般,没有在 MQ
核心中去实现 JMS 等接口,有些系统要迁移需要修改大量代码
2007 年发布,是一个在 AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最
主流的消息中间件之一。
优点:由于 erlang 语言的高并发特性,性能较好;吞吐量到万级,MQ 功能比较完备,健壮、稳定、易用、跨平台、支持多种语言 如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持 AJAX 文档齐全;开源提供的管理界面非常棒,用起来很好用,社区活跃度高;更新频率相当高
缺点:商业版需要收费,学习成本较高
4、MQ选择
大型公司建议用,如果有日志采集功能,首选kafka
为金融互联网领域而生。适用于对于可靠性要求很高的场景,在稳定上
性能好时效性微秒级,社区活跃度高,如果数据量不大,中小型公司建议选择
二、RabbitMQ入门
2.1、概念
1、RabbitMQ
RabbitMQ是一个消息中间件:接收并转发消息。
2、核心概念
- 生产者:产生数据发生消息到程序是生产者
- 交换机:一方面接收来自生产者的消息,另一方面将消息推送到队列中。对于消息的处理由交换机类型决定
- 队列:是一种内部使用的数据结构。本质是一个大的消息缓冲区
- 消费者:等待接收消息的程序
3、安装
systemctl enable rabbitmq-server
systemctl start rabbitmq-server
systemctl status rabbitmq-server
systemctl stop rabbitmq-server
systemctl restart rabbitmq-server
rabbitmq-plugins enable rabbitmq_management
访问 http://ip地址:15672/
创建账号和密码
rabbitmqctl add_user admin 123
设置用户角色
rabbitmqctl set_user_tags admin administrator
设置用户权限
set_permissions [-p ]
rabbitmqctl set_permissions -p “/” admin “." ".” “.*”
用户 user_admin 具有/vhost1 这个 virtual host 中所有资源的配置、写、读权限
查看当前用户和角色
rabbitmqctl list_users
2.2、简单队列模式
Java代码
public class Producer {
public static final String QUEUE_NAME="hello";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.180.100");
factory.setUsername("adminx");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
String message = "hello world";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("消息发送完毕");
}
}
public class Consumer {
public static final String QUEUE_NAME="hello";
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.180.100");
factory.setUsername("adminx");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
DeliverCallback deliverCallback = (consumerTag,message)->{
System.out.println(new String(message.getBody()));
} ;
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("消息消费被中断" + consumerTag);
};
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}

2.3、工作队列模式
工作队列(又称任务队列)的主要思想是避免立即执行大量任务 ,导致不得不等待它完成而耗费时间 。我们把任务封装为消息并将其发送到队列,在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。
当生产者发送大量消息到队列时,接收消息的消费者变成多个工作线程,一条消息只会被一个工作线程所处理,rabbitmq采用轮询的方式将消息平均发送给工作线程,线程在处理完某条信息才会接收到下一个信息
将创建信道的代码抽取出来
package com.xqh.rabbitmq.two;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitMqUtils {
public static Channel getChannel() throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.180.100");
factory.setUsername("adminx");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}
工作线程1和2
package com.xqh.rabbitmq.two;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
public class Worker01 {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
DeliverCallback deliverCallback = (comsumerTag,message)->{
String receivedMessage = new String(message.getBody());
System.out.println("接收到消息:"+receivedMessage);
};
CancelCallback cancelCallback = (consumerTag)->{
System.out.println(consumerTag+"消费者取消订阅消息接口回调逻辑");
};
System.out.println("c1消费者启动等待消费...");
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
可以利用idea的配置,勾选允许多个实例允许
修改输出为出 c2启动等待消费,然后再运行一次,就可以得到第二个工作线程
启动一个发送线程
package com.xqh.rabbitmq.two;
import com.rabbitmq.client.Channel;
import java.util.Scanner;
public class Task01 {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
Scanner sc = new Scanner(System.in);
while (sc.hasNext()){
String message = sc.next();
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("消息发送完成:"+message);
}
}
}
从控制台接收要发送的消息
测试结果
rabbitmq将大量信息采用轮询方式分配给工作线程。例如,当输入发送信息aa,假设线程1接收到信息,那么再输入发送信息bb,就会分配给线程2处理,以此类推。你一条我一条,轮询分配
一个消息只能处理一次,不能处理多次
2.4、消息应答
1、消息应答
rabbitMQ在发送一条信息给消费者,一旦传递给消费者就会立马把信息删除,假如此时消费者并没有处理完这条信息就挂掉了,那么这条信息就会丢失 。为了保证消息在发送过程不丢失,rabbitMQ引入消息应答机制,消费者在接收到消息并且处理该消息之后,需要告诉rabbitmq它已经处理了,rabbitmq可以把消息删除了
即rabbitmq在消费者应答之后再进行消息删除
2、自动应答
- 消息发送后立即被认为已经传送成功,自动给出应答。 这种模式需要在高吞吐量和数据传输安全性方面做权
衡。(优点是可以处理高吞吐量的信息,缺点是传输安全性不好)
因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失了,这样就使得信息的安全传输得不到保证。
虽然这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,但是有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,使得内存耗尽,最终这些消费者线程被操作系统杀死。
- 所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。
自动应答是以接收到消息为准,然而接收到消息后还需要处理,但是消费者一接收到信息就会立马给出应答。
3、手动应答(推荐)
A.Channel.basicAck (用于肯定确认):RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了
B.Channel.basicNack (用于否定确认),C.Channel.basicReject (用于否定确认):不处理该消息了直接拒绝,可以将其丢弃了
可以批量应答并且减少网络拥堵。消息在手动应答时是不丢失,如果出现消费者失去连接会自动放回队列中重新消费、
批量应答。
multiple的true和false代表不同意思
例如channel上有传输信息5,6,7,8,(队列先进先出)当前要传输的是8
如果是true:那么信道上5~8的这些还未应答的消息都会被确认收到消息应答
如果是false:那么只应答当前的8的消息,5,6,7这三个消息依然不会收到消息应答
启动一个生产者线程
package com.xqh.rabbitmq.three;
import com.rabbitmq.client.Channel;
import com.xqh.rabbitmq.two.RabbitMqUtils;
import java.util.Scanner;
public class Task2 {
public static final String Task_QUEUQ_NAME = "ack_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
channel.queueDeclare(Task_QUEUQ_NAME,false,false,false,null);
Scanner sc = new Scanner(System.in);
while (sc.hasNext()){
String message = sc.next();
channel.basicPublish("",Task_QUEUQ_NAME,null,message.getBytes("UTF-8"));
System.out.println("生产者发出消息:"+message);
}
}
}
启动消费者线程1
package com.xqh.rabbitmq.three;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xqh.rabbitmq.two.RabbitMqUtils;
public class Work03 {
public static final String Task_QUEUQ_NAME = "ack_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
System.out.println("c1等待接收信息处理时间较短");
boolean autoAck = false;
DeliverCallback deliverCallback = (consumerTag,message)->{
SleepUtils.sleep(1);
System.out.println("接收到的信息:"+new String(message.getBody(),"UTF-8"));
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
CancelCallback cancelCallback = (consumerTag)->{
System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
};
channel.basicConsume(Task_QUEUQ_NAME,autoAck,deliverCallback,cancelCallback);
}
}
启动消费者线程2
package com.xqh.rabbitmq.three;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xqh.rabbitmq.two.RabbitMqUtils;
public class Work04 {
public static final String Task_QUEUQ_NAME = "ack_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
System.out.println("c2等待接收信息处理时间较长");
boolean autoAck = false;
DeliverCallback deliverCallback = (consumerTag,message)->{
SleepUtils.sleep(30);
System.out.println("接收到的信息:"+new String(message.getBody(),"UTF-8"));
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
CancelCallback cancelCallback = (consumerTag)->{
System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
};
channel.basicConsume(Task_QUEUQ_NAME,autoAck,deliverCallback,cancelCallback);
}
}
模拟场景:
- 当生产者发送信息aa , 消费者线程1接收到信息aa
- 当生产者发送信息bb,消费者线程2接收到信息bb (很慢)
- 当生产者发送信息cc,消费者线程1接收到信息cc
- 当生产者发送信息dd,在消费者线程2处理信息dd的过程,将线程2关闭,此时,信息会自动重新入列,此时发现消费者线程1可以接受信息,于是把信息发送给消费者线程1,----线程1接收到信息dd
说明,当消费者失去连接时,传给他的信息并不会丢失,mq会将它重新入列排队,此时其他消费者可以处理会重新分配给其他消费者。
4、消息自动重新入列
如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。
2.5、持久化
1、概念
如何保障当RabbitMQ服务停掉以后,生产者发来的消息不会丢失呢?默认情况下,一旦rabbitmq退出或由于某种原因崩溃时,它会忽视队列和消息—造成消息丢失。要确保消息不丢失,我们需要将队列和消息都标记为持久化
2、队列实现持久化
之前我们创建的队列都是非持久化的,一旦rabbitmq重启,队列就会被删除。要实现持久化,需要在队列声明时将参数durable设为true
boolean durable = true;
channel.queueDeclare(Task_QUEUQ_NAME,durable,false,false,null);
3、消息实现持久化
channel.basicPublish("",Task_QUEUQ_NAME, MessageProp