MQ
复习用
消息队列引入的好处
- 通过异步处理提高系统性能(减少响应所需时间)
- 削峰/限流
- 降低系统耦合性。
消息队列引入的问题
- 系统可用性降低:需要处理mq宕机问题
- 系统复杂度提高:需要处理消息重复、丢失、保序等问题
- 一致性问题:消息没有被正确消费的话,引入一致性问题
kafka特点
- 高吞吐量、低延迟。topic可以分为多个partition,消费者组的消费者并行对topic进行消费
- 可扩展性:支持热扩展
- 持久性:消息被持久化到磁盘,并支持数据备份
- 容错性:partition replica
- 高并发:支持数千个客户端同时进行读写
kafka架构
- Producer(生产者) : 产生消息的一方。
- Consumer(消费者) : 消费消息的一方。
- Consumer Group(消费者组):同一个组内不同消费者负责消费不同的partation,消费者组之间互不影响
- Broker(代理) : 可以看作是一个独立的 Kafka 实例。多个 Kafka Broker 组成一个 Kafka Cluster。
- Controller:通过 zk 从 Brokers 中选举出来管理整个 Broker 集群的 Broker,名为 Controller。Controller 通过定时任务,或者监听器模式获取 zk 信息,将 zk 的变动通过事件的方式发送给事件队列,队列就是一个LinkedBlockingQueue,事件消费者线程组通过消费消费事件,将相应的事件同步到各 Broker 节点。
每个Broker又包含了topic和partition
- Topic(主题) : Producer 将消息发送到特定的主题,Consumer 通过订阅特定的 Topic 来消费消息。
- Partition(分区) : 一个 Topic 划分为 Partition ,分布在不同的 Broker 上,便于负载均衡以及并发消费。每个 Partition 以文件夹的形式存储在文件系统中。每个对应的 Partition 数据目录下存储
.index
,.log
,.timeindex
三种文件
partition实际上就可以看成是一个队列。
kafka partition的存储
一个 partition 包含多个 segment。每个segment文件,包含两部分,一个是 .log 文件(包含了发送的数据),另外一个是 .index 文件(记录.log文件的数据索引值,以便于加快数据的查询速度):
其中index文件的一行存储了log文件中的第n条消息在log文件中的偏移量,比如「3, 497」说的是「在log文件中的第3条消息,在log文件中偏移量为497」,而log文件中的「Message 368772」表示「在该partiton中是第368772条消息」:
Q:为什么.index不索引.log中的每条消息,而是1,3,6…:.index文件使用稀疏索引?A:.index会mmap映射到内存,提高访问速度,同时为了减少内存使用量而使用稀疏索引
Q:为什么一个partition要包含多组index+log:当log文件超过1G就会进行切分(文件命名为最后一条message在partition的偏移量)?A:便于删除过期文件,减少磁盘占用;根据局部性原理每次只需要使用一部分的消息,由于.index映射到内存,也只需要映射一小部分,减少内存占用
kafka partition 副本机制
kafka为partition引入replica(副本)实现故障转移,主副本叫做 leader(一个),从副本叫做 follower(多个)。follower 通过拉的方式从leader同步数据。与主副本已经同步的副本叫做in-sync-replicas (ISR) ,即 ISR 为当前可用的副本。
所有副本称为 AR(Assigned Replicas)
消费者和生产者都是从 Leader 读写数据,不与 Follower 交互。
kafka partition 副本机制中的HW,LEO,LSO
- LogStartOffset:每个副本都维护自己的LogStartOffset,指示副本中能读取到的最小偏移量,拉取同步或删除都可能推进LogStartOffset,以清除不再使用的旧偏移量
- LW(low watermark):AR中的最小的logStartOffset
- LEO(log end offset):每个副本都维护自己的LEO,指示下一个写入的偏移量,
- HW(high watermark):ISR中的最小LEO,即已同步到ISR中所有副本中的最大偏移量,消费者只能拉到HW之前的消息。若ISR所有副本都同步了,那么HW=LEO
- LSO(LastStableOffset):与事务有关,LSO为事务中的第一条消息的偏移量,若当前没有事务,则LSO=LEO
- 若消费者隔离级别为
read_commited
,则消费者只能读到LSO之前的消息 - 若消费者隔离级别为
read_uncommited
,则消费者可以读到HW之前的消息
- 若消费者隔离级别为
总结:LSO<=HW<=LEO
kafka和zookeeper
ZooKeeper 主要为 Kafka 提供元数据的管理的功能:
- 注册Broker:每个 Broker 在启动时,都会到 Zookeeper 上进行注册,将自己的 IP 地址和端口等信息记录到该节点中去。
- 注册Topic:同一个Topic 的消息会被分成多个分区并将其分布在多个 Broker 上,这些分区信息及与 Broker 的对应关系也都是由 Zookeeper 在维护
- 负载均衡: Kafka 会尽力将这些 Partition 分布到不同的 Broker 服务器上。当生产者产生消息后也会尽量投递到不同 Broker 的 Partition 里面。当 Consumer 消费的时候,Zookeeper 可以根据当前的 Partition 数量以及 Consumer 数量来实现动态负载均衡
- 记录分区和消费者的关系:一个分区只能被消费者组里的一个消费者消费。
- 记录消费进度:定时地将消费者对分区的消费进度Offset记录到Zookeeper上,以便在该消费者进行重启或者其他消费者重新接管该消息分区的消息消费后,能够从之前的进度开始继续进行消息消费
kafka2.8之后引入基于raft协议的kafka,不再依赖zk
kafka节点还活着的两个条件
- 节点和zk保持心跳
- 如果是个从节点必须及时同步主节点
kafka 的组协调者、组领导者以及rebalance
- Group Coordinator是某个Broker,负责通过心跳机制监听消费者是否在线、接收
JoinGroup
包加入消费者、通知触发rebalance等操作。 - Group Leader是组内某个Consumer,负责rebalance的具体规则。
Q:将coordinator和leader职责分开的好处是什么?A:将分区分配的职责下发到消费者组,每个消费者组可以灵活指定不同的策略,而不用重启或变更broker
kafka Producer执行步骤
- 发送消息:producer生产消息,从zk找到partition的leader,向leader推送消息
- 副本同步:通知isr列表的follower从leader拉取消息并发送ack
- 同步完成:leader收到所有follower的ack,更新offset,然后向producer发送ack
producer可以配置broker发送ack的策略:
- 0:不等待broker发送ack
- 1:broker写到leader后ack
- all:等待broker同步到整个ISR后ack,如果当前ISR中有副本超时,不是无限等待,而是将该副本踢出ISR,然后ack
kafka为什么快
kafka为了处理海量消息,把所有的消息都写入速度低容量大的硬盘,但实际上,使用硬盘并没有带来过多的性能损失。
- 顺序读写,写到PageCache,等OS来刷盘
- 零拷贝
- 文件分段
- 批量发送
- 数据压缩
kafka 缺点
- 由于批量发送导致数据延迟接收
- partition内消息有序,topic内的消息不一定有序(单partition的情况下还是有序的)
- 依赖zk
kafka保证消费顺序
kafka只能保证单个分区中的消息之间有序:消息在被追加到 Partition 的时候都会分配一个特定的 offset。Kafka 通过 offset 来保证消息在分区内的顺序性。
因此为了保证几条消息的有序性,可以把这几条消息发送到同一个分区
kafka保证消息不丢失
- 发送者丢失:添加发送回调函数,失败的话重新发送即可
- 中间件丢失:leader宕机,新leader未完全同步。可以通过设置acks=all(所有isr都收到消息才响应)、min.insync.replicas=n(至少被写入n个副本才响应)
- 消费者丢失:关闭自动提交offset,等消费完消息后再手动提交offset。(可能出现消费完消息后,提交offset前宕机,出现重复消费)
kafka重复消费
重复消费的原因
- 消费者消费后没提交offset就宕机了,触发rebalance
- 消费时间过长,导致kafka认为服务假死,触发rebalance
解决办法:
- 如果消费消息指的是写数据库,那么可以加一个messageId唯一索引列,将本次数据库写与消息id绑定,从数据库的层面实现幂等消费
kafka消费重试
消费失败的话会进行重试,重试到一定次数后视为消费失败,消息可以指定发送到对应的DLQ(死信队列)中,继续消费后面的消息。然后手动启动一个消费者去处理DLQ内的消息。
kafka 零拷贝
具体参见《系统八股》中的零拷贝
RabbitMQ架构
RabbitMQ是AMQP协议的实现(RabbitMQ也支持其他协议,比如MQTT3,STOMP2等)
Producer,Consumer:消息由label和payload组成,label是消息头,由一系列属性组成,如routing-key、priority、delivery-mode等。rabbitmq会根据消息头把消息发给对应的consumer
Exchange交换器:producer不是将消息直接投递到队列,而是发送给exchange,exchange充当一个路由器的角色,决定将消息投递到哪些队列中。Exchange通过Binding Key与Queue绑定起来。
Queue:保存消息。多个消费者可以订阅同一个队列,队列里的消息会被round-robin平均分摊给这些消费者。RabbitMQ不支持同一个队列的广播消费。
Broker:一个Broker可以简单看作一台服务器,
Exchange Type:常用的有四种交换类型(还有两种是system和自定义):
- fanout:将消息投递到所有队列
- direct:将消息投递到RoutingKey与BingdingKey完全匹配的队列,常用于处理有优先级的任务,这样可以精确地给高优消息分配更多的处理资源
- topic:也是匹配RoutingKey和BindingKey,但是可以包含通配符。这两个key格式是使用
.
分割的字符串xx.xx.xx
,可以使用通配符*
匹配1个单词,#
匹配0到n单词。如com.#
可以匹配com.a.b
或com.a
,com.*
可以匹配com.a
- headers:性能差,基本不用
AMQP
三层协议
- Module:定义了客户端可调用的命令,客户端用这些命令实现业务逻辑
- Session:定义客户端和服务端之间的请求和响应,提供可靠同步机制和错误处理
- Transport:定义二进制流,提供帧处理、信道复用、错误检测、数据表示等
三大组件
- Exchange:Broker中将消息路由到队列的组件
- Queue:存储消息的结构,位于硬盘或者内存
- Binding:路由规则的一部分
RabbitMQ死信队列
RabbitMQ中有一个死信交换器(DLX,Dead Letter Exchange),当消息在普通队列中成为死信之后,就会被发送到DLX中,然后DLX投递到死信队列中。导致消息变成死信的原因有三个:
- 消息被拒收
- 消息TTL过期
- 队列满了
RabbitMQ延迟队列
延迟队列指的是,消息被发送到队列后,消费者过一段时间才会拿到消息。
RabbitMQ并不直接支持延迟队列,一般有两种方式实现:
- 使用DLX+TTL来模拟延迟队列,此时DLX绑定的队列就变成了延迟队列
- RabbitMQ提供了插件来实现延迟队列
RabbitMQ优先级队列
优先级高的队列会先被消费。不过,当消费速度大于生产速度且 Broker 没有堆积的情况下,优先级显得没有意义。
RabbitMQ消息怎么传输
基于一条TCP连接,建立若干个信道,每个线程对应一个信道,每个信道在 RabbitMQ 都有唯一的 ID,保证了信道私有性。这样多个线程复用同一条TCP连接,避免了频繁创建TCP连接带来的overhead。
RabbitMQ保证消息不丢失
- 保证发送到服务端成功:事务机制或确认机制。开启事务机制后,发送消息失败会进行事务回滚并重新发送,否则提交事务。但是事务是同步的,降低了发送的性能,一般不用。而确认机制也有同步和异步两种,同步的也是性能低下,而异步的则是异步接收服务端的确认,然后异步地处理发送失败的消息。
- 保证exchange投递到队列成功:配置
mandatory=true
(投递失败通知客户端),然后客户端设置路由失败的回调来处理。 - 保证消息在服务端持久化:开启exchange和队列的持久化,避免因为MQ服务端挂掉而丢失数据。但是并不能避免整个服务器宕机,因此还需要配置 镜像队列 来避免单点故障,每个镜像队列都包含一个主节点和若干从节点,从节点不提供服务,只用来备份容灾
- 保证消费者正确消费:消费者确认机制,这点跟kafka类似
RabbitMQ高可用
- 单机模式:standalone
- 普通集群模式:启动多个节点,每个queue只会放在一个节点上,但每个节点的元数据是一样的。如果消费的队列不在连接的节点上,节点会将请求转发到队列所在的节点,并接受响应,然后响应给客户端
- 镜像集群模式:每个queue都在所有节点上同步
Kafka和RabbitMQ对比
kafka基于拉的消费,rabbitmq可以选择推或拉的消费
kafka支持流式API,并且吞吐量比rabbitmq大