首页  

rocketmq要点     所属分类 rocketmq 浏览量 2276
http://rocketmq.apache.org/
 
RocketMQ广泛应用于交易、数据同步、缓存同步、IM通讯、流计算、IoT等场景。

使用轻量级的NameServer(多台),没有使用ZK(rocketmq没有选举) 
broker: master、slave模式(预先配置好)master挂了不能写,还可以继续读(可以从master和slave读) 

与kafka区别
一共一个commitLog(尽量顺序写,支持多个topic),topic对应于多个MessageQueue 
支持事物,两阶段提交 
producer:localTransaction + localTransaction查询 
broker:half queue + op queue 

应用解耦 流量消峰 消息分发

2007年Notify
2010年的Napoli
2011年升级后改为MetaQ
2012年RocketMQ Java语言开发

Notify主要使用推模型,解决事务消息。
MetaQ主要使用拉模型,解决顺序消息和海量堆积问题。

RocketMQ基于长轮询的拉取方式,兼有两者的优点。

RocketMQ:Java
Kafka:Scala
RabbitMQ:Erlang

Producer Consumer Broker NameServer
先启动NameServer,再启动Broker
消除单点故障,增加可靠性 提升吞吐量,部署多个NameServer和Broker,为每个Broker部署一个或多个Slave。

Topic  分成 Message Queue 类似分区 Partition

fileReservedTime = 48
在磁盘上保存消息的时长,单位是小时,自动删除超时的消息。

flushDiskType = ASYNC_FLUSH
刷盘策略,分为SYNC_FLUSH和ASYNC_FLUSH两种,分别代表同步刷盘和异步刷盘。

listenPort  = 10911
Broker监听端口


消费者两种类型

1、DefaultMQPushConsumer,由系统控制读取操作,收到消息后自动调用传入的处理方法来处理。

2、DefaultMQPullConsumer,读取操作中的大部分功能由消费者自主控制。

Consumer GroupName 与 消息模式(MessageModel)配合使用

支持两种消息模式:Clustering和Broadcasting

Clustering模式
同一个ConsumerGroup(GroupName相同)里的每个Consumer只消费订阅消息的一部分内容

Broadcasting模式
同一个ConsumerGroup里的每个Consumer都能消费到所订阅Topic的全部消息


Push  Server端接收到消息后,主动把消息推送给Client端,实时性高。
Push方式主动推送缺点,
加大Server端工作量,影响Server的性能
Client处理能力各不相同,Client状态不受Server控制,如果Client不能及时处理Server推送过来的消息,会造成各种潜在问题。


Pull方式
Client端循环从Server端拉取消息,自己拉取到一定量消息后,处理后再接着取。

Pull方式缺点
循环拉取消息的间隔不好设定,间隔太短就处在一个“忙等”的状态,浪费资源
Pull时间间隔太长,实时性低


长轮询方式

Broker端HOLD住客户端请求一小段时间,在这个时间内有新消息到达,就利用现有的连接立刻返回消息给Consumer。
长轮询的主动权还是掌握在Consumer手中,Broker即使有大量消息积压,也不会主动推送给Consumer。
长轮询方式的极限性,是在HOLD住Consumer请求的时候需要占用资源,适合用在客户端连接数可控的场景。

DefultMQPushConsumer的流量控制

分布式消息队列的协调者

集群 新的Producer或Consumer加入

NameServer维护集群元数据 ,协同执行。
可部署多个,相互独立,其他角色同时向多个NameServer上报状态信息,热备。
无状态,元数据信息不会持久化存储,由各个角色定时上报并存储到内存中。

集群状态存储结构

topicQueueTable,存储所有Topic信息,QueueData里存储着Broker名称、读写queue的数量、同步标识等。

BrokerAddrTable,一个Master Broker和多个Slave Broker的地址信息。

ClusterAddrTable,存储集信息,Cluster名称和BrokerName组成的集合。

BrokerLiveTable,Broker实时状态,包括上次更新状态的时间戳,定期检查这个时间戳,超时没有更新就认为Broker无效,将其从Broker列表里清除。

FileterServerTable,过滤服务器,一个Broker可以有一个或多个Filter Server。

为何不用Zookeeper

RocketMQ 不需要Master选举,只需要一个轻量级的元数据服务器。


底层通信机制基于Netty


消息队列的核心机制

消息存储与发送

分布式消息队列 高可靠性  数据持久化存储。

目前高性能磁盘,顺序写速度可以达到600MB/s,超过一般网卡的传输速度。

磁盘随机写的速度只有大概100KB/s。

服务器把本地磁盘文件内容发送给客户端,一般分为两个步骤:

1、read,读取本地文件内容
2、write,将读取的内容通过网络发送出去

4次数据复制
从磁盘复制数据到内核态内存
从内核态内存复制到用户态内存
从用户态内存复制到网络驱动的内核态内存
从网络驱动的内核态内存复制到网卡中进行传输。

消息存储结构 尽量保证顺序写

消息存储由ConsumerQueue和CommitLog配合完成,消
息真正的物理存储文件是CommitLog,
ConsumeQueue是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。
每个Topic下的每个Message Queue都有一个对应的ConsumeQueue文件。

CommitLog以物理文件的方式存放。

一个消息的存储长度是不固定的,RocketMQ采取一些机制,尽量向CommitLog中顺序写,但是随机读。
ConsumeQueue的内容也会持久化。

1、CommitLog顺序写,提高写入效率。
2、虽然是随机读,但是利用操作系统的pagecache机制,可以批量从磁盘读取,作为cache存到内存中,加速后续的读取速度。
3、为保证完全的顺序写,需要ConsumeQueue这个中间结构,因为ConsumeQueue里只存偏移量信息,大小有限,大部分的ConsumeQueue能够被全部读入内存

顺序消息:全局顺序、部分顺序

重复消费问题

解决消息重复的两种方法
第一种方法是保证消费逻辑的幂等性(多次调用和一次调用效果相同),
另外一种方法是维护一个已消费消息的记录,消费前查询这个消息是否被消费过。

消息优先级

RocketMQ先入先出队列,不支持消息级别或者Topic级别的优先级。

提高Consumer处理能力

1、提供消费并行度 同一个ConsumerGroup,增加Consumer实例
2、批量消费
3、延时检测,跳过非重要消息

发送一条消息的三个步骤
1、客户端发送请求到服务器
2、服务器处理该请求
3、服务器向客户端返回应答

提高发送速度方法
1、使用Oneway方式,写入客户端Socket缓冲区就返回,微秒级
2、增加Producer并发量,使用多个Producer同时发送,引入并发窗口,在窗口内消息可以并发地写入DirectMem,然后异步地将连续一段无空洞的数据刷入文件系统。


主从同步机制
Master和Slave之间的同步
1、元数据信息,采用基于Netty的command方式来同步消息
2、commitLog信息,直接基于Java NIO来实现。

上一篇     下一篇
一行代码摧毁jvm

unix/linux基本理念和准则

自旋锁要点及简单实例

Docker和K8S

docker安装和使用

docker设置镜像仓库