I've been doing some prototyping on Kafka for a few months now and like what I see. It's a good fit for some of my use cases in the areas of data distribution but also for processing - liking a lot of what I see in Samza. I'm now working through some of the operational issues and have a question to the community.
I have several data sources that I want to push into Kafka but some of the most important are arriving as a stream of files being dropped either into a SFTP location or S3. Conceptually the data is really a stream but its being chunked and made more batch by the deployment model of the operational servers. So pulling the data into Kafka and seeing it more as a stream again is a big plus.
But, I really don't want duplicate messages. I know Kafka provides at least once semantics and that's fine, I'm happy to have the de-dupe logic external to Kafka. And if I look at my producer I can build up a protocol around adding record metadata and using Zookeeper to give me pretty high confidence that my clients will know if they are reading from a file that was fully published into Kafka or not.
I had assumed that this wouldn't be a unique use case but on doing a bunch of searches I really don't find much in terms of either tools that help or even just best practice patterns for handling this type of need to support exactly-once message processing.
So now I'm thinking that either I just need better web search skills or that actually this isn't something many others are doing and if so then there's likely a reason for that.
The out-of-the-box support for this in Kafka isn't great right now.
Exactly once semantics has two parts: avoiding duplication during data production and avoiding duplicates during data consumption.
There are two approaches to getting exactly once semantics during data production.
1. Use a single-writer per partition and every time you get a network error check the last message in that partition to see if your last write succeeded 2. Include a primary key (UUID or something) in the message and deduplicate on the consumer.
If you do one of these things the log that Kafka hosts will be duplicate free. However reading without duplicates depends on some co-operation from the consumer too. If the consumer is periodically checkpointing its position then if it fails and restarts it will restart from the checkpointed position. Thus if the data output and the checkpoint are not written atomically it will be possible to get duplicates here as well. This problem is particular to your storage system. For example if you are using a database you could commit these together in a transaction. The HDFS loader Camus that LinkedIn wrote does something like this for Hadoop loads. The other alternative that doesn't require a transaction is to store the offset with the data loaded and deduplicate using the topic/partition/offset combination.
I think there are two improvements that would make this a lot easier: 1. I think producer idempotence is something that could be done automatically and much more cheaply by optionally integrating support for this on the server. 2. The existing high-level consumer doesn't expose a lot of the more fine grained control of offsets (e.g. to reset your position). We will be working on that soon.
On Mon, Feb 10, 2014 at 9:11 AM, Garry Turkington < [EMAIL PROTECTED]> wrote:
Thanks Jay for the info, and Neha for adding it to the FAQ!
On the producer side I've been going down Jay's second route, i.e. adding metadata to the messages as they are published. Though in my case I don't just want to avoid duplicates on a per-message basis but be able to quickly identify a partially ingested file so I can quickly drop any related messages.
Since I'll have multiple producers I'm looking to ZooKeeper to help ensure only one reads a given file at a time so I can add to each message the filename and a producer uuid then after a file is fully written either publish completion notices to a different topic or mark the file's ZNode appropriately. A client can then tell if the copy of a given message it is reading comes from the 'committed' ingest of the file (matching producer uuid) or a file that was only partially ingested and should be ignored.
I think this holds together and given my single file reader requirement I'll always need extra machinery outside of Kafka but if things like producer idempotence are possible/truly cheaper server side then that'd be very interesting.
I found Jay's wiki page on the idempotent producer support and that looks really good. Since it looks like in that model the pid is something the client sends with each message then I could change my workflow to be:
1. Producer gains ZK lock on a file ZNode 2. Producer adds the pid as an attribute on the file ZNode if none is already associated with it 3. Producer starts reading/sending messages 4. If a producer fails another can look for the pid attribute and use it when resending the messages
Very interested in this whole topic.
NEW: Monitor These Apps!
Apache Lucene, Apache Solr and all other Apache Software Foundation project and their respective logos are trademarks of the Apache Software Foundation.
Elasticsearch, Kibana, Logstash, and Beats are trademarks of Elasticsearch BV, registered in the U.S. and in other countries. This site and Sematext Group is in no way affiliated with Elasticsearch BV.
Service operated by Sematext