Kafka 概念及用法

QuickStart

根据 https://hub.docker.com/r/apache/kafka 上的说明,使用docker compose进行集群部署:

services:
  controller-1:
    image: apache/kafka:latest
    container_name: controller-1
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: controller
      KAFKA_LISTENERS: CONTROLLER://:9093
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
 
  controller-2:
    image: apache/kafka:latest
    container_name: controller-2
    environment:
      KAFKA_NODE_ID: 2
      KAFKA_PROCESS_ROLES: controller
      KAFKA_LISTENERS: CONTROLLER://:9093
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
 
  controller-3:
    image: apache/kafka:latest
    container_name: controller-3
    environment:
      KAFKA_NODE_ID: 3
      KAFKA_PROCESS_ROLES: controller
      KAFKA_LISTENERS: CONTROLLER://:9093
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
 
  broker-1:
    image: apache/kafka:latest
    container_name: broker-1
    ports:
      - 29092:9092
    environment:
      KAFKA_NODE_ID: 4
      KAFKA_PROCESS_ROLES: broker
      KAFKA_LISTENERS: 'PLAINTEXT://:19092,PLAINTEXT_HOST://:9092'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker-1:19092,PLAINTEXT_HOST://localhost:29092'
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
    depends_on:
      - controller-1
      - controller-2
      - controller-3
 
  broker-2:
    image: apache/kafka:latest
    container_name: broker-2
    ports:
      - 39092:9092
    environment:
      KAFKA_NODE_ID: 5
      KAFKA_PROCESS_ROLES: broker
      KAFKA_LISTENERS: 'PLAINTEXT://:19092,PLAINTEXT_HOST://:9092'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker-2:19092,PLAINTEXT_HOST://localhost:39092'
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
    depends_on:
      - controller-1
      - controller-2
      - controller-3
 
  broker-3:
    image: apache/kafka:latest
    container_name: broker-3
    ports:
      - 49092:9092
    environment:
      KAFKA_NODE_ID: 6
      KAFKA_PROCESS_ROLES: broker
      KAFKA_LISTENERS: 'PLAINTEXT://:19092,PLAINTEXT_HOST://:9092'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker-3:19092,PLAINTEXT_HOST://localhost:49092'
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
    depends_on:
      - controller-1
      - controller-2
      - controller-3

注意Kafka在4.0+后引入了KRaft模式,该模式去掉了zookeeper的依赖,以后会成为主流。这里有两个角色:Controller负责管理,Broker负责实际业务。注意因为是Raft,任意时刻也只有一个Controller在工作。

常见用法

  1. 创建Topic

    kafka-topics.sh --bootstrap-server localhost:29092 --create --topic my-cluster-topic --partitions 3 --replication-factor 2
  2. 查询Topic

    kafka-topics.sh --bootstrap-server localhost:29092 --describe --topic my-cluster-topic
  3. 删除Topic

    kafka-topics.sh --bootstrap-server localhost:29092 --delete --topic my-cluster-topic
  4. 生产消息

    kafka-console-producer.sh --bootstrap-server localhost:29092 --topic my-cluster-topic

    如果需要带上key的话,可以加上,那么输入消息的时候结构为key:val

    --property "parse.key=true" --property "key.separator=:"
  5. 消费消息

    kafka-console-consumer.sh --bootstrap-server localhost:29092 --topic my-cluster-topic --from-beginning

    如果需要打印key的话

    --property print.key=true --property key.separator=:

    如果需要打印offset

    --property print.offset=true

消费组测试

首先安装对应依赖

pip3 install kafka-python

用Python脚本定时产生消息

from kafka import KafkaProducer
import time
import random
 
producer = KafkaProducer(
    bootstrap_servers='localhost:29092',
    key_serializer=str.encode,
    value_serializer=str.encode
)
 
i = 0
while True:
    key = f"user-{random.randint(0, 9)}"
    value = f"message-{i}"
    producer.send('my-cluster-topic', key=key, value=value)
    print(f"Produced: {key}:{value}")
    i += 1
    time.sleep(0.5)

终端启动消费者,分配对应group

kafka-console-consumer.sh --bootstrap-server localhost:29092 --topic my-cluster-topic --group groupA --property print.partition=true

查看消费组内每个成员分配到的partition

./kafka-consumer-groups.sh --bootstrap-server localhost:29092 --describe --group groupA

有几个核心字段

  • LOG-END-OFFSET Kafka 当前写到的位置
  • CURRENT-OFFSET 消费者已经消费并提交到的位置
  • LAG 尚未消费的消息,高LAG就有问题了

一些概念

什么是消息的key

Kafka 中的 消息 key(键) 是消息的一个可选字段,它在某些场景下非常重要,尤其是在分区路由、消息去重和日志压缩时。Kafka 中的每条消息都有如下结构:ProducerRecord<K, V>

  • 决定消息路由到哪个分区: 用户 ID 作为 key,可以保证每个(同一)用户的行为消息都被写入同一个分区,这样消费时可以按顺序处理。
    • 缺点: 分区负载可能不均衡, 增加分区数时不能保持之前的 hash 结果
  • 启用日志压缩(cleanup.policy=compact)时使用
  • 用于幂等写入/去重(少见)

消息什么时候清理

两种策略,参考文档 https://kafka.apache.org/documentation/ ,可以做delete或者compact。compact就根据key只保存最后一条,delete就按时间或者体积来清理。

消息是怎么写入的

如果一个 topic 有多个分区,这些分区所在的 broker 都可以分别处理写操作,但前提是:每条消息只会写入一个分区的 leader。

Kafka 的 多个分区可以分布在多个 broker 上,各自的 leader broker 会并发接收写请求。Producer 会根据分区路由逻辑,把每条消息写入对应分区的 leader 上。

幂等

Kafka可以通过--producer-property enable.idempotence=true 来启用幂等。

  1. Kafka broker 会为每个幂等 producer 分配一个唯一的 producer ID
  2. 每个 partition 有自己的 消息序号(sequence number)。每发一条消息,序号 +1。
  3. Broker 维护每个 partition 的 (PID, sequence number) 状态:如果收到了重复的 sequence(已经写过了),会丢弃;如果是正常的下一个 sequence,才会写入。

同时存在一些问题:

  1. 幂等性只能保证 producer 到 broker 的一次性写入不能跨多个 session(比如重启) 保证“最终去重”。
  2. 如果 producer 重启,它会获取新的 PID,之前的幂等性上下文会丢失。
  3. 幂等性不等于消费端去重。如果你消费端重复处理数据,那是你的业务逻辑要处理的。

ISR

ISR(In-Sync Replicas,同步副本集合):一个分区中,和 leader 副本保持同步的 follower 副本集合。如果follower落后太多,会被踢出。

批量发送和拉取

提高Kafka的Producer和Consumer的性能。

Offset

Kafka 的每个 分区(partition) 是一组有序、不可变的消息日志。每条消息在这个日志中都有一个递增的编号,这个编号就是 offset

  • 每个分区内部唯一、顺序递增
  • 不会因消费者消费而删除或修改

Kafka 不记录谁消费了哪条消息,消费者自己决定从哪个 offset 开始消费

消费组

消费组就是一组消费者的集合,它们协作消费一个或多个 topic 的数据。

  • 一个分区只会被一个组内成员消费。
  • 横向扩展 同一个 topic 的多个分区可以被多个消费者同时处理,提升吞吐量
  • 不重复消费 同一个 group 内,一个分区只会被一个消费者消费

Kafka 为 每个消费组的每个分区 单独维护一个 offset,用来记录该 group 消费到哪了。

在设计时,应让消费组内的消费者数 小于或等于 topic 的分区数,才能确保每个分区被处理且资源不浪费。

Kafka 消费组在实际业务中的作用是:让不同的系统模块独立、安全、高效地消费同一个 topic 的消息流,既支持并发消费,又保证逻辑解耦和数据一致性。

一些配置

acks

  • 0 : producer 不等待任何响应,直接认为发送成功
  • 1 : 只要 leader broker 写入成功,就返回成功
  • all : 所有 ISR 副本都确认写入,producer 才认为成功

消费者确认

  • enable.auto.commit 是否由 Kafka consumer 自动周期性地提交 offset
  • auto.commit.interval.ms 当 enable.auto.commit=true 时,这个值决定 Kafka 多久自动提交一次 offset
  • auto.offset.reset 当 Kafka 中 找不到消费者的 offset(比如首次消费、offset 过期)时,应该从哪里开始读

auto.offset.reset

当消费者所属的 group 在某个 topic 分区上没有已提交的 offset,或者该 offset 已过期不可用时,Kafka 会参考 auto.offset.reset 的值决定从哪开始消费。

什么是已过期不可用,当 Kafka 已经清理掉了某个分区中早期的消息,而你的消费者还试图从那些已经被删除的 offset 开始消费时,就会出现 “offset 已过期” 的情况。

含义场景举例
earliest最早的可用消息开始(即 offset = 0)第一次启动消费者,想读取全部历史数据
latest最新的消息开始(忽略历史)实时处理,不关心旧数据
none如果没有 offset,就报错强制使用提交过的 offset,适合事务

核心语义

  • 当我们说 Kafka 的 “at most once”“at least once” 语义时,默认是指从 Kafka 到消费者 的消息交付语义,不是整个 end-to-end 流程(即 producer → Kafka → consumer)。
  • 当我们说“exactly once” 时,Kafka能实现到的是从生产者到Kafka这一段,要实现全流程的话,需要业务端配合。

At most once

  • 消息最多处理一次,如果失败就丢弃,不会重试。
  • 开启 enable.auto.commit=true(默认)
  • Kafka 在 消费之前就提交 offset
  • 如果消费或处理失败,offset 已经提交,不会再拉取那条消息

At least once

  • 消息至少被处理一次,可能被重复处理(例如消费失败后重试)
  • 设置 enable.auto.commit=false
  • 应用处理完消息后,手动提交 offset(commitSync())
  • 如果处理成功之前崩溃,offset 没提交,Kafka 会 再次发送那条消息

Exactly once

  • 每条消息只被处理一次,不多不少、不丢不重
  • 需要 生产端 + 消费端 + Kafka 自身 三者配合
  • 生产端:
    • 设置:enable.idempotence=true
    • 幂等写入,Kafka 自动去重(PID + seq)
  • 消费 + 生产联动(读处理写场景):
    • 使用 transactional.id 启动事务性 producer
    • 读 offset + 写消息 → 在同一个事务中提交
    • 使用 commitTransaction() 实现写入+offset原子提交
  • 消费端读取配置:
    • 设置:isolation.level=read_committed,只读事务提交的数据

幂等保证了同一个消息重发不会被记录两次,事务保证了类似转账之类的需求产生的消息会原子写入,但是最终消费者如何处理,处理后ACK是否正常响应,是达成全链路Exactly Once的关键。

而这里恰恰是最难的地方,确认消息被消费和业务端的业务提交如何绑定,比较简单的做法是在事务中记录消息的offset,下次再有消息过来时进行验证。

性能原理

Kafka 的每个 broker 是高度多线程架构,不同类型的线程(网络、I/O、副本同步、日志清理)协同工作,支持高并发、高吞吐,同时保持分区级顺序性。

顺序追加写(Append-only log)——极高磁盘写入性能

  • 不可更新,只能追加
  • 每条消息都直接写入一个连续日志文件尾部(顺序磁盘写)

零拷贝(Zero-Copy)+ Page Cache

传统 I/O 消息发送流程

  1. 数据从磁盘读取 → 拷贝到内核缓冲区(kernel space)
  2. 再拷贝到应用缓冲区(user space)
  3. 应用处理 → 发送 → 拷贝回内核缓冲区
  4. 最终发到网卡(socket buffer)

Kafka 使用零拷贝的流程

  1. Kafka 直接调用 sendfile(fd_disk, fd_socket)
  2. 内核直接从磁盘页缓存 → 拷贝到 socket buffer
  3. 不经过用户空间,无需多次拷贝

Page Cache 是 Linux 的文件系统缓存,Kafka 并不会每次都从磁盘真实读写数据,而是尽量操作内存缓存。

步骤说明
Producer 发来消息Kafka broker 接收并写入日志(append-only)
Kafka 写入 .log 文件实际是写入 Page Cache(内存),不是直接写磁盘
异步刷盘Kafka 定期刷 Page Cache 到磁盘(由 OS 或 Kafka 控制)

批量写入机制(Batching)

  • Producer 默认聚合消息再发送(linger.ms, batch.size)
  • Broker 也按 batch 写入磁盘、刷写日志

I/O 多线程 + 后台刷盘

  • 数据写入后并不是立刻刷到磁盘,而是先写 OS page cache,再异步刷盘

内存缓冲 + 压缩(可选)

Kafka 支持 Snappy、LZ4 等压缩算法,可在网络层或写入磁盘前压缩,节省 I/O。类似gzip,但是注意减少I/O的时候也增加了计算消耗。

商业转载请联系站长获得授权,非商业转载请注明本文出处及文章链接,您可以自由地在任何媒体以任何形式复制和分发作品,也可以修改和创作,但是分发衍生作品时必须采用相同的许可协议。

本文采用CC BY-NC-SA 4.0 - 非商业性使用 - 相同方式共享 4.0 国际进行许可。