更新時間:2023-04-14 來源:黑馬程序員 瀏覽量:
Kafka的消息存儲在磁盤中,為了控制磁盤占用空間,Kafka需要不斷地對過去的一些消息進行清理工作。Kafka的每個分區(qū)都有很多的日志文件,這樣也是為了方便進行日志的清理。在Kafka中,提供日志清理和日志壓縮兩種方式:
日志刪除(Log Deletion):按照指定的策略直接刪除不符合條件的日志。
日志壓縮(Log Compaction):按照消息的key進行整合,有相同key的但有不同value值,只保留最后一個版本。
在Kafka的broker或topic配置中:
以段(segment日志)為單位來進行定期清理的。本節(jié)我們來詳細介紹Kafka數(shù)據(jù)處理中的日志刪除和日志壓縮。
定時日志刪除任務
Kafka日志管理器中會有一個專門的日志刪除任務來定期檢測和刪除不符合保留條件的日志分段文件,這個周期可以通過broker端參數(shù)log.retention.check.interval.ms來配置,默認值為300,000,即5分鐘。當前日志分段的保留策略有3種:1. 基于時間的保留策略
2. 基于日志大小的保留策略
3. 基于日志起始偏移量的保留策略
基于時間的保留策略
以下三種配置可以指定如果Kafka中的消息超過指定的閾值,就會將日志進行自動清理:
- log.retention.hours
- log.retention.minutes
- log.retention.ms
其中,優(yōu)先級為 log.retention.ms > log.retention.minutes > log.retention.hours。默認情況,在broker中,配置如下:
log.retention.hours=168
也就是,默認日志的保留時間為168小時,相當于保留7天。
刪除日志分段時:
1. 從日志文件對象中所維護日志分段的跳躍表中移除待刪除的日志分段,以保證沒有線程對這些日志分段進行讀取操作
2. 將日志分段文件添加上“.deleted”的后綴(也包括日志分段對應的索引文件)
3. Kafka的后臺定時任務會定期刪除這些“.deleted”為后綴的文件,這個任務的延遲執(zhí)行時間可以通過file.delete.delay.ms參數(shù)來設置,默認值為60000,即1分鐘。
設置topic 5秒刪除一次
1. 為了方便觀察,設置段文件的大小為1M。
key: segment.bytes
value: 1048576
1. 設置topic的刪除策略
key: retention.ms
value: 5000
嘗試往topic中添加一些數(shù)據(jù),等待一會,觀察日志的刪除情況。我們發(fā)現(xiàn),日志會定期被標記為刪除,然后被刪除。
基于日志大小的保留策略
日志刪除任務會檢查當前日志的大小是否超過設定的閾值來尋找可刪除的日志分段的文件集合??梢酝ㄟ^broker端參數(shù) log.retention.bytes 來配置,默認值為-1,表示無窮大。如果超過該大小,會自動將超出部分刪除。
注意:
log.retention.bytes 配置的是日志文件的總大小,而不是單個的日志分段的大小,一個日志文件包含多個日志分段。
基于日志起始偏移量保留策略
每個segment日志都有它的起始偏移量,如果起始偏移量小于 logStartOffset,那么這些日志文件將會標記為刪
日志壓縮(Log Compaction)
Log Compaction是默認的日志刪除之外的清理過時數(shù)據(jù)的方式。它會將相同的key對應的數(shù)據(jù)只保留一個版本。
Log Compaction執(zhí)行后,offset將不再連續(xù),但依然可以查詢Segment。
Log Compaction執(zhí)行前后,日志分段中的每條消息偏移量保持不變。Log Compaction會生成一個新的Segment文件。
Log Compaction是針對key的,在使用的時候注意每個消息的key不為空。
基于Log Compaction可以保留key的最新更新,可以基于Log Compaction來恢復消費者的最新狀態(tài)。