为了保证producer的消息能可靠的投递到指定的topic,topic的每个分区partition收到消息后,都需要向producer发送ack(acknowlege确认收到),如果生产者producer收到了ack,则进行下一轮的发送,否则重新发送。

【kafka】生产者消息投递可靠性(ACK机制,ISR机制,leader选举机制)

ACK机制的实现

问题:分区中现有一个leader副本节点和多个follower副本节点,生产者将消息发送过来的时候,何时返回ack给生产者?

leader副本负责读与写,follower副本同步leader的数据。

方案1:leader和所有的follower都同步完成,才发送ack给生产者
方案2:leader+follower同步完成的数量过半,就发送ack给生产者
【kafka】生产者消息投递可靠性(ACK机制,ISR机制,leader选举机制)

kafka采用的eader+follower完全同步机制(ISR中follower),所有节点同步完成才返回ack。

存在问题:leader+follower完全同步时,假如有1个leader+4个follower,1个leader和3个follower都同步完成,1个follower同步超级慢或者挂掉,会影响返回或者不返回ack。

ISR机制的实现

  • [x] ISR机制

当leader挂掉的时候,由controller会在follower副本中选举出一个leader。但是这个剩余follower副本有一个条件,就是follower必须在ISR列表中。

leader和ISR中的follower副本都同步完成时,就返回ack。

ISR (IN-SYNC Replication) 维护了与leader信息一致的follower副本的信息,当leader挂掉的时候 就从这个ISR中选举。

ISR信息存放在zookeeper的topic信息中,由kafka动态维护
【kafka】生产者消息投递可靠性(ACK机制,ISR机制,leader选举机制)

leader如何动态维护ISR?
【kafka】生产者消息投递可靠性(ACK机制,ISR机制,leader选举机制)

  • replica.lag.max.messages 默认值4000
    如果某个follower副本落后leader副本的消息数超过了这个值,那么leader副本就会把该follower副本从ISR中移除。

在0.9之后的版本中已经移除。

存在问题,生产者批量产生多余4000条的消息,发送给leader,此时ISR中所有的follower副本全部落后于leader,都会被剔除ISR。然后又要有新的follower副本加入ISR(问题:频繁操作ISR,还要操作zookeeper)。

  • replica.lag.time.max.ms
    follower副本响应leader副本的最长等待时间。超过这个时间将会从ISR中移除。

生产者生产消息进行投递——>分区中的leader——>通知到ISR中所有的follower副本进行同步数据——>ISR中所有的follower告知leader同步完成——>leader返回ack——>生产者

  • [x] acks 生产者投递消息的ACK的级别设置
    如果需要等到ISR所有的follower副本返回消息leader,可能需要等待,便产生了不同的ack可靠性级别

  • acks=0 消费者只管投递消息,leader一接收到消息还没有写入磁盘就返回ack。

优点:延迟性最低
缺点:如果接收到消息后leader没有写入磁盘就挂掉,从ISR中的follower新选举leader后,会丢失数据。

  • acks=1 消费者只等到leader写入磁盘完成,不管follower副本是否同步完成,就返回ack。

问题:leader写入磁盘完成后挂掉了,ISR中的follower还没有来得及同步。从ISR中的follower新选举leader后,会丢失数据。

  • acks=-1或者all

消费者投递消息后,等待leader和ISR中所有的follower副本同步完成,leader才返回ack。

优点:leader和ISR中所有的follower都同步完成,不丢数据,达到副本数据一致性。

问题:leader在返回ack之前就挂掉了,会从ISR中的follower中选出leader,此时所有leader+follower数据都一致。生产者没有收到leader的ack回应会重试投递,会造成数据重复。

  • [x] leader、follower故障处理

【kafka】生产者消息投递可靠性(ACK机制,ISR机制,leader选举机制)

log中每个消息都会有对应一个offset偏移量。

  • HW 高水位线(High Watermark), LEO 日志尾部偏移量(Log End Offset)

leader与ISR中所有的follower的文件的最小LEO为HW。如上图所示,HW为12。

  • 对于消费者来说,leader中不大与HW的数据才能被consumer可见。如图所示leader文件中的0~12可以被消费者所见。

  • [x] 分区中ISR中的某follower副本挂掉后,leader会将该follower副本剔除ISR。
    待该follower恢复后,读取大发棋牌大发棋牌技巧技巧 本地 磁盘记录的上次HW,并将文件大于HW的部分截取掉,从HW位置开始请求向leader同步。当follower的LEO达到该分区partition的HW时候(即该follower赶上了leader),就可以重新加入到ISR中。

leader选举机制的实现

  • [x] 分区中的leader挂掉后,需要从ISR的follower副本中选举出新的leader。
    【kafka】生产者消息投递可靠性(ACK机制,ISR机制,leader选举机制)

图示HW为9,消费者只能看到0~9

假如leader挂掉了,选举follower2为leader,那么以新leader通知其他节点以HW 9为基准,超过HW的部分需要截取掉,leader自身的(LEO为11 大于HW 9)不用截取掉。然后从新的leader开始同步。

**

  • 保证了副本数据之间的一致性
  • 保证了消费者消费数据的一致性
  • 不能保证数据投递不丢失或者不重复。**