Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # user >> High Level Consumer error handling and clean exit

Copy link to this message
Re: High Level Consumer error handling and clean exit
Hi Philip,

Correct, I don't want to explicitly control the offset committing. The
ConsumerConnector handles that well enough except for when I want to
shutdown and NOT have Kafka think I consumed that last message for a
stream. This isn't the crash case, it is a case where the logic consuming
the message detects and error and wants to cleanly exit until that issue
can be resolved, but not lose the message it was trying to process when the
problem is resolved.

My understanding is that the commitOffsets() call is across all threads,
not just for the stream my thread is reading from. So knowing it is okay to
call this requires coordination across all my threads, which makes a High
Level Consumer a lot harder to write correctly.

Thinking about what I'd like to happen is: my code hands the message back
to the KafkaStream (or whatever level knows about the consumed offsets) and
- set the next start offset for this topic/partition to this message in
- cleanly shutdown the stream from the broker(s)
- don't force a rebalance on the consumer since something is wrong with
processing of the data in the message, not the message.
- If I try to use the stream again I should get an exception
- I don't think I would want this to cause a complete shutdown of the
ConsumerConnector, in case other threads are still processing. If all
threads have the same issue they will all fail soon enough and do the same
logic. But if only one thread fails, our Operations teams will need to
resolve the issue then do a clean restart to recover.

I think this logic would only happen when the down stream system was having
issues since the iterator would be drained correctly when the 'shutdown'
call to ConsumerConnector is made.



On Tue, Jul 9, 2013 at 11:21 AM, Philip O'Toole <[EMAIL PROTECTED]> wrote:

> It seems like you're not explicitly controlling the offsets. Is that
> correct?
> If so, the moment you pull a message from the stream, the client framework
> considers it processed. So if your app subsequently crashes before the
> message is fully processed, and "auto-commit" updates the offsets in
> Zookeeper, you will drop that message.
> The solution to this to call commitOffsets() explicitly.
> Philip
> On Tue, Jul 9, 2013 at 11:16 AM, Chris Curtin <[EMAIL PROTECTED]
> >wrote:
> > Hi,
> >
> > I'm working through a production-level High Level Consumer app and have a
> > couple of error/shutdown questions to understand how the offset storage
> is
> > handled.
> >
> > Test case - simulate an error writing to destination application, for
> > example a database, offset is 'lost'
> >
> > Scenario
> > - write 500 messages for each topic/partition
> > - use the example High Level Consumer code I wrote for the Wiki
> > - Change the code so that every 10th read from the 'hasNext()'
> > ConsumerIterator breaks out of the loop and returns from the thread,
> > simulating a hard error. I write the offset to System.out to see what was
> > provided
> > - startup again and look to see what offset was first emitted for a
> > partition
> >
> > Issue: Kafka treats the offset for the message read that caused me to
> break
> > out of the loop as processed (as expected), but I really failed. How do I
> > tell Kafka that I didn't really consume that offset?
> >
> > Here is the example code in the 'business logic':
> >
> > public void run() {
> >         ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
> >         int counter = 0;
> >         while (it.hasNext())   {
> >             MessageAndMetadata<byte[], byte[]> msg = it.next();
> >             if (counter == 10) {
> >                 System.out.println("Stopping Thread " + m_threadNumber +
> ":
> > Partition: " + msg.partition() +
> >                         ": Offset: " + msg.offset() + " :" + new
> > String(msg.message()));
> >                 break;
> >             }
> >             System.out.println("Thread " + m_threadNumber + ":
> Partition: "
> > + msg.partition() +