网站集群建设是,广西住房建设厅网站,巩义自助建站优化,做网站要素最近项目组的kafka集群#xff0c;老是由于应用端写入kafka topic的消息太多#xff0c;导致所在的broker节点占满#xff0c;导致其他的组件接连宕机。
这里和应用端沟通可以删除1天之前的消息来清理磁盘#xff0c;并且可以调整topic的消息存活时间。
一、调整Topic的消…最近项目组的kafka集群老是由于应用端写入kafka topic的消息太多导致所在的broker节点占满导致其他的组件接连宕机。
这里和应用端沟通可以删除1天之前的消息来清理磁盘并且可以调整topic的消息存活时间。
一、调整Topic的消息存活时长删除消息
kafka-configs --zookeeper localhost:2181 --entity-type topics --entity-name topicName --alter --add-config retention.ms86400000
如上调整topic的消息存活时长为为1天当执行完之后执行查询topic详细信息可以看到已经发生了修改并且过一会过期的消息会被删除。
kafka-topics --bootstrap-server localhost:9092 --describe --topic topicName
二、不修改Topic消息存活时长删除消息
1.登录到相应的机器上。
2.找到写满的磁盘删除掉不需要的业务数据。数据清理原则
不可直接删除Kafka的数据目录避免造成不必要的数据丢失。找到占用空间较多或者明确不需要的Topic选择其中某些Partition从最早的日志数据开始删除。删除segment及相应地index和timeindex文件。不要清理内置的Topic例如__consumer_offsets和_schema等。
3.重启磁盘被写满的相应的Broker节点使日志目录online。
参考Kafka磁盘写满时如何运维操作_开源大数据平台E-MapReduce-阿里云帮助中心 (aliyun.com)
怎么删除kafka中的数据-火山引擎 (volcengine.com) 三、Kafka消息清理策略
在Kafka中存在数据过期的机制称为data expire。如何处理过期数据是根据指定的policy策略决定的而处理过期数据的行为即为log cleanup。
在Kafka中有以下几种处理过期数据的策略
log.cleanup.policydeleteKafka中所有用户创建的topics默认均为此策略
根据数据已保存的时间进行删除默认为1周根据log的max size进行删除默认为-1也就是无限制
log.cleanup.policycompacttopic __consumer_offsets 默认为此策略
根据messages中的key进行删除操作在active segment 被commit 后会删除掉old duplicate keys无限制的时间与空间的日志保留
自动清理Kafka中的数据可以控制磁盘上数据的大小、删除不需要的数据同时也减少了对Kafka集群的维护成本。
那Log cleanup 在什么时候发生呢
首先值得注意的是log cleanup 在partition segment 上发生更小/更多的segment也就意味着log cleanup 发生的频率会上升Log cleanup 不应该频繁发生 因为它会消耗CPU与内存资源Cleaner的检查会在每15秒进行一次由log.cleaner.backoff.ms 控制
log.cleanup.policydelete
log.cleanup.policydelete 的策略根据数据保留的时间、以及log的max size对数据进行cleanup。控制数据保留时间以及log max size的参数分别为
log.retention.hours指定数据保留的时常默认为一周168
将参数调整到更高的值也就意味着会占据更多的磁盘空间更小值意味着保存的数据量会更少假如consumer 宕机超过一周则数据便会再未处理前即丢失
log.retention.bytes每个partition中保存的最大数据量大小默认为-1也就是无限大
再控制log的大小不超过一个阈值时会比较有用 在到达log cleanup 的条件后cleaner会自动根据时间或是空间的规则进行删除新数据仍写入active segment 针对于这个参数一般有以下两种使用场景分别为
log保留周期为一周根据log保留期进行log cleanup
log.retention.hours168 以及 log.retention.bytes-1
log保留期为无限制根据log大小进行进行log cleanup
log.retention.hours17520以及 log.retention.bytes524288000
其中第一个场景会更常见。 Log Compaction
Log compaction用于确保在一个partition中对任意一个key它所对应的value都是最新的。
这里举个例子我们有个topic名为employee-salary我们希望维护每个employee当前最新的工资情况。
左边的是compaction前segments中的数据右边为compaction 后segments中的数据其中有部分key对应的value有更新 可以看到在log compaction后相对于更新后的key-value message旧的message被删除。
Log Compaction 有如下特点
messages的顺序仍然是保留的log compaction 仅移除一些messages但不会重新对它们进行排序一条message的offset是无法改变的immutable如果一条message缺失则offset会直接被跳过被删除的records在一段时间内仍然可以被consumers访问到这段时间由参数delete.retention.ms默认为24小时控制
需要注意的是Kafka 本身是不会组织用户发送duplicate data的。这些重复数据也仅会在一个segment在被commit 的时候做重复数据删除所以consumer仍会读取到这部分重复数据如果客户端有发的话。
Log Compaction也会有时失败compaction thread 可能会crash所以需要确保给Kafka server 足够的内存用于做这些操作。如果log compaction异常则需要重启Kafka此为一个已知的bug。
Log Compaction也无法通过API手动触发至少到现在为止是这样只能server端自动触发。
下面是一个 Log Compaction过程的示意图 正在写入的records仍会被写入Active Segment已经committed segments会自动做compaction。此过程会遍历所有segments中的records并移除掉所有需要被移除的messages。
Log compaction由上文提到的log.cleanup.policycompact进行配置其中
Segment.ms默认为7天在关闭一个active segment前所需等待的最长时间Segment.bytes默认为1G一个segment的最大大小Min.compaction .lag.ms默认为0在一个message可以被compact前所需等待的时间Delete.retention.ms默认为24小时在一条message被加上删除标记后在实际删除前等待的时间Min.Cleanable.dirty.ratio默认为0.5若是设置的更高则会有更高效的清理但是更少的清理操作触发。若是设置的更低则清理的效率稍低但是会有更多的清理操作被触发