I designed my consumer app (running on 0.7) to run with autocommit off and commit manually once it was done processing a record. The intent was so that if a consumer died while processing a message, the offset would not be committed, and another box would pick up the partition and reprocess the message. This seemed to work fine with small numbers of consumers (~10). But now that I'm scaling it out, I'm running into a problem where it looks like messages that consumers picked up and then errored on are not getting processed on another machine.
After investigating the logs and the partition offsets in zookeeper, I found that in ZookeeperConsumerConnector.scala closeFetchersForQueues, called during the rebalance process, will commit the offset regardless of the autocommit status. So it looks like even if my consumer is in the middle of processing a message, the offset will be committed, and even if the processing fails, it will never be picked up again. Now that I have a lot of consumer nodes, the rebalancer is going off a lot more often and I'm running into this constantly.
Were my assumptions faulty? Did I design this wrong? After reading the comment in the code I understand that if it didn't commit the offset there, the message would just get immediately consumed by whoever ended up owning the partition, even if we were in the middle of consuming it elsewhere, and we'd get unintentional duplicate delivery. How can I make it work the way I've described? Is there any way?
closeFetchersForQueues in ZookeeperConsumerConnector.scala will only be called during rebalance, so if your consumer died in the middle of processing a message that function should not be called hence it would not participate in the rebalance.
On Wed, Aug 14, 2013 at 5:21 PM, Ian Friedman <[EMAIL PROTECTED]> wrote:
Is there any way to make this work in 0.7, or is transitioning to 0.8 the only way? My operations engineers spent a lot of effort in configuring and hardening our 0.7 production install, and 0.8 isn't released yet. Not to mention having to integrate the new client side code.
Either way, thanks for all your help Jun.
Ian Friedman On Thursday, August 15, 2013 at 12:21 AM, Jun Rao wrote:
We are only patching blocker issues in 0.7. 0.8 beta1 has been released and most dev effort will be on 0.8 and beyond. That said. This particular case is easy to fix. If you can port the patch in https://issues.apache.org/jira/browse/KAFKA-919 o the 0.7 branch , we can commit that to the 0.7 branch.
Jun On Wed, Aug 14, 2013 at 9:30 PM, Ian Friedman <[EMAIL PROTECTED]> wrote:
It's a simple enough patch, but wouldn't this mean that messages still in process when a rebalance happens could get delivered to another consumer if we end up losing the partition? Rebalances seem to happen very frequently with a lot of consumers for some reason… And it doesn't seem like a consumer is guaranteed or likely to retain ownership of a partition it's in the middle of consuming after a rebalance.
Ian Friedman On Thursday, August 15, 2013 at 10:53 AM, Jun Rao wrote:
Jun, I read that FAQ entry you linked, but I am not seeing any Zookeeper connection loss in the logs. It's rebalancing multiple times per minute, though. Any idea what else could cause this? We're running kafka 0.7.2 on approx 400 consumers against a topic with 400 partitions * 3 brokers.
Ian Friedman On Thursday, August 15, 2013 at 11:52 AM, Jun Rao wrote:
That's not it either. I just had all my consumers shut down on me with this:
INFO 21:51:13,948 () ZkUtils$ - conflict in /consumers/flurry1/owners/dataLogPaths/1-183 data: flurry1_hs1030-1376964634130-dcc9192a-0 stored data: flurry1_hs1061-1376964609207-4b7f348b-0 INFO 21:51:13,948 () ZookeeperConsumerConnector - flurry1_hs1030-1376964634130-dcc9192a waiting for the partition ownership to be deleted: 1-183 INFO 21:51:13,950 () ZookeeperConsumerConnector - flurry1_hs1030-1376964634130-dcc9192a flurry1_hs1030-1376964634130-dcc9192a-0 successfully owned partition 1-180 for topic dataLogPaths
and I've also been seeing:
INFO 21:51:15,971 () ZookeeperConsumerConnector - flurry1_hs1030-1376964634130-dcc9192a begin rebalancing consumer flurry1_hs1030-1376964634130-dcc9192a try #3 INFO 21:51:16,038 () ZookeeperConsumerConnector - flurry1_hs1030-1376964634130-dcc9192a exception during rebalance org.I0Itec.zkclient.exception.ZkNoNodeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/flurry1/ids/flurry1_hs676-1376964612747-6f532caa at org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47) at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:750) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:744) at kafka.utils.ZkUtils$.readData(ZkUtils.scala:162) at kafka.consumer.TopicCount$.constructTopicCount(TopicCount.scala:66) at kafka.utils.ZkUtils$$anonfun$getConsumersPerTopic$1.apply(ZkUtils.scala:259) at kafka.utils.ZkUtils$$anonfun$getConsumersPerTopic$1.apply(ZkUtils.scala:258) at scala.collection.Iterator$class.foreach(Iterator.scala:631) at scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:474) at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) at scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:521) at kafka.utils.ZkUtils$.getConsumersPerTopic(ZkUtils.scala:258) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:478) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:449) at scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:285) at scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:444) at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anon$1.run(ZookeeperConsumerConnector.scala:401) Caused by: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /consumers/flurry1/ids/flurry1_hs676-1376964612747-6f532caa at org.apache.zookeeper.KeeperException.create(KeeperException.java:111) at org.apache.zookeeper.KeeperException.create(KeeperException.java:51) at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:1151) at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:1180) at org.I0Itec.zkclient.ZkConnection.readData(ZkConnection.java:103) at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:770) at org.I0Itec.zkclient.ZkClient$9.call(ZkClient.java:766) at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:675) ... 19 more INFO 21:51:16,039 () ZookeeperConsumerConnector - flurry1_hs1030-1376964634130-dcc9192a end rebalancing consumer flurry1_hs1030-1376964634130-dcc9192a try #3 any ideas?
Ian Friedman On Monday, August 19, 2013 at 11:58 PM, Jun Rao wrote:
Sorry, ignore that first exception, I believe that was caused by an actual manual shutdown. The NoNode exception though, has been popping up a lot, and I am not sure if it's relevant, but it seems to show up a bunch when the consumers decide it's time to rebalance continuously.
Ian Friedman On Tuesday, August 20, 2013 at 2:17 AM, Ian Friedman wrote: