Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # user >> High Level Consumer error handling and clean exit


Copy link to this message
-
Re: High Level Consumer error handling and clean exit
Hey Chris,

The way I handled this in my application using the High Level Consumer was to turn off auto-commit and commit manually after finishing a batch of messages (obviously you could do it after every message, but for my purposes it was better to have batches)

--
Ian Friedman
On Tuesday, July 9, 2013 at 4:09 PM, Chris Curtin wrote:

> Enhancement submitted: https://issues.apache.org/jira/browse/KAFKA-966
>
>
>
> On Tue, Jul 9, 2013 at 3:53 PM, Chris Curtin <[EMAIL PROTECTED] (mailto:[EMAIL PROTECTED])> wrote:
>
> > Thanks. I know I can write a SimpleConsumer to do this, but it feels like
> > the High Level consumer is _so_ close to being robust enough to handle
> > what I'd think people want to do in most applications. I'm going to submit
> > an enhancement request.
> >
> > I'm trying to understand the level of data loss in this situation, so I
> > looked deeper into the KafkaStream logic: it looks like a KafkaStream
> > includes a BlockingQueue for transferring the messages to my code from
> > Kafka. If I call shutdown() when I detect the problem, are the messages
> > already in the BlockingQueue considered 'read' by Kafka, or does the
> > shutdown peek into the Queue to see what is still there before updating
> > ZooKeeper?
> >
> > My concern is if that queue is not empty I'll be losing more than the one
> > message that led to the failure.
> >
> > I'm also curious how others are handling this situation. Do you assume the
> > message that is causing problems is lost or somehow know to go get it
> > later? I'd think others would have this problem too.
> >
> > Thanks,
> >
> > Chris
> >
> >
> >
> > On Tue, Jul 9, 2013 at 3:23 PM, Philip O'Toole <[EMAIL PROTECTED] (mailto:[EMAIL PROTECTED])> wrote:
> >
> > > OK.
> > >
> > > It sounds like you're requesting functionality that the high-level
> > > consumer
> > > simply doesn't have. As I am sure you know, there is no API call that
> > > supports "handing back a message".
> > >
> > > I might be missing something, but if you need this kind of control, I
> > > think
> > > you need to code your application differently. You could try creating a
> > > ConsumerConnection per partition (your clients will then need to know the
> > > number of partitions out there). That way commitOffsets() will actually
> > > only apply to that partition. Auto-commit the same way. It might give you
> > > the level of control you need.
> > >
> > > Philip
> > >
> > > On Tue, Jul 9, 2013 at 2:22 PM, Chris Curtin <[EMAIL PROTECTED] (mailto:[EMAIL PROTECTED])>
> > > wrote:
> > >
> > > > Hi Philip,
> > > >
> > > > Correct, I don't want to explicitly control the offset committing. The
> > > > ConsumerConnector handles that well enough except for when I want to
> > > > shutdown and NOT have Kafka think I consumed that last message for a
> > > > stream. This isn't the crash case, it is a case where the logic
> > > >
> > >
> > > consuming
> > > > the message detects and error and wants to cleanly exit until that issue
> > > > can be resolved, but not lose the message it was trying to process when
> > > >
> > >
> > > the
> > > > problem is resolved.
> > > >
> > > > My understanding is that the commitOffsets() call is across all threads,
> > > > not just for the stream my thread is reading from. So knowing it is
> > > >
> > >
> > > okay to
> > > > call this requires coordination across all my threads, which makes a
> > >
> > > High
> > > > Level Consumer a lot harder to write correctly.
> > > >
> > > > Thinking about what I'd like to happen is: my code hands the message
> > > back
> > > > to the KafkaStream (or whatever level knows about the consumed offsets)
> > >
> > > and
> > > > says
> > > > - set the next start offset for this topic/partition to this message in
> > > > ZooKeeper
> > > > - cleanly shutdown the stream from the broker(s)
> > > > - don't force a rebalance on the consumer since something is wrong with
> > > > processing of the data in the message, not the message.
 
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB