Kafka 概念及用法
QuickStart
根据 https://hub.docker.com/r/apache/kafka 上的说明,使用docker compose进行集群部署:
services:
controller-1:
image: apache/kafka:latest
container_name: controller-1
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: controller
KAFKA_LISTENERS: CONTROLLER://:9093
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
controller-2:
image: apache/kafka:latest
container_name: controller-2
environment:
KAFKA_NODE_ID: 2
KAFKA_PROCESS_ROLES: controller
KAFKA_LISTENERS: CONTROLLER://:9093
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
controller-3:
image: apache/kafka:latest
container_name: controller-3
environment:
KAFKA_NODE_ID: 3
KAFKA_PROCESS_ROLES: controller
KAFKA_LISTENERS: CONTROLLER://:9093
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
broker-1:
image: apache/kafka:latest
container_name: broker-1
ports:
- 29092:9092
environment:
KAFKA_NODE_ID: 4
KAFKA_PROCESS_ROLES: broker
KAFKA_LISTENERS: 'PLAINTEXT://:19092,PLAINTEXT_HOST://:9092'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker-1:19092,PLAINTEXT_HOST://localhost:29092'
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
depends_on:
- controller-1
- controller-2
- controller-3
broker-2:
image: apache/kafka:latest
container_name: broker-2
ports:
- 39092:9092
environment:
KAFKA_NODE_ID: 5
KAFKA_PROCESS_ROLES: broker
KAFKA_LISTENERS: 'PLAINTEXT://:19092,PLAINTEXT_HOST://:9092'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker-2:19092,PLAINTEXT_HOST://localhost:39092'
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
depends_on:
- controller-1
- controller-2
- controller-3
broker-3:
image: apache/kafka:latest
container_name: broker-3
ports:
- 49092:9092
environment:
KAFKA_NODE_ID: 6
KAFKA_PROCESS_ROLES: broker
KAFKA_LISTENERS: 'PLAINTEXT://:19092,PLAINTEXT_HOST://:9092'
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker-3:19092,PLAINTEXT_HOST://localhost:49092'
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
depends_on:
- controller-1
- controller-2
- controller-3
注意Kafka在4.0+后引入了KRaft模式,该模式去掉了zookeeper的依赖,以后会成为主流。这里有两个角色:Controller负责管理,Broker负责实际业务。注意因为是Raft,任意时刻也只有一个Controller在工作。
常见用法
-
创建Topic
kafka-topics.sh --bootstrap-server localhost:29092 --create --topic my-cluster-topic --partitions 3 --replication-factor 2
-
查询Topic
kafka-topics.sh --bootstrap-server localhost:29092 --describe --topic my-cluster-topic
-
删除Topic
kafka-topics.sh --bootstrap-server localhost:29092 --delete --topic my-cluster-topic
-
生产消息
kafka-console-producer.sh --bootstrap-server localhost:29092 --topic my-cluster-topic
如果需要带上key的话,可以加上,那么输入消息的时候结构为key:val
--property "parse.key=true" --property "key.separator=:"
-
消费消息
kafka-console-consumer.sh --bootstrap-server localhost:29092 --topic my-cluster-topic --from-beginning
如果需要打印key的话
--property print.key=true --property key.separator=:
如果需要打印offset
--property print.offset=true
消费组测试
首先安装对应依赖
pip3 install kafka-python
用Python脚本定时产生消息
from kafka import KafkaProducer
import time
import random
producer = KafkaProducer(
bootstrap_servers='localhost:29092',
key_serializer=str.encode,
value_serializer=str.encode
)
i = 0
while True:
key = f"user-{random.randint(0, 9)}"
value = f"message-{i}"
producer.send('my-cluster-topic', key=key, value=value)
print(f"Produced: {key}:{value}")
i += 1
time.sleep(0.5)
终端启动消费者,分配对应group
kafka-console-consumer.sh --bootstrap-server localhost:29092 --topic my-cluster-topic --group groupA --property print.partition=true
查看消费组内每个成员分配到的partition
./kafka-consumer-groups.sh --bootstrap-server localhost:29092 --describe --group groupA
有几个核心字段
- LOG-END-OFFSET Kafka 当前写到的位置
- CURRENT-OFFSET 消费者已经消费并提交到的位置
- LAG 尚未消费的消息,高LAG就有问题了
一些概念
什么是消息的key
Kafka 中的 消息 key(键) 是消息的一个可选字段,它在某些场景下非常重要,尤其是在分区路由、消息去重和日志压缩时。Kafka 中的每条消息都有如下结构:ProducerRecord<K, V>
。
- 决定消息路由到哪个分区: 用户 ID 作为 key,可以保证每个(同一)用户的行为消息都被写入同一个分区,这样消费时可以按顺序处理。
- 缺点: 分区负载可能不均衡, 增加分区数时不能保持之前的 hash 结果
- 启用日志压缩(cleanup.policy=compact)时使用
- 用于幂等写入/去重(少见)
消息什么时候清理
两种策略,参考文档 https://kafka.apache.org/documentation/ ,可以做delete或者compact。compact就根据key只保存最后一条,delete就按时间或者体积来清理。
消息是怎么写入的
如果一个 topic 有多个分区,这些分区所在的 broker 都可以分别处理写操作,但前提是:每条消息只会写入一个分区的 leader。
Kafka 的 多个分区可以分布在多个 broker 上,各自的 leader broker 会并发接收写请求。Producer 会根据分区路由逻辑,把每条消息写入对应分区的 leader 上。
幂等
Kafka可以通过--producer-property enable.idempotence=true
来启用幂等。
- Kafka broker 会为每个幂等 producer 分配一个唯一的 producer ID。
- 每个 partition 有自己的 消息序号(sequence number)。每发一条消息,序号 +1。
- Broker 维护每个 partition 的 (PID, sequence number) 状态:如果收到了重复的 sequence(已经写过了),会丢弃;如果是正常的下一个 sequence,才会写入。
同时存在一些问题:
- 幂等性只能保证 producer 到 broker 的一次性写入,不能跨多个 session(比如重启) 保证“最终去重”。
- 如果 producer 重启,它会获取新的 PID,之前的幂等性上下文会丢失。
- 幂等性不等于消费端去重。如果你消费端重复处理数据,那是你的业务逻辑要处理的。
ISR
ISR(In-Sync Replicas,同步副本集合):一个分区中,和 leader 副本保持同步的 follower 副本集合。如果follower落后太多,会被踢出。
批量发送和拉取
提高Kafka的Producer和Consumer的性能。
Offset
Kafka 的每个 分区(partition) 是一组有序、不可变的消息日志。每条消息在这个日志中都有一个递增的编号,这个编号就是 offset。
- 每个分区内部唯一、顺序递增
- 不会因消费者消费而删除或修改
Kafka 不记录谁消费了哪条消息,消费者自己决定从哪个 offset 开始消费。
消费组
消费组就是一组消费者的集合,它们协作消费一个或多个 topic 的数据。
- 一个分区只会被一个组内成员消费。
- 横向扩展 同一个 topic 的多个分区可以被多个消费者同时处理,提升吞吐量
- 不重复消费 同一个 group 内,一个分区只会被一个消费者消费
Kafka 为 每个消费组的每个分区 单独维护一个 offset,用来记录该 group 消费到哪了。
在设计时,应让消费组内的消费者数 小于或等于 topic 的分区数,才能确保每个分区被处理且资源不浪费。
Kafka 消费组在实际业务中的作用是:让不同的系统模块独立、安全、高效地消费同一个 topic 的消息流,既支持并发消费,又保证逻辑解耦和数据一致性。
一些配置
acks
- 0 : producer 不等待任何响应,直接认为发送成功
- 1 : 只要 leader broker 写入成功,就返回成功
- all : 所有 ISR 副本都确认写入,producer 才认为成功
消费者确认
- enable.auto.commit 是否由 Kafka consumer 自动周期性地提交 offset。
- auto.commit.interval.ms 当 enable.auto.commit=true 时,这个值决定 Kafka 多久自动提交一次 offset
- auto.offset.reset 当 Kafka 中 找不到消费者的 offset(比如首次消费、offset 过期)时,应该从哪里开始读
auto.offset.reset
当消费者所属的 group 在某个 topic 分区上没有已提交的 offset,或者该 offset 已过期不可用时,Kafka 会参考 auto.offset.reset 的值决定从哪开始消费。
什么是已过期不可用,当 Kafka 已经清理掉了某个分区中早期的消息,而你的消费者还试图从那些已经被删除的 offset 开始消费时,就会出现 “offset 已过期” 的情况。
值 | 含义 | 场景举例 |
---|---|---|
earliest | 从 最早的可用消息开始(即 offset = 0) | 第一次启动消费者,想读取全部历史数据 |
latest | 从 最新的消息开始(忽略历史) | 实时处理,不关心旧数据 |
none | 如果没有 offset,就报错 | 强制使用提交过的 offset,适合事务 |
核心语义
- 当我们说 Kafka 的 “at most once” 或 “at least once” 语义时,默认是指从 Kafka 到消费者 的消息交付语义,不是整个 end-to-end 流程(即 producer → Kafka → consumer)。
- 当我们说“exactly once” 时,Kafka能实现到的是从生产者到Kafka这一段,要实现全流程的话,需要业务端配合。
At most once
- 消息最多处理一次,如果失败就丢弃,不会重试。
- 开启 enable.auto.commit=true(默认)
- Kafka 在 消费之前就提交 offset
- 如果消费或处理失败,offset 已经提交,不会再拉取那条消息
At least once
- 消息至少被处理一次,可能被重复处理(例如消费失败后重试)
- 设置 enable.auto.commit=false
- 应用处理完消息后,手动提交 offset(commitSync())
- 如果处理成功之前崩溃,offset 没提交,Kafka 会 再次发送那条消息
Exactly once
- 每条消息只被处理一次,不多不少、不丢不重
- 需要 生产端 + 消费端 + Kafka 自身 三者配合:
- 生产端:
- 设置:enable.idempotence=true
- 幂等写入,Kafka 自动去重(PID + seq)
- 消费 + 生产联动(读处理写场景):
- 使用 transactional.id 启动事务性 producer
- 读 offset + 写消息 → 在同一个事务中提交
- 使用 commitTransaction() 实现写入+offset原子提交
- 消费端读取配置:
- 设置:isolation.level=read_committed,只读事务提交的数据
幂等保证了同一个消息重发不会被记录两次,事务保证了类似转账之类的需求产生的消息会原子写入,但是最终消费者如何处理,处理后ACK是否正常响应,是达成全链路Exactly Once的关键。
而这里恰恰是最难的地方,确认消息被消费和业务端的业务提交如何绑定,比较简单的做法是在事务中记录消息的offset,下次再有消息过来时进行验证。
性能原理
Kafka 的每个 broker 是高度多线程架构,不同类型的线程(网络、I/O、副本同步、日志清理)协同工作,支持高并发、高吞吐,同时保持分区级顺序性。
顺序追加写(Append-only log)——极高磁盘写入性能
- 不可更新,只能追加
- 每条消息都直接写入一个连续日志文件尾部(顺序磁盘写)
零拷贝(Zero-Copy)+ Page Cache
传统 I/O 消息发送流程
- 数据从磁盘读取 → 拷贝到内核缓冲区(kernel space)
- 再拷贝到应用缓冲区(user space)
- 应用处理 → 发送 → 拷贝回内核缓冲区
- 最终发到网卡(socket buffer)
Kafka 使用零拷贝的流程
- Kafka 直接调用 sendfile(fd_disk, fd_socket)
- 内核直接从磁盘页缓存 → 拷贝到 socket buffer
- 不经过用户空间,无需多次拷贝
Page Cache 是 Linux 的文件系统缓存,Kafka 并不会每次都从磁盘真实读写数据,而是尽量操作内存缓存。
步骤 | 说明 |
---|---|
Producer 发来消息 | Kafka broker 接收并写入日志(append-only) |
Kafka 写入 .log 文件 | 实际是写入 Page Cache(内存),不是直接写磁盘 |
异步刷盘 | Kafka 定期刷 Page Cache 到磁盘(由 OS 或 Kafka 控制) |
批量写入机制(Batching)
- Producer 默认聚合消息再发送(linger.ms, batch.size)
- Broker 也按 batch 写入磁盘、刷写日志
I/O 多线程 + 后台刷盘
- 数据写入后并不是立刻刷到磁盘,而是先写 OS page cache,再异步刷盘
内存缓冲 + 压缩(可选)
Kafka 支持 Snappy、LZ4 等压缩算法,可在网络层或写入磁盘前压缩,节省 I/O。类似gzip,但是注意减少I/O的时候也增加了计算消耗。
商业转载请联系站长获得授权,非商业转载请注明本文出处及文章链接,您可以自由地在任何媒体以任何形式复制和分发作品,也可以修改和创作,但是分发衍生作品时必须采用相同的许可协议。