请选择 进入手机版 | 继续访问电脑版
MSIPO技术圈 首页 IT技术 查看内容

RabbitMQ的使用详解

2023-07-13

一、什么是MQ

1、什么是MQ

MQ(message queue),本质是个队列,FIFO先入先出。只不过队列中放的是message,是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ是一种非常常见的上下游解耦的消息通信服务。使用了MQ之后,消息发送上游只需要依赖MQ,不用依赖其他服务。

2、为什么使用MQ

  • 流量消峰

当一个系统只能够承受一万次点击的时候,这时候来了两万次点击,服务器就会宕机(采取方式是限制第一万次以上的访问)。而现在使用MQ来做缓冲,可以取消这个限制,把一秒内的访问分散成一段时间来处理(排队),缺点是,排队就要时间,所以收到访问结果会慢,但是这比不能访问要好。

  • 应用解耦

以电商应用为例,用户下单,通过订单系统调用库存系统、物流系统、支付系统,如果耦合调用,当任何一个子系统出现了故障,都会造成下单操作异常。而当转变为基于消息队列的方式后(在调用之间加上队列),当一个子系统出现故障需要几分钟修复,在这几分钟内,用户的下单操作依然可以正常进行(对这个子系统要处理的内存会被缓存在消息队列中),等待故障的子系统修复好后就从缓存中获取去处理。这样用户基本感受不到子系统出现故障(因为可以正常下单操作),提升系统的可用性

  • 异步处理

异步调用服务时,例如A调用B,B需要花费很长时间去执行,但是A需要知道B什么时候执行完成。以前的方式,是通过A过一段时间去调用B的查询api查询,或者A提供一个callback api,当B执行完就调用这个api来通知A。现在使用mq,A调用B后,只需要监听mq转发的消息。当B完成后,会发送一条信息给MQ,MQ会将此信息转发给A服务。这样A服务能及时得到异步处理成功的消息

简而言之,就是通过一个第三方,来接受B的完成信息,然后转发给A

3、MQ分类

  • ActiveMQ

优点:单机吞吐量万级,时效性 ms 级,可用性高,基于主从架构实现高可用性,消息可靠性较

低的概率丢失数据

缺点:官方社区现在对 ActiveMQ 5.x 维护越来越少,高吞吐量场景较少使用。

  • Kafka

为大数据而生的消息中间件。

优点: 性能卓越,单机写入 TPS 约在百万条/秒,最大的优点,就是吞吐量高。时效性 ms 级可用性非常高,kafka 是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用,消费者采用 Pull 方式获取消息, 消息有序, 通过控制能够保证所有消息被消费且仅被消费一次;有优秀的第三方Kafka Web 管理界KafkaManager;在日志领域比较成熟,被多家公司和多个开源项目使用;功能支持:功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用

缺点:Kafka 单机超过 64 个队列/分区,Load 会发生明显的飙高现象,队列越多,load 越高,发送消

息响应时间变长,使用短轮询方式,实时性取决于轮询间隔时间,消费失败不支持重试;支持消息顺序,

但是一台代理宕机后,就会产生消息乱序,社区更新较慢;

  • RocketMQ

阿里巴巴的开源产品,用Java语言实现

优点:单机吞吐量十万级,可用性非常高,分布式架构,消息可以做到 0 丢失,MQ 功能较为完善,还是分

布式的,扩展性好,支持 10 亿级别的消息堆积,不会因为堆积导致性能下降,源码是 java 我们可以自己阅

读源码,定制自己公司的 MQ

缺点:支持的客户端语言不多,目前是 java 及 c++,其中 c++不成熟;社区活跃度一般,没有在 MQ

核心中去实现 JMS 等接口,有些系统要迁移需要修改大量代码

  • RabbitMQ

2007 年发布,是一个在 AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最

主流的消息中间件之一。

优点:由于 erlang 语言的高并发特性,性能较好;吞吐量到万级,MQ 功能比较完备,健壮、稳定、易用、跨平台、支持多种语言 如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持 AJAX 文档齐全;开源提供的管理界面非常棒,用起来很好用,社区活跃度高;更新频率相当高

缺点:商业版需要收费,学习成本较高

4、MQ选择

  • Kafka

大型公司建议用,如果有日志采集功能,首选kafka

  • RocketMQ

为金融互联网领域而生。适用于对于可靠性要求很高的场景,在稳定上

  • RabbitMQ

性能好时效性微秒级,社区活跃度高,如果数据量不大,中小型公司建议选择

二、RabbitMQ入门

2.1、概念

1、RabbitMQ

RabbitMQ是一个消息中间件:接收并转发消息。

2、核心概念

  • 生产者:产生数据发生消息到程序是生产者
  • 交换机:一方面接收来自生产者的消息,另一方面将消息推送到队列中。对于消息的处理由交换机类型决定
  • 队列:是一种内部使用的数据结构。本质是一个大的消息缓冲区
  • 消费者:等待接收消息的程序

3、安装

# 设置rabbitmq服务开机自启动
systemctl enable rabbitmq-server

#启动
systemctl start rabbitmq-server

#查看状态
systemctl status rabbitmq-server

# 关闭rabbitmq服务
systemctl stop rabbitmq-server

# 重启rabbitmq服务
systemctl restart rabbitmq-server


  • 开启web管理插件

rabbitmq-plugins enable rabbitmq_management

访问 http://ip地址:15672/

创建账号和密码

rabbitmqctl add_user admin 123

设置用户角色

rabbitmqctl set_user_tags admin administrator

设置用户权限

set_permissions [-p ]

rabbitmqctl set_permissions -p “/” admin “." ".” “.*”

用户 user_admin 具有/vhost1 这个 virtual host 中所有资源的配置、写、读权限

查看当前用户和角色

rabbitmqctl list_users

2.2、简单队列模式

Java代码

  • 消息生产者
/**
 * 生产者--发送信息
 */
public class Producer {
    //队列名称
    public static final String QUEUE_NAME="hello";

    public static void main(String[] args) throws Exception {
        //创建连接工厂,设置连接属性
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.180.100");
        factory.setUsername("adminx");
        factory.setPassword("123456");
        //从连接工厂中获取连接
        Connection connection = factory.newConnection();
        //在连接中创建信道,mq中的所有操作都是在信道中完成
        Channel channel = connection.createChannel();
        /**
         * 生成一个队列,各参数含义:
         * 1、队列名称 2、队列里面消息是否持久化(持久化就是存入磁盘),true是持久化,false则是存入内存
         * 3、是否进行消息共享(多个消费者消费),ture是进行共享,false是只能一个消费者消费
         * 4、是否自动删除(最后一个消费者断开连接以后,该队列是否自动删除)false是不自动删除
         * 5、其他参数 null
         */
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        //发送的消息内容
        String message = "hello world";

        /**
         * 通过信道发送信息
         * 1、发送到哪个交换机,
         * 2、路由的key值是哪个,本次是队列的名称
         * 3、其他参数信息
         * 4、发送信息的消息体
         */

        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
        System.out.println("消息发送完毕");

    }
}

  • 消息消费者
/**
 * 消费者:接收信息
 */
public class Consumer {
    //队列名称,和生产者发送时的队列名称一致
    public static final String QUEUE_NAME="hello";
    //接收消息
    public static void main(String[] args) throws Exception{
        //创建连接工厂,设置连接属性
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.180.100");
        factory.setUsername("adminx");
        factory.setPassword("123456");
        //从工厂中获得连接
        Connection connection = factory.newConnection();
        //在连接中创建信道,接收信息是在信道中发生
        Channel channel = connection.createChannel();


        /**
         * RabbitMQ推送给消费者回调接口,在该接口中用于编写如何对消息进行处理
         *consumerTag,消费者注册到mq之后,mq会生成一个该消费者的唯一标识
         * message,推送过来的信息
         */
        DeliverCallback deliverCallback = (consumerTag,message)->{
            System.out.println(new String(message.getBody()));
        } ;

        /**
         * mq取消该消费者对信道中队列的订阅时,调用的回调接口
         * 当我们在mq管理界面手动删除该队列时,就会调用该接口
         */
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println("消息消费被中断" + consumerTag);
        };


        /**
         * 接收信息
         * 1、消费哪个队列
         * 2、消费成功是否要自动应答,true--自动挡
         * 3、信息推送给消费者的回调
         * 4、消费者取消消费者的回调
         */
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }

}

在这里插入图片描述

2.3、工作队列模式

工作队列(又称任务队列)的主要思想是避免立即执行大量任务 ,导致不得不等待它完成而耗费时间 。我们把任务封装为消息并将其发送到队列,在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。

当生产者发送大量消息到队列时,接收消息的消费者变成多个工作线程,一条消息只会被一个工作线程所处理,rabbitmq采用轮询的方式将消息平均发送给工作线程,线程在处理完某条信息才会接收到下一个信息

  • 模拟两个工作线程

将创建信道的代码抽取出来

package com.xqh.rabbitmq.two;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 将创建连接工程以及创建信道的操作抽取出来
 */
public class RabbitMqUtils {
    //静态方法,可以直接通过类名访问
    public static Channel getChannel() throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.180.100");
        factory.setUsername("adminx");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        return channel;
    }


}

工作线程1和2

package com.xqh.rabbitmq.two;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

public class Worker01 {
    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback = (comsumerTag,message)->{
            String receivedMessage = new String(message.getBody());
            System.out.println("接收到消息:"+receivedMessage);
        };
        CancelCallback cancelCallback = (consumerTag)->{
            System.out.println(consumerTag+"消费者取消订阅消息接口回调逻辑");
        };

        System.out.println("c1消费者启动等待消费...");
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}

可以利用idea的配置,勾选允许多个实例允许

修改输出为出 c2启动等待消费,然后再运行一次,就可以得到第二个工作线程

启动一个发送线程

package com.xqh.rabbitmq.two;

import com.rabbitmq.client.Channel;

import java.util.Scanner;

/**
 * 发送线程
 */
public class Task01 {
    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        Scanner sc = new Scanner(System.in);
        while (sc.hasNext()){
            String message = sc.next();
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            System.out.println("消息发送完成:"+message);
        }


    }
}

从控制台接收要发送的消息

测试结果

rabbitmq将大量信息采用轮询方式分配给工作线程。例如,当输入发送信息aa,假设线程1接收到信息,那么再输入发送信息bb,就会分配给线程2处理,以此类推。你一条我一条,轮询分配

一个消息只能处理一次,不能处理多次

2.4、消息应答

1、消息应答

rabbitMQ在发送一条信息给消费者,一旦传递给消费者就会立马把信息删除,假如此时消费者并没有处理完这条信息就挂掉了,那么这条信息就会丢失 。为了保证消息在发送过程不丢失,rabbitMQ引入消息应答机制,消费者在接收到消息并且处理该消息之后,需要告诉rabbitmq它已经处理了,rabbitmq可以把消息删除了

即rabbitmq在消费者应答之后再进行消息删除

2、自动应答

  • 消息发送后立即被认为已经传送成功,自动给出应答。 这种模式需要在高吞吐量和数据传输安全性方面做权

衡。(优点是可以处理高吞吐量的信息,缺点是传输安全性不好)

因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失了,这样就使得信息的安全传输得不到保证。

虽然这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,但是有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,使得内存耗尽,最终这些消费者线程被操作系统杀死。

  • 所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。

自动应答是以接收到消息为准,然而接收到消息后还需要处理,但是消费者一接收到信息就会立马给出应答。

3、手动应答(推荐)

  • 手动应答的三个方法

A.Channel.basicAck (用于肯定确认):RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了

B.Channel.basicNack (用于否定确认),C.Channel.basicReject (用于否定确认):不处理该消息了直接拒绝,可以将其丢弃了

  • 手动应答的好处

可以批量应答并且减少网络拥堵。消息在手动应答时是不丢失,如果出现消费者失去连接会自动放回队列中重新消费、

  • Multiple的解释

批量应答。

multiple的true和false代表不同意思

例如channel上有传输信息5,6,7,8,(队列先进先出)当前要传输的是8

如果是true:那么信道上5~8的这些还未应答的消息都会被确认收到消息应答

如果是false:那么只应答当前的8的消息,5,6,7这三个消息依然不会收到消息应答

  • 代码实现

启动一个生产者线程

package com.xqh.rabbitmq.three;

import com.rabbitmq.client.Channel;
import com.xqh.rabbitmq.two.RabbitMqUtils;

import java.util.Scanner;

/**
 *生产者
 */
public class Task2 {
    public static final String Task_QUEUQ_NAME = "ack_queue";

    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        channel.queueDeclare(Task_QUEUQ_NAME,false,false,false,null);
        //从控制台扫描要发送的信息
        Scanner sc = new Scanner(System.in);
        while (sc.hasNext()){
            String message = sc.next();
            channel.basicPublish("",Task_QUEUQ_NAME,null,message.getBytes("UTF-8"));
            System.out.println("生产者发出消息:"+message);

        }

    }

}

启动消费者线程1

package com.xqh.rabbitmq.three;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xqh.rabbitmq.two.RabbitMqUtils;

/**
 * 消息消费者
 */
public class Work03 {
    public static final String Task_QUEUQ_NAME = "ack_queue";
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("c1等待接收信息处理时间较短");
        //采用手动应答
        boolean autoAck = false;
        DeliverCallback deliverCallback = (consumerTag,message)->{
            //沉睡一秒
            SleepUtils.sleep(1);
            System.out.println("接收到的信息:"+new String(message.getBody(),"UTF-8"));
            /**
             * 手动应答 channel.basicAck()
             * 参数:
             * 1、消息的标记 tag
             * 2、是否批量应答信道中的消息
             */

            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);

        };

        CancelCallback cancelCallback = (consumerTag)->{
            System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
        };



        channel.basicConsume(Task_QUEUQ_NAME,autoAck,deliverCallback,cancelCallback);
    }
}

启动消费者线程2

package com.xqh.rabbitmq.three;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xqh.rabbitmq.two.RabbitMqUtils;

/**
 * 消息消费者
 */
public class Work04 {
    public static final String Task_QUEUQ_NAME = "ack_queue";
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("c2等待接收信息处理时间较长");
        //采用手动应答
        boolean autoAck = false;
        DeliverCallback deliverCallback = (consumerTag,message)->{
            //沉睡一秒
            SleepUtils.sleep(30);
            System.out.println("接收到的信息:"+new String(message.getBody(),"UTF-8"));
            /**
             * 手动应答 channel.basicAck()
             * 参数:
             * 1、消息的标记 tag
             * 2、是否批量应答信道中的消息
             */

            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);

        };

        CancelCallback cancelCallback = (consumerTag)->{
            System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
        };



        channel.basicConsume(Task_QUEUQ_NAME,autoAck,deliverCallback,cancelCallback);
    }
}

模拟场景:

  • 当生产者发送信息aa , 消费者线程1接收到信息aa
  • 当生产者发送信息bb,消费者线程2接收到信息bb (很慢)
  • 当生产者发送信息cc,消费者线程1接收到信息cc
  • 当生产者发送信息dd,在消费者线程2处理信息dd的过程,将线程2关闭,此时,信息会自动重新入列,此时发现消费者线程1可以接受信息,于是把信息发送给消费者线程1,----线程1接收到信息dd

说明,当消费者失去连接时,传给他的信息并不会丢失,mq会将它重新入列排队,此时其他消费者可以处理会重新分配给其他消费者。

4、消息自动重新入列

如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。

2.5、持久化

1、概念

如何保障当RabbitMQ服务停掉以后,生产者发来的消息不会丢失呢?默认情况下,一旦rabbitmq退出或由于某种原因崩溃时,它会忽视队列和消息—造成消息丢失。要确保消息不丢失,我们需要将队列和消息都标记为持久化

2、队列实现持久化

之前我们创建的队列都是非持久化的,一旦rabbitmq重启,队列就会被删除。要实现持久化,需要在队列声明时将参数durable设为true

  boolean durable = true;
  channel.queueDeclare(Task_QUEUQ_NAME,durable,false,false,null);

3、消息实现持久化

   //设置消息持久化(消息保存到磁盘)MessageProperties.PERSISTENT_TEXT_PLAIN
            channel.basicPublish("",Task_QUEUQ_NAME, MessageProp

相关阅读

手机版|MSIPO技术圈 皖ICP备19022944号-2

Copyright © 2023, msipo.com

返回顶部