rocketmq知识点
所属分类 rocketmq
浏览量 1541
RocketMQ 前身是 MetaQ, MeataQ3.0版本改名为RocketMQ,设计思路和Kafka类似
使用Java开发 , kafka 使用 scala
Producer 主Broker 从Broker Topic Consumer NameServer
Broker 上报信息到 NameServer,Consumer 从NameServer 拉取Broker和Topic的信息
Group ProducerGroup,ConsumerGroup
Tag 业务有关联的可以使用同一个Tag,比如订单消息队列,使用Topic_Order
Queue 对应kafka中的Partition,每个Queue内部是有序的,分为读和写两种队列,一般来说读写队列数量一致
NameServer 无状态
Kafka使用ZooKeeper保存Broker信息 ,Broker Leader选举
非顺序消息 一般直接采用轮训发送的方式进行发送。
顺序消息 根据某个Key比如常见的订单Id,用户Id,进行Hash,将同一类数据放在同一个队列中,保证顺序性。
Consumer 下线或者上线 Rebalance
定时拉取broker,topic的最新信息
每隔20s做重平衡
随机选取当前Topic的一个主Broker,这里要注意的是不是每次重平衡所有主Broker都会被选中
获取当前Broker,当前ConsumerGroup的所有机器ID
然后进行策略分配
由于重平衡是定时做的,可能会出现某个Queue同时被两个Consumer消费,会出现消息重复投递。
Kafka重平衡通过Consumer和Coordinator联系来完成的,
当Coordinator感知到消费组的变化,会在心跳过程中发送重平衡的信号,
然后由一个ConsumerLeader进行重平衡选择,然后再由Coordinator将结果通知给所有的消费者。
消费模型
推送(push)和拉取(poll)
MQPullConsumer MQPushConsumer
其实这两种模型都是客户端主动去拉消息
MQPullConsumer
每次拉取消息需要传入拉取消息的offset和每次拉取多少消息量,具体拉取哪里的消息,拉取多少由客户端控制。
MQPushConsumer
客户端主动拉取消息,但是消息进度是由服务端保存,Consumer会定时上报自己消费到哪里,所以Consumer下次消费的时候是可以找到上次消费的点,
一般来说使用PushConsumer不需要关心offset和拉取多少数据,直接使用即可。
集群消费和广播消费
集群消费
同一个GroupId都属于一个集群,一般来说一条消息只会被任意一个消费者处理。
广播消费
广播消息会被集群中所有消费者进行消费,
但是要注意一下因为广播消费的offset在服务端保存成本太高,
所以客户端每一次重启都会从最新消息消费,而不是上次保存的offset
网络模型
Kafka使用原生的socket ,RocketMQ使用Netty
使用Netty的原因
API使用简单,不需要关心过多的网络细节,更专注于中间件逻辑。
性能高。
成熟稳定,jdk nio的bug都被修复了。
网络线程模型
1+N(1个Acceptor线程,N个IO线程)
1+N+M(1个acceptor线程,N个IO线程,M个worker线程)
RocketMQ使用 1+N1+N2+M
1个acceptor线程
N1个IO线程
N2个线程用来做Shake-hand,SSL验证,编解码
M个线程用来做业务处理。
这样的好处将编解码,和SSL验证等一些可能耗时的操作放在了一个单独的线程池,不会占据业务线程和IO线程。
Kafka在Topic数量由64增长到256时,吞吐量下降了98.37%
RocketMQ在Topic数量由64增长到256时,吞吐量只下降了16%
kafka一个topic下面的所有消息都是以partition的方式分布式的存储在多个节点上。
每个Partition其实都会对应一个日志目录,在目录下面会对应多个日志分段。
Topic很多时 ,文件过多,会造成磁盘IO竞争非常激烈。 顺序变随机读写
这里指的都是普通硬盘,SSD上面多个文件并发写入和单个文件写入影响不大
RocketMQ四个目录
commitLog
消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容,消息内容不是定长的。
单个文件大小默认1G ,文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;
当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。
消息主要是顺序写入日志文件,当文件满了,写入下一个文件
config
保存一些配置信息,包括一些Group,Topic以及Consumer消费offset等信息
consumeQueue
消息消费队列,引入的目的主要是提高消息消费的性能,
由于RocketMQ是基于主题topic的订阅模式,消息消费是针对主题进行的,如果要遍历commitlog文件中根据topic检索消息是非常低效的。
Consumer即可根据ConsumeQueue来查找待消费的消息。
其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。
consumequeue文件可以看成是基于topic的commitlog索引文件,故consumequeue文件夹的组织方式如下:topic/queue/file三层组织结构
index HOME \store\index\${fileName},文件名fileName是以创建时的时间戳命名的,
固定的单个IndexFile文件大小约为400M,一个IndexFile可以保存 2000W个索引,
IndexFile的底层存储设计为在文件系统中实现HashMap结构,故rocketmq的索引文件其底层实现为hash索引。
CommitLog添加新的消息,有一个定时任务ReputService会不断的扫描新添加进来的CommitLog,
然后不断的去构建ConsumerQueue和Index
读取消息
Kafka中每个Partition都会是一个单独的文件,所以当消费某个消息的时候,会很好的出现顺序读,
OS从物理磁盘上访问读取文件的同时,会顺序对其他相邻块的数据文件进行预读取,将数据放入PageCache,所以Kafka的读取消息性能比较好。
RocketMQ读取流程
先读取ConsumerQueue中的offset对应CommitLog物理的offset
根据offset读取CommitLog
ConsumerQueue也是每个Queue一个单独的文件,并且其文件体积小,所以很容易利用PageCache提高性能。
而CommitLog,由于同一个Queue的连续消息在CommitLog其实是不连续的,所以会造成随机读,RocketMQ对此做了几个优化:
Mmap映射读取,Mmap的方式减少了传统IO将磁盘文件数据在操作系统内核地址空间的缓冲区和用户应用程序地址空间的缓冲区之间来回进行拷贝的性能开销
使用DeadLine调度算法+SSD存储盘
由于Mmap映射受到内存限制,当不在Mmmap映射这部分数据的时候(也就是消息堆积过多),默认是内存的40%,会将请求发送到SLAVE,减缓Master的压力
集群模式
单Master
单Master多SLAVE master 宕机,写入不可用,读取依然可用,如果master磁盘损坏,可依赖slave的数据。
多Master 如果出现部分master宕机,那么这部分master上的消息都不可消费,也不可写,如果一个Topic的队列在多个Master上都有,那么可以保证没有宕机的那部分可以正常消费,写入。如果master的磁盘损坏会导致消息丢失。
多Master多Slave master宕机,只会出现在这部分master上的队列不可写入,但依然可读取 ,如果master磁盘损坏,可以依赖slave的数据。
存储和复制
刷盘 同步和异步
选择同步刷盘,刷盘超时返回FLUSH_DISK_TIMEOUT
Dleger
master出问题,写入不可用,除非恢复master,或者手动将slave切换成master
RocketMQ在最近的几个版本中推出 Dleger-RocketMQ,使用Raft协议复制CommitLog,并且自动进行选主
定时/延时消息
订单超时未支付自动关闭,因为在很多场景中下单之后库存就被锁定了,这里需要将其进行超时关闭。
需要一些延时的操作,比如一些兜底的逻辑,当做完某个逻辑之后,可以发送延时消息比如延时半个小时,进行兜底检查补偿。
在某个时间给用户发送消息,同样也可以使用延时消息。
延时消息是利用新建单独的Topic和Queue来实现
事务消息
使用步骤
调用sendMessageInTransaction发送事务消息
如果发送成功,则执行本地事务。
如果执行本地事务成功则发送commit,如果失败则发送rollback。
如果其中某个阶段比如commit发送失败,rocketMQ会进行定时从Broker回查,本地事务的状态。
Step1: 发送事务消息,这里也叫做halfMessage,会将Topic替换为HalfMessage的Topic。
Step2: 发送commit或者rollback,如果是commit这里会查询出之前的消息,然后将消息复原成原Topic,并且发送一个OpMessage用于记录当前消息可以删除。如果是rollback这里会直接发送一个OpMessage删除。
Step3: 在Broker有个处理事务消息的定时任务,定时对比halfMessage和OpMessage,如果有OpMessage且状态为删除,那么该条消息必定commit或者rollback,所以就可以删除这条消息。
Step4: 如果事务超时(默认是6s),还没有opMessage,那么很有可能commit信息丢了,这里会去反查我们的Producer本地事务状态。
Step5: 根据查询出来的信息做Step2。
上一篇
下一篇
drools简介及实例
redis单线程为什么还那么快
redis的VM机制
链式调用优缺点
CountDownLatch 与 CyclicBarrier
让自己更优秀的16条法则