×

rocketmq

rocketmq(RocketMQ(二)——基本概念)

admin admin 发表于2024-03-03 01:52:32 浏览23 评论0

抢沙发发表评论

这篇文章给大家聊聊关于rocketmq,以及RocketMQ(二)——基本概念对应的知识点,希望对各位有所帮助,不要忘了收藏本站哦。

本文目录

RocketMQ(二)——基本概念

Topic表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。 topic:message 1:n message:topic 1:1

一个生产者可以同时发送多种Topic消息;而一个消费者只对某种特定的Topic感兴趣,即只可以订阅和消费一种Topic消息。 producer:topic 1:n consumer:topic 1:1

Topic是消息的一级分类,Tag是消息的二级分类

存储消息的物理实体。一个Topic中可以包含多个Queue,每个Queue中存放的是该Topic的消息。一个Topic的Queue也被称为一个Topic中消息的 分区(Partition) 。 一个Topic的Queue中的消息只能被一个消费者组中的一个消费者消费。一个Queue中的消息不允许同一个消费者组中的多个消费者同时消费

分片(sharding) 分片不同于分区。在RocketMQ中,分片指的是存放相应Topic的Broker。每个分片中会创建出相应数量的分区,即Queue,每个Queue的大小是相同的。

RocketMQ中每个消息拥有唯一的MessageID,且可以携带具有业务标识的Key,以方便对消息的查询。不过需要注意的是,MessageID有两个:在生产者send()消息时会自动生成一个MessageID(msgId),当消息到达Broker后也会自动生成一个MessageId(offsetMsgId)。 msgId、offsetMsgId与key都称为消息标识。

聊聊RocketMQ(一)

大家好,我是BugKing,不知不觉正式工作快满2年了,在工作之前没有用过消息队列中间件,在这想分享下我这两年对RocketMQ的使用以及一些思考,因为内容比较多,会用好几期来分享。 先简单来聊下我在日常开发中,哪些问题适合使用RocketMQ来解决,因为我是搞IM的,所以下面我都会以IM的角度来分享。

在我负责的IM系统中,经常会遇到业务方群发几十万消息的场景,那面临这么多的请求,如何避免请求压垮我们的IM聊天系统呢?我们的系统应该是在自身能力范围内尽可能多地处理请求,那我们就可以使用消息队列来达到流量控制和保护后端服务的目的。 加入RocketMQ后,整个业务方发送消息的流程变成: 1、业务方调用rpc框架如dubbo接口发送消息后,直接将消息内容放入RocketMQ; 2、发消息后端服务从RocketMQ中获取消息内容,完成后续发消息流量,投递给前端。

这种设计既有优点也有缺点

那想在同一个topic下的某种消息进行流量控制限速呢?有没有什么好的办法? 我的做法是根据某种类型消息的标识,通过令牌桶算法(单机限流),根据你预估的处理能力,为这种消息单独设置一个线程池,线程池队列长度可以设置大些,用这个线程池也单独处理这种消息,这样也不会让其他类型的消息堆积在MQ。

IM系统也需要解决的核心问题时,如何利用有限的服务器资源,尽可能多地处理大量发送消息。在一个正常的IM系统中,一个完整的消息发送包含了很多操作,当你发出去一条消息后可会有这些操作: 1、消息入库 2、消息投递前端 3、用户不在线需要发送离线push。 4、用户这条消息被风控了需要发送风控提示。 5、消息需要统计数据,包含每天发送量,push量等等。 6、.... 如果没有任何优化,正常的处理流程时:消息投递后,依次调用上述流程,然后结束。 对于这几个步骤来说,决定消息是否发送成功,实际上只有消息入库这个步骤,只要消息入库了,用户就一定能看到消息,就算当时没有投递给前端,后续用户拉历史消息也能把消息拉出来,但是为了判断用户在不在线,需不需要发离线push,依赖消息投递前端的结果,所以当消息入库、消息投递前端后,就可以马上结束流程,然后把消息体放入rokcetMQ中,由消息队列异步执行后续的操作。

rocketMQ的另一个作用,就是实现系统之间的解耦。 我们知道订单时在电商系统中比较核心的,当有一个新订单时: 1、支付系统发起支付流程 2、风控需要审核 3、IM系统发送一些卡片消息(比如确认收货地址) 4、统计系统需要统计数据 5、..... 这些订单下油的系统都需要实时获得订单数据。随着业务的发展,订单的下游可能在不断增加,负责订单的程序员不得不花费大量的精力,应对不断变化的下游系统,不停地调试订单系统与下游系统的接口。任何一个接口变更,订单系统就需要修改并上线,这是不能接受的。几乎所有的电商都会选择消息队列来解决类型的系统耦合的问题。这时候引入rocketMQ憨,订单系统在有一个新订单时,发送一条消息到rocketMQ的topic中,所有下游系统都订阅topic,这样每个下游可以根据订单消息来做相应的处理。

RocketMQ用的消息模式时发布 - 订阅模型。在发布 - 订阅模型中,消息的发送方称为发布者,接收方称为订阅者,服务端存放的消息的容器称为主题(Topic)。传统的队列模式和这种模型最大的区别就是,一份消息数据能不能被消费多次对的问题。因为在传统的队列模型中,任何一条消息都只能被一个消费者收到。

RocketMQ是发布-订阅模型,但是RocketMQ也有队列的概念,那队列的作用是什么呢?

我们都知道RocketMQ中有ack机制,确保消息不会在传递过程中由于网络或服务器故障而丢失,在消费端如果收到消息并完成了业务逻辑后,会给MQ回一个消费成功的确认,代表一条消息被成功消费,否则会给消费者重新发送消息,直到成功ack。这个确认机制保证了消息传递的可靠性,但是也带来了一个问题,为了确保消息的有序性,在某一笑消息被成功消息前,下一条消息是不能被消费的,否则违背了有序性这个原则,也就是每个Topic在任意时刻,最多只能有一个消费者在进行消费,这样消费端总体的消费性能就不能通过水平扩展消费者数量来提升,所以RocketMQ引入了队列来解决这个问题。来看下面这个图:

RocketMQ的每个Topic都包含多个队列,通过多个队列来实现多实例并行生产和消费。rocketMQ只在队列上保证消息的有序性,Topic层面是无法保证消息严格顺序的。每个消费组都有主题中一份完整的消息,不同消费组之间消费进度不受对方影响, 一条消息被消费组1消费过,也会给消费组2消费。 每一个消费组中包含多个消费者,同一个消费组内的消费者是竞争关系,比如一个消费组内的一条消息被消费者1消费了,就不会再给同组的其他消费者消费。

在一个Topic下的消息消费过程中,消息需要被不同的组进行多次消费,所以每个消费组在每个队列都维护一个消费位置,在这个位置之前的消息都是被消费过的,之后的消息都是没有被消费过。

** 需要注意的是Topic和消费组的关系、消费组和消费者的关系,消费组和队列数没有关系,不是有多少消费者就有多少队列,队列数可以根据数据量和消费速度合理配置**

可以按照某个唯一标识,比如IM中,根据消息发送方用户id,通过一致性哈希算法,计算出队列ID,指定队列ID发送,这样可以保证相同的用户发的消息总被发送到同一个队列上,可以确保严格顺序。

时间不早了~下期再见。

RocketMQ 简介

RocketMQ在阿里内部叫做Metaq(最早名为Metamorphosis,中文意思 变形记 ,是作家卡夫卡的中篇小说代表作,可见是为了致敬Kafka)。

RocketMQ是Metaq3.0之后的开源版本。

Metaq在阿里巴巴集团内部、蚂蚁金服、菜鸟等各业务中被广泛使用,接入了上万个应用系统中。并平稳支撑了历年双十一大促(万亿级的消息),在性能、稳定性、可靠性等方面表现出色,在整个阿里技术体系和大中台战略中发挥着举足轻重的作用。

Metaq最终源于Kafka,早起借鉴了Kafka很多优秀的设计。但是由于Kafka是Scale语言编写而阿里系主要使用Java,且无法满足阿里的电商、金融业务场景,所以誓嘉(花名)团队用Java重新造轮子,并做了大量的改造和优化。

在此之前,淘宝有一款消息中间件名为 Notify ,目前已经逐步被Metaq所取代。

第一代的Notify主要使用了推模型,解决了事务消息;第二代的MetaQ主要使用了拉模型,解决了顺序消息和海量堆积的问题。相比起Kafka使用的Scale语言编写,RabbitMQ 使用Erlang语言编写,基于Java的RocketMQ开源后更容易被广泛的研究,以及其他大厂定制开发。

执行流程:

RocketMQ 消息订阅有两种模式,一种是Push模式(MQPushConsumer),即MQServer主动向消费端推送;另外一种是Pull模式(MQPullConsumer),即消费端在需要时,主动到MQ Server拉取。但在具体实现时, Push和Pull模式本质都是采用消费端主动拉取的方式 ,即 Consumer 轮询从 Broker 拉取消息。

优点:就是实时性高。

缺点:在于消费端的处理能力有限,当瞬间推送很多消息给消费端时,容易造成消费端的消息积压,严重时会压垮客户端。

Push 与 Pull 区别:

Push 方式里,Consumer 把长轮询的动作封装了,并注册MessageListener监听器,取到消息后,唤醒MessageListener的consumeMessage()来消费,对用户而言,感觉消息是被推送过来的。

Pull 方式里,取消息的过程需要用户自己主动调用,首先通过打算消费的 Topic 拿到 MessageQueue 的集合,遍历MessageQueue集合,然后针对每个MessageQueue批量取消息,一次取完后,记录该队列下一次要取的开始offset,直到取完了,再换另一个MessageQueue。

RocketMQ 使用长轮询机制来模拟 Push 效果,算是兼顾了二者的优点。

RocketMQ第五讲

broker是RocketMQ的核心,核心工作就是接收生成这的消息,进行存储。同时,收到消费者的请求后,从磁盘读取内容,把结果返回给消费者。 消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容,消息内容不是定长的。单个文件大小默认1G ,文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件; CommitLog文件中保存了消息的全量内容。不同的Topic的消息,在CommitLog都是顺序存放的。就是来一个消息,不管Topic是什么,直接追加的CommitLog中。 broker启动了一个专门的线程来构建索引,把CommitLog中的消息,构建了两种类型的索引。ConsumerQueue和Index。正常消费的时候,是根据Topic来消费,会用到ConsumerQueue索引。 也可根据返回的offsetMsgId,解析出ip,端口和CommitLog中的物理消息偏移量,直接去CommitLog中取数据。 引入的目的主要是提高消息消费的性能,由于RocketMQ是基于主题topic的订阅模式,消息消费是针对主题进行的,如果要遍历commitlog文件中根据topic检索消息是非常低效的。Consumer即可根据ConsumeQueue来查找待消费的消息。 其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。consumequeue文件可以看成是基于topic的commitlog索引文件,故consumequeue文件夹的组织方式如下:topic/queue/file三层组织结构,具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同样consumequeue文件采取定长设计,每一个条目共20个字节,分别为8字节的commitlog物理偏移量、4字节的消息长度、8字节tag hashcode,单个文件由30W个条目组成,可以像数组一样随机访问每一个条目,每个ConsumeQueue文件大小约5.72M。 IndexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方法。Index文件的存储位置是: {fileName},文件名fileName是以创建时的时间戳命名的,固定的单个IndexFile文件大小约为400M,一个IndexFile可以保存 2000W个索引,IndexFile的底层存储设计为在文件系统中实现HashMap结构,故rocketmq的索引文件其底层实现为hash索引。 按照Message Key查询消息的时候,会用到这个索引文件。 IndexFile索引文件为用户提供通过“按照Message Key查询消息”的消息索引查询服务,IndexFile文件的存储位置是: {fileName},文件名fileName是以创建时的时间戳命名的,文件大小是固定的,等于40+500W 4+2000W 20= 420000040个字节大小。如果消息的properties中设置了UNIQ_KEY这个属性,就用 topic + “#” + UNIQ_KEY的value作为 key 来做写入操作。如果消息设置了KEYS属性(多个KEY以空格分隔),也会用 topic + “#” + KEY 来做索引。 其中的索引数据包含了Key Hash/CommitLog Offset/Timestamp/NextIndex offset 这四个字段,一共20 Byte。NextIndex offset 即前面读出来的 slotValue,如果有 hash冲突,就可以用这个字段将所有冲突的索引用链表的方式串起来了。Timestamp记录的是消息storeTimestamp之间的差,并不是一个绝对的时间。整个Index File的结构如图,40 Byte 的Header用于保存一些总的统计信息,4 500W的 Slot Table并不保存真正的索引数据,而是保存每个槽位对应的单向链表的头。20 2000W 是真正的索引数据,即一个 Index File 可以保存 2000W个索引。 “按照Message Key查询消息”的方式,RocketMQ的具体做法是,主要通过Broker端的QueryMessageProcessor业务处理器来查询,读取消息的过程就是用topic和key找到IndexFile索引文件中的一条记录,根据其中的commitLog offset从CommitLog文件中读取消息的实体内容。 RocketMQ中有两个核心模块,remoting模块和store模块。remoting模块在NameServer,Produce,Consumer和Broker都用到。store只在Broker中用到,包含了存储文件操作的API,对消息实体的操作是通过DefaultMessageStore进行操作。 属性和方法很多,就不往这里放了。 文件存储实现类,包括多个内部类 · 对于文件夹下的一个文件 上面介绍了broker的核心业务流程和架构,关键接口和类,启动流程。最后介绍一下broker的线程模型,只有知道了线程模型,才能大概知道前面介绍的那些事如何协同工作的,对broker才能有一个立体的认识。 RocketMQ的RPC通信采用Netty组件作为底层通信库,同样也遵循了Reactor多线程模型,同时又在这之上做了一些扩展和优化。关于Reactor线程模型,可以看看我之前写的这篇文档: Reactor线程模型 上面的框图中可以大致了解RocketMQ中NettyRemotingServer的Reactor 多线程模型。一个 Reactor 主线程(eventLoopGroupBoss,即为上面的1)负责监听 TCP网络连接请求,建立好连接,创建SocketChannel,并注册到selector上。RocketMQ的源码中会自动根据OS的类型选择NIO和Epoll,也可以通过参数配置),然后监听真正的网络数据。拿到网络数据后,再丢给Worker线程池(eventLoopGroupSelector,即为上面的“N”,源码中默认设置为3),在真正执行业务逻辑之前需要进行SSL验证、编解码、空闲检查、网络连接管理,这些工作交给defaultEventExecutorGroup(即为上面的“M1”,源码中默认设置为8)去做。而处理业务操作放在业务线程池中执行,根据 RomotingCommand 的业务请求码code去processorTable这个本地缓存变量中找到对应的 processor,然后封装成task任务后,提交给对应的业务processor处理线程池来执行(sendMessageExecutor,以发送消息为例,即为上面的 “M2”)。 上面的图和这段画是从官方文档抄过来的,但是文字和图对应的不是很好,画的也不够详细,但是主要流程是这个样子。以后有时间了,我重新安装自己的理解,画一张更详细的图。 AsyncAppender-Worker-Thread-0:异步打印日志,logback使用,应该是守护线程 FileWatchService: NettyEventExecutor: NettyNIOBoss_:一个 NettyServerNIOSelector_:默认为三个 NSScheduledThread:定时任务线程 ServerHouseKeepingService:守护线程 ThreadDeathWatch-2-1:守护线程,Netty用,已经废弃 RemotingExecutorThread(1-8):工作线程池,没有共用NettyServerNIOSelector_,直接初始化8个线程 AsyncAppender-Worker-Thread-0:异步打印日志,logback使用,共九个: RocketmqBrokerAppender_inner RocketmqFilterAppender_inner RocketmqProtectionAppender_inner RocketmqRemotingAppender_inner RocketmqRebalanceLockAppender_inner RocketmqStoreAppender_inner RocketmqStoreErrorAppender_inner RocketmqWaterMarkAppender_inner RocketmqTransactionAppender_inner SendMessageThread_:remotingServer.registerProcessor(RequestCode.SEND_MESSAGE PullMessageThread_:remotingServer.registerProcessor(RequestCode.PULL_MESSAGE ProcessReplyMessageThread_:remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE QueryMessageThread_:remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE AdminBrokerThread_:remotingServer.registerDefaultProcessor ClientManageThread_:remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT HeartbeatThread_:remotingServer.registerProcessor(RequestCode.HEART_BEAT EndTransactionThread_:remotingServer.registerProcessor(RequestCode.END_TRANSACTION ConsumerManageThread_:remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP,RequestCode.UPDATE_CONSUMER_OFFSET,RequestCode.QUERY_CONSUMER_OFFSET brokerOutApi_thread_:BrokerController.registerBrokerAll(true, false, true); ================================================================== BrokerControllerScheduledThread:=》 BrokerController.this.getBrokerStats().record(); BrokerController.this.consumerOffsetManager.persist(); BrokerController.this.consumerFilterManager.persist(); BrokerController.this.protectBroker(); BrokerController.this.printWaterMark(); log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes()); BrokerController.this.brokerOuterAPI.fetchNameServerAddr(); BrokerController.this.printMasterAndSlaveDiff(); BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister()); BrokerFastFailureScheduledThread:=》 FilterServerManagerScheduledThread:=》 FilterServerManager.this.createFilterServer(); ClientHousekeepingScheduledThread:=》 ClientHousekeepingService.this.scanExceptionChannel(); PullRequestHoldService FileWatchService AllocateMappedFileService AcceptSocketService BrokerStatsThread1

RocketMQ概念篇

白话系列文章讲述RocketMQ。因为是白话,尽量通过比较直白的方式来介绍RocketMQ,所以涉及到详细的技术细节可能表述的不是那么严谨。但是不用担心,后续会有专门的文章详细介绍技术细节。 这篇文章介绍的是RocketMQ基本概念,分为介绍和提问两部分,如果对概念很清楚了就不用了,闲暇无事可以看看提问。 类似介绍概念的文章网上比较多,希望这篇文章提问式的阅读会让大家对概念能有更清晰的认识。Message Queue 消息队列 ,既然是队列,就要实现 数据结构中队列 的基本特征,比如先进先出,入队、出队操作等。 RocketMQ就是把内存中使用的那个队列,变成一个独立的、大家都可以用的队列系统。一个业务事件,是整个MQ领域最核心的概念,无论是生产还是消费都是针对Topic进行操作。 如果MQ是个大的队列,只有一个队列可以用太浪费了吧,来分一分分一分,分解成很多个小的独立的队列。 RocketMQ变成一个管理队列的系统 ,而分解下来的若干个 小的队列通过什么来区分呢 ? 就是通过topic。 比如我的业务定义topic:tp_im_event。你的业务定义topic:tp_cargo_event,那就是两个小队列了,我的业务用我的队列,你的项目用你的队列。 Topic就是队列的名字 。 提问 : 如果不小心定义了相同的Topic名字,上线后会发生什么? 申请Topic好麻烦,所有业务都用一个Topic好了,这样会有什么问题? Topic名字起的越酷炫越好?既然Topic是队列的名字,那么queue就表示真实操作的队列了。一开始的时候一个Topic就对应一个queue,多好,一个是名字、一个是现实。可是用着用着就悲催了,为啥?消息操作太多了,全都怼在一个小队列上。为了提高效率,咋整??RocketMQ是这样做的,一个Topic绑定的是一组queue,这样每个queue分摊部分压力,性能就上去了。 读队列 个数:可以用来读取数据的队列个数 写队列 个数:可以用来写入数据的队列个数 queue :真实存储数据用的队列。 提问 : 我申请了一个Topic,读队列设置2,写队列设置4有什么问题么? 我申请了一个Topic,读队列设置4,写队列设置2有什么问题么? 既然增加队列数可以提升性能,我申请8848个队列的Topic是不是可以达到性能的巅峰?好了,说完了队列,我们再来说一说队列存储的内容是什么? 存储的是消息!Message!尽量小,别发个文件啊什么的大东西,后面真心扛不住(超过特定大小还会报错)一个queue里都是消息,如何对这些消息进行归类呢?为了进一步细化消息,有了Tag的概念。可以通过Tag对相同消息进行归类,这样用户就可以只订阅一部分的消息了(只订阅部分Tag) 比如:有一个Topic叫做‘发货’,下游消费者希望可以根据货源进行不同的处理,可以通过‘tag=北京’以及‘tag=上海’来区分不同的发货源。下游消费者,可以单独订阅‘上海’的货物,或者‘tag=上海|江苏|浙江’来订阅这三个地区的货物,还可以‘tag=*’来订阅全国的货物。发送了某个消息,但是希望在后台很方便的搜索到,就要通过key了。可以根据key搜索到所有相关的Message。可以认为RocketMQ内部维护了一个非常大的HashMap,key就是这个key,value就是Message,如果出现Hash冲突就用链表来报错对应关系。 提问 : 每次申请Topic好烦啊,索性申请个叫tp_all的topic算了,然后内部用tag来区分岂不是美滋滋,这样很好吧? 我是生产者,我可以任意修改发送的消息体? 一个topic里面有什么tag我又不知道,索性消费所有消息,内部判断是不是我要的消息内容不就好?生产者:针对某一个Topic制造数据,把数据塞到queue里。 简单点: 发消息的 管理消息的时候,我们肯定会遇见这个问题,某个消息谁发的?RocketMQ把发送者的身份抽象成了Producer Group,就是[ 发送组 ]。 简单点:这个东西命名成项目名就行, 相同Producer Group保持相同业务行为 提问 : 我的项目要发送10个Topic,定义相同的Producer Group可以么? 有一个Topic,可以多个Producer Group一起生产么? 2台机器有相同的Producer Group,机器1发送tp1、 机器2发送tp2这样有问题么? 一个Topic有Producer Group:‘test_group’ 两个项目都用了,但是A项目发送的tag叫A,B项目发送的消息Tag是B,请问有问题么?? 消费者:把queue里面的消息拿出来用 消费行为:如何处理通过 Topic+Tag定位的 消息 重点!重点!重点! 来了,直接翻译是‘消费组’ 一个RocketMQ集群是如何区分 消费者是谁 的呢?就是通过消费组, 相同消费组的机器,MQ认为消费行为是一致的 。业务上一定要保证相同消费组有相同的消费行为。对于不同的消费组名字,RocketMQ就认为是个不同消费者了。如果修改了消费组的名字,那就是新的消费者,就会按照新的消费组的消费进度处理消费。     消息那么多,项目都重启无数次了,RocketMQ是如何记录消息消费到什么地方了呢?     也是通过消费组,RocketMQ内部会维护一个关系,记录Consumer Group和消费进度之间的联系。所以,如果把Consumer Group的名字改掉是可能重新消费之前的所有数据的(视初始消费位置而定) 提问 : 两个服务,服务A和服务B,消费相同集群的 相同Topic ,既然服务不一样,那么就算是定义了 相同的consumer group 也无所谓吧? 常见问题: 消费组名字命名的不合理,上线后悄悄改回来行不行? 不小心用了别人的消费组名,悄悄改回来重新上线也没什么问题吧? 常见问题: 一个服务有消费组A消费3个Topic,有一次上线,希望消费4个Topic。对于新消费的消息希望可以灰度验证一段时间。请问有问题么? 消息队列主要的功能是模块结偶,同步转异步和削峰,必然会出现生产非常快但是消费慢这种事情,比如生产的速度是100000/s但是消费速度是1/s,这个时候就叫做消息积压或者消费延迟(Delay)。理论上RockeMQ对于这种场景有比较好的适应能力,原理大致这样:正常的生产消费都是操作内存数据,所以比较快。但是如果积压非常多,内存明显扛不住了,则降级为生产消费的是磁盘数据,直接操作磁盘。磁盘肯定比内存的速度慢很多啦。 这个时候整个集群的处理能力就拉低了。所以最好生产和消费能力不要相差太多,即便相差很多,积压也应该在有限的时间内处理完毕。 目前比较容易出现消息积压的情况有: 1.新消费组上线(消费历史消息) 2.消费能力弱 3.生产洪峰(比如for循环发消息,job发消息) 由于RocketMQ开源版本没有多租户隔离,所以公共集群使用的过程中会有相互影响发生,鉴于此大家在上线前还是要合理评估自己的系统能力。 提问 : 消费延迟太多了,业务上接受丢弃一部分消息,如何操作呢? 消息的处理线程太少了,想加大处理线程怎么办? 自己搞个线程池处理消息是不是很赞? 这个概念比较尴尬。上面说的Producer Group和Consumer Group都是逻辑概念。如果需要连接 多集群 ,就需要物理上进行区分(Instance Name)。 一个Instance Name对应一个连接,默认的值是本机ip@进程号。连接多集群的时候务必修改这个值。 提问 : 要向两个RocketMQ集群生产数据,只需要设置不同的Producer Group即可? 要从两个RocketMQ集群消费数据,只需要设置不同的Consumer Group即可?

RocketMQ架构分析

RocketMQ是阿里巴巴捐赠给appache的MQ开源组件,从架构上我们分析一下。 kafka是依靠Zookeeper进行集群选举的,在rocketMQ的同样位置上是NameServer,这个Nameserver仅仅是注册服务,没有选举能力。每个broker都和NameServer进行连接,通过心跳维持状态。 producer和consumer定时到Nameserver拉取broker信息,并且和自己所消费的broker建立连接。这就和微服务的体系一模一样了。 那么rocketMQ的集群选举怎么实现的呢,通过集成了Dledge实现,Dledge是个jar包,实现了raft算法。 如图,topic可在多个broker上形成分片,producer可写数据到不通的分片,分片信息也可以由不同的group进行消费。 如下介绍存储,rocketMQ可配置主备,形成主备复制。 ***隐藏网址*** 对于保存的数据,每天会删除数据;如果磁盘满,超过设置阈值,则不允许写入数据。 RocketMQ的设计确保了消息的并发处理能力,但是有时候,消息是有状态的,即有顺序,RocketMQ怎么实现呢? 发送到临时缓存,到达延迟时间后由delay service路由给topic。 如果消费返回了consumer_later,则如上述延迟消息一样,会延迟一段时间,进入死信队列,消费死信队列,重新处理。 如果业务规模小,不会改源码,就选用RabbitMQ;如果业务规模大,不允许丢消息,追求效率高,用RocketMQ;如果业务规模大,运行少量丢消息,吞吐量大,用Kafka;如果用于大数据,毫无疑问选kafka。

rocketmq配置

borker配置说明文档 #broker所属的集群名字 brokerClusterName=rocketmq-cluster #broker名字,同个集群中的每个broker应当具有它自己独有的名字 brokerName=broker-a #设置主broker和从broker  其中0 表示 主机,》0 表示 从机 brokerId=0 #nameServer地址(地址为ip:端口),多个地址之间用分号分割 namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876 #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 defaultTopicQueueNums=4 #是否允许 Broker 自动创建Topic,测试时可以开启,实用时关闭 autoCreateTopicEnable=true #是否允许 Broker 自动创建订阅组,测试时可以开启,实用时关闭 #在pull形式消费时若设置了falsename会报subscription group not exist,且收不到消息,在push形式消费时没有影响 autoCreateSubscriptionGroup=true #Broker 对外服务的监听端口 listenPort=10911 #haService中使用 haListenPort=10912 #主要用于slave同步master fastListenPort=10909 #定时删除文件时间点,默认凌晨 4点 deleteWhen=04 #文件保留最长时间,默认 48 小时 fileReservedTime=120 #commitLog每个文件的大小默认1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue每个文件默认存30W条,根据业务情况调整 mapedFileSizeConsumeQueue=300000 #强制删除文件时间间隔(单位毫秒) #destroyMapedFileIntervalForcibly=120000 #定期检查Hanged文件间隔时间(单位毫秒) #redeleteHangedFileInterval=120000 #检测物理文件磁盘空间,磁盘空间使用率不能超过88% diskMaxUsedSpaceRatio=88 #存储总路径 storePathRootDir=/usr/local/rocketmq/store #commitLog 存储路径 storePathCommitLog=/usr/local/rocketmq/store/commitlog #消费队列存储路径存储路径 storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue #消息索引存储路径 storePathIndex=/usr/local/rocketmq/store/index #异常退出产生的文件存储路径 storeCheckpoint=/usr/local/rocketmq/store/checkpoint #abort 文件存储路径 abortFile=/usr/local/rocketmq/store/abort #限制的消息大小 maxMessageSize=65536 #Commitlog每次刷盘最少页数,每页4kb flushCommitLogLeastPages=4 #ConsumeQueue每次刷盘最页数,每页4kb #flushConsumeQueueLeastPages=2 #刷盘时间间隔(单位毫秒),此间隔时间优先级高于上面两个参数,即当时间间隔超过之后直接进行刷盘,不考虑页数问题 #flushCommitLogThoroughInterval=10000 #flushConsumeQueueThoroughInterval=60000 #Broker 的角色 (1) ASYNC_MASTER 异步复制Master (2) SYNC_MASTER 同步双写Master (3) SLAVE brokerRole=ASYNC_MASTER #刷盘方式 (1) ASYNC_FLUSH 异步刷盘  (2)SYNC_FLUSH 同步刷盘 flushDiskType=ASYNC_FLUSH #是否开启事务check过程,消息体量大的时候可以不开启,默认为关闭状态 checkTransactionMessageEnable=false #发消息线程池数量(如果不做配置,个数为16+(核*线程)*4) #sendMessageThreadPoolNums=128 #拉消息线程池数量(如果不做配置,个数为16+(核*线程)*4) #pullMessageThreadPoolNums=12参考资源链接 ***隐藏网址***

rocketmq的RocketMQTemplate不能@Autowired,报以下错误

类没有加载,从头找

  1. jar包有没有导进来(maven项目的话查看是否配置了rocketmq包)

  2. 加载到spring的配置中是否配置了RocketMQTemplate

  3. @Autowired注解驱动是否加载

RocketMQ的消息重试

对于顺序消息,当消费者消费消息失败后,消息队列 RocketMQ 会自动不断进行消息重试(每次间隔时间为 1 秒),这时,应用会出现消息消费被阻塞的情况。因此,在使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。 对于无序消息(普通、定时、延时、事务消息),当消费者消费消息失败时,您可以通过设置返回状态达到消息重试的结果。 无序消息的重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。 消息队列 RocketMQ 默认允许每条消息最多重试 16 次,每次重试的间隔时间如下: 如果消息重试 16 次后仍然失败,消息将不再投递。如果严格按照上述重试时间间隔计算,某条消息在一直消费失败的前提下,将会在接下来的 4 小时 46 分钟之内进行 16 次重试,超过这个时间范围消息将不再重试投递。 注意: 一条消息无论重试多少次,这些重试消息的 Message ID 不会改变。 2)配置方式 消费失败后,重试配置方式集群消费方式下,消息消费失败后期望消息重试,需要在消息监听器接口的实现中明确进行配置 (三种方式任选一种): 返回 ConsumeConcurrentlyStatus.RECONSUME_LATER; (推荐) 返回 Null 抛出异常 消费失败后,不重试配置方式 集群消费方式下,消息失败后期望消息不重试,需要捕获消费逻辑中可能抛出的异常,最终返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,此后这条消息将不会再重试。 自定义消息最大重试次数 消息队列 RocketMQ 允许 Consumer 启动的时候设置最大重试次数,重试时间间隔将按照如下策略: 最大重试次数小于等于 16 次,则重试时间间隔同上表描述。 最大重试次数大于 16 次,超过 16 次的重试时间间隔均为每次 2 小时。 注意: 消息最大重试次数的设置对相同 Group ID 下的所有 Consumer 实例有效。 如果只对相同 Group ID 下两个 Consumer 实例中的其中一个设置了 MaxReconsumeTimes,那么该配置对两个 Consumer 实例均生效。 配置采用覆盖的方式生效,即最后启动的 Consumer 实例会覆盖之前的启动实例的配置。 获取消息重试次数 消费者收到消息后,可按照如下方式获取消息的重试次数:

rocketmq怎么看是订阅模式还是广播模式

在左下角有一个查看模式,点击查看就可以看是订阅模式还是广播模式了。rocketmq是一款分布式、队列模型的消息中间件,是阿里巴巴集团自主研发的专业消息中间件。

如果你还想了解更多这方面的信息,记得收藏关注本站。