Search Hadoop and all its sub project:

Switch to Threaded View
Subject: High Level Consumer error handling and clean exit

I'm working through a production-level High Level Consumer app and have a
couple of error/shutdown questions to understand how the offset storage is

Test case - simulate an error writing to destination application, for
example a database, offset is 'lost'

- write 500 messages for each topic/partition
- use the example High Level Consumer code I wrote for the Wiki
- Change the code so that every 10th read from the 'hasNext()'
ConsumerIterator breaks out of the loop and returns from the thread,
simulating a hard error. I write the offset to System.out to see what was
- startup again and look to see what offset was first emitted for a

Issue: Kafka treats the offset for the message read that caused me to break
out of the loop as processed (as expected), but I really failed. How do I
tell Kafka that I didn't really consume that offset?

Here is the example code in the 'business logic':

public void run() {
        ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
        int counter = 0;
        while (it.hasNext())   {
            MessageAndMetadata<byte[], byte[]> msg =;
            if (counter == 10) {
                System.out.println("Stopping Thread " + m_threadNumber + ":
Partition: " + msg.partition() +
                        ": Offset: " + msg.offset() + " :" + new
            System.out.println("Thread " + m_threadNumber + ": Partition: "
+ msg.partition() +
                    ": Offset: " + msg.offset() + " :" + new

        System.out.println("Shutting down Thread: " + m_threadNumber);

I understand that handling 'hard' errors like JVM crashes, kill -9 etc. may
leave the offsets in ZooKeeper incorrect, but I'm trying to understand what
happens in a clean shutdown where Kafka and the Consumer are behaving
correctly but I can't process what I read.

This also feels like I'm blurring SimpleConsumer theory into this, but
except for the exception/shutdown case High Level Consumer does everything
I want.


NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB