Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # user >> Dealing with errors when using Kafka Consumer

Copy link to this message
Dealing with errors when using Kafka Consumer
We have a fairly simple class that runs in a loop and consumes
messages from Kafka and feeds it to our stream processing system.

    consumerConnector = Consumer.create(new ConsumerConfig(props))
    val topicMessageStreams consumerConnector.createMessageStreams(Map(topic -> 1))

    // We only care about the first streamList from topicMessageStreams ...
    kafkaStream = topicMessageStreams(topic)(0)
    while (true) {
       val logMessage: String Utils.toString(kafkaStream.head.payload, "UTF-8")
       // do stuff with the message.

When this code gets an exception, it swallows it on the assumption
that the error is transient, and continues on its merry way. Obviously
this isn't the right thing to do in all cases (or even any case
perhaps)-- over a weekend, this code kept getting the same exception
and eventually logged many hundred gigs of error messages before it
got restarted. The exception we were getting from Kafka was:

java.lang.IllegalStateException: Iterator is in failed state
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:46)
at kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:35)
at kafka.consumer.ConsumerIterator.next(ConsumerIterator.scala:38)"

I was wondering what exceptions are transient, which ones need special
handing (say reconnecting to kafka? or just exiting the JVM and have
our job monitors restart the process again). For example, with the
iterator in an invalid state, would creating a new connector have
helped? Any help would be appreciated.