英国购物网站排名,网络推广公司名字,phpcms 关闭网站,自建网站餐饮服务提供者应当在通信主管部门备案后文章目录 服务端日志Topic消息存储方式主体介绍log文件追加记录消息index和timeindex索引文件 日志文件清理Kafka的文件高效读写机制Kafka的文件结构顺序写磁盘零拷贝 合理配置刷盘频率客户端消费进度管理 服务端日志
Kafka的日志信息是通过conf/server.properties文件中的log… 文章目录 服务端日志Topic消息存储方式主体介绍log文件追加记录消息index和timeindex索引文件 日志文件清理Kafka的文件高效读写机制Kafka的文件结构顺序写磁盘零拷贝 合理配置刷盘频率客户端消费进度管理 服务端日志
Kafka的日志信息是通过conf/server.properties文件中的log.dirs配置项来配置的 Topic消息存储方式
主体介绍
进入到上方配置文件中指定的目录下查看topic的数据都是以topic名 partition下标的命名方式保存的 我们现在进入其中一个partition目录 .index 日志索引文件采用的稀疏索引提高查询效率记录的是消息偏移量offset 和 该消息在.log文件中的位置position .log 消息保存在.log文件中是以二进制的方式保存的。可以通过.index和.timeindex两个索引文件加速查找消息 文件大小是(log.segment.bytes参数设定)默认1GB新文件名是第一条消息的offset.log .timeindex 日志索引文件采用的也是稀疏索引结构每隔一段时间保存一条索引记录记录的是消息产生的时间戳timestamp和消息偏移量offset .snapshot 快照可以理解为一个备份文件 leader-epoch-checkpoint Leader partition新上任就会往该文件中写入一个epoch来保证HW的一致性 partition.metadata 保存着该partition对应的topic_id 最后两个文件可以直接查看就记录的一些简单的信息
[rootworker1 disTopic-0]# cat leader-epoch-checkpoint
0
3
5 0
9 6
14 18[rootworker1 disTopic-0]# cat partition.metadata
version: 0
topic_id: rDUdZBO7RH2GNPgdRXk7Tw而前三个文件是以二进制的方式保存的需要通过Kafka提供的kafka-dump-log.sh来查看文件内容如下所示
# 保存的是消息产生的时间戳 和 消息offset
[rootworker3 testTopic-0]# kafka-dump-log.sh --files 00000000000000000000.timeindex
Dumping 00000000000000000000.timeindex
timestamp: 1723254597947 offset: 51
timestamp: 1723254598224 offset: 102
timestamp: 1723254598501 offset: 152
timestamp: 1723254598816 offset: 201
timestamp: 1723254599085 offset: 250# 保存是的消息的offset 和 该消息在.log文件中对应的position位置
[rootworker3 testTopic-0]# kafka-dump-log.sh --files 00000000000000000000.index
Dumping 00000000000000000000.index
offset: 51 position: 4160
offset: 102 position: 8324
offset: 152 position: 12428
offset: 201 position: 16544
offset: 250 position: 20660
offset: 299 position: 24776# 每一条记录保存的是一批消息信息只不过我下面刚好都只保存一条消息
[rootworker3 testTopic-0]# kafka-dump-log.sh --files 00000000000000000000.log
baseOffset: 50 lastOffset: 50 count: 1 baseSequence: 0 lastSequence: 0 producerId: 5001 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 4080 CreateTime: 1723254597907 size: 80 magic: 2 compresscodec: none crc: 672861010 isvalid: true
baseOffset: 51 lastOffset: 51 count: 1 baseSequence: 1 lastSequence: 1 producerId: 5001 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 4160 CreateTime: 1723254597947 size: 80 magic: 2 compresscodec: none crc: 3136762717 isvalid: true
baseOffset: 52 lastOffset: 52 count: 1 baseSequence: 2 lastSequence: 2 producerId: 5001 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 4240 CreateTime: 1723254597951 size: 80 magic: 2 compresscodec: none crc: 3076149845 isvalid: truelog文件追加记录消息
.log文件是以追加的方式写入新的消息日志。position表示写入位置size表示消息的总长度通过这两个值就能从一段二进制中获取到一条具体的消息。Kafka中的消息日志只允许追加不允许删除和修改。log文件的固定大小是log.segment.bytes参数设定默认1GB。新创建的文件是以写入第一条消息的offset作为的文件名 index和timeindex索引文件
index和timeindex索引文件目的都是为了加快从log文件中读取消息的效率如下图所示 index和timeindex文件是以相对偏移量的方式建立的log消息日志数据索引 比如说0000.index和 0947.index索引文件中的内容offset都是以0开始计数的使用的是第一条消息的相对偏移量而消息绝对偏移量文件名相对偏移量 index和timeindex文件采用的是类似于数据的跳表并不是每一条消息都会记录一条索引。 由log.index.interval.bytes决定.log文件中产生多少大小的消息就生成一条index记录 官网 服务端的参数说明 log.index.interval.bytes
The interval with which we add an entry to the offset indexType: int
Default: 4096 (4 kibibytes)index文件的作用类似于数据结构中的跳表他的作用是用来加速查询log文件的效率。而timeindex文件的作用则是用来进行一些跟时间相关的消息处理。比如文件清理。 日志文件清理
Kafka为了防止过多的日志文件给服务器带来过大的压力他会定期删除过期的log文件。 判断那些日志文件过期了 log.retention.check.interval.ms 定时检测文件是否过期。默认是 300000毫秒也就是五分钟。 在检查文件是否超时时是以每个.timeindex中最大的那一条记录为准。 log.retention.hours log.retention.minutes log.retention.ms 。 这一组参数表示文件保留多长时间。默认生效的是log.retention.hours默认值是168小时也就是7天。如果设置了更高的时间精度以时间精度最高的配置为准。
官网 服务端的参数说明
# 日志清除程序检查是否有日志符合删除条件的频率(以毫秒为单位)
log.retention.check.interval.ms
Type: long
Default: 300000 (5 minutes)# 在删除日志文件之前保留它的小时数(以小时为单位)仅次于log.retention.ms属性
log.retention.hours
Type: int
Default: 168# 在删除日志文件之前保留日志文件的分钟数(以分钟为单位)仅次于log.retention.ms属性。如果没有设置则使用log.retention.hours中的值
log.retention.minutes
Type: int
Default: null# 日志文件删除前保留的毫秒数(以毫秒为单位)如果未设置则使用log.retention.minutes中的值。如果设置为-1则不应用时间限制。
log.retention.ms
Type: long
Default: null过期的日志文件如何处理
# 日志清理策略。有两个选项delete表示删除日志文件。 compact表示压缩日志文件。
log.cleanup.policy
Type: list
Default: delete
Valid Values: [compact, delete]# 日志删除前的最大容量
# 当log.cleanup.policy选择delete时 当总的日志文件大小超过这个阈值后就会删除最早的日志文件。默认是-1表示无限大。
log.retention.bytes
Type: long
Default: -1Kafka的文件高效读写机制
Kafka的文件结构
kafka的数据文件结构可以加速日志文件的读取。
Topic下的多个partition采用的是单独记录日志文件这样加快了topic下的数据读取
通过.index索引文件的稀疏索引结构进一步加快日志检索速度。 顺序写磁盘
对每个Log文件Kafka会提前规划固定的大小这样在申请文件时可以提前占据一块连续的磁盘空间。
Kafka的log文件只能以追加的方式往文件的末端添加(这种写入方式称为顺序写) 零拷贝
零拷贝是Linux操作系统提供的一种IO优化机制而Kafka大量的运用了零拷贝机制来加速文件读写。
零拷贝就是配合内核态的复制机制减少用户态和内核态之间的内容拷贝
传统的一次硬件IO是这样工作的。如下图所示 零拷贝主要有两种实现机制
1、mmap文件映射机制
不再将整个文件复制进用户态而是用户态只持有一个文件的映射信息通过这个映射信息控制内核态的文件读写。
java中大量使用该方式 可以参考下JDK中的DirectByteBuffer实现机制
适用于文件不超过2G的文件所以Kafka将日志文件设计成1G 2、sendfile文件传输机制
用户态连文件索引都不读取直接向内核态发送一个sendfile指令让内核态去进行文件拷贝
例如当Consumer要从Broker上poll消息时Broker不需要对消息进行任何的加工用户态就只需要往内核态发一个sendfile指令而不需要有任何的数据拷贝过程。Kafka大量的使用了sendfile机制用来加速对本地数据文件的读取过程。 JDK中8中java.nio.channels.FileChannel类提供了transferTo和transferFrom方法底层就是使用了操作系统的sendfile机制。 合理配置刷盘频率
应用程序读取文件是从内核态的pageCache中读取的保存文件是最终只能保存到pageCache中应用程序不能直接操作pageCache。而pageCache中的数据如果还没有来得及刷盘持久化到磁盘服务器忽然非正常断电那么pageCache中的数据就会丢失。
应用程序唯一能做的就是频繁的调用OS提供的fsync()通知OS进行刷盘操作但是则会降低应用的执行性能。所以应用程序需要在数据安全和高性能上做取舍。
Kafka在服务端设计了几个参数来控制刷盘的频率
这里可以看到Kafka为了最大化性能默认是将刷盘操作交由了操作系统进行统一管理。
# 多长时间进行一次强制刷盘。默认是Long.MAX。
flush.ms
Type: long
Default: 9223372036854775807
Valid Values: [0,...]# 表示当同一个Partiton的消息条数积累到这个数量时就会申请一次刷盘操作。默认是Long.MAX。
log.flush.interval.messages
Type: long
Default: 9223372036854775807
Valid Values: [1,...]#当一个消息在内存中保留的时间达到这个数量时就会申请一次刷盘操作。他的默认值是空。如果这个参数配置为空则生效的是下一个参数。
log.flush.interval.ms
Type: long
Default: null# 日志刷新程序检查是否需要将日志刷新到磁盘的频率(以毫秒为单位),默认也是Long.MAX。
log.flush.scheduler.interval.ms
Type: long
Default: 9223372036854775807客户端消费进度管理
消费者消费消息的进度被保存在一个名称为__consumer_offsets内置的topic中该topic默认会创建50个分区partition 该topic在zookeeper也能查看到相应的信息只不过zookeeper上只是简单记录了partition的Leader和ISR列表并没有看见真实消费者的消费进度 既然这也是一个topic下的partition我们启动一个消费者来消费其中的消息看看
可以看到下面记录的消息内容就是一个key-value的格式key为消费者组topicpartition 而value保存则offset和一些元数据信息
也就是说这里记录了消费者组在某个topic下的partition的消息消费偏移量offset
[rootworker1 ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic __consumer_offsets --from-beginning --formatter kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter[test,disTopic,0]::OffsetAndMetadata(offset6, leaderEpochOptional[5], metadata, commitTimestamp1723081907174, expireTimestampNone)
[test,disTopic,3]::OffsetAndMetadata(offset6, leaderEpochOptional[7], metadata, commitTimestamp1723081907275, expireTimestampNone)
[test,disTopic,1]::OffsetAndMetadata(offset3, leaderEpochOptional[1], metadata, commitTimestamp1723081907275, expireTimestampNone)
[test,disTopic,2]::OffsetAndMetadata(offset11, leaderEpochOptional[4], metadata, commitTimestamp1723081907275, expireTimestampNone)
[test,disTopic,0]::OffsetAndMetadata(offset6, leaderEpochOptional[5], metadata, commitTimestamp1723081907275, expireTimestampNone)
[test,disTopic,3]::OffsetAndMetadata(offset6, leaderEpochOptional[7], metadata, commitTimestamp1723081907377, expireTimestampNone)
[test,disTopic,1]::OffsetAndMetadata(offset3, leaderEpochOptional[1], metadata, commitTimestamp1723081907377, expireTimestampNone)而这些Offset数据其实也是可以被消费者修改的在之前章节已经演示过消费者如何从指定的位置开始消费消息。而一旦消费者主动调整了OffsetKafka当中也会更新对应的记录。
另外这个系统Topic里面的数据是非常重要的因此Kafka在消费者端也设计了一个参数来控制这个Topic应该从订阅关系中剔除。
public static final String EXCLUDE_INTERNAL_TOPICS_CONFIG exclude.internal.topics;
private static final String EXCLUDE_INTERNAL_TOPICS_DOC Whether internal topics matching a subscribed pattern should be excluded from the subscription. It is always possible to explicitly subscribe to an internal topic.;
public static final boolean DEFAULT_EXCLUDE_INTERNAL_TOPICS true;