Kafka Configuration
1. Producer Configuration
1.1 Mandatory Properties
bootstrap.servers
该属性为host:port格式的列表, 表示producer与Kafka集群初次建立连接时所需的broker列表. 该属性不必包含所有broker, 但推荐包含至少两个broker地址, 若其中一个broker宕机, 可连接另外一个broker.
key.serializer
该属性为一个class名称, 用于序列化record的key. 任何Java object都可作为key, 因此serializer必须知道如何将object转换为byte array, 因此该属性对应的class应实现org.apache.kafka.common.serialization.Serializer接口. kafka client已经提供了一个serializer, 如ByteArraySerializer, StringSerializer, IntegerSerializer, VoidSerializer. 即使record中的key为空, 也必须设置属性.
value.serializer
该属性为一个class名称, 用于序列化record的value.
1.2 Optional Properties
client.id
该属性可为任意字符串, 表示client的标识符. Broker用其区分不同的client, 通常用于日志记录和配额. 推荐将该属性配置为client所处的服务名称, 如order service. 默认为空字符串.
acks
该属性表示在producer认为写入成功前, 需有多少个partition replica接收到该record. 默认情况下, 当leader接收到record时, client就认为该record已成功写入. 以下是该属性的三种选项:
ack=0: producer不等待任何broker回复, 默认信息成功发送, 因此producer不知道信息是否发送至broker, 虽然可能发生信息丢失, 但信息吞吐量最高.ack=1: leader replica收到信息并回复producer时, producer认为信息已成功写入; 若leader宕机或还没选举出leader, producer会受到错误回复, 并重新发送信息. 若leader收到信息后宕机, 但信息还没复制到其他replica, 则信息丢失.acks=all: 所有sync replica都受到信息时, producer认为信息已成功写入. 该选项可确保即使leader宕机, 信息依然不会丢失, 但延迟最高, 因为producer需等待所有replica收到信息.
需要注意的是, 该选项只影响producer latency, 不影响end-to-end latency. Producer latency表示从producer生成一个record, 到producer确认该消息写入Kafka集群的时间; end-to-end latency表示从producer生成一个record, 到consumer可读取该record的时间. 由于一条record只有被所有replica写入后才能被consumer消费, 因此上述三个选项的end-to-end latency相同.
max.block.ms
该属性针对两个KafkaProducer的两个函数:
send(): 获取metadata和申请buffer的最长时长partitionFor(): 当metadata不可用, 等待新的metadata的最长时长
若调用上述函数时, 对应的操作阻塞达到该属性指定的时长, 则返回timeout exception.
request.timeout.ms
该属性规定了从producer发送数据到接收到broker的响应之间的最大时长, 不包括重试时间. 若超出该时长, 则返回timeout exception.
linger.ms
该属性规定了发送batch前等待下一条消息的最长时长. producer在生成一条消息后不会立即发送, 而是一次发送多个消息, 从而降低网络; 但producer无法确定下一条消息何时生成, 若等待时间超过该属性, 则直接发送当前batch.
delivery.timeout.ms
该属性规定了从send()成功返回(将record放入batch中)到broker响应或producer放弃重试之间的最大时长, 其中包括重试所需的时间. 该属性需大于linger.ms和request.timeout.ms, 否则返回exception. 若重试时超出该属性值, 则抛出重试之前错误对应的exception; 若还没重试前就超出属性值, 则返回timeout exception.
retries
该属性表示producer在放弃向broker发送前需重试多少次. 需要注意的是, 并不是错误都会让producer重试, 若消息大小超出阈值, 则producer不会重试. 若不想让producer重试, 则设置retries=0.
retry.backoff.ms
该属性表示每次重试的间隔时间, 避免在某些故障发生时重复发送请求.
buffer.memory
该属性为producer用于临时存放消息的内存大小, 若消息发送速率比消息生成速度低, 则会导致buffer耗尽, 下次调用send()时会阻塞进程, 直到超过max.block.ms时间并抛出exception.
compression.type
该属性为producer压缩消息的算法, 默认不压缩, 可选项为snappy, gzip, lz4和zstd.
- snappy: CPU负载低, 且压缩率优秀
- gzip: CPU负载高, 但压缩率最高
batch.size
该属性为一个batch的最大内存大小. batch中消息的总内存量超过该属性时, producer会立即发送该batch; 但即使调高该属性值, producer也不会一直等待, 而是等待linger.ms时长后发送batch.
max.in.flight.requests.per.connection
该属性为producer可以同时发送的batch数量, 该属性值越高, producer的吞吐量越大. 需要注意的是, 若该属性值大于1且entries不为0, 则可能无法保证一个partition内的消息顺序存储: 假设producer先发送消息A, 之后发送消息B, A由于网络故障而发送失败, B成功被broker写入partition, producer再次尝试发送A并成功被broker写入. 两条消息都被成功写入, 但写入顺序与发送顺序颠倒, 导致consumer也会按照错误顺序消费. 为保证顺序一致性, 可设置enable.idempotence=true, 这样可以保证在最多5个并发请求的情况下依然保证顺序一致, 且不会因重试产生重复写入.
max.request.size
该属性为单个请求的最大大小, 既表示单个消息的最大值, 也表示单个请求的最大值. 默认下该属性为1MB, 因此可以发送一个1MB大小的消息, 也可以发送包含1024个1KB大小消息的batch. 需要注意的是, broker端也会限制可收到消息的最大值(message.max.bytes), 超出大小会被broker拒绝.
receive.buffer.bytes and send.buffer.bytes
该属性为TCP send/receive buffer的大小, 若设置为-1, 则使用操作系统的默认值.
enable.idempotence
从0.11版本开始, kafka支持exacylt once语义, 将该属性设置为true可保证producer端生成消息的顺序和broker端的写入顺序相同, 且不会造成重复写入(若producer发送后, leader写入消息并宕机, producer未收到确认并再次发送消息, 新的leader会返回DuplicateSequenceException).