config
目录下的文件列表
config/
├── connect-console-sink.properties # Kafka Connect的控制台Sink连接器配置,用于从Kafka读取数据并写入控制台
├── connect-console-source.properties # Kafka Connect的控制台Source连接器配置,用于从控制台读取数据并写入Kafka
├── connect-distributed.properties # Kafka Connect分布式模式配置,用于在集群中运行多个Kafka Connect实例
├── connect-file-sink.properties # Kafka Connect的文件Sink连接器配置,用于将数据从Kafka写入文件
├── connect-file-source.properties # Kafka Connect的文件Source连接器配置,用于从文件读取数据并写入Kafka
├── connect-log4j.properties # Kafka Connect的Log4j日志配置
├── connect-mirror-maker.properties # Kafka MirrorMaker配置,用于在不同Kafka集群之间复制数据
├── connect-standalone.properties # Kafka Connect独立模式配置,用于单实例运行Kafka Connect
├── consumer.properties # Kafka消费者配置文件,用于配置如何消费消息
├── kraft # KRaft模式配置文件夹(KRaft是去Zookeeper的Kafka模式)
│ ├── broker.properties # KRaft模式中的Kafka broker配置
│ ├── controller.properties # KRaft模式中的控制器配置,用于管理Kafka集群
│ ├── reconfig-server.properties # KRaft模式中的重新配置服务配置
│ └── server.properties # KRaft模式中的Kafka服务器配置,类似于传统的`server.properties`
├── log4j.properties # Kafka的Log4j日志配置文件
├── producer.properties # Kafka生产者配置文件,用于配置如何生产消息
├── server.properties # Kafka服务器配置文件,配置Kafka broker的基本设置
├── tools-log4j.properties # Kafka工具的Log4j日志配置,通常用于Kafka命令行工具
├── trogdor.conf # Trogdor配置文件,Trogdor是用于生成负载的Kafka性能测试工具
└── zookeeper.properties # Zookeeper配置文件,用于配置Zookeeper服务(如果你使用的是传统的Zookeeper模式)
-
Kafka Connect 配置文件:用于配置 Kafka Connect 的连接器(Source、Sink),这允许你通过连接器将数据从外部系统(如文件、数据库等)导入 Kafka,或者将 Kafka 数据导出到外部系统。
-
KRaft 配置文件:用于配置 Kafka 去 Zookeeper 模式(KRaft模式),这是 Kafka 近期的一个新特性,旨在消除对 Zookeeper 的依赖,所有的元数据和集群管理功能由 Kafka 本身管理。
常规配置文件
-
server.properties
用于配置 Kafka broker,包括端口、日志目录、Zookeeper 连接等。 -
consumer.properties
和producer.properties
分别用于配置 Kafka 消费者和生产者。 -
log4j.properties
用于日志配置,定义日志输出格式和级别。
config/producer.properties
[root@VM-0-16-centos config]# cat producer.properties
# 由 Apache 软件基金会(ASF)许可,依据一个或多个贡献者许可协议。有关版权所有权的更多信息,请参见随此工作一起分发的 NOTICE 文件。
# ASF 根据 Apache 许可协议第 2.0 版("许可")授权您使用此文件;除非符合许可的规定,否则不得使用此文件。
# 您可以在以下网址获取许可证副本:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律要求或书面协议另有规定,否则根据许可分发的软件是按 "原样" 提供的,不附有任何明示或暗示的担保或条件。
# 请参见许可证,了解有关权限和限制的具体规定。
# 有关更多详情,请参见 org.apache.kafka.clients.producer.ProducerConfig。
############################# 生产者基本设置 #############################
# 用于启动的 broker 列表,获取集群中其余部分的信息
# 格式:host1:port1,host2:port2 ...
bootstrap.servers=localhost:9092
# 这是 Kafka 集群中一个或多个 broker 的地址,生产者需要使用它来连接集群。通常可以是多个 broker 地址,生产者会从中选择一个作为启动连接的目标。
# 指定所有生成数据的压缩编解码器:none, gzip, snappy, lz4, zstd
compression.type=none
# 这是指定 Kafka 生产者在发送消息时使用的压缩方式,常见的压缩方式包括:`none`(不压缩)、`gzip`、`snappy`、`lz4` 和 `zstd`。压缩可以减少带宽的消耗,提高吞吐量,但也可能会增加 CPU 的负担。
# 用于分区记录的分区器类的名称;
# 默认使用 "sticky" 分区逻辑,它将负载均匀地分配到各个分区,但通过尝试填充发送到每个分区的批量数据来提高吞吐量。
#partitioner.class=
# Kafka 默认采用“粘性”分区器,这意味着它会尽量将消息发送到相同的分区,直到该分区的批次满载,之后才会选择其他分区。
# 客户端等待请求响应的最大时间
#request.timeout.ms=
# 这是生产者等待请求响应的最大时间,单位是毫秒。如果在此时间内没有收到响应,生产者将会抛出超时错误。
# `KafkaProducer.send` 和 `KafkaProducer.partitionsFor` 阻塞的最长时间
#max.block.ms=
# 该配置控制在调用 `send()` 或 `partitionsFor()` 时,生产者等待发送的最大阻塞时间。超过此时间将会抛出异常。
# 生产者将在给定的延迟内等待,以便允许其他记录一起发送,从而将发送的记录批量化
#linger.ms=
# 这个配置指定生产者在发送消息之前等待其他记录加入当前批次的最长时间。较高的值可以提高吞吐量,但会增加延迟。
# 请求的最大大小(以字节为单位)
#max.request.size=
# 这是生产者发送单个请求时允许的最大字节数。如果消息过大,生产者将会拒绝该请求。
# 批量发送到一个分区的多个记录时的默认批量大小(以字节为单位)
#batch.size=
# 这是生产者批量发送记录时的默认批量大小。调整这个值可以优化批量发送性能。较大的批量通常能提高吞吐量,但也可能增加延迟。
# 生产者用于缓冲等待发送到服务器的记录的总内存字节数
#buffer.memory=
# 这是生产者用来缓冲待发送记录的内存大小。如果生产者的缓存区满了,它将阻塞发送新消息直到缓存中有足够的空间。
bootstrap.servers:指定 Kafka 集群的 broker 列表,生产者会通过这些地址进行初次连接。
compression.type:控制消息的压缩类型,压缩可以节省带宽,但可能会增加 CPU 开销。none
表示不压缩,gzip
、snappy
、lz4
和 zstd
都是常见的压缩格式。
partitioner.class:用于定义分区的策略。默认使用“粘性”分区策略,通常没有必要修改这个配置。
request.timeout.ms:指定生产者请求响应的最大等待时间。如果响应超时,将会报错。
max.block.ms:控制生产者在发送消息时等待时间的上限,超过该时间没有得到响应会抛出异常。
linger.ms:设置生产者等待时间,以便在发送消息之前批量处理多个消息。增加这个时间可以提高吞吐量,但会带来更高的延迟。
max.request.size:限制生产者发送单个请求时的最大字节数。
batch.size:设置批量发送的默认批量大小,增加批量大小可以提升吞吐量,但也会增加延迟。
buffer.memory:配置生产者缓存数据的总内存大小。如果内存满了,生产者将会阻塞等待空间。
config/server.properties
[root@VM-0-16-centos config]# cat server.properties
# 由 Apache 软件基金会(ASF)许可,依据一个或多个贡献者许可协议。有关版权所有权的更多信息,请参见随此工作一起分发的 NOTICE 文件。
# ASF 根据 Apache 许可协议第 2.0 版("许可")授权您使用此文件;除非符合许可的规定,否则不得使用此文件。
# 您可以在以下网址获取许可证副本:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律要求或书面协议另有规定,否则根据许可分发的软件是按 "原样" 提供的,不附有任何明示或暗示的担保或条件。
# 请参见许可证,了解有关权限和限制的具体规定。
#
# 该配置文件适用于基于 ZK 模式,其中需要使用 Apache ZooKeeper。
# 详细信息请参阅 kafka.server.KafkaConfig
#
############################# 服务器基本设置 #############################
# broker 的 ID。每个 broker 必须设置一个唯一的整数值。
broker.id=1
# 每个 Kafka broker 都需要一个唯一的 ID,用于标识该 broker。
############################# 套接字服务器设置 #############################
# 套接字服务器监听的地址。如果未配置,主机名将等于 java.net.InetAddress.getCanonicalHostName(),
# 并使用 PLAINTEXT 监听器名称,端口 9092。
# 格式:
# listeners = listener_name://host_name:port
# 示例:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://localhost:9092
# 配置 Kafka broker 监听的地址和端口。此处的 `PLAINTEXT` 表示非加密连接。
# broker 向客户端公开的地址和端口。 如果未设置,默认使用 "listeners" 的值。
#advertised.listeners=PLAINTEXT://your.host.name:9092
# 这是向客户端广告的地址,客户端将使用这个地址来连接 Kafka。
# 将监听器名称映射到安全协议,默认情况下它们是相同的。更多详情请参考配置文档。
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# 这是为不同的监听器指定安全协议。
# 用于接收来自网络的请求并向网络发送响应的线程数。
num.network.threads=3
# 设置处理网络请求的线程数,通常用于提高网络请求的并发处理能力。
# 用于处理请求的线程数,这些请求可能包括磁盘 I/O。
num.io.threads=8
# 设置用于处理 I/O 请求的线程数,磁盘 I/O 操作通常是瓶颈,因此可以增加线程数来提高吞吐量。
# 套接字服务器的发送缓冲区(SO_SNDBUF)。
socket.send.buffer.bytes=102400
# 控制网络传输时发送数据的缓冲区大小。
# 套接字服务器的接收缓冲区(SO_RCVBUF)。
socket.receive.buffer.bytes=102400
# 控制接收数据时的缓冲区大小。
# 套接字服务器将接受的最大请求大小(防止 OOM)。
socket.request.max.bytes=104857600
# 限制每个请求的最大大小,用于防止请求过大导致内存溢出。
############################# 日志设置 #############################
# 用于存储日志文件的目录列表,目录之间用逗号分隔。
log.dirs=/tmp/kafka-logs
# Kafka 存储日志数据的目录,多个目录可以通过逗号分隔。
# 每个主题的默认分区数。更多分区可允许更大的并行消费,但这也会导致更多的文件生成。
num.partitions=1
# 设置每个 Kafka 主题的默认分区数。分区越多,可以支持更多的并发消费。
# 用于日志恢复和关闭时刷盘的每个数据目录的线程数。对于 RAID 数组安装,建议增加此值。
num.recovery.threads.per.data.dir=1
# 设置每个数据目录用于恢复和关闭时的线程数。
############################# 内部主题设置 #############################
# `__consumer_offsets` 和 `__transaction_state` 内部主题的复制因子。
# 对于非开发测试环境,建议使用大于 1 的值以确保可用性,如 3。
offsets.topic.replication.factor=1
# 设置 Kafka 内部主题(如 `__consumer_offsets`)的副本数。生产环境推荐使用 3 以上的副本数。
transaction.state.log.replication.factor=1
# 设置 Kafka 内部事务日志副本数,通常建议大于 1。
transaction.state.log.min.isr=1
# 设置事务日志的最小 ISR(In-Sync Replicas)数量,即副本同步的最小数量。
############################# 日志刷新策略 #############################
# 消息会立即写入文件系统,但默认情况下,我们仅在懒加载时 fsync() 来同步 OS 缓存。以下配置控制数据刷盘到磁盘的方式。
# 下面的设置允许配置刷新策略,可以按时间间隔或每 N 条消息进行刷盘(或者两者都有)。
# 这样做有一些重要的权衡:
# 1. 持久性:如果不使用副本,未刷盘的数据可能会丢失。
# 2. 延迟:较大的刷新间隔可能导致刷新时延迟增加,因为会有更多的数据等待刷新。
# 3. 吞吐量:刷新是最昂贵的操作,较小的刷新间隔可能会导致过多的磁盘寻址操作。
# 接受多少条消息后强制刷新数据到磁盘
#log.flush.interval.messages=10000
# 设置刷新间隔的消息数量。达到此数量后,将强制刷新数据到磁盘。
# 消息在日志中等待的最大时间,超过此时间后强制刷新
#log.flush.interval.ms=1000
# 设置刷新间隔的时间,达到此时间后强制刷新。
############################# 日志保留策略 #############################
# 以下配置控制日志段的丢弃。策略可以根据时间或日志大小来丢弃日志段。
# 只要满足以下条件之一,就会删除日志段。删除操作总是从日志的末尾开始。
# 日志文件的最小年龄,超过此时间后该文件有资格被删除。
log.retention.hours=168
# 设置日志保留的小时数,超过该时间的日志会被删除。
# 基于大小的日志保留策略。如果剩余日志段的总大小低于 `log.retention.bytes`,将删除旧的日志段。
# 与 `log.retention.hours` 独立工作。
#log.retention.bytes=1073741824
# 设置基于大小的日志保留策略,设置日志保留的总字节数。
# 日志段文件的最大大小。当此大小被达到时,将创建一个新的日志段。
#log.segment.bytes=1073741824
# 设置日志段的最大文件大小,超过该大小后将开始新日志段。
# 检查日志段是否可以根据保留策略删除的间隔时间
log.retention.check.interval.ms=300000
# 设置检查日志段是否可以删除的间隔时间。
############################# Zookeeper 配置 #############################
# ZooKeeper 连接字符串(参见 ZooKeeper 文档获取详情)。
# 这是一个以逗号分隔的 host:port 对,每个都对应一个 zk 服务器。例如:"127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"。
# 你还可以在 URL 后添加一个可选的 chroot 字符串,以指定 Kafka 所有 ZNode 的根目录。
zookeeper.connect=localhost:2181
# 配置 Kafka 用于连接 ZooKeeper 的地址,通常是 ZooKeeper 服务的地址。
# 连接 ZooKeeper 的超时时间(单位:毫秒)
zookeeper.connection.timeout.ms=18000
# 设置 Kafka 连接到 ZooKeeper 时的超时时间,超过该时间没有连接成功将会报错。
############################# 消费者协调器设置 #############################
# 以下配置指定 GroupCoordinator 延迟初始消费者重新平衡的时间,单位为毫秒。
# 默认值为 3 秒。我们将其设置为 0,以提供更好的开发和测试体验。
# 但在生产环境中,默认的 3 秒值更适合,因为这可以避免在应用程序启动时进行不必要的重新平衡。
group.initial.rebalance.delay.ms=0
# 设置 GroupCoordinator 延迟进行初始消费者重新平衡的时间。
broker.id:每个 Kafka broker 必须有一个唯一的 ID,多个 Kafka 实例可以通过不同的 ID 区分。
listeners:指定 Kafka broker 用于通信的地址和端口。默认使用 PLAINTEXT
,即不加密的传输。
log.dirs:指定 Kafka 存储日志文件的目录
config/zookeeper.properties
[root@VM-0-16-centos config]# cat zookeeper.properties
# 由 Apache 软件基金会(ASF)许可,依据一个或多个贡献者许可协议。有关版权所有权的更多信息,请参见随此工作一起分发的 NOTICE 文件。
# ASF 根据 Apache 许可协议第 2.0 版("许可")授权您使用此文件;除非符合许可的规定,否则不得使用此文件。
# 您可以在以下网址获取许可证副本:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律要求或书面协议另有规定,否则根据许可分发的软件是按 "原样" 提供的,不附有任何明示或暗示的担保或条件。
# 请参见许可证,了解有关权限和限制的具体规定。
# 配置存储快照的目录。
dataDir=/tmp/zookeeper
# 设置 ZooKeeper 用于存储快照(数据)的目录。
# 客户端连接的端口。
clientPort=2181
# 配置 ZooKeeper 监听客户端连接的端口,默认端口是 2181。
# 禁用每个 IP 的连接数限制,因为这是一个非生产配置。
maxClientCnxns=0
# 配置允许的最大客户端连接数。设置为 0 时禁用每个 IP 的连接数限制,适用于非生产环境。
# 默认情况下禁用 adminserver,以避免端口冲突。如果选择启用此功能,请将端口设置为非冲突值。
admin.enableServer=false
# 禁用 adminserver,这通常用于在生产环境中避免冲突。启用时,可以设置一个非冲突的端口,如 8080。
# admin.serverPort=8080
# 如果启用了 adminserver,可以指定其监听的端口。默认情况下,此项被禁用。
dataDir:指定 ZooKeeper 存储其数据(如事务日志、快照等)的目录。在此配置中,ZooKeeper 数据存储在 /tmp/zookeeper
目录下。
clientPort:设置 ZooKeeper 服务器接受客户端连接的端口,默认端口是 2181。客户端将通过该端口与 ZooKeeper 进行通信。
maxClientCnxns:配置每个 IP 地址允许的最大连接数。在非生产环境中,设置为 0 可以禁用此限制,允许更多连接。
admin.enableServer:默认情况下,管理员界面(adminserver)是禁用的。启用该功能时,可以设置 admin.serverPort
来指定管理界面的端口,用于通过浏览器访问 ZooKeeper 的监控和管理界面。
config/consumer.properties
[root@VM-0-16-centos config]# cat consumer.properties
# 受Apache软件基金会(ASF)的一项或多项贡献者许可协议的许可。有关版权拥有权的更多信息,请参见随附的NOTICE文件。
# ASF根据Apache许可协议2.0版本(“许可协议”)将此文件授权给您;除非符合许可协议,否则您不得使用此文件。
# 您可以在以下网址获取许可协议的副本:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律要求或书面协议另有规定,按“原样”分发软件,不提供任何形式的保证或条件,
# 无论是明示或暗示的。有关许可协议的具体条款,请参阅许可协议。
# 有关更多详情,请参见org.apache.kafka.clients.consumer.ConsumerConfig
# 用于引导集群信息的代理列表
# 格式:host1:port1,host2:port2 ...
bootstrap.servers=localhost:9092
# 消费者组ID
group.id=test-consumer-group
# 当Kafka中没有初始偏移量或当前偏移量在服务器上不存在时的处理方式:latest(最新)、earliest(最早)、none(无)
#auto.offset.reset=
bootstrap.servers
:指定 Kafka 集群的初始 broker 地址,客户端用来获取集群的元数据并连接到 Kafka 集群。
group.id
:定义消费者所属的消费组,每个消费组内的消费者共同消费主题的数据,确保每个分区的数据只会被一个消费者处理。
auto.offset.reset
:指定消费者在没有初始偏移量或偏移量丢失时的行为,earliest
表示从最早的消息开始消费,latest
表示从最新的消息开始消费。