I hacked infinite retention into my open source Kafka

Posted on
kafka
thumbnail

Well, sort of. But bear with me.

background

A couple of days ago, Confluent announced a ZooKeeper free Kafka 2.8 RC0 available for testing. A fantastic effort, great achievement by all the contributors who made it happen.

In the typical Hacker News fashion, a post about Kafka always triggers an inevitable “Puslar vs Kafka” discussion. These always remind me of one of my main gripes related to Kafka: no infinite retention. I’ve written about it close to five years ago1.

So, apparently there is a KIP for open source Kafka tiered storage2 which would enable this kind of behavior and take it even further. There is definitely a paid feature on the Confluent platform enabling tiered storage3.

However, as a user of the open source Kafka, I can’t use it.

after 5 years of waiting

I kinda hacked it myself in. Here’s how I’ve done it.

I have forked Kafka from https://github.com/apache/kafka and checked out the 2.8 branch, the one with no ZooKeeper. To be exact, the 08849bc3909d4fabda965c8ca7f78b0feb5473d2 commit.

I then applied this diff:

build Kafka from sources

./gradlewAll releaseTarGz

Now, I can start my new Kafka like this:

AWS_SHARED_CREDENTIALS_FILE=~/.aws/credentials \
    AWS_PROFILE=a-profile-with-bucket-access \
    KAFKA_AWS_S3_UPLOADS=1 \
    KAFKA_AWS_S3_REGION=eu-central-1 \
    KAFKA_AWS_S3_BUCKET=my-kafka-logs-bucket \
    ./kafka_2.13-2.8.0-SNAPSHOT/bin/kafka-server-start.sh ./kafka_2.13-2.8.0-SNAPSHOT/config/kraft/server.properties

Now, every time Kafka is about to delete a log segment, it will put it in S3 first. Only the log files are stored because there is no need to have an index. Whenever I need data from an older segment, I can download the segment from S3, rebuild the index and read out all data from the segment.

I can process dozens of segments in parallel, regardless of the total partition size.

quick and dirty test method

Create a topic with a rather small segment:

~/dev/kafka-2.8.0/kafka_2.13-2.8.0-SNAPSHOT/bin/kafka-topics.sh \
    --create \
    --topic test-topic \
    --partitions 1 \
    --replication-factor 1 \
    --bootstrap-server localhost:9092 \
    --config segment.bytes=524288

Write some data to the topic with a tool of choice. I can see new segments rolling in:

[2021-04-02 23:32:57,551] INFO [BrokerLifecycleManager id=1] The broker has been unfenced. Transitioning from RECOVERY to RUNNING. (kafka.server.BrokerLifecycleManager)
[2021-04-02 23:32:57,552] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions Set(test-topic-0) (kafka.server.ReplicaFetcherManager)
[2021-04-02 23:36:38,025] INFO [ProducerStateManager partition=test-topic-0] Writing producer snapshot at offset 3830 (kafka.log.ProducerStateManager)
[2021-04-02 23:36:38,028] INFO [Log partition=test-topic-0, dir=/tmp/kraft-combined-logs] Rolled new log segment at offset 3830 in 5 ms. (kafka.log.Log)
[2021-04-02 23:36:44,082] INFO [ProducerStateManager partition=test-topic-0] Writing producer snapshot at offset 7646 (kafka.log.ProducerStateManager)
[2021-04-02 23:36:44,083] INFO [Log partition=test-topic-0, dir=/tmp/kraft-combined-logs] Rolled new log segment at offset 7646 in 3 ms. (kafka.log.Log)

Finally, delete the topic from Kafka:

[2021-04-02 23:37:52,732] INFO [Controller 1] Removed topic test-topic with ID H6YadkN7SUGXheMu6MJ1uA. (org.apache.kafka.controller.ReplicationControlManager)
[2021-04-02 23:37:52,759] INFO [BrokerMetadataListener id=1] Processing deletion of topic test-topic with id H6YadkN7SUGXheMu6MJ1uA (kafka.server.metadata.BrokerMetadataListener)
[2021-04-02 23:37:52,761] INFO [GroupCoordinator 1]: Removed 0 offsets associated with deleted partitions: test-topic-0. (kafka.coordinator.group.GroupCoordinator)
[2021-04-02 23:37:52,768] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions Set(test-topic-0) (kafka.server.ReplicaFetcherManager)
[2021-04-02 23:37:52,768] INFO [ReplicaAlterLogDirsManager on broker 1] Removed fetcher for partitions Set(test-topic-0) (kafka.server.ReplicaAlterLogDirsManager)
[2021-04-02 23:37:52,776] INFO Log for partition test-topic-0 is renamed to /tmp/kraft-combined-logs/test-topic-0.fe590aa674124d0c81bb5b3d718a73ab-delete and is scheduled for deletion (kafka.log.LogManager)

After about 60 seconds, the segments are uploaded to S3 and deleted from disk:

[2021-04-02 23:38:52,781] INFO [Log partition=test-topic-0, dir=/tmp/kraft-combined-logs] Deleting segments as the log has been deleted: LogSegment(baseOffset=0, size=524285, lastModifiedTime=1617399398000, largestRecordTimestamp=Some(1617399398018)),LogSegment(baseOffset=3830, size=524270, lastModifiedTime=1617399404000, largestRecordTimestamp=Some(1617399404079)),LogSegment(baseOffset=7646, size=48257, lastModifiedTime=1617399404000, largestRecordTimestamp=Some(1617399404638)) (kafka.log.Log)
[2021-04-02 23:38:52,786] INFO [Log partition=test-topic-0, dir=/tmp/kraft-combined-logs] Deleting segment files LogSegment(baseOffset=0, size=524285, lastModifiedTime=1617399398000, largestRecordTimestamp=Some(1617399398018)),LogSegment(baseOffset=3830, size=524270, lastModifiedTime=1617399404000, largestRecordTimestamp=Some(1617399404079)),LogSegment(baseOffset=7646, size=48257, lastModifiedTime=1617399404000, largestRecordTimestamp=Some(1617399404638)) (kafka.log.Log)
[2021-04-02 23:38:57,798] INFO Uploading segment noan.local/test-topic-0.fe590aa674124d0c81bb5b3d718a73ab-delete/00000000000000000000.log.deleted with size 524285 prior to delete... (kafka.log.LogSegment)
[2021-04-02 23:38:59,210] INFO Uploaded segment prior to delete, S3 ETag: 769bf025a6e85a7402509142de3d149a, took 6417ms (kafka.log.LogSegment)
[2021-04-02 23:38:59,212] INFO Deleted log /tmp/kraft-combined-logs/test-topic-0.fe590aa674124d0c81bb5b3d718a73ab-delete/00000000000000000000.log.deleted. (kafka.log.LogSegment)
[2021-04-02 23:38:59,219] INFO Deleted offset index /tmp/kraft-combined-logs/test-topic-0.fe590aa674124d0c81bb5b3d718a73ab-delete/00000000000000000000.index.deleted. (kafka.log.LogSegment)
[2021-04-02 23:38:59,219] INFO Deleted time index /tmp/kraft-combined-logs/test-topic-0.fe590aa674124d0c81bb5b3d718a73ab-delete/00000000000000000000.timeindex.deleted. (kafka.log.LogSegment)
[2021-04-02 23:39:04,223] INFO Uploading segment noan.local/test-topic-0.fe590aa674124d0c81bb5b3d718a73ab-delete/00000000000000003830.log.deleted with size 524270 prior to delete... (kafka.log.LogSegment)
[2021-04-02 23:39:04,840] INFO Uploaded segment prior to delete, S3 ETag: 06d987e417031ca3474a4fcf6efc72de, took 5620ms (kafka.log.LogSegment)
[2021-04-02 23:39:04,842] INFO Deleted log /tmp/kraft-combined-logs/test-topic-0.fe590aa674124d0c81bb5b3d718a73ab-delete/00000000000000003830.log.deleted. (kafka.log.LogSegment)
[2021-04-02 23:39:04,842] INFO Deleted offset index /tmp/kraft-combined-logs/test-topic-0.fe590aa674124d0c81bb5b3d718a73ab-delete/00000000000000003830.index.deleted. (kafka.log.LogSegment)
[2021-04-02 23:39:04,843] INFO Deleted time index /tmp/kraft-combined-logs/test-topic-0.fe590aa674124d0c81bb5b3d718a73ab-delete/00000000000000003830.timeindex.deleted. (kafka.log.LogSegment)
[2021-04-02 23:39:09,848] INFO Uploading segment noan.local/test-topic-0.fe590aa674124d0c81bb5b3d718a73ab-delete/00000000000000007646.log.deleted with size 48257 prior to delete... (kafka.log.LogSegment)
[2021-04-02 23:39:10,103] INFO Uploaded segment prior to delete, S3 ETag: 624d8699a9f53ecb24143e197c4f1ccf, took 5259ms (kafka.log.LogSegment)
[2021-04-02 23:39:10,104] INFO Deleted log /tmp/kraft-combined-logs/test-topic-0.fe590aa674124d0c81bb5b3d718a73ab-delete/00000000000000007646.log.deleted. (kafka.log.LogSegment)
[2021-04-02 23:39:10,104] INFO Deleted offset index /tmp/kraft-combined-logs/test-topic-0.fe590aa674124d0c81bb5b3d718a73ab-delete/00000000000000007646.index.deleted. (kafka.log.LogSegment)
[2021-04-02 23:39:10,105] INFO Deleted time index /tmp/kraft-combined-logs/test-topic-0.fe590aa674124d0c81bb5b3d718a73ab-delete/00000000000000007646.timeindex.deleted. (kafka.log.LogSegment)
[2021-04-02 23:39:10,111] INFO Deleted log for partition test-topic-0 in /tmp/kraft-combined-logs/test-topic-0.fe590aa674124d0c81bb5b3d718a73ab-delete. (kafka.log.LogManager)

The upload would take less time if I was running my Kafka in EC2. This method works for deleted topics, regular segment rolls and also compacted topics. Basically, every time a segment is about to be removed from disk.


  1. The case for Kafka cold storage ↩︎

  2. KIP-405: Kafka Tiered Storage ↩︎

  3. Confluent Platform infinite Kafka retention ↩︎