Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Plain View
Kafka >> mail # user >> High Level Consumer error handling and clean exit


+
Chris Curtin 2013-07-09, 15:16
Copy link to this message
-
Re: High Level Consumer error handling and clean exit
It seems like you're not explicitly controlling the offsets. Is that
correct?

If so, the moment you pull a message from the stream, the client framework
considers it processed. So if your app subsequently crashes before the
message is fully processed, and "auto-commit" updates the offsets in
Zookeeper, you will drop that message.

The solution to this to call commitOffsets() explicitly.

Philip

On Tue, Jul 9, 2013 at 11:16 AM, Chris Curtin <[EMAIL PROTECTED]>wrote:

> Hi,
>
> I'm working through a production-level High Level Consumer app and have a
> couple of error/shutdown questions to understand how the offset storage is
> handled.
>
> Test case - simulate an error writing to destination application, for
> example a database, offset is 'lost'
>
> Scenario
> - write 500 messages for each topic/partition
> - use the example High Level Consumer code I wrote for the Wiki
> - Change the code so that every 10th read from the 'hasNext()'
> ConsumerIterator breaks out of the loop and returns from the thread,
> simulating a hard error. I write the offset to System.out to see what was
> provided
> - startup again and look to see what offset was first emitted for a
> partition
>
> Issue: Kafka treats the offset for the message read that caused me to break
> out of the loop as processed (as expected), but I really failed. How do I
> tell Kafka that I didn't really consume that offset?
>
> Here is the example code in the 'business logic':
>
> public void run() {
>         ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
>         int counter = 0;
>         while (it.hasNext())   {
>             MessageAndMetadata<byte[], byte[]> msg = it.next();
>             if (counter == 10) {
>                 System.out.println("Stopping Thread " + m_threadNumber + ":
> Partition: " + msg.partition() +
>                         ": Offset: " + msg.offset() + " :" + new
> String(msg.message()));
>                 break;
>             }
>             System.out.println("Thread " + m_threadNumber + ": Partition: "
> + msg.partition() +
>                     ": Offset: " + msg.offset() + " :" + new
> String(msg.message()));
>             counter++;
>         }
>
>         System.out.println("Shutting down Thread: " + m_threadNumber);
>     }
>
> I understand that handling 'hard' errors like JVM crashes, kill -9 etc. may
> leave the offsets in ZooKeeper incorrect, but I'm trying to understand what
> happens in a clean shutdown where Kafka and the Consumer are behaving
> correctly but I can't process what I read.
>
> This also feels like I'm blurring SimpleConsumer theory into this, but
> except for the exception/shutdown case High Level Consumer does everything
> I want.
>
>
> Thanks,
>
> Chris
>

 
+
Chris Curtin 2013-07-09, 18:23
+
Philip OToole 2013-07-09, 19:24
+
Chris Curtin 2013-07-09, 19:54
+
Chris Curtin 2013-07-09, 20:09
+
Ian Friedman 2013-07-09, 21:51
+
Chris Curtin 2013-07-10, 11:14
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB