Top 5 things every apache kafka developer should know

top 5 things every apache kafka developer should know

翻译自:https://www.confluent.io/blog/top-5-things-every-apache-kafka-developer-should-know
介绍下面5个内容

  • 理解消息传递和持久化保证
  • 学习producer api中的新的粘性分区(learn about the new sticky partitioner in the producer API)
  • 利用cooperative rebalancing(协同重平衡)来避免消费者组执行rebalance时的stop the world
  • 掌握常用命令行工具
    • kafka console producer
    • kafka consule consumer
    • dump log
    • delete records
  • 使用record headers的能力
    • 为kafka记录增加headers
    • 检索headers

Tip1:理解消息传递和持久化保证

针对数据持久化,KafkaProducer提供了不同的配置。acks 配置指定了当生产者接受到多少消息确认后,才认为记录已经成功发送到broker上。kafka提供了以下三种选择:

  • none: 生产者不等待broker的确认,发送消息后就认为已经成功发送到broker上。
  • one: 生产者等待leader broker的确认(leader broker有一个),一定收到确认,就认为消息发送成功
  • all: 生产者需要等待所有的ISR(in-sync replicas) broker都确认消息后,才认为消息发送成功。
    如果需要更到的发送吞吐量,可以损失一定的数据,那么可以使用none或one。而如果应用不能容忍数据丢失,那么可以设置all,但是这样吞吐量会降低。
    这里需要说明下acks=all的情况。下面的场景描述中,producer都是使用acks=all来发送消息,并且topic副本数是3,一个leader,两个follower。
    情况1:如果这些副本中的记录偏移量是一致的,那么他们被认为是in-sync的。如下面所示,producer采用acks=all的情况: 情况2:假设由于某些情况(网络分区,负载过高等),导致两个follower没有跟上leader,那么follower就不是in sync的。此时生产者发送消息,那么实际的确认只会有一个。acks=all并不是指定有多少副本必须在in-sync。leader broker始终跟自己是同步的。

一般来说,设置acks=all, 我们的要求通常都是所有副本都应该确认,或者至少大量的in sync副本应该确认。如果不是这样,那么应该抛出异常知道所有副本都在in sync中。
为了满足这个要求,kafka提供了这样一个配置:min.insync.replicas. 这个配置强制指定多少个副本写成功才被认为真正写成功。需要注意的是,min.insync.replicas配置在是broker或者topic级别,而不是在producer上。min.insync.replicas默认值是1。所以为了避免上面说的情况,在三个副本的情况下,需要将min.insync.replicas设置为2。

上图中展示了in sync中的副本不满足min.insync.replicas要求的情况,此时producer发送的消息,leader broker不会将记录添加到log中,而是会抛出NotEnoughReplicasException 或者 NotEnoughReplicasAfterAppendException。副本与leader不一致被认为是一种可以重试的错误,所以producer会重试直到成功或者达到超时时间(默认值两分钟)delivery.timeout.ms
如果需要非常高的数据持久化保证,那么应该同时设置min.insync.replicas和acks=all.

Tip 2: Learn about the new sticky partitioner in the producer API

kafka需要partition来提升吞吐量并且将消息均衡到不同的broker上。kafka的消息记录是key/value格式,其中key可以为null。kafka producer在发送消息时,不会立即发送,而是将消息放置到对应的partition batch中(类似缓存),待缓存满了,在一次发送。batch是一种增加网络利用的有效方式。在将消息发送到partition中,通常有三种方式来决定发送到哪个partition上。

  • 方式1:在发送消息时,直接指定消息对应的partition。这种情况,producer直接使用这个partition
  • 方式2:如果没有提供partition,消息包含key,那么producer会使用key的hash值来决定partition。
  • 方式3:如果既没有key也没有提供partition信息,那么kafka会使用round-robin的方式将消息发送到不同的partition中。producer会将第一个消息发送到partition 0,第二个消息发送到partition 1,以此类推。

下图展示了方式3:

round robin方法对于将消息均衡到不同的partition上工作的很好。但是存在一个缺点,由于producer是依次将消息发送到不同的partition batch中,那么有可能会出现每个partition中的batch都填充不满。比如下面展示的,topic有三个partition。假设应用产生了9条消息,并且消息没有key,所有的消息几乎是同时发送,如下图:

9条记录分散到是三个batch中,每个batch有三条。但是如果我们将9条消息放到一个batch中会更好。更少的batch使用更少的网络带宽并且对于broker的负载更小。
kafka 2.4.0新增了sticky partitioner approach. 这种方法能够将消息发送到一个partition的batch中直到此batch满了。然后,发送这个batch,sticky partitioner使用下一个partition的batch。如下图展示了使用sticky partitioner的例子:

通过使用sticky partitioner方法,我们减少了请求次数,同时也减少了请求队列上的负载,也减少了系统延迟。需要注意的时,sticky partitioner仍然是将消息均衡放置到不同的partition batch中。可以将这种认为是per-batch round robin 或者 eventually even approach。
如果想要更多了解sticky 模式,可以参考 Apache Kafka Producer Improvements with the Sticky Partitioner

Tip 3: Avoid “stop-the world” consumer group rebalances by using cooperative rebalancing

kafka是一个分布式系统,而分布式系统中一个重要的事情就是如何处理失败。kafka处理失败的方式之一是使用consumer group,consumer group管理多个consumer。如果其中一个consumer停止,kafka会进行rebalance从而确保另一个consumer能够接管这个工作。
从2.4版本开始,kafka引入了一个新的rebalance协议,cooperative rebalancing。在深入了解cooperative rebalancing之前,先来了解一下consumer group的基础。
假设一个分布式应用(比如一个微服务的多个副本)有个多个consumer,订阅同一个topic。这些consumer组成了一个consumer group,具有同样的.group.id。在consumer group中的每个consumer负责从一个或多个partition中消费消息。这些partition的分配是由consumer group中的leader进行的。如下图所示:

从图中可以看到,总共有6个partition,在理想的情况下,每个consumer负责消费两个partition。但是如果其中的某个应用失败了或者不能连接网络。那么对应的partition中的消息是不是就不能被消费直到应用恢复?幸运的是,由于consumer rebalancing协议的存在,不会发生这种情况。
下图展示了consumer group protocal过程:

如上图,consumer2由于某些原因失败了。group coordinator将它从组中移除然后触发rebalance。rebalance尝试将工作负载在组内所有工作的consumer上进行均衡分布。在这个例子中,consumer2离开了组,rebalance会将consumer2拥有的partition分配给组内其他的consumer。所以对于一个consumer group,如果其中consumer失败了, 那么对于这些partition的处理不会产生影响。
但是,默认的rebalance协议有个缺点。在rebalance过程中,每个consumer都会放弃之前获得的partition(这会造成consumer停止消费),知道topic下所有的partition都被重新分配。这种情况被称为stop the world rebalance。为了解决这个问题,依靠ConsumerPartitionAssignor实例,consumer简单的重新获取之前分配的partition,所以在这些partition上仍然能够继续消费。
上述描述的实现被称为eager rebalancing, 因为它优先考虑的是针对一个consumer group中,不会有两个consumer同时对于一个partition拥有主权。
虽然对于同一个topic下的某个partition不能具有相同的consumer非常重要,但是有一种更好的方法,既能够提供安全性同时还不会暂停处理,既incremental cooperative rebalancing。这个方法在kafka2.3版本的kafka connect中被首次引入,现在已经在consumer group 协议中实现了。利用cooperative 方法,消费者不会在rebalance开始时主动放弃partition的所有权。在cooperative方法中,consumer group中的所有成员会将当前的分配进行编码然后将信息发送到group leader中。group leader决定那个partition需要修改对应的consumer。而不是一开始就完全从新分配。之后第二次rebalance发起,但是这一次,仅仅涉及到那些需要改变所有权的分区。这有可能是撤销不在用的partition或者新增的partition。对于那些没有改变所有权的分区,这些分区中的数据会继续进行处理。
这种处理办法解决了stop-the-world,而仅仅是暂停了哪些需要修改所有权分区的消费。这带来了更少的rebalance代驾以及降低了完成rebalance的时间。即使rebalance时间很长也没有关系,因为现在数据仍然被处理。使用CooperativeStickyAssignor能够开启这个功能。
如果要开启这个功能,则需要将partition.assignment.strategy设置为使用CooperativeStickyAssignor。这种设置完全是在客户端测,所以仅仅更新客户端版本即可。而在Kafka Stream中,这个功能是默认开启的。

Tip 4:掌握命令行工具

下面介绍了4种在平时工作中使用最多的工具。

kafka console producer

1
2
3
4
# 开启发送者程序, 发送的消息只有value,没有key
kafka-console-producer --topic <topic> --broker-list <broker-host:port>
# 发送消息,发送的消息包含key 和 value
kafka-console-producer --topic <topic> --broker-list <broker-host:port> --property parse.key=true --property key.separator=":"

kafka console consumer

1
2
3
4
5
6
# 消费指定topic中的消息
kafka-console-consumer --topic <topic> --bootstrap-server <broker-host:port>
# 指定从开始的地方消费
kafka-console-consumer --topic <topic> --bootstrap-server <broker-host:port> --from-beginning
# 默认情况下consumer只会打印消息的value,如果想要打印消息的key,则输入下面命令
kafka-console-consumer --topic <topic> --bootstrap-server <broker-host:port> --property print.key=true --property key.separator=":"

Dump log

1
2
3
# 指定打印topic为example-0中的日志,参数--print-data-log表示输出日志
# 不过一般在生产环境中不会使用这个命令
kafka-dump-log --print-data-log --files ./var/lib/kafka/data/example-0/00000000000000000000.log

delete records

kafka提供了配置来控制数据保留,包括时间和数据大小

  • 数据保留的时间由 log.retention.hours 控制,默认值是168hour,也就是一周
  • configuration.log.retention.bytes 控制segment文件最大是多少。默认值是-1, 也就是不限制大小

如果想要删除数据,可以使用下述命令:

1
2
kafka-delete-records --bootstrap-server <broker-host:port> \
--offset-json-file offsets.json

offsets.json 文件内容如下:
1
2
3
4
5
6
{
"partitions": [
{"topic": "example", "partition": 0, "offset": -1}
],
"version":1
}

参数介绍如下:

  • topic:指定要删除数据对应的topic
  • partition:指定需要删除数据对应的partition
  • offset:指定从哪个offset开始删除,注:是删除offset之前的数据。-1表示删除当前HW之前的数据,HW(high watermark)表示能够开始消费的位置

Tip5:使用record headers的能力

Record headers可以给kafka消息添加一些元数据,并且不是给消息的key value添加额外的信息。比如如果你想要在消息中嵌入一些信息,如表示消息来源系统,也是是想要增加一些审计功能。
为什么不能将这些额外的数据添加到key中。因为给key中添加数据会带来两个潜在的问题

  1. 首先,如果你使用的是压缩主题,那么给key添加信息会使得消息不正确。这样压缩不会像之前起作用
  2. 其次,给key添加额外的信息有可能会影响数据的partition分布

回顾

我们了解了kafka的五个tips,我们理解了下面的知识点

  1. 消息持久性以及和消息传递之间的关系
  2. producer API中的sticky partitioner
  3. command line tools
  4. record headers的能力