Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Plain View
Kafka, mail # dev - [jira] [Updated] (KAFKA-631) Implement log compaction


Copy link to this message
-
[jira] [Updated] (KAFKA-631) Implement log compaction
"Jay Kreps 2013-01-15, 20:06

     [ https://issues.apache.org/jira/browse/KAFKA-631?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jay Kreps updated KAFKA-631:
----------------------------

    Attachment: KAFKA-631-v1.patch

This patch implements more or less what was described above.

Specific Changes:
- OffsetCheckpoint.scala: Generalize HighwaterMarkCheckpoint to OffsetCheckpoint for use in tracking the cleaner point. In the future we would use this for flush point too, if possible.
- Move configuration parameters in Log to a single class, LogConfig, to prepare for dynamically changing log configuration (also a nice cleanup)
- Implement a cleaner process in LogCleaner.scala that cleans logs, this is mostly standalone code. It is complicated but doesn't really touch anything else.
- Implement an efficient OffsetMap (and associated tests) for log deduplication
- Add an API in Log.scala that allows swapping in segments. This api is fairly specific to the cleaner for now and is not a public api.
- Refactor segment delete in Log.scala to allow reuse of the async delete functionality in segment swap
- Add logic in log recovery (Log.scala) to handle the case of a crash in the middle of cleaning or file swaps.
- Add a set of unit tests on cleaner logic (CleanerTest.scala), an integration test (LogCleanerIntegrationTest.scala) for the cleaner, and a torture test to run against a standalone server (TestLogCleaning.scala). The torture test produces a bunch of messages to a server over a long period of time and simultaneously logs them out to a text file. Then it uses unix sort to deduce this text file and compares the result to the result of consuming from the topic (if the unique key-set isn't the same for both it throws an error). It also measures the log size reduction.

New configuration parameters:

# should we default to delete or deduce for the cleanup policy?
log.cleanup.policy = delete/dedupe

# per-topic override for cleanup policy
topic.log.cleanup.policy = topic:delete/dedupe, …

# number of background threads to use for log cleaning
log.cleaner.threads=1

# maximum I/O the cleaner is allowed to do (read & write combined)
log.cleaner.io.max.bytes.per.second=Double.MaxValue

# the maximum memory the cleaner can use
log.cleaner.buffer.size=100MB

# the amount of time to sleep when there is no cleaning to do
log.cleaner.backoff.ms=30secs

# minimum ratio of new to old messages the log must have for cleaning to proceed
log.cleaner.min.cleanable.ratio=0.5

I also changed the configuration log.cleanup.interval.mins to log.retention.check.interval.ms because the word "cleanup" is confusing.

New Persistent Data

This patch adds a new persistent data structure, a per-data directory file 'cleaner-offset-checkpoint'. This is the exact same format and code as the existing 'replication-offset-checkpoint'. The contents of the file is the position in the log up to which the cleaner has cleaned.

Current State

This patch is mostly functional with a number of known limitations:
1. It is a lot of code, so there are likely bugs. I think most bugs would only effect log cleaning.
2. The cleaner is somewhat inefficient. Current it does about 11MB/sec. I suspect this can be increased to around 70-100MB/sec by implementing batching of writes. I will do this as a follow-up ticket.
3. I do not properly handle compressed logs. Cleaning will work correctly but all messages are written uncompressed. The reason for this is that logically it is pretty complex to figure out what codec messages should be written with (since there may be a mixture of compression types in the log). Rather then try to handle this now, I think it makes more sense to implement dynamic config and then add a new config for log compression so that each topic has a single compression type that all messages are written with.
4. It would be nice to seed the hash with a different seed for each run so that collisions would get handled in the next run. This will also be done in a follow-up patch.
5. It would be nice to integrate the torture test into the nightly integration test framework (since it is a pass/fail test). I will work to do this as a separate item.

I would like to get this in in the current state and work on making log config dynamic. Without that feature this is not very useful since you have to bounce the server every time you add a new topic to set the cleanup policy. Once that is done we can use it for real features which will likely uncover more issues then further testing now.

Status of Testing

- There is reasonable unit test coverage but I will likely add additional tests as real usage uncovers corner cases
- I can run the torture test for many hours on a few dozen gb of data and get correct results.

                

This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

 
+
Jay Kreps 2013-01-21, 18:50
+
Jay Kreps 2013-01-21, 23:16