I am using Kafka 0.7.1, and using the low-level SyncProducer to send messages to a *single* partition from a *single* thread. The client sends messages that contain sequential numbers so it is obvious at the consumer when message order is shuffled. I have noticed that messages can be saved out-or-order by Kafka when there are connection problems, and am looking for possible solutions (I think I already know the cause).
The client sends messages in a retry loop so that it will wait for a short period and then retry to send on any IO errors. In SyncProducer, any IOException triggers a disconnect. Next time send is called a new connection is established. I believe that it is this disconnect/reconnect cycle that can cause messages to be saved to the kafka log in a different order to that of the client.
If I can assume that all messages within a single partition are ordered the same as delivery order, the state management to eliminate duplicates is far simpler.
I am using Kafka as the infrastructure for a streaming map/reduce style solution, where throughput is critical. Events are sent into topic A, which is partitioned based on event id. Consumers of topic A generate data that is sent to a different topic B, which is partitioned by a persistence key. Consumers of topic B save the data to a partitioned store. Each stage can be single-threaded by the partition, which results in zero contention on the partitioned data store and massively improves the throughput. Message offsets are used to end-to-end to eliminate duplicates, so the application effectively achieves guaranteed once-only processing of messages. Currently, any out-of-order messages result in data being dropped because duplicate tracking is based *only* on message offsets. If ordering within a partition is not guaranteed, I would need to track maintain a list of message offsets that have been processed, rather than having to track just the latest message offset for a partition (and would need to persist this list of offsets to allow resume after failure).
The assumption of guaranteed order is essential for the throughput the application achieves.
On 23 August 2013 14:36, Philip O'Toole <[EMAIL PROTECTED]> wrote:
How much code are you writing to do all this, post-Kafka? Have you considered Storm? I believe the Trident topologies can give you guaranteed-once semantics, so you may be interested in checking that out, if you have the time (I have not yet played with Trident stuff myself, but Storm in general, yes). Coupling Storm to Kafka is a very popular thing to do. Even without Trident, and just using Storm in a simpler mode, may save you from writing a ton of code.
On Thu, Aug 22, 2013 at 11:59 PM, Ross Black <[EMAIL PROTECTED]> wrote:
This is a general issue with resending. Since resending is typically done on a new socket, essentially new messages are sent from a new instance of producer. So, there is no easy way to ensure that the new messages are ordered behind the ones sent by the old instance of the producer. So 0.8 will have similar issues. It may be possible to add some sort of per client sequence id and track that in the broker. But this may not be trivial and will need more thoughts.
On Thu, Aug 22, 2013 at 9:32 PM, Ross Black <[EMAIL PROTECTED]> wrote:
An auto-increment index can be assigned to a message as a key when it is being published. The consumer can monitor this index when receiving. If the expected message does not show up, buffer all received messages in a hashtable (use index as hash key) until it is received. Then handle all messages in the hashtable.
The issue is that a produce request which is either in the network buffer or in the request processing queue on the broker may still be processed after a disconnect. So there is a race condition between that processing and the reconnect/retry logic. You could work around this in a hacky way using the reconnect backoff time, but the fundamental race condition exists. We could easily make this more transparent by having some mode where disconnection throws an error back to the client, but in fact there is no way for the client to solve this either.
Neither Storm nor Samza nor any other framework would actually fix this issue for you, since they are in turn dependent on Kafka's ordering (though they might solve a lot of other problems).
As Jun mentions we have been thinking of having a per-producer sequence number to enforce ordering. This would allow us to make produce calls idempotent, enforce strong ordering in the case of retries, as well as fix a number of other corner cases. I think it would handle this issue as well. But it's not a quick patch.
I will try to get a design proposal up by next week so we have something concrete to discuss.
-Jay On Thu, Aug 22, 2013 at 9:32 PM, Ross Black <[EMAIL PROTECTED]> wrote:
Thanks for your comments - you have confirmed what I thought was most likely the case. I will attempt to work around the issue for the moment in the client to minimise the chance of the out-of-order problem occurring (probably by stopping retries and triggering a fail-fast of the JVM so that by the time it restarts there is little chance of pending requests on the prior connection).
I look forward to seeing a design proposal.
On 24 August 2013 01:34, Jay Kreps <[EMAIL PROTECTED]> wrote:
Thanks for you input. I did evaluate Storm about 9 months ago before going down the path of developing this myself on top of Kafka. The primary reason for not using Storm was the inability to control allocation of requests to processing elements. This same requirement was the reason for using the low-level Kafka consumer and producer rather than the higher-level Kafka APIs (something I hope will be possible with the redesigned APIs - https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Client+Re-Design). As Jay mentioned, using Storm would not fix the out-of-order delivery issue. I will probably eventually couple Storm to our Kafka messaging, but will need to fix https://github.com/nathanmarz/storm/issues/115 before I could use it.
I am also about to look at Samza to see if it can help me avoid having to write more code :-)
On 24 August 2013 00:34, Philip O'Toole <[EMAIL PROTECTED]> wrote:
NEW: Monitor These Apps!
Apache Lucene, Apache Solr and all other Apache Software Foundation project and their respective logos are trademarks of the Apache Software Foundation.
Elasticsearch, Kibana, Logstash, and Beats are trademarks of Elasticsearch BV, registered in the U.S. and in other countries. This site and Sematext Group is in no way affiliated with Elasticsearch BV.
Service operated by Sematext