Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # user >> High Level Consumer error handling and clean exit


Copy link to this message
-
High Level Consumer error handling and clean exit
Hi,

I'm working through a production-level High Level Consumer app and have a
couple of error/shutdown questions to understand how the offset storage is
handled.

Test case - simulate an error writing to destination application, for
example a database, offset is 'lost'

Scenario
- write 500 messages for each topic/partition
- use the example High Level Consumer code I wrote for the Wiki
- Change the code so that every 10th read from the 'hasNext()'
ConsumerIterator breaks out of the loop and returns from the thread,
simulating a hard error. I write the offset to System.out to see what was
provided
- startup again and look to see what offset was first emitted for a
partition

Issue: Kafka treats the offset for the message read that caused me to break
out of the loop as processed (as expected), but I really failed. How do I
tell Kafka that I didn't really consume that offset?

Here is the example code in the 'business logic':

public void run() {
        ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
        int counter = 0;
        while (it.hasNext())   {
            MessageAndMetadata<byte[], byte[]> msg = it.next();
            if (counter == 10) {
                System.out.println("Stopping Thread " + m_threadNumber + ":
Partition: " + msg.partition() +
                        ": Offset: " + msg.offset() + " :" + new
String(msg.message()));
                break;
            }
            System.out.println("Thread " + m_threadNumber + ": Partition: "
+ msg.partition() +
                    ": Offset: " + msg.offset() + " :" + new
String(msg.message()));
            counter++;
        }

        System.out.println("Shutting down Thread: " + m_threadNumber);
    }

I understand that handling 'hard' errors like JVM crashes, kill -9 etc. may
leave the offsets in ZooKeeper incorrect, but I'm trying to understand what
happens in a clean shutdown where Kafka and the Consumer are behaving
correctly but I can't process what I read.

This also feels like I'm blurring SimpleConsumer theory into this, but
except for the exception/shutdown case High Level Consumer does everything
I want.
Thanks,

Chris

 
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB