-
Notifications
You must be signed in to change notification settings - Fork 257
Mastering Message Cleanup with Apache Kafka
Message cleanup is a fundamental capability of MQ middleware, preventing the unbounded growth of storage usage in MQ systems. Unlike other messaging products, Apache Kafka® (hereinafter referred to as Kafka) does not immediately delete messages from a topic once they are consumed. Instead, it relies on topic-level cleanup policies. This article will briefly introduce Kafka's two message cleanup strategies: deletion and compaction, discussing their application scenarios, configuration parameters, and some technical details.
AutoMQ[1] is a next-generation cloud-native Kafka product that is 100% compatible with Apache Kafka. It has redesigned and re-implemented Kafka's storage layer, enabling it to build on object storage solutions like S3. Thanks to AutoMQ's full compatibility with Apache Kafka, the principles and parameters discussed in this article are also applicable to AutoMQ.
-
Message : Kafka officially refers to messages as events or records. An event consists of a key (optional) and a value (the message body);
-
Message batch : Kafka aggregates multiple messages into a batch. Specifically, the client produces or consumes messages to or from the server in batch form, and the server also stores messages in batches.
-
Topic partition : A partition of a topic. In Kafka, a topic is divided into multiple topic partitions to support load balancing between consumers and servers.
-
Segment : The basic unit of message storage in Kafka. A topic partition is divided into multiple segments, which are also the basic units for message cleanup.
We can configure the "cleanup.policy" parameter for a topic in Kafka to specify its cleanup strategy. The options include:
-
delete : The default policy where segments are deleted once their size or age reaches a specified threshold.
-
compact : A compression strategy based on the key; this strategy retains only the latest message bound to the same key while deleting other messages. Kafka’s internal topic "__consumer_offsets" uses the compact strategy.
-
delete + compact : A hybrid strategy where older segments are deleted based on size or time, and the topic partition is also compacted.
Generally, if your business focuses on the final value of a key (such as in KV scenarios), like recording a user's daily steps or the balance of an account, compact is more suitable. Additionally, under the compact strategy, it's best if the business keys are limited to fewer possible values. If the key values are too dispersed, it will significantly reduce the effectiveness of compaction. In such scenarios, you may consider using the delete + compact strategy. If there are no obvious KV characteristics, it is usually sufficient to use the delete strategy.
Note: Kafka supports modifying the topic’s cleanup policy without requiring a restart.
The following threads are responsible for executing cleaning logic within Kafka:
-
Scheduler thread : Executes the "kafka-log-retention" task, periodically checking whether topics with a purely delete policy need cleaning.
-
N CleanerThreads held by LogCleaner : Perform log compaction and execute deletions for topics under the “delete + compact” mixed strategy.
LogCleaner requires the "log.cleaner.enable" configuration on the server side to be set to true to be activated (set to true by default starting from version 0.9.0.1).
Below, the details of these two types of cleanup actions will be discussed separately.
Scheduler periodically executes the "kafka-log-retention" task, which triggers segment deletions based on time or size within this task.
-
log.retention.hours : The retention time of messages in Kafka, with a default value of 168, meaning messages are retained for one week. As previously mentioned, the basic unit of cleanup is a segment. Therefore, an entire segment will only be deleted if all messages within the segment have exceeded the retention time. Similar time-based parameters include log.retention.minutes and log.retention.ms.
-
log.retention.bytes : The maximum size of messages retained in a topic partition, with a default value of -1, meaning there is no size limit.
-
log.retention.check.interval.ms : The interval at which the "kafka-log-retention" task runs. The default value is 300000, which is equivalent to 5 minutes.
-
Filter topic partitions with a delete-only policy.
-
Invoke kafka.log.UnifiedLog#deleteOldSegments() to clean up three types of segments:
-
deleteLogStartOffsetBreachedSegments: Deletes segments with baseOffset <= logStartOffset;
-
deleteRetentionSizeBreachedSegments: Deletes excess segments based on size;
-
deleteRetentionMsBreachedSegments: Deletes expired segments based on time;
-
LogCleaner is a component introduced to support compaction, controlled by the `log.cleaner.enable` setting. LogCleaner hosts multiple CleanerThread instances, with each thread independently cleaning stale messages based on their key.
Under the compaction strategy, messages can also be "deleted." If the latest value for a key is null, the server will regard it as a deletion "marker," and the key will be permanently deleted once the tombstone expires (as discussed later).
Regarding compaction, Kafka can provide the following guarantees[3]:
-
Tail-read consumers can read all messages written by producers, and these messages have sequential offsets;
-
The order of messages will not change; compaction only removes some messages.
-
Offsets of messages will not change; once an offset is created, it is persisted.
-
A consumer starting from the beginning can at least consume the final state value.
The last point implies two things:
-
The latest value will certainly be preserved (unless it is a null value);
-
It is possible to consume the value corresponding to an earlier key due to the conditions for compaction not being met yet, or due to a tombstone marker (see later sections) that has not been deleted yet.
Besides the aforementioned "log.cleaner.enable," important configurations include:
-
log.cleaner.min.compaction.lag.ms : The minimum retention time that messages need to meet to participate in compaction. This can prevent newer messages from being compacted. Default value is 0.
-
log.cleaner.max.compaction.lag.ms : The retention time threshold that triggers compaction, mainly to ensure that inactive topic partitions can also participate in compaction. Default value is 9223372036854775807, which means this feature is not enabled.
-
log.cleaner.min.cleanable.ratio : The dirty ratio of a topic partition. A partition is considered for compaction only if its dirty ratio exceeds this threshold. A lower value means a higher frequency of cleaning. The default value is 0.5;
-
delete.retention.ms : The "grace period" for tombstone-marked messages. The default value is 86400000, which is one day;
-
log.cleaner.threads : The number of CleanerThreads managed by LogCleaner. The default value is 1;
-
log.cleaner.backoff.ms : The idle time before a CleanerThread checks again when no topic partition is found for compaction. The default value is 15000 (15s);
Before we begin, let's introduce a few concepts:
-
active segment : The currently active segment, which is also the most recent segment. It can accept new message writes;
-
cleaned segments : The product of previous compactions. All messages within these segments have no duplicate keys;
-
dirty segments : The segments that come after the cleaned segments but before the active segment. These segments have not undergone compaction.
Broadly, they can be categorized as:
-
Selection of topic partitions for compaction;
-
Compaction of topic partitions;
-
Execution of deletion for topic partitions configured with "delete + compact" mixed strategy;
At this stage, the server will filter out the "dirtiest" topic partitions. Specifically, Kafka will filter out the following topic partitions as candidates:
-
If there are dirty messages in an inactive topic partition, ensure they are not left uncleaned for long periods;
-
Active topic partitions that exceed the cleanup threshold;
The overall logic is as follows:
Where:
-
inProgress lock : This lock is used for inter-thread synchronization to prevent a topic partition from being selected by multiple CleanerThreads simultaneously and to prevent a topic partition from being selected by both the Scheduler thread and the CleanerThread when the cleaning strategy changes;
-
partition uncleanable : Indicates that an unexpected exception (not ThreadShutdownException or ControlThrowable) occurred during the thread cleaning process, marking these partitions as "blacklisted";
-
firstDirtyOffset : Generally read from the checkpoint file, it holds the value of the last cleaned offset + 1;
-
firstUncleanableDirtyOffset: The starting offset that cannot be cleaned, taking the minimum value among {log.lastStableOffset, log.activeSegment.baseOffset, segments that do not meet minCompactionLagMs.segment.baseOffset};
-
needCompactionNow : The value is determined by (now - min(dirty segment's FirstBatchTimestamp)) > maxCompactionLagMs. When true, it often signifies that this topic partition has not been hit for a long time;
-
Taking the max value : Essentially, it involves selecting the topic partition with the highest dirty ratio. A CleanerThread will perform compaction on only one topic partition at a time;
-
dirty ratio = the size of messages between [firstDirtyOffset, firstUncleanableDirtyOffset) / the size of messages between [startOffset, firstUncleanableDirtyOffset);
In this phase, an offsetMap is constructed, where the key is the message key and the value is the latest message offset bound to that key. Based on this map, new segments are constructed in batches. The rough process is as follows:
First, build the offsetMap within the range of [firstDirtyOffset, firstUncleanableDirtyOffset). Note that this map does not contain any control information and messages of interrupted transactions. To prevent the map from growing indefinitely due to an excessively long range, the size of the offsetMap is limited (the total memory usage of the offsetMap for all CleanerThreads must not exceed 128 MB). Therefore, the latestOffset in the final offsetMap is < firstUncleanableDirtyOffset.
Next, within the range of [0, offsetMap.latestOffset], group all segments to be cleaned. The total logSize, indexSize, and timeIndexSize within each group must not exceed the respective topic configurations. Each group is expected to correspond to one cleaned segment.
After that, based on offsetMap, filter all batches in each group's segment and write them to a new segment. The deletion rules for records across different batches are as follows:
-
ControlBatch : Empty batches or ControlBatches corresponding to empty transactions (transaction data has been previously removed), and tombstones that have expired;
-
dataBatch : Meets any one of the following conditions:
-
Messages within a transaction are disrupted;
-
record.offset() < offsetMap.get(key) or (value is null and the tombstone has expired);
-
The aforementioned tombstone is a two-phase deletion mechanism introduced by Kafka to ensure that downstream consumers have the chance to fully acquire all messages. This means that consumers must consume from the beginning to the latest offset within the "delete.retention.ms" time window to entirely "replay" all messages. Essentially, the tombstone is like granting a "reprieve" to transaction markers and null value messages.
It's important to note that for newer version batches with a magic value >= 2, the tombstone time is marked into the batch during the first compaction. For older version batches with a magic value < 2, the expiration of the tombstone is approximately inferred based on the last modification time of the segment.
Finally, the filtered record is written into a new segment. The new segment goes online, and the old segment is deleted. The value of offsetMap.latestOffset + 1 is stored in the clean offset checkpoint file.
Overall, after compaction, the log startOffset will move slightly forward, and the existing messages will be "compacted" to some extent:
Similar to the deletions triggered by Scheduler, the topic partition configured with the "delete + compact" mixed policy will also delete old segments. However, the deletion is completed by the CleanerThread. The deletion logic will not be repeated here.
This article introduces two message cleanup policies in Kafka. First, it provides an overview of two message cleanup strategies in Kafka and discusses the choice of cleanup strategies for business topics. Then, it briefly introduces the threads involved in the cleanup process. Finally, it details the parameters and technical specifics of cleanups triggered by Scheduler and LogCleaner. For the delete policy, segment deletion is based on partition size or message expiration time; for the compact policy, an offset map is built, and the latest value corresponding to the same key is retained based on this map.
References:
[1] AutoMQ: https://www.automq.com
[2] Kafka Topic Configuration: Log Compaction: https://www.conduktor.io/kafka/kafka-topic-configuration-log-compaction/
[3] Compaction guarantees https://docs.confluent.io/kafka/design/log_compaction.html#compaction-guarantees
- What is automq: Overview
- Difference with Apache Kafka
- Difference with WarpStream
- Difference with Tiered Storage
- Compatibility with Apache Kafka
- Licensing
- Deploy Locally
- Cluster Deployment on Linux
- Cluster Deployment on Kubernetes
- Example: Produce & Consume Message
- Example: Simple Benchmark
- Example: Partition Reassignment in Seconds
- Example: Self Balancing when Cluster Nodes Change
- Example: Continuous Data Self Balancing
-
S3stream shared streaming storage
-
Technical advantage
- Deployment: Overview
- Runs on Cloud
- Runs on CEPH
- Runs on CubeFS
- Runs on MinIO
- Runs on HDFS
- Configuration
-
Data analysis
-
Object storage
-
Kafka ui
-
Observability
-
Data integration