Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Plain View
Kafka, mail # user - High Level Consumer error handling and clean exit


+
Chris Curtin 2013-07-09, 15:16
Copy link to this message
-
Re: High Level Consumer error handling and clean exit
Philip O'Toole 2013-07-09, 15:22
It seems like you're not explicitly controlling the offsets. Is that
correct?

If so, the moment you pull a message from the stream, the client framework
considers it processed. So if your app subsequently crashes before the
message is fully processed, and "auto-commit" updates the offsets in
Zookeeper, you will drop that message.

The solution to this to call commitOffsets() explicitly.

Philip

On Tue, Jul 9, 2013 at 11:16 AM, Chris Curtin <[EMAIL PROTECTED]>wrote:

> Hi,
>
> I'm working through a production-level High Level Consumer app and have a
> couple of error/shutdown questions to understand how the offset storage is
> handled.
>
> Test case - simulate an error writing to destination application, for
> example a database, offset is 'lost'
>
> Scenario
> - write 500 messages for each topic/partition
> - use the example High Level Consumer code I wrote for the Wiki
> - Change the code so that every 10th read from the 'hasNext()'
> ConsumerIterator breaks out of the loop and returns from the thread,
> simulating a hard error. I write the offset to System.out to see what was
> provided
> - startup again and look to see what offset was first emitted for a
> partition
>
> Issue: Kafka treats the offset for the message read that caused me to break
> out of the loop as processed (as expected), but I really failed. How do I
> tell Kafka that I didn't really consume that offset?
>
> Here is the example code in the 'business logic':
>
> public void run() {
>         ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
>         int counter = 0;
>         while (it.hasNext())   {
>             MessageAndMetadata<byte[], byte[]> msg = it.next();
>             if (counter == 10) {
>                 System.out.println("Stopping Thread " + m_threadNumber + ":
> Partition: " + msg.partition() +
>                         ": Offset: " + msg.offset() + " :" + new
> String(msg.message()));
>                 break;
>             }
>             System.out.println("Thread " + m_threadNumber + ": Partition: "
> + msg.partition() +
>                     ": Offset: " + msg.offset() + " :" + new
> String(msg.message()));
>             counter++;
>         }
>
>         System.out.println("Shutting down Thread: " + m_threadNumber);
>     }
>
> I understand that handling 'hard' errors like JVM crashes, kill -9 etc. may
> leave the offsets in ZooKeeper incorrect, but I'm trying to understand what
> happens in a clean shutdown where Kafka and the Consumer are behaving
> correctly but I can't process what I read.
>
> This also feels like I'm blurring SimpleConsumer theory into this, but
> except for the exception/shutdown case High Level Consumer does everything
> I want.
>
>
> Thanks,
>
> Chris
>

 
+
Chris Curtin 2013-07-09, 18:23
+
Philip OToole 2013-07-09, 19:24
+
Chris Curtin 2013-07-09, 19:54
+
Chris Curtin 2013-07-09, 20:09
+
Ian Friedman 2013-07-09, 21:51
+
Chris Curtin 2013-07-10, 11:14