Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # user >> Exception from consumer


Copy link to this message
-
Exception from consumer
Hi,

I tried following scenario:
1) Created producer for sending messages to 3 topics.
2) Created 3 consumers in same group for 1 topic, so 2 topics should be
unread.
3) After successful execution of consumer-producer for multiple times, i
thought to delete the log file because it grew very large.
4) So in effect, the messages for 2 topics which were unread got deleted.
5) I ran the above experiment again.
6) Now, changed the consumer code - and created 3 consumers for each of 3
topics in 3 different groups. So, now i wanted to read messages for all 3
topics.

But, after that i am seeing following exception in broker log:::
20736 [kafka-processor-0] ERROR kafka.server.KafkaRequestHandlers  - error
when processing request FetchRequest(topic:orderTopic, part:0
offset:298534904 maxSize:307200)
kafka.common.OffsetOutOfRangeException: offset 298534904 is out of range
    at kafka.log.Log$.findRange(Log.scala:48)
    at kafka.log.Log.read(Log.scala:224)
    at
kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$readMessageSet(KafkaRequestHandlers.scala:116)
    at
kafka.server.KafkaRequestHandlers$$anonfun$2.apply(KafkaRequestHandlers.scala:106)
    at
kafka.server.KafkaRequestHandlers$$anonfun$2.apply(KafkaRequestHandlers.scala:105)
    at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
    at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
    at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
    at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
    at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
    at
kafka.server.KafkaRequestHandlers.handleMultiFetchRequest(KafkaRequestHandlers.scala:105)
    at
kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$3.apply(KafkaRequestHandlers.scala:45)
    at
kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$3.apply(KafkaRequestHandlers.scala:45)
    at kafka.network.Processor.handle(SocketServer.scala:289)
    at kafka.network.Processor.read(SocketServer.scala:312)
    at kafka.network.Processor.run(SocketServer.scala:207)
    at java.lang.Thread.run(Thread.java:662)

and this exception at consumer side:::
12:27:36,259 [FetchRunnable-0] ERROR kafka.consumer.FetcherRunnable  -
error in FetcherRunnable for orderTopic:1-1: fetched offset = 254633932:
consumed offset = 254633932
kafka.common.InvalidMessageSizeException: invalid message size: 1681733685
only received bytes: 307196 at 254633932( possible causes (1) a single
message larger than the fetch size; (2) log corruption )
        at
kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:103)
        at
kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:138)
        at
kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:82)
        at
kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
        at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
        at
kafka.message.ByteBufferMessageSet.shallowValidBytes(ByteBufferMessageSet.scala:65)
        at
kafka.message.ByteBufferMessageSet.validBytes(ByteBufferMessageSet.scala:60)
        at
kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:62)
        at
kafka.consumer.FetcherRunnable$$anonfun$run$3.apply(FetcherRunnable.scala:82)
        at
kafka.consumer.FetcherRunnable$$anonfun$run$3.apply(FetcherRunnable.scala:68)
        at
scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
        at scala.collection.immutable.List.foreach(List.scala:45)
        at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:68)
in fact consumer is getting killed after throwing this exception.

i feel i got into trouble because i deleted the logs in between and
consumer for some reason is still trying to retrieve messages from an older
offset. Is that the case?

How to get over this problem?

Thanks,
Navneet Sharma
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB