-Re: High Level Consumer error handling and clean exit
Chris Curtin 2013-07-09, 20:09
Enhancement submitted: https://issues.apache.org/jira/browse/KAFKA-966
On Tue, Jul 9, 2013 at 3:53 PM, Chris Curtin <[EMAIL PROTECTED]> wrote:
> Thanks. I know I can write a SimpleConsumer to do this, but it feels like
> the High Level consumer is _so_ close to being robust enough to handle
> what I'd think people want to do in most applications. I'm going to submit
> an enhancement request.
> I'm trying to understand the level of data loss in this situation, so I
> looked deeper into the KafkaStream logic: it looks like a KafkaStream
> includes a BlockingQueue for transferring the messages to my code from
> Kafka. If I call shutdown() when I detect the problem, are the messages
> already in the BlockingQueue considered 'read' by Kafka, or does the
> shutdown peek into the Queue to see what is still there before updating
> My concern is if that queue is not empty I'll be losing more than the one
> message that led to the failure.
> I'm also curious how others are handling this situation. Do you assume the
> message that is causing problems is lost or somehow know to go get it
> later? I'd think others would have this problem too.
> On Tue, Jul 9, 2013 at 3:23 PM, Philip O'Toole <[EMAIL PROTECTED]> wrote:
>> It sounds like you're requesting functionality that the high-level
>> simply doesn't have. As I am sure you know, there is no API call that
>> supports "handing back a message".
>> I might be missing something, but if you need this kind of control, I
>> you need to code your application differently. You could try creating a
>> ConsumerConnection per partition (your clients will then need to know the
>> number of partitions out there). That way commitOffsets() will actually
>> only apply to that partition. Auto-commit the same way. It might give you
>> the level of control you need.
>> On Tue, Jul 9, 2013 at 2:22 PM, Chris Curtin <[EMAIL PROTECTED]>
>> > Hi Philip,
>> > Correct, I don't want to explicitly control the offset committing. The
>> > ConsumerConnector handles that well enough except for when I want to
>> > shutdown and NOT have Kafka think I consumed that last message for a
>> > stream. This isn't the crash case, it is a case where the logic
>> > the message detects and error and wants to cleanly exit until that issue
>> > can be resolved, but not lose the message it was trying to process when
>> > problem is resolved.
>> > My understanding is that the commitOffsets() call is across all threads,
>> > not just for the stream my thread is reading from. So knowing it is
>> okay to
>> > call this requires coordination across all my threads, which makes a
>> > Level Consumer a lot harder to write correctly.
>> > Thinking about what I'd like to happen is: my code hands the message
>> > to the KafkaStream (or whatever level knows about the consumed offsets)
>> > says
>> > - set the next start offset for this topic/partition to this message in
>> > ZooKeeper
>> > - cleanly shutdown the stream from the broker(s)
>> > - don't force a rebalance on the consumer since something is wrong with
>> > processing of the data in the message, not the message.
>> > - If I try to use the stream again I should get an exception
>> > - I don't think I would want this to cause a complete shutdown of the
>> > ConsumerConnector, in case other threads are still processing. If all
>> > threads have the same issue they will all fail soon enough and do the
>> > logic. But if only one thread fails, our Operations teams will need to
>> > resolve the issue then do a clean restart to recover.
>> > I think this logic would only happen when the down stream system was
>> > issues since the iterator would be drained correctly when the 'shutdown'
>> > call to ConsumerConnector is made.
>> > Thanks,
>> > Chris
>> > On Tue, Jul 9, 2013 at 11:21 AM, Philip O'Toole <[EMAIL PROTECTED]>