RocketMQ源码深入剖析1 RocketMQ介绍RocketMQ 是阿里巴巴集团基于高可用分布式集群技术,自主研发的云正式商用的专业消息中间件,既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性,是阿里巴巴双 11 使用的核心产品。 1.1 RocketMQ版本发展如果想要了解RocketMQ的历史,则需了解阿里巴巴中间件团队中的历史。 2011年,Linkin(领英:全球知名的职场社交平台)推出Kafka消息引擎,阿里巴巴中间件团队在研究了Kafka的整体机制和架构设计之后,基于Kafka(Scala语言编写)的设计使用Java进行了完全重写并推出了MetaQ 1.0版本,主要是用于解决顺序消息和海量堆积的问题,由开源社区killme2008维护。课程重点不在此版本,具体见:https://github.com/killme2008/Metamorphosis 2012年,阿里巴巴发现MetaQ原本基于Kafka的架构在阿里巴巴如此庞大的体系下很难进行水平扩展,于是对MetaQ进行了架构重组升级,开发出了MetaQ 2.0,同年阿里把Meta2.0从阿里内部开源出来,取名RocketMQ,为了命名上的规范以及版本上的延续,对外称为RocketMQ3.0。因为RocketMQ3只是RocketMQ的一个过渡版本,课程重点也不在此。 2016年11月28日,阿里巴巴宣布将开源分布式消息中间件RocketMQ捐赠给Apache,成为Apache 孵化项目。在孵化期间,RocketMQ完成编码规约、分支模型、持续交付、发布规约等方面的产品规范化,同时RocketMQ3也升级为RocketMQ4。现在RocketMQ主要维护的是4.x的版本,也是大家使用得最多的版本,所以本书重点将围绕此版本进行详细的讲解,项目地址:https://github.com/apache/rocketmq/ 2015年,阿里基于RocketMQ开发了阿里云上的Aliware MQ,Aliware MQ(Message Queue)是RocketMQ的商业版本,是阿里云商用的专业消息中间件,是企业级互联网架构的核心产品,基于高可用分布式集群技术,搭建了包括发布订阅、消息轨迹、资源统计、定时(延时)、监控报警等一套完整的消息云服务。因为Aliware MQ是商业版本,课程也不对此产品进行讲述,产品地址:https://www.aliyun.com/product/rocketmq 2021年,伴随众多企业全面上云以及云原生的兴起,RocketMQ也在github上发布5.0版本。目前来说还只是一个预览版,不过RocketMQ5的改动非常大,同时也明确了版本定位,RocketMQ 5.0定义为云原生的消息、事件、流的超融合平台。本课程也将会根据目前所发布的版本进行针对性的讲述。 1.1.1 RocketMQ4.X版本更新概要
1.2 为什么要学RocketMQ源码
1.3 RocketMQ源码中的技术亮点
2 RocketMQ核心组件NameServer 命名服务,更新和路由发现 broker服务。NameServer的作用是为消息生产者、消息消费者提供关于主题 Topic 的路由信息,NameServer除了要存储路由的基础信息,还要能够管理 Broker节点,包括路由注册、路由删除等功能。 Producer/Consumer java版本的MQ客户端实现,包括生产者和消费者。 Broker 它能接收producer和consumer的请求,并调用store层服务对消息进行处理。HA服务的基本单元,支持同步双写,异步双写等模式。 Store 存储层实现,同时包括了索引服务,高可用HA服务实现。 Netty Remoting Server/Netty Remoting Client 基于netty的底层通信实现,所有服务间的交互都基于此模块。也区分服务端和客户端 3 RocketMQ源码下载及安装3.1 下载地址官方下载地址:http://rocketmq.apache.org/dowloading/releases/ 本课程使用的是4.8.0的版本 3.2 环境要求
3.3 使用IntelliJ IDEA导入安装源码1)使用IDEA导入已经下载且已经解压后的代码 2)下载且已经解压后的代码导入后执行Maven命令install:
使用Maven验证下没问题 3.4 配置与运行RocketMQ3.4.1 启动NameServerRocketMQ启动必须先启动NameServer,启动类是namesrv/目录下的NamesrvStartup类,不过在运行这个类之前必须要配置环境变量ROCKETMQ_HOME,变量值为RocketMQ的运行主目录。 RocketMQ的运行主目录一般使用新建的方式,同时在运行主目录中创建conf、logs、store三个文件夹,然后从源码目录中distribution目录下的中将broker.conf、logback_broker.xml、logback_namesrv.xml复制到conf目录中。 最后运行namesrv/目录下的NamesrvStartup类的main方法,NameServer启动成功! 3.4.2 启动Broker在broker模块找到broker模块,同时找到启动类BrokerStartup.java 源码启动前需要修改配置文件broker.conf (修改RocketMQ的消息存储路径) 配置环境变量,同时启动时需要加入参数(指定启动的配置文件) broker启动成功后的控制台如下: 3.5 控制台安装及部署3.5.1 环境要求运行前确保:已经有jdk1.8,Maven(打包需要安装Maven3.2.x) 3.5.2 下载老版本地址下载:https://codeload.github.com/apache/rocketmq-externals/zip/master 新版本地址:https://github.com/apache/rocketmq-dashboard 解压后如图(以下使用的是老版本,新版本参考老版本即可) 3.5.3 配置后端管理界面是一个Java工程,独立部署,同时也需要根据不同的环境进行相关的配置。 控制台端口及服务地址配置: 下载完成之后,进入‘\rocketmq-console\src\main\resources’文件夹,打开‘application.properties’进行配置。 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-px4G7knH-1689127643104)(file:///C:\Users\ADMINI~1\AppData\Local\Temp\msohtmlclip1\01\clip_image002.jpg)] 进入‘\rocketmq-externals\rocketmq-console’文件夹,执行‘mvn clean package -Dmaven.test.skip=true’,编译生成运行jar包。 编译成功之后,cmd命令进入‘target’文件夹,执行‘java [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-NJ9Fo6ls-1689127643104)(file:///C:\Users\ADMINI~1\AppData\Local\Temp\msohtmlclip1\01\clip_image006.jpg)] 浏览器中输入‘127.0.0.1:8089’,成功后即可进行管理端查看。 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1oB35ZLA-1689127643105)(file:///C:\Users\ADMINI~1\AppData\Local\Temp\msohtmlclip1\01\clip_image010.jpg)] 4 RocketMQ的核心三流程整体模块如下: RocketMQ的源码是非常的多,我们没有必要把RocketMQ所有的源码都读完,所以我们把核心、重点的源码进行解读,RocketMQ核心流程如下:
5 NameServer源码分析5.1 NameServer整体流程NameServer是整个RocketMQ的“大脑”,它是RocketMQ的服务注册中心,所以RocketMQ需要先启动NameServer再启动Rocket中的Broker。
5.2 NameServer启动流程从源码的启动可知,NameServer单独启动。 流程图如下: 5.2.1 加载KV配置核心解读NamesrvController类中createNamesrvController() 在源码中发现还有一个p的参数,直接在启动个参数中送入 -p 就可以打印这个NameServer的所有的参数信息(不过NameServer会自动终止),说明这个-p是一个测试参数。 正常启动时,也可以在启动日志中一定可以找到所有的参数: 5.2.2 构建NRS通讯接收路由、心跳信息5.2.3 定时任务剔除超时Broker核心控制器会启动定时任务: 每隔10s扫描一次Broker,移除不活跃的Broker。 Broker每隔30s向NameServer发送一个心跳包,心跳包包含BrokerId,Broker地址,Broker名称,Broker所属集群名称、Broker关联的FilterServer列表。但是如果Broker宕机,NameServer无法收到心跳包,此时NameServer如何来剔除这些失效的Broker呢?NameServer会每隔10s扫描brokerLiveTable状态表,如果BrokerLive的lastUpdateTimestamp的时间戳距当前时间超过120s,则认为Broker失效,移除该Broker,关闭与Broker连接,同时更新topicQueueTable、brokerAddrTable、brokerLiveTable、filterServerTable。 路由剔除机制中,Borker每隔30S向NameServer发送一次心跳,而NameServer是每隔10S扫描确定有没有不可用的主机(120S没心跳),那么问题就来了!这种设计是存在问题的,就是NameServer中认为可用的Broker,实际上已经宕机了,那么,某一时间段,从NameServer中读到的路由中包含了不可用的主机,会导致消息的生产/消费异常,不过不用担心,在生产和消费端有故障规避策略及重试机制可以解决以上问题(原理后续源码解读)。这个设计符合RocketMQ的设计理念:整体设计追求简单与性能,同时这样设计NameServer是可以做到无状态化的,可以随意的部署多台,其代码也非常简单,非常轻量。 RocketMQ有两个触发点来删除路由信息:
5.3 NameServer设计亮点5.3.1 读写锁RouteInfoManager类中有一个读写锁的设计 消息发送时客户端会从NameServer获取路由信息,同时Broker会定时更新NameServer的路由信息,所以路由表会有非常频繁的以下操作: 1、 生产者发送消息时需要频繁的获取。对表进行读。 RouteInfoManager类 2、 Broker定时(30s)会更新一个路由表。对表进行写。 RouteInfoManager类 因为Broker每隔30s向NameServer发送一个心跳包,这个操作每次都会更新Broker的状态,但同时生产者发送消息时也需要Broker的状态,要进行频繁的读取操作。所以这个地方就有一个矛盾,Broker的状态会被经常性的更新,同时也会被更加频繁的读取。这里如何提高并发,尤其是生产者进行消息发送时的并发,所以这里使用了读写锁机制(针对读多写少的场景)。 synchronized和ReentrantLock基本都是排他锁,排他锁在同一时刻只允许一个线程进行访问,而读写锁在同一时刻可以允许多个读线程访问,但是在写线程访问时,所有的读线程和其他写线程均被阻塞。读写锁维护了一对锁,一个读锁和一个写锁,通过分离读锁和写锁,使得并发性相比一般的排他锁有了很大提升。 5.3.2 存储基于内存NameServer存储以下信息: topicQueueTable:Topic消息队列路由信息,消息发送时根据路由表进行负载均衡 brokerAddrTable:Broker基础信息,包括brokerName、所属集群名称、主备Broker地址 clusterAddrTable:Broker集群信息,存储集群中所有Broker名称 brokerLiveTable:Broker状态信息,NameServer每次收到心跳包是会替换该信息 filterServerTable:Broker上的FilterServer列表,用于类模式消息过滤。 NameServer的实现基于内存,NameServer并不会持久化路由信息,持久化的重任是交给Broker来完成。这样设计可以提高NameServer的处理能力。 5.3.3 NameServer无状态化
项目实战部署分析: 假设一个RocketMQ集群部署在两个机房,每个机房都有一些NameServer、Broker和客户端节点,当两个机房的链路中断时,所有的NameServer都可以提供服务,客户端只能在本机房的NameServer中找到本机房的Broker。 RocetMQ集群中,NameSever之间是不需要互相通信的,所以网络分区对NameSever本身的可用性是没有影响的,如果NameSever检测到与Broker的连接中断了,NameServer会认为这个Broker不再能提供服务,NameServer会立即把这个Broker从路由信息中移除掉,避免客户端连接到一个不可用的Broker上去。 |
原文地址:https://blog.csdn.net/starlight_520/article/details/131675353
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:https://www.msipo.com/article-150.html 如若内容造成侵权/违法违规/事实不符,请联系MSIPO邮箱:3448751423@qq.com进行投诉反馈,一经查实,立即删除!
Copyright © 2024, msipo.com