Kafka

思维脑图

项目涉及源码

消息队列概述

两种工作模式

点对点模式

一对一,消费者主动拉取数据,消息收到后消息清除

在点对点消息队列(Point-to-Point, P2P)模式中,消息生产者将消息发送至特定的队列(Queue),而消息消费者则从该队列中主动拉取并处理消息。每个消息只能被一个消费者接收和处理。一旦消费者成功处理了消息,该消息就会从队列中被移除,确保不会被其他消费者再次处理。虽然一个队列可以拥有多个消费者,每条消息只能被其中一个消费者接收。此模式确保每条消息都会被精确地处理一次,避免重复或遗漏的情况发生。

发布/订阅模式

一对多,消费者消费数据之后不会清理消息,有时间限制

在消息队列的发布-订阅(Publish-Subscribe)模式中,消息生产者(或称发布者)将消息发送到一个称作“主题”(Topic)的通道中。与点对点模式不同的是,所有订阅了该主题的消费者(或称订阅者)都有能力接收到这些消息。重要的是,一条发布到主题的消息会被所有活跃的订阅者消费,实现了一条消息能够同时被多个消费者接收的效果。此模式支持一条消息被多次处理,由不同的消费者分别进行处理和响应。发布到主题的消息通常会在所有订阅者消费完成后保留一段预定的时间或直至满足某些条件后才被清理。

kafka 是发布订阅模式,只有一个消费者组就类似点对点模式,多个消费者组就是发布/订阅模式,但是不会马上删除数据,需要配合配置文件

两种获取消息的方式

推模式

在推模式中,当 Broker 接收到 Producer 的消息后,它会主动地将这些消息推送给 Consumer。具体来说,客户端(Consumer)与服务端(Broker)之间会建立一个持久化的连接。当Broker有新的消息时,它会直接通过这个已建立的连接推送消息到客户端,从而确保客户端可以实时接收到最新的消息。

优点

  • 实时性:由于消息是被主动推送的,客户端能够快速地接收到最新的消息,确保了消息的实时传递。
  • 客户端实现简单:只需要监听服务端的推送消息

缺点

  • 无法应对不同能力的消费者:每个客户端的消费能力是不同的,如果简单粗暴进行消息推送就会出现 消息堆积 而引发宕机

拉模式

在拉模式中,Consumer 会主动向 Broker 请求消息,而不是靠Broker将消息推送给它。这意味着Consumer决定何时接收消息,并控制获取消息的频率。

长轮询:客户端向服务端发起请求,如果此时有数据就直接返回,如果没有数据就保持连接,等到有数据时就直接返回。如果一直没有数据,超时后客户端再次发起请求,保持连接

优点

  • 避免消息堆积:由于Consumer控制消息的拉取时机,它可以确保在上一批消息处理完成后再拉取新的消息,从而避免在客户端出现消息堆积。
  • 长轮询实现的拉模式实时性也能够保证消息时效性

缺点

  • 实现复杂性:客户端需要维护拉取消息的逻辑和处理潜在的超时/重试情况,又要考虑消息的时效性和避免忙等

kafka consumerbroker拉取消息

Kafka概念

定义

Kafka传统定义: Kafka 通常被定义为一个分布式的、基于发布/订阅模式的消息队列系统,它在大数据和实时分析领域具有广泛的应用。

发布/订阅:消息发布者(Producer)不会直接将消息发送到特定的接收者(Consumer)。相反,发布者将消息发送到一个中间层,通常是一个“主题”(Topic),而订阅者则从这个主题订阅它们感兴趣的消息。这种解耦的方式使得生产者和消费者可以独立地扩展和演变。

Kafka最新定义:如今通常被认为是一个开源的分布式事件流平台,它不仅仅能处理高吞吐量的事件流数据,也能支持高性能的数据管道、数据集成、实时分析和关键任务应用。其灵活的架构使其在全球范围内的数千家公司中被用于构建多种应用场景的事件驱动解决方案。

优缺点

优点

  • 高性能与高吞吐量:Kafka展现了卓越的性能表现和高吞吐量,单机事务吞吐量可达到百万条/秒级别。
  • 高可用性:作为一个分布式系统,Kafka通过数据副本保证了高可用性和数据的持久性。即使部分节点宕机,由于多副本的存在,数据仍不会丢失,系统依然可用。
  • 主动拉取消息:消费者采用Pull(拉取)方式获取消息,支持消息有序消费,并能通过控制策略确保消息被消费一次且仅被消费一次。
  • 生态支持:Kafka拥有稳定的社区支持,以及丰富的第三方工具,例如Kafka Web管理界面EFAK。在日志处理和大数据实时处理领域(如Flink、CDC、ETL等)具有广泛的应用。
  • 功能定位:虽然功能相对简洁,主要专注于消息队列(MQ)功能,但在大数据领域,这种定位使其在实时计算和日志采集等应用场景下表现卓越。

缺点

  • 再平衡延迟:消费者组成员的变化可能导致再平衡,这会引入额外的延迟。
  • 实时性与轮询策略:消费者使用轮询方式获取消息,因此消息的实时性会受轮询间隔时间的影响。
  • 不支持自动重试:在消费消息失败的场景下,Kafka本身不提供自动重试的机制。
  • 消息顺序问题:保证单分区内消息有序,但是不保证多分区间的消息有序。

应用场景

缓存/消峰

Kafka 能够作为一个高效的缓冲层,协助控制和优化数据在系统间的流动,尤其适用于生产消息的速度和消费消息的处理速度不匹配的场景。以秒杀功能为例:在高流量场景下,如用户请求量超过服务器的处理能力,可以利用Kafka进行秒杀请求的缓存。服务器根据其处理能力,稳定地从队列中拉取数据进行处理,从而有效地实现流量的削峰。

解耦

Kafka 在系统解耦方面表现卓越,特别是在需要异构数据同步的上下文中。例如,在一个需要从多个服务系统中收集日志数据、并将数据分发到不同数据库进行各异分析的日志收集系统中。通过Kafka进行数据交流和传递,不同的服务和处理模块可以解耦合,保持独立运行和演进,只需确保数据的消费处理逻辑保持一致。

异步通信

Kafka 支持异步通信模型,允许生产者将消息发布到队列中而无需立即进行处理。消费者可以根据自身的处理能力,在适当的时候从队列中拉取消息进行处理。一个常见的应用场景是电商平台的订单通知系统:当用户下单成功时,下单信息被发送到Kafka,短信通知服务作为消费者异步从Kafka中获取这些信息,进而在可控的速度和时间内,发送订单确认短信给用户。

数据聚合

Kafka 可以用于收集来自多个源的数据,将其聚合在一处,方便进行数据分析和信息提取。比如多个服务的操作日志可以汇集至Kafka,再由专门的日志分析服务消费,实现集中式的日志管理和分析。

事件驱动架构

在事件驱动架构中,Kafka 常作为事件传递的中介,各个服务发布和订阅事件,从而实现低耦合的交互和协作。通过精细定义事件的种类和格式,服务可以轻松响应其他服务的状态变化或请求,而无需直接交互。

Kafka架构

几个重要配置参数

Broker

定义

  • 消息处理中心:Kafka Broker(服务实例) 接收来自生产者的消息,存储这些消息,并处理消费者的读取请求。
  • 分布式节点:在Kafka的分布式环境中,Broker是作为独立节点运行的服务器或一组服务器。

核心功能

  • 数据存储:Broker负责将生产者发送的消息持久化到磁盘上,确保数据的安全性。
  • 数据提取:Broker允许消费者读取存储的数据,支持并行读取。
  • 分区管理:Kafka的数据存储在不同的Topic中,每个Topic可以被分成多个分区。Broker管理这些分区的数据,每个分区可以在不同的Broker上。
  • 副本管理:为了提高数据的可用性和耐用性,Kafka的数据通常会在多个Broker上进行复制。Broker负责管理这些副本,并在必要时进行故障转移。
  • 协调和同步:在多Broker环境下,Broker间会协调并同步消息和事务状态。

Topic

在Kafka中,消息被归类到不同的 topic。每条发送到Kafka集群的消息都应指定一个 topic。默认情况下,topic中的数据保留7天(168小时),但这是可以配置的。

Topic  在Kafka中是逻辑上的分区,而其下的 partition 则代表物理上的分区。

特殊Topic

特殊Topic

从Kafka 0.10.x版本开始,消费者的 offset 信息默认存储在内置的 __consumer_offsets topic中。当消费者首次尝试消费Kafka数据时,这个特殊的topic会自动被创建。需要注意的是,该topic的副本数不受集群的默认topic副本数配置的影响,而它的默认分区数为50,但这也是可以调整的。

Partition

Topic -> Partition -> 副本

Partition(分区)是用于实现消息在处理时的并行化的基本单位(物理分区)。每个Topic可以被分成一个或多个Partition,每个Partition可以存在于多个节点上以提供数据冗余,以确保数据在某些节点失败的情况下仍然可用。这样,Partition既能提供系统的横向扩展性,也提供了数据的高可用性。

核心功能

  • 并行处理:通过将Topic分区,Kafka可以在多个broker(节点)上并行处理数据,也允许在多个消费者之间并行处理数据。
  • 数据持久化:每个Partition都是一个有序的、不可变的记录集,并且可以持久地存储到磁盘上。每个在Partition中的消息都会被分配一个唯一的、递增的ID号,称为“offset”。
  • 水平扩展:通过增加Partition数量,Kafka能够水平扩展处理更多的消息。
  • 复制:Partition也支持复制,以防止数据丢失。每个Partition会有一个Leader和零个或多个Follower。所有的读写操作都由Leader处理,而Follower用来在Leader失败时提供冗余备份。
  • 消费者组内的并行消费:消费者组中的每个消费者实例都会消费Topic的一个或多个Partition的数据。一个Partition在一个消费者组内只会被一个消费者实例消费,从而实现了在消费者组内的消息处理的负载均衡。

Partition数量只能增加不能减少

Replica

Kafka中的每个分区(Partition)可以有一个或多个副本,这些副本分散在集群中的不同集群节点上,以实现数据的持久化和高可用。

Leader

在Kafka中,每个分区都有一个并且只有一个 Leader 副本,它负责处理该分区的所有读写操作。虽然一个Topic可能包含多个分区,但每个分区的Leader副本都是独立且唯一的。

Follower

Follower 副本主要用于备份和同步Leader副本中的数据。在Leader副本出现故障时,Kafka会从Follower副本中选举一个新的Leader来保证高可用性。

ISR

表示和 Leader 保持同步的副本集合,主节点宕机可作为备选节点。

OSR

表示 Follower 与 Leader 副本同步时,延迟过多的副本集合。

Producer

生产者(Producer)是消息生产和发布的实体。它负责将应用程序的消息发布到指定的 Kafka 主题。

  • main线程

    主线程主要负责将消息(Records)封装到一个 ProducerRecord 对象中,并发送给内部的 RecordAccumulator

    • 序列化: 将 Key 和 Value 对象序列化为字节数组。

    • 分区: 如果 ProducerRecord 中没有指定 Partition,Sender 线程需要通过 Partitioner 来决定消息将发送到 Topic 的哪个 Partition 上。

  • RecordAccumulator(内存缓冲区)

    RecordAccumulator 是一个消息的内存缓冲区。当主线程调用 Producer 的 send() 方法发送消息时,消息实际上被先存储在此缓冲区中。它会尽可能地批量处理这些消息(即尽可能地将多个消息打包在一个 batch 中)以提高效率。当 batch 满了或经过一定时间(linger.ms 属性设定的时间)后,消息会被发送到目标分区(Partition)。

  • send线程

    Sender 线程是 Kafka 生产者的核心,它从 RecordAccumulator 中取出消息并负责将消息发送到 Kafka Broker 上对应的 Partition 中。

    • 压缩: 根据配置,Sender 线程可能会将几个消息压缩到一起发送以节省带宽。
    • 发送: Sender 线程将消息发送到 Broker 并等待 Broker 的确认。
    • 重试: 如果消息发送失败,根据生产者的配置(例如 retriesretry.backoff.ms 等参数),Sender 线程可能会进行一定次数的重试。
    • 回调: 如果消息发送成功或者最终失败,Sender 线程将调用回调方法(如果在发送消息时提供了回调函数)。

ConsumerGroup

在Kafka中,消费者组是一种机制,使多个消费者可以协作处理同一个主题的消息。消费者组内的每个消费者负责读取该主题的不同分区,确保每条消息只被消费者组中的一个消费者消费一次。

  • 消费者组内的再分配:当消费者组中的消费者数量发生变化(例如新的消费者加入或现有的消费者离开)时,Kafka会重新分配主题分区给消费者。这种再分配确实会对系统造成一些开销,但它允许Kafka的消费模式具有弹性和容错性。
  • 分区和消费者的数量关系:为了确保每个消费者都有数据可以读取,一个主题的分区数量应当大于或等于消费者组中消费者的数量。
  • 消费者和消费者组:每个消费者都属于一个消费者组,并可以为每个消费者指定一个组名。如果不指定组名,消费者会属于默认组。一条消息可以被多个消费者组中的消费者读取,但在一个消费者组内,一条消息只会被一个消费者读取。

Consumer

消费者在Kafka中负责从Broker读取消息。Kafka采用的是发布-订阅模式,但与传统的发布-订阅系统不同的是,Kafka中的订阅者是消费者组而非单一的消费者实例。在消费者组内,每条消息只会被一个消费者实例处理。但值得注意的是,不同的消费者组可以独立地并行消费同一条消息。

Kafka集群搭建

java8环境安装

1
2
sudo apt-get update
sudo apt-get install openjdk-8-jdk

下载并解压

kafka最新版本下载

新版本Kafka不需要额外安装zookeeper,使用内置zookeeper搭建集群。目前还支持kraft方式部署,不过目前版本还未稳定故不选择

下载kafka_2.12-3.2.0.tgz(或目前最新版本)

1
2
3
4
5
6
7
8
9
# 创建目录
mkdir /data/kafka/k1
mkdir /data/kafka/k2
mkdir /data/kafka/k3

# 解压到创建目录
tar -zvxf kafka_2.12-3.2.0.tgz -C /data/kafka/k1/
tar -zvxf kafka_2.12-3.2.0.tgz -C /data/kafka/k2/
tar -zvxf kafka_2.12-3.2.0.tgz -C /data/kafka/k3/

zookeeper存储位置配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 创建zookeeper数据存储目录
mkdir /data/kafka/k1/kafka_2.12-3.2.0/zookeeper
mkdir /data/kafka/k1/kafka_2.12-3.2.0/zookeeper/data
mkdir /data/kafka/k1/kafka_2.12-3.2.0/zookeeper/log

mkdir /data/kafka/k2/kafka_2.12-3.2.0/zookeeper
mkdir /data/kafka/k2/kafka_2.12-3.2.0/zookeeper/data
mkdir /data/kafka/k2/kafka_2.12-3.2.0/zookeeper/log

mkdir /data/kafka/k3/kafka_2.12-3.2.0/zookeeper
mkdir /data/kafka/k3/kafka_2.12-3.2.0/zookeeper/data
mkdir /data/kafka/k3/kafka_2.12-3.2.0/zookeeper/log

# 创建myid文件
echo 1 > /data/kafka/k1/kafka_2.12-3.2.0/zookeeper/data/myid
echo 2 > /data/kafka/k2/kafka_2.12-3.2.0/zookeeper/data/myid
echo 3 > /data/kafka/k3/kafka_2.12-3.2.0/zookeeper/data/myid

zookeeper配置修改

将三个目录下的zookeeper配置文件分别修改下文三个配置

  • /data/kafka/k1/kafka_2.12-3.2.0/config/zookeeper.properties
  • /data/kafka/k2/kafka_2.12-3.2.0/config/zookeeper.properties
  • /data/kafka/k3/kafka_2.12-3.2.0/config/zookeeper.properties
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#心跳时间2秒 
tickTime=2000
#Follower跟随者服务器与Leader领导者服务器之间初始化连接时能容忍的最多心跳数10*tickTime
initLimit=10
#集群中Leader与Follower之间的最大响应时间单位5*tickTime
syncLimit=5
#存储快照文件 snapshot 的目录。默认情况下,事务日志也会存储在这里。建议同时配置参数dataLogDir, 事务日志的写性能直接影响zk性能
dataDir=/data/kafka/k1/kafka_2.12-3.2.0/zookeeper/data
#事务日志输出目录。尽量给事务日志的输出配置单独的磁盘或是挂载点,这将极大的提升ZK性能
dataLogDir=/data/kafka/k1/kafka_2.12-3.2.0/zookeeper/log
#zookeeper端口
clientPort=2181
#单个客户端与单台服务器之间的连接数的限制,是ip级别的,默认是60,如果设置为0,那么表明不作任何限制
maxClientCnxns=60
#server.1代表一台服务器的编号,第一个端口为集群通讯端口,第二个端口代表Leader选举的端口
server.1=127.0.0.1:2881:3881
server.2=127.0.0.1:2882:3882
server.3=127.0.0.1:2883:3883
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#心跳时间2秒 
tickTime=2000
#Follower跟随者服务器与Leader领导者服务器之间初始化连接时能容忍的最多心跳数10*tickTime
initLimit=10
#集群中Leader与Follower之间的最大响应时间单位5*tickTime
syncLimit=5
#存储快照文件 snapshot 的目录。默认情况下,事务日志也会存储在这里。建议同时配置参数dataLogDir, 事务日志的写性能直接影响zk性能
dataDir=/data/kafka/k2/kafka_2.12-3.2.0/zookeeper/data
#事务日志输出目录。尽量给事务日志的输出配置单独的磁盘或是挂载点,这将极大的提升ZK性能
dataLogDir=/data/kafka/k2/kafka_2.12-3.2.0/zookeeper/log
#zookeeper端口
clientPort=2182
#单个客户端与单台服务器之间的连接数的限制,是ip级别的,默认是60,如果设置为0,那么表明不作任何限制
maxClientCnxns=60
#server.1代表一台服务器的编号,第一个端口为集群通讯端口,第二个端口代表Leader选举的端口
server.1=127.0.0.1:2881:3881
server.2=127.0.0.1:2882:3882
server.3=127.0.0.1:2883:3883
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#心跳时间2秒 
tickTime=2000
#Follower跟随者服务器与Leader领导者服务器之间初始化连接时能容忍的最多心跳数10*tickTime
initLimit=10
#集群中Leader与Follower之间的最大响应时间单位5*tickTime
syncLimit=5
#存储快照文件 snapshot 的目录。默认情况下,事务日志也会存储在这里。建议同时配置参数dataLogDir, 事务日志的写性能直接影响zk性能
dataDir=/data/kafka/k3/kafka_2.12-3.2.0/zookeeper/data
#事务日志输出目录。尽量给事务日志的输出配置单独的磁盘或是挂载点,这将极大的提升ZK性能
dataLogDir=/data/kafka/k3/kafka_2.12-3.2.0/zookeeper/log
#zookeeper端口
clientPort=2183
#单个客户端与单台服务器之间的连接数的限制,是ip级别的,默认是60,如果设置为0,那么表明不作任何限制
maxClientCnxns=60
#server.1代表一台服务器的编号,第一个端口为集群通讯端口,第二个端口代表Leader选举的端口
server.1=127.0.0.1:2881:3881
server.2=127.0.0.1:2882:3882
server.3=127.0.0.1:2883:3883

编写启动脚本启动

  • /data/kafka/k1/kafka_2.12-3.2.0
  • /data/kafka/k1/kafka_2.12-3.2.0
  • /data/kafka/k1/kafka_2.12-3.2.0

zk-start.sh

1
2
#!/bin/bash
nohup bin/zookeeper-server-start.sh config/zookeeper.properties > logs/zookeeper/zookeeper.log 2>1 &
1
2
3
4
5
6
7
8
9
10
# 创建log目录
mkdir /data/kafka/k1/kafka_2.12-3.2.0/logs
mkdir /data/kafka/k1/kafka_2.12-3.2.0/logs/zookeeper
mkdir /data/kafka/k2/kafka_2.12-3.2.0/logs
mkdir /data/kafka/k2/kafka_2.12-3.2.0/logs/zookeeper
mkdir /data/kafka/k3/kafka_2.12-3.2.0/logs
mkdir /data/kafka/k3/kafka_2.12-3.2.0/logs/zookeeper

# 分别启动zk
. zk-start.sh

创建kafka数据目录

1
2
3
mkdir /data/kafka/k1/kafka_2.12-3.2.0/data
mkdir /data/kafka/k2/kafka_2.12-3.2.0/data
mkdir /data/kafka/k3/kafka_2.12-3.2.0/data

kafka配置修改

将三个目录下的kafka配置文件分别修改下文三个配置

  • /data/kafka/k1/kafka_2.12-3.2.0/config/server.properties
  • /data/kafka/k2/kafka_2.12-3.2.0/config/server.properties
  • /data/kafka/k3/kafka_2.12-3.2.0/config/server.properties
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
# broker的全局唯一编号,不能重复
broker.id=1

# 提供给客户端响应的地址和端口
listeners = PLAINTEXT://0.0.0.0:9091

# 提供给客户端响应的地址和端口(允许外网访问)
advertised.listeners=PLAINTEXT://127.0.0.1:9091


# 处理网络请求的线程数量,也就是接收消息的线程数。
# 接收线程会将接收到的消息放到内存中,然后再从内存中写入磁盘。
num.network.threads=3

# 消息从内存中写入磁盘是时候使用的线程数量。
# 用来处理磁盘IO的线程数量
num.io.threads=8

# 发送套接字的缓冲区大小
socket.send.buffer.bytes=102400

# 接受套接字的缓冲区大小
socket.receive.buffer.bytes=102400

# 请求套接字的缓冲区大小
socket.request.max.bytes=104857600

# kafka运行日志存放的路径
log.dirs=/data/kafka/k1/kafka_2.12-3.2.0/data

# topic在当前broker上的分片个数
num.partitions=3

# 我们知道segment文件默认会被保留7天的时间,超时的话就
# 会被清理,那么清理这件事情就需要有一些线程来做。这里就是
# 用来设置恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1

# 副本因子
# 实际使用过程中很多用户都会将offsets.topic.replication.factor设置成大于1的数以增加可靠性,这是推荐的做法
# 存储的消费者客户端offsets偏移量
offsets.topic.replication.factor=3

# 副本因子事物状态日志数量,存储事务明细
transaction.state.log.replication.factor=3

# 事物状态日志最小数量
transaction.state.log.min.isr=3

# segment文件保留的最长时间,默认保留7天(168小时),
# 超时将被删除,也就是说7天之前的数据将被清理掉。
log.retention.hours=168

# 日志文件中每个segment的大小,默认为1G
log.segment.bytes=1073741824

# 上面的参数设置了每一个segment文件的大小是1G,那么
# 就需要有一个东西去定期检查segment文件有没有达到1G,
# 多长时间去检查一次,就需要设置一个周期性检查文件大小
# 的时间(单位是毫秒)。
log.retention.check.interval.ms=300000

# 日志清理是否打开
log.cleaner.enable=true

# broker需要使用zookeeper保存meta数据
zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183

# zookeeper链接超时时间
zookeeper.connection.timeout.ms=18000

# 上面我们说过接收线程会将接收到的消息放到内存中,然后再从内存
# 写到磁盘上,那么什么时候将消息从内存中写入磁盘,就有一个
# 时间限制(时间阈值)和一个数量限制(数量阈值),这里设置的是
# 数量阈值,下一个参数设置的则是时间阈值。
# partion buffer中,消息的条数达到阈值,将触发flush到磁盘。
log.flush.interval.messages=10000

# 消息buffer的时间,达到阈值,将触发将消息从内存flush到磁盘,
# 单位是毫秒。
log.flush.interval.ms=3000

# 删除topic需要server.properties中设置delete.topic.enable=true否则只是标记删除
delete.topic.enable=true

# 防止成员加入请求后本应当即开启的rebalance
group.initial.rebalance.delay.ms=0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# broker的全局唯一编号,不能重复
broker.id=2

# 提供给客户端响应的地址和端口
listeners = PLAINTEXT://0.0.0.0:9092

# 提供给客户端响应的地址和端口(允许外网访问)
advertised.listeners=PLAINTEXT://127.0.0.1:9092

# 处理网络请求的线程数量,也就是接收消息的线程数。
# 接收线程会将接收到的消息放到内存中,然后再从内存中写入磁盘。
num.network.threads=3

# 消息从内存中写入磁盘是时候使用的线程数量。
# 用来处理磁盘IO的线程数量
num.io.threads=8

# 发送套接字的缓冲区大小
socket.send.buffer.bytes=102400

# 接受套接字的缓冲区大小
socket.receive.buffer.bytes=102400

# 请求套接字的缓冲区大小
socket.request.max.bytes=104857600

# kafka运行日志存放的路径
log.dirs=/data/kafka/k2/kafka_2.12-3.2.0/data

# topic在当前broker上的分片个数
num.partitions=3

# 我们知道segment文件默认会被保留7天的时间,超时的话就
# 会被清理,那么清理这件事情就需要有一些线程来做。这里就是
# 用来设置恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1

# 副本因子
# 实际使用过程中很多用户都会将offsets.topic.replication.factor设置成大于1的数以增加可靠性,这是推荐的做法
# 存储的消费者客户端offsets偏移量
offsets.topic.replication.factor=3

# 副本因子事物状态日志数量,存储事务明细
transaction.state.log.replication.factor=3

# 事物状态日志最小数量
transaction.state.log.min.isr=3

# segment文件保留的最长时间,默认保留7天(168小时),
# 超时将被删除,也就是说7天之前的数据将被清理掉。
log.retention.hours=168

# 日志文件中每个segment的大小,默认为1G
log.segment.bytes=1073741824

# 上面的参数设置了每一个segment文件的大小是1G,那么
# 就需要有一个东西去定期检查segment文件有没有达到1G,
# 多长时间去检查一次,就需要设置一个周期性检查文件大小
# 的时间(单位是毫秒)。
log.retention.check.interval.ms=300000

# 日志清理是否打开
log.cleaner.enable=true

# broker需要使用zookeeper保存meta数据
zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183

# zookeeper链接超时时间
zookeeper.connection.timeout.ms=18000

# 上面我们说过接收线程会将接收到的消息放到内存中,然后再从内存
# 写到磁盘上,那么什么时候将消息从内存中写入磁盘,就有一个
# 时间限制(时间阈值)和一个数量限制(数量阈值),这里设置的是
# 数量阈值,下一个参数设置的则是时间阈值。
# partion buffer中,消息的条数达到阈值,将触发flush到磁盘。
log.flush.interval.messages=10000

# 消息buffer的时间,达到阈值,将触发将消息从内存flush到磁盘,
# 单位是毫秒。
log.flush.interval.ms=3000

# 删除topic需要server.properties中设置delete.topic.enable=true否则只是标记删除
delete.topic.enable=true

# 防止成员加入请求后本应当即开启的rebalance
group.initial.rebalance.delay.ms=0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# broker的全局唯一编号,不能重复
broker.id=3

# 提供给客户端响应的地址和端口
listeners = PLAINTEXT://0.0.0.0:9093

# 提供给客户端响应的地址和端口(允许外网访问)
advertised.listeners=PLAINTEXT://127.0.0.1:9093

# 处理网络请求的线程数量,也就是接收消息的线程数。
# 接收线程会将接收到的消息放到内存中,然后再从内存中写入磁盘。
num.network.threads=3

# 消息从内存中写入磁盘是时候使用的线程数量。
# 用来处理磁盘IO的线程数量
num.io.threads=8

# 发送套接字的缓冲区大小
socket.send.buffer.bytes=102400

# 接受套接字的缓冲区大小
socket.receive.buffer.bytes=102400

# 请求套接字的缓冲区大小
socket.request.max.bytes=104857600

# kafka运行日志存放的路径
log.dirs=/data/kafka/k3/kafka_2.12-3.2.0/data

# topic在当前broker上的分片个数
num.partitions=3

# 我们知道segment文件默认会被保留7天的时间,超时的话就
# 会被清理,那么清理这件事情就需要有一些线程来做。这里就是
# 用来设置恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1

# 副本因子
# 实际使用过程中很多用户都会将offsets.topic.replication.factor设置成大于1的数以增加可靠性,这是推荐的做法
# 存储的消费者客户端offsets偏移量
offsets.topic.replication.factor=3

# 副本因子事物状态日志数量,存储事务明细
transaction.state.log.replication.factor=3

# 事物状态日志最小数量
transaction.state.log.min.isr=3

# segment文件保留的最长时间,默认保留7天(168小时),
# 超时将被删除,也就是说7天之前的数据将被清理掉。
log.retention.hours=168

# 日志文件中每个segment的大小,默认为1G
log.segment.bytes=1073741824

# 上面的参数设置了每一个segment文件的大小是1G,那么
# 就需要有一个东西去定期检查segment文件有没有达到1G,
# 多长时间去检查一次,就需要设置一个周期性检查文件大小
# 的时间(单位是毫秒)。
log.retention.check.interval.ms=300000

# 日志清理是否打开
log.cleaner.enable=true

# broker需要使用zookeeper保存meta数据
zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183

# zookeeper链接超时时间
zookeeper.connection.timeout.ms=18000

# 上面我们说过接收线程会将接收到的消息放到内存中,然后再从内存
# 写到磁盘上,那么什么时候将消息从内存中写入磁盘,就有一个
# 时间限制(时间阈值)和一个数量限制(数量阈值),这里设置的是
# 数量阈值,下一个参数设置的则是时间阈值。
# partion buffer中,消息的条数达到阈值,将触发flush到磁盘。
log.flush.interval.messages=10000

# 消息buffer的时间,达到阈值,将触发将消息从内存flush到磁盘,
# 单位是毫秒。
log.flush.interval.ms=3000

# 删除topic需要server.properties中设置delete.topic.enable=true否则只是标记删除
delete.topic.enable=true

# 防止成员加入请求后本应当即开启的rebalance
group.initial.rebalance.delay.ms=0

编写启动脚本启动

  • /data/kafka/k1/kafka_2.12-3.2.0
  • /data/kafka/k2/kafka_2.12-3.2.0
  • /data/kafka/k3/kafka_2.12-3.2.0

在上面三个目录分别创建 kafka-start.sh 启动脚本

1
2
#!/bin/bash
nohup ./bin/kafka-server-start.sh config/server.properties > logs/kafka/kafka.log 2>1 &
1
2
3
4
# 创建日志目录
mkdir /data/kafka/k1/kafka_2.12-3.2.0/logs/kafka
mkdir /data/kafka/k2/kafka_2.12-3.2.0/logs/kafka
mkdir /data/kafka/k3/kafka_2.12-3.2.0/logs/kafka

启动

1
. kafka-start.sh

安装kafka-manager

kafka-manager 安装

Kafka命令

Topic

参数描述
--bootstrap-server <String: server toconnect to>连接的 Kafka Broker 主机名称和端口号
--topic <String: topic>操作的 topic 名称
--create创建主题
--delete删除主题
--alter修改主题
--list查看所有主题
--describe查看主题详细描述
--partitions <Integer: # of partitions>设置分区数
--replication-factor<Integer: replication factor>设置分区副本
--config <String: name=value>更新系统默认的配置

列出所有topic

1
sh kafka-topics.sh --list --bootstrap-server localhost:9092

创建topic

1
2
sh kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test --partitions 3 --replication-factor 3
参数 --topic 指定 Topic 名,--partitions 指定分区数,--replication-factor 指定备份数

查看topic

1
2
sh kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test
可以查看每个broker上的topic分区副本信息

增加topic的partition数量

Partition数量只能增加不能减少

1
sh kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic test --partitions 2

删除topic

1
sh kafka-topics.sh --bootstrap-server localhost:9092 --topic test3 --delete

生产消息

参数描述
--bootstrap-server <String: server toconnect to>连接的 Kafka Broker 主机名称和端口号
--topic <String: topic>操作的 topic 名称
1
2
sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
此时控制台会出现光标,这时候就可以发送消息

消费消息

参数描述
--bootstrap-server <String: server toconnect to>连接的 Kafka Broker 主机名称和端口号
--topic <String: topic>操作的 topic 名称
--from-beginning从头开始消费
--group <String: consumer group id>指定消费者组名称
1
sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first
1
sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic first

Kafka生产者

消息发送过程

  1. 创建Producer实例:当我们创建一个Kafka Producer实例,后台会启动几个线程,其中最重要的是 mainSender 线程。
  2. 序列化:使用配置的序列化器对消息的 key 和 value 进行序列化。
  3. 分区:消息在被添加到 RecordAccumulator 之前需要确定目标分区。这是通过Partitioner完成的,它可以基于消息键、消息内容或自定义策略来决定消息应该进入哪个分区。
  4. 添加消息到缓冲区:在应用程序的 main 线程中,当调用Producer的 send 方法时,它实际上并不直接发送消息。消息首先会被添加到一个批次中(称为Batch),然后存储在一个名为 RecordAccumulator 的内部结构中。这个 RecordAccumulator 实际上是由多个双端队列组成的,每个队列对应一个Kafka的分区
  5. 消息批处理:Kafka Producer将消息分组成批次,这样可以一次性发送多个消息,从而提高吞吐量。每个批次都有一个大小上限(由配置参数 batch.size 确定),当批次满了,或达到一定的延迟时(由 linger.ms 确定),Sender线程就会发送这些消息。
  6. Sender线程:这是一个后台线程,不断地从 RecordAccumulator 中拉取待发送的批次,然后将它们发送到相应的Kafka Broker。发送完成后,它也会处理来自Broker的响应,例如确认消息是否已经成功写入。
  7. 消息确认:Kafka支持几种确认模式,这是由 acks 参数确定的。
    • acks=0:不进行ack确认。

    • acks=1:只要Leader收到消息,Producer就认为消息已成功发送。

    • acks=allacks=-1:Leader + ISR列表里面的同步副本收到消息,Producer才会认为消息发送成功。

  8. 重试策略:如果消息发送失败,Kafka Producer会尝试重新发送。重试的次数和间隔是可以配置的。
  9. 关闭Producer:当你完成所有的消息发送并调用 close 方法时,它会确保所有待发送的消息都被发送出去,并且所有已发送的消息都得到了确认。

Send线程拉取参数

  • batch.size:只有数据积累到batch.size之后,sender才会发送数据。默认16k
  • linger.ms:如果数据迟迟未达到batch.size,sender等待linger.ms设置的时间到了之后就会发送数据。单位ms,默认值是0ms,表示没有延迟

生产者重要参数

参数名称描述
bootstrap.servers生产者连接集群所需的 broker 地址清单,多个逗号隔开。注意这里并非需要所有的 broker 地址,因为生产者从给定的 broker 里查找到其他 broker 信息
key.serializervalue.serializer指定发送消息的 key 和 value 的序列化类型。一定要写全类名
buffer.memoryRecordAccumulator 缓冲区总大小,默认 32m
batch.size缓冲区一批数据最大值,默认 16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加
linger.ms如果数据迟迟未达到 batch.size,sender 等待 linger.time 之后就会发送数据。单位 ms,默认值是 0ms,表示没有延迟。生产环境建议该值大小为 5-100ms 之间
acks0:生产者发送过来的数据,不需要等数据落盘应答。
1:生产者发送过来的数据,Leader 收到数据后应答。
-1(all):生产者发送过来的数据,Leader+和 isr 队列 里面的所有节点收齐数据后应答。
默认值是-1,-1 和 all 是等价的。
max.in.flight.requests.per.connection允许最多没有返回 ack 的次数,默认为 5,开启幂等性要保证该值是 1-5 的数字
retries当消息发送出现错误的时候,系统会重发消息。retries 表示重试次数。默认是 int 最大值,2147483647。 如果设置了重试,还想保证消息的有序性,需要设置 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1 否则在重试此失败消息的时候,其他的消息可能发送成功了
retry.backoff.ms两次重试之间的时间间隔,默认是 100ms
enable.idempotence是否开启幂等性,默认 true,开启幂等性
compression.type生产者发送的所有数据的压缩方式。默认是 none,也就是不压缩。 支持压缩类型:none、gzip、snappy、lz4 和 zstd

异步发送

  1. 异步发送概述:Kafka提供异步方式发送消息,主要特点是消息发送操作不会因等待Broker的确认而阻塞。
  2. 消息异步提交到 RecordAccumulator :在异步模式下,Main线程调用 send 方法将消息提交消息到 RecordAccumulator 时是非阻塞的。这意味着Main线程可以迅速地继续其它任务,而不需等待消息实际被发送到Broker。
  3. Sender线程:Sender线程在后台工作,负责从 RecordAccumulator 取出消息并发送至Kafka Broker。
  4. 非阻塞的优势:这种异步方式的主要优势是它提高了生产者的吞吐量和效率,因为Main线程不会因消息发送而被频繁阻塞。

项目添加依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.2.0</version>
</dependency>

kafka-client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Slf4j
public class ProductTest {

private KafkaProducer<String, String> kafkaProducer;

@BeforeEach
public void init() {
// 1. 创建 kafka 生产者的配置对象
Properties properties = new Properties();
// 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
// key,value 序列化(必须):key.serializer,value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");

properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");

// 3. 创建 kafka 生产者对象
kafkaProducer = new KafkaProducer<>(properties);
}

/**
* 异步发送
*/
@Test
public void send() {
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("first", "hello " + i));
}

kafkaProducer.close();
}
}

带回调函数的异步发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* 带回调函数的异步发送
*/
@Test
public void callBack() {
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("first", "hello " + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (Objects.isNull(e)) {
log.info("主题:{},分区:{}", recordMetadata.topic(), recordMetadata.partition());
}
}
});
}

kafkaProducer.close();
}

Spring Boot

1
2
3
4
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
1
2
3
4
5
6
7
8
spring:
kafka:
bootstrap-servers: localhost:9091 # 连接kafka的地址,多个地址用逗号分隔
producer:
batch-size: 16384 # batch.size 批次大小,默认16k
buffer-memory: 33554432 # RecordAccumulator 大小,默认32M
key-serializer: org.apache.kafka.common.serialization.StringSerializer # 关键字的序列化类
value-serializer: org.apache.kafka.common.serialization.StringSerializer # 值的序列化类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
@Slf4j
public class ProductTest {

private KafkaProducer<String, String> kafkaProducer;

@BeforeEach
public void init() {
// 1. 创建 kafka 生产者的配置对象
Properties properties = new Properties();
// 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "121.37.23.172:9092");
// key,value 序列化(必须):key.serializer,value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");

properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");

// 3. 创建 kafka 生产者对象
kafkaProducer = new KafkaProducer<>(properties);
}

/**
* 异步发送
*/
@Test
public void send() {
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("first", i + ""));
}

kafkaProducer.close();
}


/**
* 带回调函数的异步发送
*/
@Test
public void callBack() {
for (int i = 0; i < 500; i++) {
kafkaProducer.send(new ProducerRecord<>("first", i+"","hello " + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (Objects.isNull(e)) {
log.info("主题:{},分区:{}", recordMetadata.topic(), recordMetadata.partition());
}
}
});
}

kafkaProducer.close();
}


/**
* 同步发送
*/
@Test
@SneakyThrows
public void syncSend() {
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("first", "hello " + i)).get();
}

kafkaProducer.close();
}
}

同步发送

  1. 同步发送概述:在Kafka的同步发送模式中,Main线程会在每次发送消息后阻塞,直到从Broker收到确认。
  2. 消息提交到 RecordAccumulator:Main线程首先将消息提交到 RecordAccumulator,这是一个内部缓冲区,用于存储待发送的消息。
  3. Sender线程:Sender线程负责从 RecordAccumulator 中提取消息并发送到Kafka Broker。
  4. 阻塞等待确认:不像异步发送,同步发送会导致Main线程在消息被Sender线程发送并收到Broker的ACK之前一直阻塞。只有在收到ACK后,Main线程才会进行下一次发送或其他操作。

只需在异步发送的基础上,再调用一下 get()方法即可

1
2
3
4
5
6
7
8
9
@Test
@SneakyThrows
public void syncSend() {
for (int i = 0; i < 5; i++) {
kafkaProducer.send(new ProducerRecord<>("first", "hello " + i)).get();
}

kafkaProducer.close();
}

自定义拦截器

  1. 拦截器目的:Kafka的Producer拦截器提供了一种机制,允许用户在关键的消息发送阶段介入,以实现特定的自定义需求。
  2. 主要功能
    • 消息发送前处理:在消息实际被发送到Broker之前,拦截器的 onSend 方法会被调用。
    • ACK接收后处理:当Producer从Broker接收到一个ACK或消息发送失败时,onAcknowledgement 方法会被触发。
  3. 拦截链:Producer支持多个拦截器的配置。这些拦截器会按照配置的顺序依次处理消息,形成所谓的“拦截链”。
  4. 如何实现
    • 要创建自己的拦截器,用户需实现 ProducerInterceptor 接口。
    • 此接口中有两个主要方法,分别是 onSend(在消息发送前调用)和 onAcknowledgement(在从Broker接收到响应前调用)。
  5. 如何配置:通过Producer的配置属性 interceptor.classes,用户可以指定一个或多个拦截器的完整类名。Producer在初始化时会按照配置的顺序加载并激活这些拦截器。

拦截器接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public interface ProducerInterceptor<K, V> extends Configurable {

/**
* 获取配置信息或初始化数据时调用
*/
public void configure(Map<String, ?> configs)

/**
* 该方法封装进KafkaProducer.send方法中,即它运行在用户主线程中。Producer确保在消息被序列化以及计算分区前调用该方法。用户可以在该方法中对消息做任何操作,但最好保证不要修改消息所属的topic和分区,否则会影响目标分区计算
*/
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);

/**
* 会在消息从RecordAccumulator成功发送到kafka broker之后,或者再发送过程中失败调用,并且通常都是在producer回调逻辑触发之前。它运行在producer的IO线程,因此不要放入很重的逻辑,否则会影响消息发送效率
*/
public void onAcknowledgement(RecordMetadata metadata, Exception exception);

/**
* 关闭interceptor,主要用于执行一些资源释放
* interceptor 可能被运行在多个线程中,因此需要注意线程安全问题。如果指定多个interceptor,则producer将按照指定顺序调用它们,并仅仅是补货每个interceptor可能抛出的一场记录到错误日志中而非向上传递
*/
public void close();
}

定义拦截器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
**
* 消息添加时间戳
*/
@Slf4j
public class TimeInterceptor implements ProducerInterceptor<String, String> {

@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
String value = record.value();
value = String.format("%s_%s", value, System.currentTimeMillis());
return new ProducerRecord<String, String>(record.topic(), value);
}

@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
log.info("metadata: {}", JSON.toJSONString(metadata));
log.info("exception: {}", JSON.toJSONString(exception));
}

@Override
public void close() {

}

@Override
public void configure(Map<String, ?> configs) {
log.info(JSON.toJSONString(configs));
}
}

/**
* 计数拦截器
*/
@Slf4j
public class CountInterceptor implements ProducerInterceptor<String, String> {
private final AtomicInteger sendCount = new AtomicInteger(0);

@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
return record;
}

@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
if (Objects.isNull(exception)) {
log.info("发送成功:{}", sendCount.incrementAndGet());
}
}

@Override
public void close() {

}

@Override
public void configure(Map<String, ?> configs) {

}
}

SpringBoot方式集成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
server:
port: 9000

spring:
kafka:
bootstrap-servers: 127.0.0.1:9092 # 连接kafka的地址,多个地址用逗号分隔
producer:
retries: 0 # 若设置大于0的值,客户端会将发送失败的记录重新发送
batch-size: 16384 # 当将多个记录被发送到同一个分区时, Producer 将尝试将记录组合到更少的请求中。这有助于提升客户端和服务器端的性能。这个配置控制一个批次的默认大小(以字节为单位)。16384是缺省的配置
buffer-memory: 33554432 # #Producer 用来缓冲等待被发送到服务器的记录的总字节数,33554432是缺省配置
key-serializer: org.apache.kafka.common.serialization.StringSerializer # 关键字的序列化类
value-serializer: org.apache.kafka.common.serialization.StringSerializer # 值的序列化类
properties:
interceptor.classes: com.wgf.interceptor.TimeInterceptor,com.wgf.interceptor.CountInterceptor # 支持多个,逗号隔开

Kafka-client方式集成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class InterceptorTest {
private KafkaProducer<String, String> kafkaProducer;

@BeforeEach
public void init() {
// 1. 创建 kafka 生产者的配置对象
Properties properties = new Properties();
// 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
// key,value 序列化(必须):key.serializer,value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");

properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");

// 添加拦截器
List<String> interceptors = new ArrayList<>();
interceptors.add("com.wgf.interceptor.CountInterceptor");
interceptors.add("com.wgf.interceptor.TimeInterceptor");
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

// 3. 创建 kafka 生产者对象
kafkaProducer = new KafkaProducer<>(properties);
}

@Test
public void sendTest() {
kafkaProducer.send(new ProducerRecord<>("first", "interceptor"));
}
}

自定义序列化

Kafka 自定义消息序列化和反序列化方式

生产者

需要实现序列化接口Serializer,序列化包含key和value的序列化,两者的序列化方式可不同,分别对应key.serializervalue.serializer

1
2
3
4
5
6
7
8
9
10
11
12
13
public interface Serializer<T> extends Closeable {
default void configure(Map<String, ?> configs, boolean isKey) {
}

byte[] serialize(String var1, T var2);

default byte[] serialize(String topic, Headers headers, T data) {
return this.serialize(topic, data);
}

default void close() {
}
}

消费者

需要实现反序列化接口Deserializer,反序列化包含key和value的反序列化,两者的反序列化方式可不同,分别对应key.deserializervalue.deserializer

1
2
3
4
5
6
7
8
9
10
11
12
13
public interface Deserializer<T> extends Closeable {
default void configure(Map<String, ?> configs, boolean isKey) {
}

T deserialize(String var1, byte[] var2);

default T deserialize(String topic, Headers headers, byte[] data) {
return this.deserialize(topic, data);
}

default void close() {
}
}

生产者分区

  1. 分区的目的:Kafka使用分区实现数据的水平扩展性、并行处理和容错能力。
  2. 存储和分发
    • 多Broker存储:分散存储和IO负载,Kafka将每个Topic的数据划分为多个分区,并将这些分区分布在多个Broker上。
    • 负载均衡:通过合理地分配分区到各个Broker,可以有效地实现负载均衡,确保每个Broker承担相似的数据和流量。
  3. 提高并行度
    • 生产者并行发送:生产者可以同时向多个分区发送数据,从而提高消息的生产速率。
    • 消费者并行消费:消费者组中的不同消费者实例可以并行地从不同分区消费数据,从而提高整体的数据消费速率。

分区策略

ProducerRecord

1
2
3
4
5
6
7
8
9
10
11
public class ProducerRecord<K, V> {
// 该消息需要发往的主题
private final String topic;
// 该消息需要发往的主题中的某个分区,如果该字段有值,则分区器不起作用,直接发往指定的分区
// 如果该值为null,则利用分区器进行分区的选择
private final Integer partition;
private final Headers headers;
// 如果partition字段不为null,则使用分区器进行分区选择时会用到该key字段,该值可为空
private final K key;
private final V value;
private final Long timestamp;

指定Partition

在发送消息时,如果明确指定了分区,则消息会直接发送到该指定分区。如果同时提供了 partitionkey,则 partition 优先级高于 key

1
2
3
4
5
6
7
8
9
10
11
12
13
    private Callback callback = (metadata, exception) -> log.info("发送的分区是:{}", metadata.partition());   
/**
* 指定分区
*/
@Test
public void partitionTest() {
for (int i = 0; i < 3; i++) {
kafkaProducer.send(new ProducerRecord<>("first", 0, null, "hello"), callback);
}
}
//14:44:05.375 [kafka-producer-network-thread | producer-1] INFO com.wgf.kafka.PartitionTest - 发送的分区是:0
//14:44:05.375 [kafka-producer-network-thread | producer-1] INFO com.wgf.kafka.PartitionTest - 发送的分区是:0
//14:44:05.375 [kafka-producer-network-thread | producer-1] INFO com.wgf.kafka.PartitionTest - 发送的分区是:0

指定key

当发送消息时未显式指定 Partition,但提供了key,Kafka会使用 key 的MurmurHash值与Topic的Partition数量进行取模运算,以确定消息的目标Partition。

1
2
3
4
5
6
7
8
9
10
11
12
    /**
* 指定key
*/
@Test
public void keyTest() {
for (int i = 0; i < 3; i++) {
kafkaProducer.send(new ProducerRecord<>("first", "aaa", "hello"), callback);
}
}
//14:44:46.084 [kafka-producer-network-thread | producer-1] INFO com.wgf.kafka.PartitionTest - 发送的分区是:1
//14:44:46.085 [kafka-producer-network-thread | producer-1] INFO com.wgf.kafka.PartitionTest - 发送的分区是:1
//14:44:46.085 [kafka-producer-network-thread | producer-1] INFO com.wgf.kafka.PartitionTest - 发送的分区是:1

都不指定

若既未指定partition值,又未提供key值,Kafka将使用Sticky Partition策略。初始时,它随机选择一个整数作为起始点。对于随后的每次调用,此整数值会递增。为确定消息的目标Partition,该整数值会与Topic的可用Partition总数进行取模运算,这一策略类似于轮询(round-robin)算法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
    /**
* Partition和Key不指定则使用轮询
*/
@Test
@SneakyThrows
public void roundRobinTest() {
for (int i = 0; i < 3; i++) {
TimeUnit.SECONDS.sleep(1);
kafkaProducer.send(new ProducerRecord<>("first", "hello"), callback);
}
}
//14:47:26.704 [kafka-producer-network-thread | producer-1] INFO com.wgf.kafka.PartitionTest - 发送的分区是:0
//14:47:27.700 [kafka-producer-network-thread | producer-1] INFO com.wgf.kafka.PartitionTest - 发送的分区是:1
//14:47:28.685 [kafka-producer-network-thread | producer-1] INFO com.wgf.kafka.PartitionTest - 发送的分区是:0

自定义分区策略

在Kafka中,为满足特定的业务需求或数据路由逻辑,可以自定义分区策略。这可以通过实现 Partitioner 接口并覆写其方法来完成。虽然Kafka提供了 DefaultPartitioner 作为默认的分区策略,但有时候,为了实现更优的负载均衡或根据特定的数据属性将消息路由到不同的分区,自定义的分区策略是必要的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class MyPartitioner implements Partitioner {

// 返回的是第几个分区
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 获取topic的partitions
List<PartitionInfo> partitionInfoList = cluster.partitionsForTopic(topic);
int numPartitions = partitionInfoList.size();

// 根据key进行hash计算
int hashCode = key.hashCode();
int partition = Math.abs(hashCode % numPartitions);
return partition;
}

@Override
public void close() {

}

@Override
public void configure(Map<String, ?> configs) {

}
}

SpringBoot配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092 # 连接kafka的地址,多个地址用逗号分隔
producer:
transaction-id-prefix: kafka-tx-
retries: 1 # 若设置大于0的值,客户端会将发送失败的记录重新发送, 若开启事务则需要大于0
acks: -1 # 若开启事务,则必须设置为-1
batch-size: 16384 # 当将多个记录被发送到同一个分区时, Producer 将尝试将记录组合到更少的请求中。这有助于提升客户端和服务器端的性能。这个配置控制一个批次的默认大小(以字节为单位)。16384是缺省的配置
buffer-memory: 33554432 # #Producer 用来缓冲等待被发送到服务器的记录的总字节数,33554432是缺省配置
key-serializer: org.apache.kafka.common.serialization.StringSerializer # 关键字的序列化类
value-serializer: org.apache.kafka.common.serialization.StringSerializer # 值的序列化类
properties:
enable.idempotence: true # 开启消息幂等
partitioner.class: com.wgf.partition.MyPartitioner # 自定义分区策略

kafka-client配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void init() {
// 1. 创建 kafka 生产者的配置对象
Properties properties = new Properties();
// 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// key,value 序列化(必须):key.serializer,value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");

properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");

// 自定义分区器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.wgf.partition.MyPartitioner");

// 3. 创建 kafka 生产者对象
kafkaProducer = new KafkaProducer<>(properties);
}

生产者提高吞吐量

由上图我们可知,batch.size,RecordAccumulator,linger.ms,配置的大小都会影响吞吐量

  • batch.size:控制批次大小,默认为16K。当批次大小增加时,每次发送的数据量也增加,从而提高吞吐量。
  • linger.ms:延迟等待时间,默认为0ms。适当增加这个值(例如,设置在5-100ms范围内)可以使批次中累积更多的消息,从而提高吞吐量。但是,这也意味着消息的延迟会增加。
  • RecordAccumulator:生产者用于缓存数据的总内存大小,默认为32M。在有大量的生产者分区或多个Topic的情况下,适当增大此值可以提高吞吐量。
  • compression.type:设置消息的压缩算法,可以提高网络吞吐量,但会增加CPU的使用,常用的压缩类型为 snappy

SpringBoot配置

1
2
3
4
5
6
7
8
9
10
11
12
13
spring:
kafka:
bootstrap-servers: localhost:9092,localhost:9093,localhost:9094 # 连接kafka的地址,多个地址用逗号分隔
producer:
acks: -1 # 若开启事务,则必须设置为-1
batch-size: 16384 # batch.size 批次大小,默认16k
buffer-memory: 33554432 # RecordAccumulator 大小,默认32M
compression-type: snappy # 开启压缩类型
key-serializer: org.apache.kafka.common.serialization.StringSerializer # 关键字的序列化类
value-serializer: org.apache.kafka.common.serialization.StringSerializer # 值的序列化类
properties:
linger.ms: 1
enable.idempotence: true # 开启消息幂等

kafka-client配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class ProducerThroughputTest {
private KafkaProducer<String, String> kafkaProducer;

@BeforeEach
public void init() {
// 1. 创建 kafka 生产者的配置对象
Properties properties = new Properties();
// 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// key,value 序列化(必须):key.serializer,value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");

properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");

// batch.size:批次大小,默认 16K
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);

// linger.ms:等待时间,默认 0
properties.put(ProducerConfig.LINGER_MS_CONFIG, 10);

// RecordAccumulator:缓冲区大小,默认 32M:buffer.memory
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);

// compression.type:压缩,默认 none,可配置值 gzip、snappy、lz4 和 zstd
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");

// 3. 创建 kafka 生产者对象
kafkaProducer = new KafkaProducer<>(properties);
}
}

生产者消息可靠性保证

为了确保Producer发送的数据被可靠投递指定的Topic,Kafka引入了一个 acks 机制。当Topic的某个partition收到Producer发送的数据后,会根据 acks 的配置决定是否向Producer发送ACK响应。如果Producer收到 ACK,则认为数据已经被成功发送,否则会尝试重新发送。

生产者的ack机制

acks描述优点缺点
0不进行ack确认, 一旦Broker接收到消息就立即返回确认数据吞吐量最大当broker发生故障时,消息有可能会丢失
1仅leader ack,当leader落盘成功后返回ack(默认配置)平衡了性能和数据安全性如果follower未完成同步时leader发生故障,消息可能丢失
-1(all)所有的ISR (In-Sync Replicas) 成员都必须ack数据安全性最高,只有当所有ISR成员都确认后才认为消息已写入如果leader发生故障或网络问题,可能造成数据重复,原因是消息已写入,acks无法及时应答,Producer进行重试

SpringBoot 生产者ACK配置

1
2
3
4
5
6
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092 # 连接kafka的地址,多个地址用逗号分隔
producer:
acks: -1
retries: 3 # 开启ack一般配合重试次数使用,默认Integer最大值

kafka-client配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class AckTest {
private KafkaProducer<String, String> kafkaProducer;

@BeforeEach
public void init() {
// 1. 创建 kafka 生产者的配置对象
Properties properties = new Properties();
// 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// key,value 序列化(必须):key.serializer,value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");

properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");

// 设置ack
properties.put(ProducerConfig.ACKS_CONFIG, "-1");

// 设置重试次数,默认是 int 最大值,2147483647
properties.put(ProducerConfig.RETRIES_CONFIG, 3);

// 3. 创建 kafka 生产者对象
kafkaProducer = new KafkaProducer<>(properties);
}
}

数据去重

由上文的ACK应答机制可知,当消息发送失败会进行重试,那么重试就有可能导致数据重复

数据传递语义

  • At most once(最多一次):
    • 消息可能会丢失,但绝不会被传递多次。
    • 生产者设置:
      • acks=0: 不等待任何确认。
      • retries=0: 不进行重新发送。
  • At least once(最少一次):
    • 消息不会丢失,但可能会被传递多次。
    • 要实现这种语义,可以考虑以下配置:
      • 生产者设置:
        • acks=1acks=all: 确保至少leader副本或所有in-sync副本都已确认。
        • retries 设置为一个较大的值(如 Integer.MAX_VALUE):确保消息在失败时会被重新发送。
      • 消费者:
        • 设置 enable.auto.commit=false 并手动提交offset,这样只有在消息被成功处理之后才提交offset。
  • Exactly once(仅一次):
    • 消息既不会丢失,也不会被重复传递。这是最强的传输保证,但也是最复杂的。
    • 要实现这种语义,可以考虑以下配置:
      • 生产者设置:
        • acks=all: 确保所有in-sync副本都已确认。
        • enable.idempotence=true: 确保消息不会在网络故障等情况下被重复发送。
        • 使用事务发送消息
      • 消费者:
        • 使用事务消费消息,并确保消费和生产是在同一事务中。
        • 设置 enable.auto.commit=false 并手动提交offset。

消息幂等

幂等性在 Kafka 中确保了,无论生产者向 broker 发送多少次重复的消息,broker 端都只会持久化一条相同的消息,从而避免了数据的重复。

重复数据的判断基于以下标准:消息具有相同的 <Producer ID, Partition, SeqNumber> 组合将被视为重复的。其中:

  • Producer ID:是由 Kafka 分配的唯一标识符。每次 Kafka 重启时都会为生产者分配一个新的 Producer ID。
  • Partition:表示该消息所在的分区号。
  • Sequence Number:是在每个分区中单调自增的数字,由生产者为每条消息分配。

只有当上述三个属性完全相同时,消息才被视为重复,Broker 在这种情况下只会持久化一条数据。

局限性

生产者重启时,ProducerID 会改变,而不同的Partition对应不同的 SequenceNumber。因此,Kafka的幂等性保证仅限于单个生产者会话。一旦生产者重启,之前的幂等性保证将不再有效,即Kafka只能保证会话内的幂等,而无法保证跨会话的幂等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
server:
port: 9000

spring:
kafka:
bootstrap-servers: 127.0.0.1:9092 # 连接kafka的地址,多个地址用逗号分隔
producer:
retries: 1 # 若设置大于0的值,客户端会将发送失败的记录重新发送,如果开启事务或者幂等,则必须大于0
acks: -1 # 若开启事务,则必须设置为-1
batch-size: 16384 # 当将多个记录被发送到同一个分区时, Producer 将尝试将记录组合到更少的请求中。这有助于提升客户端和服务器端的性能。这个配置控制一个批次的默认大小(以字节为单位)。16384是缺省的配置
buffer-memory: 33554432 # #Producer 用来缓冲等待被发送到服务器的记录的总字节数,33554432是缺省配置
key-serializer: org.apache.kafka.common.serialization.StringSerializer # 关键字的序列化类
value-serializer: org.apache.kafka.common.serialization.StringSerializer # 值的序列化类
properties:
enable.idempotence: true # 开启消息幂等

生产者事务

  • 当进行批量消息发送时,所有消息要么全部成功发布,要么全部不发布,确保数据的一致性。

  • 开启事务,必须开启幂等性

  • 在事务启用的情况下,Kafka 将 TransactionIDProducerID 关联,从而确保在生产者重启的情况下也不会丢失 ProducerID,进一步强化了幂等性的持久保障。

跨分区跨会话事务

  • 为了支持跨分区和跨会话的事务,Kafka 引入了一个全局唯一的 TransactionID。每当一个 Producer 启动事务时,它会获得一个特定的 ProducerID ,并将这个 ProducerIDTransactionID 进行绑定。这种设计确保了,即使 Producer 重启,也能够通过 TransactionID 重新获取原始的 ProducerID ,从而保持事务的连续性。

  • Kafka 为了管理和维护事务状态,引入了一个专门的组件:Transaction Coordinator(事务协调器)。Producer 与 Transaction Coordinator 交互以获得 TransactionID 和管理事务的生命周期。为了持久化和跟踪事务的状态,Transaction Coordinator 将事务的元数据写入 Kafka 的一个专用的内部 Topic。这样,即使 Kafka 集群中的节点重启,由于事务的状态被持久化,任何进行中的事务都可以从上次的状态恢复,保证事务的完整性。

事务API

1
2
3
4
5
6
7
8
9
10
11
12
// 1 初始化事务
void initTransactions();
// 2 开启事务
void beginTransaction() throws ProducerFencedException;
// 3 在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
String consumerGroupId) throws
ProducerFencedException;
// 4 提交事务
void commitTransaction() throws ProducerFencedException;
// 5 放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;

前置

  • transaction.state.log.replication.factor 的值必须大于等于 transaction.state.log.min.isr 的值:事务状态日志的副本数量必须大于等于最小副本同步因子。这是为了确保事务状态日志的可靠性和一致性。
  • transaction.state.log.replication.factor 的值必须小于等于 Kafka 集群中的可用 broker 数量:事务状态日志的副本数量不能超过 Kafka 集群中可用的 broker 数量。这是为了确保事务状态日志的复制和分布在 Kafka 集群中的可行性。

SpringBoot使用事务

springboot 事务配置

producer配置

1
2
3
4
5
6
7
8
9
10
11
12
13
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092 # 连接kafka的地址,多个地址用逗号分隔
producer:
transaction-id: my-transaction-id # 开启事务,设置事务id
retries: 1 # 若设置大于0的值,客户端会将发送失败的记录重新发送, 若开启事务则需要大于0
acks: -1 # 若开启事务,则必须设置为-1
batch-size: 16384 # 当将多个记录被发送到同一个分区时, Producer 将尝试将记录组合到更少的请求中。这有助于提升客户端和服务器端的性能。这个配置控制一个批次的默认大小(以字节为单位)。16384是缺省的配置
buffer-memory: 33554432 # #Producer 用来缓冲等待被发送到服务器的记录的总字节数,33554432是缺省配置
key-serializer: org.apache.kafka.common.serialization.StringSerializer # 关键字的序列化类
value-serializer: org.apache.kafka.common.serialization.StringSerializer # 值的序列化类
properties:
enable.idempotence: true # 开启消息幂等

consumer配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092 # 连接kafka的地址,多个地址用逗号分隔
consumer:
enable-auto-commit: false # 参数设置成true。那么offset交给kafka来管理,offset进行默认的提交模式,置成false。那么就是Spring来替为我们做人工提交,从而简化了人工提交的方式.需要手动提交还需要指定ack-model
properties:
session.timeout.ms: 15000
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: earliest
listener:
ack-mode: manual # 设置手动ack
properties:
isolation.level: read_committed # 置为 read_committed ,Consumer 仅读取已提交的消息, 否则不生效

使用事务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 第一种用法,注解
@Transactional
public void send(boolean flag) {
this.kafkaTemplate.send("testTopic", "before");

if (flag) {
throw new RuntimeException();
}

this.kafkaTemplate.send("testTopic", "after");
}


// 第二种用法,手动操作
public void send(boolean flag) {
kafkaTemplate.executeInTransaction(operations -> {
this.kafkaTemplate.send("testTopic", "before");

if (flag) {
throw new RuntimeException();
}

this.kafkaTemplate.send("testTopic", "after");
return true;
});
}

kafka-client使用事务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class TransactionTest {
private KafkaProducer<String, String> kafkaProducer;

@BeforeEach
public void init() {
// 1. 创建 kafka 生产者的配置对象
Properties properties = new Properties();
// 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// key,value 序列化(必须):key.serializer,value.serializer
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");

properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");

// 设置事务 id(必须),事务 id 任意起名
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test");

// 3. 创建 kafka 生产者对象
kafkaProducer = new KafkaProducer<>(properties);
}

@Test
public void test() {
// 初始化事务
kafkaProducer.initTransactions();
// 开启事务
kafkaProducer.beginTransaction();

try {
for (int i = 0; i < 6; i++) {
// 发送消息
kafkaProducer.send(new ProducerRecord<>("first", "test " + i));
}

// int i = 1 / 0;

// 提交事务
kafkaProducer.commitTransaction();
} catch (Exception e) {
// 终止事务
kafkaProducer.abortTransaction();
} finally {
// 5. 关闭资源
kafkaProducer.close();
}
}
}

数据有序

由于 Topic 可以分多个 Partition,Kafka为了保证其高吞吐量,只能保证单个 Partition 数据有序,不能保证多个 Partition 之间数据的有序性,如果要保证 Topic 数据的有序性有以下几种做法:

  • 单分区顺序保证:当一个主题只有一个分区时,可以保证消息的全局有序性。因为只有一个分区,所有消息都按照发送顺序写入该分区,消费者可以按照相同的顺序读取消息,实现全局有序性。
  • 有序分区键:当一个主题有多个分区时,可以使用有序分区键来实现相对有序性。有序分区键是根据业务需求生成的键,不同的业务数据类型(如订单类型)或业务相关键将路由到不同的分区。这样,同一类型的消息将被发送到同一分区,消费者可以按照分区顺序消费消息,实现相对有序性。

数据乱序

Kafka 数据分区有序性:

Kafka 通过分区内的偏移量来保证单个分区内的消息有序。每个分区的消息都具有唯一的偏移量,它们按照偏移量的顺序被消费者读取。这确保了分区内的消息是有序的,不论是 Kafka 1.x 之前还是之后的版本。

max.in.flight.requests.per.connection 配置:

  • 它控制了生产者在与 Kafka 服务器的连接上可以同时发送的未确认请求的最大数量。这个参数影响了生产者的性能、吞吐量以及消息的顺序性。

  • Kafka 1.x 之前版本: 在 Kafka 1.x 之前的版本中,为了保证数据的有序性,生产者通常需要将 max.in.flight.requests.per.connection 设置为 1,以确保每次只发送一个请求,避免乱序。

  • Kafka 1.x 及以后版本: 在 Kafka 1.x 以及之后的版本,有两种情况:

    • 当未启用幂等性时,依然建议将 max.in.flight.requests.per.connection 设置为 1,以确保数据的有序性。
    • 当启用幂等性时,可以将 max.in.flight.requests.per.connection 设置为小于或等于 5。这是因为 Kafka 服务端会缓存生产者发送的最近 5 个请求的元数据,无论如何,都可以保证这最近的 5 个请求的数据是有序的。

生产者可以根据需要对 max.in.flight.requests.per.connection 进行配置,以在吞吐量和有序性之间找到适当的平衡。

Kafka Broker

ZK存储的Kafka信息

  • controller:负责管理集群Broker的上下线,所有Topic的分区副本分配和Leader的选举工作等
  • broker:服务节点信息,包括上线的节点id和Topic信息

Broker重要参数

参数名称描述
replica.lag.time.max.msISR 中,如果 Follower 长时间未向 Leader 发送通 信请求或同步数据,则该 Follower 将被踢出 ISR。 该时间阈值,默认 30s
auto.leader.rebalance.enable默认是 true。 自动 Leader Partition 平衡
leader.imbalance.per.broker.percentage默认是 10%。每个 broker 允许的不平衡的 leader 的比率。如果每个 broker 超过了这个值,控制器会触发 leader 的平衡
leader.imbalance.check.interval.seconds默认值 300 秒。检查 leader 负载是否平衡的间隔时间
log.segment.bytesKafka 中 log 日志是分成一块块存储的,此配置是 指 log 日志划分成块的大小,默认值 1G
log.index.interval.bytes默认 4kb,kafka 里面每当写入了 4kb 大小的日志 (.log),然后就往 index 文件里面记录一个索引
log.retention.hoursKafka 中数据保存的时间,默认 7 天
log.retention.minutesKafka 中数据保存的时间,分钟级别,默认关闭
log.retention.msKafka 中数据保存的时间,毫秒级别,默认关闭
log.retention.check.interval.ms检查数据是否保存超时的间隔,默认是 5 分钟
log.retention.bytes默认等于-1,表示无穷大。超过设置的所有日志总大小,删除最早的 segment
log.cleanup.policy默认是 delete,表示所有数据启用删除策略; 如果设置值为 compact,表示所有数据启用压缩策略
num.io.threads默认是 8。负责写磁盘的线程数。整个参数值要占总核数的 50%
num.replica.fetchers副本拉取线程数,这个参数占总核数的 50%的 1/3
num.network.threads默认是 3。数据传输线程数,这个参数占总核数的 50%的 2/3
log.flush.interval.messages强制页缓存刷写到磁盘的条数,默认是 long 的最大值,9223372036854775807。一般不建议修改, 交给系统自己管理
log.flush.interval.ms每隔多久,刷数据到磁盘,默认是 null。一般不建议修改,交给系统自己管理

服役新节点

假设firstTopic有三个分区和三个副本分别分布在三台Broker上(0,1,2),这时候新服役了一台id为3的节点,如何把现有的分区和副本重新负载到这四台节点上

编写平衡主题脚本

1
vim topics-to-move.json
1
2
3
4
5
6
7
8
{
"topics":[
{
"topic":"first"
}
],
"version":1
}

生成一个负载均衡计划

1
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --topics-to-move-json-file topics-to-move.json --broker-list "0,1,2,3" --generate

这是控制台会输出均衡计划

1
2
3
4
5
Current partition replica assignment
{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[0,2,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[2,1,0],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[1,0,2],"log_dirs":["any","any","any"]}]}

Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[2,3,0],"log_dirs"["any","any","any"]},{"topic":"first","partition":1,"replicas":[3,0,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[0,1,2],"log_dirs":["any","any","any"]}]}

创建副本存储计划

将合适的计划复制到一个文件中

1
vim increase-replication-factor.json
1
{"version":1,"partitions":[{"topic":"first","partition":0,"replicas":[2,3,0],"log_dirs":["any","any","any"]},{"topic":"first","partition":1,"replicas":[3,0,1],"log_dirs":["any","any","any"]},{"topic":"first","partition":2,"replicas":[0,1,2],"log_dirs":["any","any","any"]}]}

执行副本存储计划

1
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file increase-replication-factor.json --execute

验证副本存储计划

1
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file increase-replication-factor.json --verify

退役旧节点

上文服役了四台节点,现在假设要退役BrokerId为3的节点,操作和服役新节点流程差不多,只不过生成负载均衡计划的 --broker-list变成 "0,1,2"即可

Kafka副本

  • 数据可靠性和高可用性:Kafka 副本确保当 Leader 副本宕机时,Follower 可以被选举为新的 Leader,保证服务可用性。
  • 副本数量考量
    • 默认设置:Kafka 的默认副本数为 1。
    • 生产推荐:在生产环境中,建议将副本数配置为 2 或更多,以确保数据冗余和可靠性。
    • 权衡:尽管增加的副本数会导致更多的磁盘使用和网络传输,但选择合适的副本数量是在可靠性和资源使用之间的一个权衡。
  • 副本角色
    • Leader:对于每个分区,Leader 负责处理所有的读写请求。
    • Follower:Follower 从 Leader 同步数据,并作为备份和故障转移角色。

AR

Kafka 分区中的所有副本被统称为 AR(All Replicas)。

AR = ISR + OSR

ISR

  • ISR 是 Kafka 分区中与 Leader 副本保持同步的 Follower 副本集合。
  • 如果 Follower 副本长时间未向 Leader 发送同步请求或同步数据,则它会从 ISR 中被移除。这个“长时间”是由 replica.lag.time.max.ms 参数控制的,默认值为 30 秒。
  • 当 Leader 副本发生故障时,一个新的 Leader 将从 ISR 中被选举。

被踢出ISR列表的副本一般有如下原因:

  • 慢副本
    • 描述:Follower 副本在一定的时间段内无法追赶 Leader 的速度。
    • 原因:常见原因是 I/O 瓶颈,导致 Follower 向 Leader 追加复制的消息速度低于从 Leader 拉取的速度。
  • 卡住的副本
    • 描述:Follower 副本在一定的时间段内停止从 Leader 拉取请求。
    • 原因:可能是由于 GC 暂停、Follower 故障或者 Follower 进程死亡。
  • 新启动副本
    • 描述:当为主题增加副本时,新的 Follower 副本最初不会在 ISR 中。
    • 原因:一旦它们完全追赶了 Leader 的日志,将加入ISR

OSR

OSR 代表与 Leader 副本同步时存在较大延迟的 Follower 副本。

  • 从 ISR 到 OSR

    触发条件:在 replica.lag.time.max.ms 时间范围内,若 Follower 的 LEO (Log End Offset) 没有追赶上该 partition 的 HW (High Watermark),则该 Follower 被移至 OSR 列表。

  • 从 OSR 到 ISR

    触发条件:当 OSR 列表中的 Follower 的 LEO 同步并追赶上该 partition 的 HW 时,该 Follower 重新被加入到 ISR 列表。

Leader选举流程

  1. Broker 注册:
    • 当 Kafka Broker 启动后,它向 ZooKeeper 的 /brokers/ids 路径注册其当前节点 ID。
  2. Controller 节点选举
    • 在所有的Broker中选举一个Controller节点。Controller节点负责管理所有Partition的Leader选举。
    • 第一个在 Zookerper 成功创建 /controller node的 Broker 将成为 Controller。
  3. Controller 监听节点上下线
    • 选举出来的 Controller 会监听 /brokers 路径的变化。
  4. Controller 选举 Leader
    • Controller 根据特定规则来决定 Partition 的 Leader。
    • 选举规则是:在 ISR 列表中处于活跃状态,且按照 AR 列表的顺序排列。例如:如果 AR 是 [1,0,2] 且 ISR 是 [1,0,2],那么 Leader 就会按照 1, 0, 2 的顺序来轮询选举。
  5. 选举结果上报 ZooKeeper
    • Controller 将选举的结果更新到 ZooKeeper 的 /brokers/topics/xxx/partitions/xxx 路径。
  6. Broker 同步
    • 其他 Broker 会从 ZooKeeper 的相应路径同步选举结果。
  7. Leader 宕机处理
    • 假设 Broker1 中的 Leader 宕机了。
    • Controller 会监听到这个节点的变化。
    • Controller 获取该 Partition 的 ISR 列表。
    • 根据上述选举规则,选举新的 Leader。例如,如果 0 在 AR 列表中的位置较前,且在 ISR 中处于活跃状态,它就会被选为新的 Leader。
    • Controller 更新新的 Leader 和 ISR 列表。

验证

此时如果停止Broker为3的节点,那么Partition2的Leader将替换为1

重新启动Broker为3的节点,经过一段时间的 自动平衡 后,Leader将重新替换为3

故障处理

  • HW: (High Watermark)
    • 俗称高水位,代表消费者可以安全读取的最后一条消息的位置。
    • 因为HW之前的消息已被所有ISR节点确认过。
    • 作用:确保消费者读取的是持久化的消息,保证数据一致性。
    • 移动:当 ISR 副本都确认某个消息,HW往前移动。
  • LEO: (Log End Offset)
    • 代表下条消息写入的位置。
    • 作用:这个标识用于从 Leader 拉取要同步的数据。

如图所示,它代表一个日志文件,这个日志文件中有 8 条消息,第一条消息的offset(LogStartOffset)为0,最后一条消息的offset为7,offset为8代表下一条待写入的消息。分区的HW为5,表示消费者只能拉取到offset在0至4之间的消息,而offset为5的消息对消费者而言是不可见的,是为了保证数据存储的一致性。

Follower 故障

  • 当 Follower 发生故障,它会被临时移出 ISR 列表。
  • 在此期间,Leader 继续处理新的数据写入请求,而正常的 Follower 也继续从 Leader 同步数据。
  • 一旦故障的 Follower 恢复,它首先会读取本地磁盘上记录的最近的 HW,并将 log 文件中高于此 HW 的记录截断。
  • 该 Follower 从 HW 开始向 Leader 请求同步,此时它位于 OSR 列表中。
  • 一旦 Follower 的 LEO 达到或超过该 Partition 的 HW,意味着 Follower 已经追赶上 Leader,它将重新被加入 ISR 列表。

Leader 故障

  • 当 Leader 发生故障,新的 Leader 会从 ISR 中选举出来。选举基于 AR 列表的排序,并确保选举的是 ISR 中的成员。
  • 为确保数据一致性,所有 Follower(包括新选举的 Leader)会截断其 log 文件中超出 HW 的部分。
  • 随后,这些 Follower 会从新的 Leader 开始同步数据。

只能保证副本之间数据的一致性,消费数据一致性,不能保证数据是否丢失或不重复

分区副本分配

如果 Kafka 服务器只有 4 个节点,那么设置 Kafka 的分区数大于服务器台数,在 kafka底层如何分配存储副本呢

创建 16 分区,3 个副本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
hadoop102Topic: second4 Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: second4 Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: second4 Partition: 2 Leader: 2 Replicas: 2,3,0 Isr: 2,3,0
Topic: second4 Partition: 3 Leader: 3 Replicas: 3,0,1 Isr: 3,0,1

Topic: second4 Partition: 4 Leader: 0 Replicas: 0,2,3 Isr: 0,2,3
Topic: second4 Partition: 5 Leader: 1 Replicas: 1,3,0 Isr: 1,3,0
Topic: second4 Partition: 6 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Topic: second4 Partition: 7 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2

Topic: second4 Partition: 8 Leader: 0 Replicas: 0,3,1 Isr: 0,3,1
Topic: second4 Partition: 9 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
Topic: second4 Partition: 10 Leader: 2 Replicas: 2,1,3 Isr: 2,1,3
Topic: second4 Partition: 11 Leader: 3 Replicas: 3,2,0 Isr: 3,2,0

Topic: second4 Partition: 12 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: second4 Partition: 13 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: second4 Partition: 14 Leader: 2 Replicas: 2,3,0 Isr: 2,3,0
Topic: second4 Partition: 15 Leader: 3 Replicas: 3,0,1 Isr: 3,0,1

  1. 均匀分布 Leader: Kafka 会尽量确保每个 broker 上的 leader partition 数量均匀分布,以便平衡 broker 之间的读写负载。
  2. 错开 Leader: Kafka 在分配 leader 时,会尽量错开 broker,确保不同 partition 的 leader 分布在不同的 broker 上,以提升集群的容错性。
  3. 分散副本: Kafka 会尽量确保一个 partition 的所有副本分散在不同的 broker 上,避免因单个 broker 宕机导致的数据不可用。

手动调整分区副本分配

在生产环境中,每台服务器的配置和性能不一致,但是Kafka只会根据自己的代码规则创建对应的分区副本,就会导致个别服务器存储压力较大。所有需要手动调整分区副本的存储

创建一个新的topic,4个分区,两个副本,名称为three。将该topic的所有副本都存储到broker0和broker1两台服务器上

创建一个新的Topic

1
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --partitions 4 --replication-factor 2 --topic three

查看分区副本存储情况

1
bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic three

创建副本存储计划

所有副本都指定存储在 broker0、broker1 中

1
vim increase-replication-factor.json
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
{
"version":1,
"partitions":[
{
"topic":"three",
"partition":0,
"replicas":[
0,
1
]
},
{
"topic":"three",
"partition":1,
"replicas":[
0,
1
]
},
{
"topic":"three",
"partition":2,
"replicas":[
1,
0
]
},
{
"topic":"three",
"partition":3,
"replicas":[
1,
0
]
}
]
}

执行副本存储计划

1
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file increase-replication-factor.json --execute

验证副本存储计划

1
bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file increase-replication-factor.json --verify

Leader Partition自动平衡

在正常情况下,Kafka 会自动将 leader partition 均匀分散在各个 broker 上,以保证每台机器的读写吞吐量均衡。然而,当一些 broker 宕机时,leader partition 可能会过度集中在少数活跃的 broker 上,导致这些 broker 承受较高的读写请求压力。与此同时,重启后的 broker 将作为 follower partition,它们主要负责与 leader 同步数据,处理的读写请求相对较少,从而导致集群的负载不均衡。

自动平衡参数

参数名称描述
auto.leader.rebalance.enable控制是否自动进行 Leader Partition 平衡。默认值为 true。尽管平衡可以增强集群的健壮性,但在生产环境中,频繁的 leader 重选举可能会带来性能影响。因此,建议根据实际情况考虑将其设置为 false 或调整其他相关参数来减少影响。
leader.imbalance.per.broker.percentage定义每个 broker 上允许的不平衡 leader 比例。默认值为 10%。当任意 broker 的不平衡 leader 比例超过此值时,控制器会触发 leader 重新分配过程。
leader.imbalance.check.interval.seconds控制检查 leader 分布是否平衡的时间间隔。默认值为 300 秒。

平衡算法

分区2的AR优先副本是0节点,但是0节点却不是Leader节点,所以不平衡数加1,AR副本总数是4,所以broker0节点不平衡率为1/4>10%,需要再平衡

增加副本因子

先创建一个三个分区一个副本的Topic

1
sh kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test --partitions 3 --replication-factor 1

查看Topic

创建存储计划

1
vim increase-replication-factor.json
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
{
"version":1,
"partitions":[
{
"topic":"test",
"partition":0,
"replicas":[
1,
2,
3
]
},
{
"topic":"test",
"partition":1,
"replicas":[
1,
2,
3
]
},
{
"topic":"test",
"partition":2,
"replicas":[
1,
2,
3
]
}
]
}

执行副本存储计划

1
sh kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file increase-replication-factor.json --execute

Kafka文件存储机制

每个Topic包含一个或多个Partition,每个Partition在物理存储层面对应一个文件夹。在这个文件夹下存储了Partition的 数据索引 文件。Partition内部的消息是有序的,但不保证多个Partition之间消息的顺序。

  • Topic是一类消息的集合,每条消息都需要指定一个Topic。在物理层面上,一个Topic会被划分为一个或多个Partition,每个Partition会有多个副本分布在不同的Broker中。
  • Partition在存储层面是由一系列 append log 文件组成,发布到此Partition的消息会被追加到log文件的尾部。这种 顺序写入 磁盘的方式较随机写入效率更高。每条消息在log文件中的位置由一个长整数型的offset(偏移量)表示,该offset唯一标识了一条消息。
  • 每个Partition的log文件会被分割成多个 Segment,每个Segment大小为1GB,并由 log 文件和 index 文件组成。
  • 消费者保存的唯一元数据是 offset 值,该值由消费者完全控制,保存在一个特殊的Topic或Zookeeper中。维度是consumer-group.topic.partition。
  • 不同于传统的消息队列,Kafka集群会保留所有消息,而不是在消费后立即删除,以优化IO操作。由于磁盘空间的限制,Kafka不可能永久保留所有消息,消息的保存期限可以通过配置来指定。

消息索引过程

log 文件和 index 文件的命名规则都是基于该文件中第一条消息的 offset。具体地,文件名是这个起始 offset 的长整数形式。这种命名方式便于在多个 log 文件中快速定位到包含特定 offset 的消息的文件。

稀疏索引:每当 log 文件增加了约4KB的数据,index 文件就会为这部分数据生成一条索引记录,这是稀疏的方式,而不是为 log 文件中的每一条消息都生成索引。

索引过程:

  • 通过偏移量定位到Partition目录下对应的index文件(文件名也是索引)
  • 通过index文件定位到指定offset消息的log文件偏移量(index是相对偏移量,可降低文件大小)
  • 通过index文件提供的偏移量读取log文件进行遍历(最坏情况下至多遍历4KB)

通过工具查看文件内容

1
sh kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.log

文件清理策略

Kafka 中默认的日志保存时间为 7 天,可以通过调整如下参数修改保存时间

参数说明
log.retention.hours最低优先级小时,默认 7 天(168)
log.retention.minutes分钟级别
log.retention.ms最高优先级毫秒
log.retention.check.interval.ms负责设置检查周期,默认 5 分钟
log.retention.bytes所有日志段的总大小上限。一旦达到这个大小,最旧的日志段将被删除

Kafka 中提供的日志清理策略有 delete(删除)compact(压缩) 两种

delete

log.cleanup.policy = delete 所有数据启用删除策略

  • 基于时间:默认打开。使用每个 Segment 中最大时间戳作为该日志段的时间戳。当消息在Kafka中存在的时间超过一定阈值时,Kafka将删除旧的日志段以释放磁盘空间。
  • 基于大小:当所有日志段的总大小超过设置的限制时,Kafka删除最早的日志段,以控制磁盘使用。

如果一个 segment 中有一部分数据过期,一部分没有过期,怎么处理?

等待 timeindex 文件的所有数据过期直接删除整个文件

compact

compact日志压缩:对于相同key的不同value值,只保留最后一个版本

log.cleanup.policy = compact 所有数据启用压缩策略

压缩后的offset可能是不连续的,比如上图中没有6,当从这些offset消费消息时,将会拿到比这个offset大的offset对应的消息,实际上会拿到offset为7的消息,并从这个位置开始消费。

这种策略适合一些特殊的场景,比如数据同步,同一条数据只保留最新版本。

数据高效读写

集群,分片技术

  • Kafka本身就是一个支持分布式的集群系统。
  • Topic采用分区技术,将多个分区分布在不同的Broker上。

顺序写盘

Kafka的producer负责生产数据,并将其顺序地追加到log文件的尾部。这种顺序写的方式在性能上有明显优势。据官方数据显示,相同的磁盘在顺序写的情况下能达到600M/s的速度,而随机写的速度仅为100K/s。这种性能差异主要是因为顺序写减少了磁头寻址的时间,而这与磁盘的机械结构有直接关系。

稀疏索引

在读取数据时利用稀疏索引,使得在大量的log文件中能迅速定位到数据对应的文件和该数据在文件中的偏移量位置。

页缓存和零拷贝

  • PageCache(页缓存):Kafka高度依赖操作系统提供的PageCache机制。当发生写操作时,数据首先被写入到PageCache中,而不是直接写入磁盘。读操作先从PageCache中搜索数据;如果PageCache中没有所需的数据,系统才会从磁盘读取。简言之,PageCache将大部分的空闲内存利用作为磁盘缓存。

  • 零拷贝:数据不会从内核空间拷贝到用户空间。实际上,在数据传输时,数据直接从内核空间复制到网卡,避免了不必要的数据复制操作。

Producer的零拷贝

  1. 消息存储:当你使用Kafka Producer发送消息时,消息首先被放入 RecordAccumulator。这是一个内部缓冲区,它负责批处理和组织消息,以便于高效发送。

  2. 准备发送:Kafka的 Sender 线程会从 RecordAccumulator 中提取批量消息。这些消息通常被组织为按目标分区和Broker的顺序。

  3. 零拷贝技术的实现:在传统的数据发送方法中,数据需要从应用程序的用户空间被拷贝到操作系统的内核空间,然后再从内核空间被拷贝到 Socket。这涉及至少两次数据拷贝。但是,使用零拷贝技术,Kafka可以直接从 RecordAccumulator(用户空间)发送数据到 Socket(内核空间),避免中间的拷贝步骤。

    这主要是通过Java的 FileChannel.transferTo 方法实现的。这允许数据从文件或缓冲区直接传输到网络套接字,绕过了额外的用户空间和内核空间之间的数据拷贝。

Broker的零拷贝

  • PageCache与磁盘交互:当数据被写入Kafka Broker或从Broker读取时,它们首先写入操作系统的 PageCache。这是操作系统为文件系统提供的一个高速缓存。
  • 实现:Kafka使用Java的NIO(非阻塞I/O)来实现这种零拷贝技术。特别是,它使用 FileChannel.transferTo 方法来从 PageCache 中直接将数据拷贝到 Socket。

Kafka消费者

消费者组重要参数

参数名称描述
bootstrap.servers向 Kafka 集群建立初始连接用到的 host/port 列表
key.deserializer
value.deserializer
指定接收消息的 key 和 value 的反序列化类型。一定要写全类名
group.id标记消费者所属的消费者组
enable.auto.commit默认值为 true,消费者会自动周期性地向服务器提交偏移量
auto.commit.interval.ms如果设置了 enable.auto.commit 的值为 true, 则该值定义了消费者偏移量向 Kafka 提交的频率,默认 5s
auto.offset.reset当 Kafka 中没有初始偏移量或当前偏移量在服务器中不存在 (如,数据被删除了),该如何处理?
earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
latest:默认,当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常
anything:向消费者抛异常
offsets.topic.num.partitions__consumer_offsets 的分区数,默认是 50 个分区
heartbeat.interval.msKafka 消费者和 coordinator 之间的心跳时间,默认 3s。 该条目的值必须小于 session.timeout.ms ,也不应该高于 session.timeout.ms 的 1/3
session.timeout.msKafka 消费者和 coordinator 之间连接超时时间,默认 45s。 超过该值,该消费者被移除,消费者组执行再平衡
max.poll.interval.ms消费者处理消息的最大时长,默认是 5 分钟。超过该值,该 消费者被移除,消费者组执行再平衡
fetch.min.bytes默认 1 个字节。消费者获取服务器端一批消息最小的字节数
fetch.max.wait.ms默认 500ms。如果没有从服务器端获取到一批数据的最小字节数。该时间到,仍然会返回数据
fetch.max.bytes默认 Default: 52428800(50 m)。消费者获取服务器端一批 消息最大的字节数。如果服务器端一批次的数据大于该值 (50m)仍然可以拉取回来这批数据,因此,这不是一个绝 对最大值。一批次的大小受 message.max.bytes (broker config)or max.message.bytes (topic config)影响
max.poll.records一次 poll 拉取数据返回消息的最大条数,默认是 500 条。

消费者组原理

Consumer Group (CG): 定义为一组具有相同 groupId 的消费者集合。

  • 在消费者组内,每个消费者专门负责消费某个或某些分区的数据。这确保了同一消息只会被组内的一个消费者处理。
  • 不同的消费者组间是独立的,它们的消费活动不会相互干扰。
  • 每个消费者必须关联到一个消费者组,可以将消费者组视作逻辑上的订阅者。
  • 为了确保有效的消费,消费者组中的消费者数量不应该超过其所消费 Topic 的分区数量。超过时,部分消费者将会处于空闲状态。
  • 具有相同 groupId 的消费者自动归入同一消费者组。

消费者组初始化流程

coordinator 协调器

作用

  1. 消费者组管理:
    • 再平衡: 如果消费者组内的成员关系发生变化(例如,新消费者加入、现有消费者离开或失败),coordinator 负责触发再均衡,重新分配分区给组内的消费者。
    • offset管理: coordinator 负责记录和存储消费者组内消费者所消费的消息的偏移量,这样消费者在失败后重启时可以从上次的位置继续消费。
    • 心跳检测: 通过消费者发送的心跳,coordinator 能够监控消费者的健康状态。如果在指定时间内没有收到某消费者的心跳,coordinator 会认为该消费者已经死亡并触发再均衡。
  2. 事务管理
    • 事务状态维护: coordinator 负责维护与生产者事务相关的状态和元数据。
    • 幂等性: 为了确保生产者重试不会导致数据的重复写入,coordinator 与生产者协作确保消息的幂等性。
    • 跨分区的事务原子性: 在涉及多个分区的事务中,coordinator 确保这些分区中的所有写操作要么全部成功,要么全部失败。
  3. 协调器选择:
    • 每个消费者组都有一个与之关联的 coordinator,这个 coordinator 是 Kafka 集群中的某个 broker。Kafka 的设计确保了协调器的负载均衡,每个 broker 都可以成为 coordinator。

Coordinator选择coordinator节点选择 = groupId的hashCode值 % __consumer_offsets的分区数。例如,若__consumer_offsets的分区数为50,且groupId的hashCode为1,则1 % 50 = 1。假设第1分区的Leader位于Broker1上,则Broker1将被选为该消费者组的coordinator节点。此消费者组内的所有消费者在提交offset时都会向此协调分区提交。

初始化流程

  1. 寻找协调器 (Coordinator):
    • 当一个消费者启动并尝试加入消费者组时,它首先需要找到负责其消费者组的协调器。消费者会向任意一个 Broker 发送 FindCoordinator 请求来找到负责其 groupId 的协调器。
  2. 发送 JoinGroup 请求:
    • 消费者找到协调器后,会发送一个 JoinGroup 请求来加入消费者组。
    • 如果这是一个全新的 groupId(即组内没有任何消费者),则协调器会立即接受此消费者作为组的首个成员。如果消费者组已经存在,消费者需要等待协调器触发重新平衡来加入组。
  3. 消费者组 Leader 选举:
    • 对于消费者组中的所有成员,协调器会选择一个作为 leader。leader 的职责是根据消费者的数量和主题的分区来确定 分区的分配策略
    • 由Coordinator自动选举,第一个发出 JoinGroup 的 Broker 当选。
  4. 分区分配:
    • 协调器会向消费者组 Leader 提供组内所有消费者的信息。leader 根据指定的分区分配策略将分区分配给组内的消费者,并将分配的结果返回给协调器。
  5. 同步消费者:
    • 协调器将分区的分配结果发送给组内的所有消费者。这是通过 SyncGroup 请求完成的。
  6. 开始消费:
    • 一旦消费者收到其分配的分区,它会开始从指定的 offset 开始消费消息。
  7. 维持心跳:
    • 为了告诉协调器它仍然是活跃的,并且正在消费消息,消费者会定期发送心跳。如果在指定的会话超时时间内,协调器没有收到来自消费者的心跳,它会认为该消费者已经失败,并可能触发再平衡。

再平衡

  • 心跳与会话超时:消费者确实会向其coordinator发送心跳以表示它仍然活跃。heartbeat.interval.ms 定义了发送心跳的频率,而 session.timeout.ms 定义了协调器等待心跳的时间。如果在 session.timeout.ms (45s)时间内,coordinator没有收到心跳,消费者将被认为已经死亡。

  • 最大轮询间隔max.poll.interval.ms (5分钟)定义了消费者可以在两次 poll() 调用之间的最长时间。如果超过此时间,消费者会被认为是不活跃的,并从组中移除,从而可能触发再平衡。

  • 消费者的加入和离开:消费者的加入和离开确实都可能触发再平衡。频繁的再平衡不仅可能影响Kafka集群的性能,而且可能导致消息处理的延迟。

消费者拉取流程

  1. 初始化
    • 当消费者启动时,它会创建一个NetworkClient实例,消费者与Kafka建立网络连接。
  2. 查找Coordinator
    • 在发送拉取请求之前,消费者首先需要找到Consumer Group的Coordinator,以便正确跟踪偏移量和管理组成员关系。
  3. Partition分配
    • 如果是新的消费者或者消费者组重新平衡后,消费者会通过Coordinator确定自己要消费哪些Topic的哪些分区。
  4. 发送拉取请求
    • 一旦知道要从哪些分区拉取,消费者会使用sendFetches方法发送请求。请求中指定了Topic、分区和开始的偏移量。
  5. Broker处理
    • 当Broker接收到拉取请求后,它会开始检查自己是否有满足请求条件的数据。Broker会等待,直至数据量达到1KB至50MB(默认500条数据),或达到了预设的超时时间。
  6. 数据接收与缓存
    • 消费者接收到数据后,会将其暂存到内部的队列中,此队列主要用于暂存数据,以便逐条处理。
  7. 数据处理
    • 从内部队列中拉取数据,并开始处理。如果数据处理出现异常,消费者可以配置重试策略或直接跳过。
  8. 偏移量提交
    • 为了避免重复消费或消息遗失,消费者在处理完消息后,会将当前的偏移量提交回Coordinator。这样,在下一次启动或者重新平衡后,消费者知道从哪里开始拉取数据。

消费者API

kafka-client

监听Topic

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@Slf4j
// 订阅主题测试
public class TopicTest{

protected KafkaConsumer kafkaConsumer;

@BeforeEach
public void init() {
// 1. 创建 kafka 生产者的配置对象
Properties properties = new Properties();
// 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// key,value 反序列化(必须)
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());

// 配置消费者组id
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");

// 创建消费者对象
kafkaConsumer = new KafkaConsumer<String, String>(properties);
}

@Test
public void listenerTest() {
// 注册要消费的主题(可以消费多个主题)
ArrayList<String> topics = new ArrayList<>();
topics.add("first");
kafkaConsumer.subscribe(topics);

// 拉取数据打印
while (true) {
// 设置 1s 中消费一批数据
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));

// 打印数据
for (ConsumerRecord<String, String> record : consumerRecords) {
log.info(record.toString());
}
}
}
}

监听指定分区

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Slf4j
public class PartitionTest {

protected KafkaConsumer kafkaConsumer;

@BeforeEach
public void init() {
// 1. 创建 kafka 生产者的配置对象
Properties properties = new Properties();
// 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// key,value 反序列化(必须)
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());

// 配置消费者组id
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");

// 创建消费者对象
kafkaConsumer = new KafkaConsumer<String, String>(properties);
}

@Test
public void listenerPartitionTest() {
// 消费某个主题的某个分区数据
ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
topicPartitions.add(new TopicPartition("first", 0));
kafkaConsumer.assign(topicPartitions);

while (true){
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
log.info(consumerRecord.toString());
}
}
}
}

Spring Boot

添加依赖

1
2
3
4
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
server:
port: 9001

spring:
kafka:
bootstrap-servers: localhost:9092 # 连接kafka的地址,多个地址用逗号分隔
consumer:
properties:
session.timeout.ms: 15000
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

监听Topic

1
2
3
4
5
6
7
8
@Slf4j
@Component
public class ListenerTopic {
@KafkaListener(groupId = "test", topics = "first")
public void listener(ConsumerRecord<String, String> record) {
log.info("监听消息:{}", record.topic(), record.value());
}
}

监听指定指定分区

1
2
3
4
5
6
7
8
9
@Slf4j
@Component
public class ListenerPartition {

@KafkaListener(groupId = "test", topicPartitions = @TopicPartition(topic = "first", partitions = "0"))
public void listener(String msg) {
log.info("监听消息:{}", msg);
}
}

分区分配策略

Topic 可以被划分为多个分区,一个消费组内可以有多个消费者,因此分区和消费者之间是多对多的关系。为了实现分区的合理分配,必然需要一种策略将 Topic 的分区分配给对应的消费者进行消费。

需要注意的是,消费者的上下线会触发分区分配的再平衡过程,该过程会根据所选的分区分配策略重新分配分区,以保证分区数据的正常消费。

Range(默认,范围)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
assign(topic, consumers) {
// 对分区和Consumer进行排序
List<Partition> partitions = topic.getPartitions();
sort(partitions);
sort(consumers);
// 计算每个Consumer分配的分区数
int numPartitionsPerConsumer = partition.size() / consumers.size();
// 额外有一些Consumer会多分配到分区
int consumersWithExtraPartition = partition.size() % consumers.size();
// 计算分配结果
for (int i = 0, n = consumers.size(); i < n; i++) {
// 第i个Consumer分配到的分区的index
int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
// 第i个Consumer分配到的分区数
int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);
// 分装分配结果
assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length));
}
}

Kafka的 Range 分区分配策略是消费者 默认 的分区分配策略。它的工作原理是为每个消费者分配连续的分区范围。

工作原理

  1. 排序:首先,所有的Topic的分区和消费者都会被排序。
    • 分区是基于Topic名称和分区号进行排序的,通常是字典序。
    • 消费者是基于它们的消费者ID进行排序的。
  2. 分区分配:每个Topic下的分区都会按照消费者数量进行划分。每个消费者首先会获得 总分区数 ÷ 总消费者数 的分区。然后,如果存在余数,会按照排序的顺序,将额外的分区分配给前面的消费者。

例子

假设我们有一个Topic A,它有8个分区:A-0, A-1, ... A-7。现在有3个消费者 C1, C2, C3

  1. 按照 Range 策略,首先将分区和消费者进行排序。在这种情况下,分区和消费者已经是有序的。
  2. 接下来,8个分区被3个消费者平均分配。这意味着每个消费者应该处理2到3个分区。
  3. 8 ÷ 3 = 2 与余数 2。因此,每个消费者最初获得2个分区,但由于有2个额外的分区,它们会被分配给C1C2
  4. 结果是:C1 负责处理 A-0, A-1, A-2C2 负责处理 A-3, A-4, A-5;而 C3 只处理 A-6, A-7

数据倾斜

按照Topic维度分配

Range 分配策略下,当处理单一Topic时,如果分区数不能被消费者数整除,额外的分区分配给前面的消费者,但这种差异相对较小。然而,当消费者组订阅了多个这样的Topic时,这种不均衡的情况会被放大。具体地说,对于每一个这样的Topic,排序靠前的消费者会多被分配一个分区。随着Topic数量的增加,这种不均衡会累积,导致排序靠前的消费者,被分配到的分区数远多于其他消费者,从而产生明显的数据倾斜。

适用场景

  • 单 Topic 场景:当只有一个 Topic 时,Range 分区策略可以保证分区间的负载均衡,每个消费者处理的分区数量相近。
  • Topic 分区数量都能整除消费者数量:在多个 Topic 的情况下,如果 Topic 的数量和分区数量能整除消费者数量,Range 分区策略也能够保证负载均衡。

RoundRobin(轮询)

RoundRobin 轮询分区策略,主要特点是以轮询的方式平均分配分区,确保每个消费者处理的分区数量大致相同,从而实现负载均衡。

工作原理:

  1. 首先,将所有的Topic分区和消费者分别进行排序。
  2. 然后,依次遍历每个Topic的分区,并按照轮询的方式分配给消费者。

例子

假设有两个Topic:TopicA有3个分区(A0, A1, A2),TopicB有2个分区(B0, B1),同时有两个消费者C1和C2。根据RoundRobin策略,分区分配如下:

  • C1分配到分区:A0, A2, B1
  • C2分配到分区:A1, B0

轮询分区,为每个消费者分配

数据倾斜

假设有三个Topic:T1(拥有三个分区)、T2(一个分区)和T3(一个分区)。一个消费者组内包含三个消费者:C0、C1和C2。其中,C0订阅了T1、T2和T3;C1和C2只订阅了T1。在这种情况下,由于RoundRobin分配策略的机制,C0会承担更多的分区消费任务,从而可能导致数据倾斜问题。

适用场景

  • 多Topic:适用于多个Topic的场景,可以更均匀地分配各个Topic的分区,减少数据倾斜的可能性。
  • 负载均衡:通过轮询的方式,确保每个消费者分配到的分区数量大致相同,从而实现负载均衡。

RoundRobin分配策略适合消费组内消费者订阅的Topic列表是相同的,在这种情况下,通过轮询分配可以确保各个消费者尽可能平均地分配到分区。

Sticky(粘性)

Sticky 分区分配策略是为了解决消费者上下线时引起的分区再平衡。尽可能保持之前的分配关系避免再平衡带来的性能损耗。

  1. 首次分配:识别每个消费者可以接受的分区数量的上下限,然后尽量均衡分配
  2. 维持当前的分配:当需要再平衡时,策略首先考虑维持当前的分区-消费者关系。
  3. 最小化重新分配:如果消费者组发生变化,该策略会尽量只重新分配受影响的分区,而不是所有分区。

适用场景

消费者频繁上下线场景,可以通过参考上次分配的结果减少调整分配的变动

分区策略配置

Kafka提供的分区策略

  • org.apache.kafka.clients.consumer.RangeAssignor

  • org.apache.kafka.clients.consumer.RoundRobinAssignor

  • org.apache.kafka.clients.consumer.StickyAssignor

kafka-client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Slf4j
public class PartitionAssignorTest {
protected KafkaConsumer kafkaConsumer;

@BeforeEach
public void init() {
// 1. 创建 kafka 生产者的配置对象
Properties properties = new Properties();
// 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091,localhost:9092,localhost:9093");
// key,value 反序列化(必须)
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());

// 配置消费者组id
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");

// 分区分配策略
//properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName());
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());
//properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StickyAssignor.class.getName());

// 创建消费者对象
kafkaConsumer = new KafkaConsumer<String, String>(properties);
}
}

Spring Boot

1
2
3
4
5
6
7
8
9
spring:
kafka:
bootstrap-servers: localhost:9092 # 连接kafka的地址,多个地址用逗号分隔
consumer:
properties:
partition.assignment.strategy: org.apache.kafka.clients.consumer.RoundRobinAssignor # 分区策略
session.timeout.ms: 15000
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

自动提交offset

为了简化offset管理,从而让开发者更加专注于业务逻辑,Kafka提供了自动提交offset的功能。

关于自动提交offset的相关参数:

  • enable.auto.commit:决定是否启用自动提交offset功能,默认值为true
  • auto.commit.interval.ms:定义了两次自动提交之间的时间间隔,默认是5000毫秒(即5秒)。

自动提交存在的问题

  • 数据丢失:当消息刚被消费,但还未在业务逻辑中完全处理结束时,如果到达自动提交周期,offset就会被自动提交。此时,如果消费者宕机,那么在重启并重新消费处理时,可能会导致数据丢失。
  • 数据重复:如果消息已经被成功消费并处理,但在自动提交offset的周期到达前发生宕机,那么offset将不会被自动提交。因此,在重启服务后,系统将根据上一次提交的offset重新消费消息,这就导致了消息的重复消费。

kafka-client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Slf4j
public class AutoCommitTest {
protected KafkaConsumer kafkaConsumer;

@BeforeEach
public void init() {
// 1. 创建 kafka 生产者的配置对象
Properties properties = new Properties();
// 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091,localhost:9092,localhost:9093");
// key,value 反序列化(必须)
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());

// 是否自动提交 offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
// 提交 offset 的时间周期 1000ms,默认 5s
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);

// 配置消费者组id
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");

// 创建消费者对象
kafkaConsumer = new KafkaConsumer<String, String>(properties);
}
}

Spring Boot

1
2
3
4
5
6
7
8
spring:
kafka:
bootstrap-servers: localhost:9092 # 连接kafka的地址,多个地址用逗号分隔
consumer:
enable-auto-commit: true # 开启自动提交(默认)
auto-commit-interval: 1000 # 自动提交间隔时间(默认5秒)
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

手动提交offset

虽然自动提交offset十分简单便利,但由于其是基于时间周期性提交,开发人员难以精准把握offset提交的时机。因此,Kafka还提供了手动提交offset的API。

手动提交offset的方法主要有两种:分别是commitSync(同步提交)和commitAsync(异步提交)。两者的共同点在于,都会提交消费者成功处理的最后一条消息的偏移量 + 1。

  • commitSync(同步提交):这种方式会阻塞当前线程,直到offset提交成功或失败。如果提交失败,可以选择重试。因为该方法会返回提交成功或失败的结果,可以在需要确保offset准确性的场景下使用。
  • commitAsync(异步提交):这种方式在提交offset时不会阻塞当前线程,同时可以提供一个回调函数来处理提交成功或失败的情况。由于是异步的,所以性能较好,但需要注意处理可能出现的提交失败的情况。

kafka-client

同步提交

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@Slf4j
public class CommitSyncTest {
protected KafkaConsumer kafkaConsumer;

@BeforeEach
public void init() {
// 1. 创建 kafka 生产者的配置对象
Properties properties = new Properties();
// 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091,localhost:9092,localhost:9093");
// key,value 反序列化(必须)
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());

// 取消自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

// 配置消费者组id
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");

// 创建消费者对象
kafkaConsumer = new KafkaConsumer<String, String>(properties);
}

@Test
public void listenerPartitionTest() {
// 消费某个主题的某个分区数据
ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
topicPartitions.add(new TopicPartition("first", 0));
kafkaConsumer.assign(topicPartitions);

while (true){
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));

for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
log.info(consumerRecord.toString());
}

// 手动同步提交
kafkaConsumer.commitSync();
}
}
}

手动异步提交

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@Slf4j
public class CommitASyncTest {
protected KafkaConsumer kafkaConsumer;

@BeforeEach
public void init() {
// 1. 创建 kafka 生产者的配置对象
Properties properties = new Properties();
// 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091,localhost:9092,localhost:9093");
// key,value 反序列化(必须)
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());

// 取消自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

// 配置消费者组id
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");

// 创建消费者对象
kafkaConsumer = new KafkaConsumer<String, String>(properties);
}

@Test
public void listenerPartitionTest() {
// 消费某个主题的某个分区数据
ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
topicPartitions.add(new TopicPartition("first", 0));
kafkaConsumer.assign(topicPartitions);

while (true){
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));

for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
log.info(consumerRecord.toString());
}

// 手动异步提交
kafkaConsumer.commitAsync();
}
}
}

Spring Boot

  • 关闭手动提交:spring.kafka.consumer.enable-auto-commit: false
  • 配置监听应答模式:spring.kafka.listener.ack-mode: manual
1
2
3
4
5
6
7
8
9
spring:
kafka:
bootstrap-servers: localhost:9092 # 连接kafka的地址,多个地址用逗号分隔
consumer:
enable-auto-commit: false # 关闭自动提交
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
ack-mode: manual
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Slf4j
@Component
public class TestListener {

@KafkaListener(topics = "first", groupId = "test")
public void testTopic(String message, Acknowledgment acknowledgment, Consumer<String, String> consumer) {
log.info("接收到消息:{}", message);

// 手动同步提交
// consumer.commitSync();

// 手动异步提交
// consumer.commitAsync();

// ACK手动提交offset, 如果整合了spring boot 建议使用这种
// acknowledgment.acknowledge();

// 不进行ack确认,让监听器线程休眠指定毫秒后重新开始消费此条消息
// acknowledgment.nack(100);
}
}

offset消费策略

auto.offset.reset 可选值: earliestlatestnone。默认设置为 latest

策略说明
earliest当指定分区有已提交的offset时,从提交的offset开始消费;如果没有提交的offset,则从分区起始位置开始消费。
latest当指定分区有已提交的offset时,从提交的offset开始消费;如果没有提交的offset,则只消费该分区新产生的数据。
none如果指定分区没有已提交的offset,则会抛出异常,不会从任何位置开始消费。

kafka-client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@Slf4j
public class AutoOffsetResetTest {
protected KafkaConsumer kafkaConsumer;

@BeforeEach
public void init() {
// 1. 创建 kafka 生产者的配置对象
Properties properties = new Properties();
// 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091,localhost:9092,localhost:9093");
// key,value 反序列化(必须)
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());

// 指定offset策略
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

// 配置消费者组id
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");

// 创建消费者对象
kafkaConsumer = new KafkaConsumer<String, String>(properties);
}

@Test
public void listenerTest() {
// 注册要消费的主题(可以消费多个主题)
ArrayList<String> topics = new ArrayList<>();
topics.add("first");
kafkaConsumer.subscribe(topics);

// 拉取数据打印
while (true) {
// 设置 1s 中消费一批数据
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));

// 打印数据
for (ConsumerRecord<String, String> record : consumerRecords) {
log.info(record.toString());
}
}
}
}

Spring Boot

1
2
3
4
5
6
7
8
9
10
11
spring:
kafka:
bootstrap-servers: localhost:9092 # 连接kafka的地址,多个地址用逗号分隔
consumer:
auto-offset-reset: earliest # offset 消费策略
enable-auto-commit: false # 开启自动提交(默认)
auto-commit-interval: 1000 # 自动提交间隔时间(默认5秒)
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
ack-mode: manual

如何重新消费队列数据

  • 更换新的消费者组
  • 设置 auto.offset.resetearliest;重新从头消费

指定offset消费

异常恢复

任意指定 offset 位移开始消费,需要对每个分区进行设置

kafka-client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
@Slf4j
public class SpecifyOffsetTest {
protected KafkaConsumer kafkaConsumer;

@BeforeEach
public void init() {
// 1. 创建 kafka 生产者的配置对象
Properties properties = new Properties();
// 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091,localhost:9092,localhost:9093");
// key,value 反序列化(必须)
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());

// 配置消费者组id
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test2");

// 创建消费者对象
kafkaConsumer = new KafkaConsumer<String, String>(properties);

// 2 订阅一个主题
ArrayList<String> topics = new ArrayList<>();
topics.add("first");
kafkaConsumer.subscribe(topics);

// 获取分区信息
Set<TopicPartition> assignment = new HashSet<>();
while (assignment.size() == 0) {
kafkaConsumer.poll(Duration.ofSeconds(1));
// 获取消费者分区分配信息(有了分区分配信息才能开始消费)
assignment = kafkaConsumer.assignment();
}

// 遍历所有分区,并指定 offset 从 100 的位置开始消费
for (TopicPartition tp : assignment) {
kafkaConsumer.seek(tp, 100);
}
}


@Test
public void listenerTest() {
// 注册要消费的主题(可以消费多个主题)
ArrayList<String> topics = new ArrayList<>();
topics.add("first");
kafkaConsumer.subscribe(topics);

// 拉取数据打印
while (true) {
// 设置 1s 中消费一批数据
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));

// 打印数据
for (ConsumerRecord<String, String> record : consumerRecords) {
log.info("topic:{},offset:{},value:{}", record.topic(), record.offset(), record.value());
}
}
}
}

Spring Boot

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Slf4j
@Component
public class SpecifyOffsetListener {

@KafkaListener(groupId = "test3", topics = "first", topicPartitions = {
@TopicPartition(topic = "first", partitionOffsets = {
@PartitionOffset(partition = "0", initialOffset = "500"),
@PartitionOffset(partition = "1", initialOffset = "500"),
@PartitionOffset(partition = "2", initialOffset = "500")
})
})
public void listener(ConsumerRecord<String, String> record) {
log.info("topic:{},offset:{},value:{}", record.topic(), record.offset(), record.value());
}
}

指定时间消费offset

异常恢复

在生产环境中,会遇到最近消费的几个小时数据异常,想重新按照时间消费。例如要求按照时间消费前一天的数据,怎么处理?

从Kafka 0.10.0版本开始,每条消息(即每个record)都有一个关联的时间戳。时间戳可以是消息创建时间(默认)或消息追加到log时的时间。

  1. CreateTime:这是默认值。时间戳代表消息创建的时间。通常,它是在生产者客户端设置的,代表消息被发送之前的时间。
  2. LogAppendTime:时间戳代表消息被追加到log的时间。这是在broker上设置的,代表消息被写入日志的时间。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
@Slf4j
public class TimeOffsetListener {
protected KafkaConsumer kafkaConsumer;

@BeforeEach
public void init() {
// 1. 创建 kafka 生产者的配置对象
Properties properties = new Properties();
// 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091,localhost:9092,localhost:9093");
// key,value 反序列化(必须)
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());

// 指定offset策略
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

// 配置消费者组id
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");

// 创建消费者对象
kafkaConsumer = new KafkaConsumer<String, String>(properties);

// 注册要消费的主题(可以消费多个主题)
ArrayList<String> topics = new ArrayList<>();
topics.add("first");
kafkaConsumer.subscribe(topics);
}

/**
* kafka 提供基于时间获取offset的API
*/
@Test
public void offsetForTimesTest() {

// 获取分区信息
Set<TopicPartition> assignment = new HashSet<>();
while (assignment.size() == 0) {
kafkaConsumer.poll(Duration.ofSeconds(1));
// 获取消费者分区分配信息(有了分区分配信息才能开始消费)
assignment = kafkaConsumer.assignment();
}

HashMap<TopicPartition, Long> timestampToSearch = new HashMap<>();

// 封装集合存储,每个分区对应一天前的数据
for (TopicPartition topicPartition : assignment) {
timestampToSearch.put(topicPartition, System.currentTimeMillis() - 1 * 24 * 3600 * 1000);
}
// 获取从 1 天前开始消费的每个分区的 offset
Map<TopicPartition, OffsetAndTimestamp> offsets = kafkaConsumer.offsetsForTimes(timestampToSearch);

offsets.forEach((k ,v) -> {
log.info("partition:{}", k.partition());
log.info("offset:{}", v.offset());
});
}

// 根据时间获取到每个分区对应的offset,再调用kafkaConsumer.seek(...),设置每个分区的offset即可
}

SpringBoot

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Service
public class KafkaTimeBasedConsumerService {

@Autowired
private ConsumerFactory<String, String> consumerFactory;

// 这里使用了Java 8的Instant类作为时间的表示
public void consumeFromTimestamp(String topic, Instant timestampToStartFrom) {
try (Consumer<String, String> consumer = consumerFactory.createConsumer()) {
List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
List<TopicPartition> topicPartitions = partitionInfos.stream()
.map(partitionInfo -> new TopicPartition(partitionInfo.topic(), partitionInfo.partition()))
.collect(Collectors.toList());

Map<TopicPartition, Long> timestampToSearch = topicPartitions.stream()
.collect(Collectors.toMap(Function.identity(), tp -> timestampToStartFrom.toEpochMilli()));

Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestampToSearch);

if (offsets != null) {
consumer.assign(offsets.keySet());
offsets.forEach((tp, ot) -> {
if (ot != null) {
consumer.seek(tp, ot.offset());
}
});

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
// Process your record here
System.out.println("Received Message with timestamp: " + record.timestamp() + " Value: " + record.value());
}
}
}
}
}

消费者事务

实现Kafka的精确一次(Exactly Once Semantics, EOS)消费需要综合考虑生产、消费、处理以及偏移量存储的方面。

  1. 生产者
    • 开启幂等性,enable.idempotence=true
    • 使用事务,transactional.id=<some-unique-id>
  2. 消费者
    • 为了消费那些已提交的事务消息,消费者需要将 isolation.level 配置设置为 read_committed。这样,消费者只会读取已提交事务的消息,并忽略未提交的消息。
    • 默认情况下,isolation.levelread_uncommitted,这意味着消费者会读取所有消息,无论它们是否在事务中。
    • 手动提交 offset
  3. 消费者与Offset管理
    • 当消费者处理事务消息时,为了确保EOS,它们需要在处理消息的同时,也在同一个事务中提交offset。
    • 这确保了处理消息和提交offset是原子操作,从而实现了end-to-end的EOS。
  4. 同步处理和外部系统
    • 如果消费者的处理逻辑涉及与外部系统的交互,那么需要确保这种交互是幂等的。
    • 例如,如果消费者需要将消息数据写入一个数据库,那么这个写操作应该是可以安全地重复的,以确保在面对失败和重试时不会有不一致的情况。

数据积压

消费能力不足

当Kafka遭遇消费能力不足导致数据积压时,可以通过以下策略来解决:

  1. 增加Topic的分区数以提高并行处理的能力。
  2. 增加消费组内的消费者数量。为了实现最佳的负载均衡,消费者的数量应该等于或小于分区数。

注意,仅增加分区数而不增加消费者数量(或反之)可能不会显著提高消费速度。两者应协同工作以实现最佳效果。

数据处理不及时

参数描述
fetch.max.bytesfetch.max.bytes 是 Kafka 消费者的一个配置参数,其默认值为 52428800(约为 50MB)。这个参数定义了消费者在单次拉取操作中可以从Kafka服务器获取的消息的最大字节数。
max.poll.records一次 poll 拉取数据返回消息的最大条数,默认是 500 条

当Kafka的数据积压是由于下游处理不及时引起的,可以考虑以下策略来解决:

  1. 增加每批次从Kafka拉取的消息数量,从而提高处理的吞吐量。
  2. 分析并优化数据处理的效率,确保处理速度至少与生产速度相匹配。

EFAK 安装

java 环境变量配置

EFAK 安装

Kafka调优

调优篇

Kraft模式

Kraft模式搭建

左图为 Kafka 现有架构,元数据在 zookeeper 中,运行时动态选举 controller,由 controller 进行 Kafka 集群管理。右图为 kraft 模式架构(实验性),不再依赖 zookeeper 集群, 而是用三台 controller 节点代替 zookeeper,元数据保存在 controller 中,由 controller 直接进 行 Kafka 集群管理

这样做的好处有以下几个

  • Kafka 不再依赖外部框架,而是能够独立运行
  • controller 管理集群时,不再需要从 zookeeper 中先读取数据,集群性能上升
  • 由于不依赖 zookeeper,集群扩展时不再受到 zookeeper 读写能力限制
  • controller 不再动态选举,而是由配置文件规定。这样我们可以有针对性的加强controller 节点的配置,而不是像以前一样对随机 controller 节点的高负载束手无策

接入外部系统

Flume

Spark


Kafka
https://wugengfeng.cn/2022/02/24/Kafka/
作者
wugengfeng
发布于
2022年2月24日
许可协议