Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # user >> Exception from consumer


Copy link to this message
-
Re: Exception from consumer
i deleted the broker log file and that fixed the problem. But is there any
better way to fix?

On Fri, May 11, 2012 at 12:39 PM, navneet sharma <
[EMAIL PROTECTED]> wrote:

> Hi,
>
> I tried following scenario:
> 1) Created producer for sending messages to 3 topics.
> 2) Created 3 consumers in same group for 1 topic, so 2 topics should be
> unread.
> 3) After successful execution of consumer-producer for multiple times, i
> thought to delete the log file because it grew very large.
> 4) So in effect, the messages for 2 topics which were unread got deleted.
> 5) I ran the above experiment again.
> 6) Now, changed the consumer code - and created 3 consumers for each of 3
> topics in 3 different groups. So, now i wanted to read messages for all 3
> topics.
>
> But, after that i am seeing following exception in broker log:::
> 20736 [kafka-processor-0] ERROR kafka.server.KafkaRequestHandlers  - error
> when processing request FetchRequest(topic:orderTopic, part:0
> offset:298534904 maxSize:307200)
> kafka.common.OffsetOutOfRangeException: offset 298534904 is out of range
>     at kafka.log.Log$.findRange(Log.scala:48)
>     at kafka.log.Log.read(Log.scala:224)
>     at
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$readMessageSet(KafkaRequestHandlers.scala:116)
>     at
> kafka.server.KafkaRequestHandlers$$anonfun$2.apply(KafkaRequestHandlers.scala:106)
>     at
> kafka.server.KafkaRequestHandlers$$anonfun$2.apply(KafkaRequestHandlers.scala:105)
>     at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>     at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>     at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
>     at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
>     at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
>     at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
>     at
> kafka.server.KafkaRequestHandlers.handleMultiFetchRequest(KafkaRequestHandlers.scala:105)
>     at
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$3.apply(KafkaRequestHandlers.scala:45)
>     at
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$3.apply(KafkaRequestHandlers.scala:45)
>     at kafka.network.Processor.handle(SocketServer.scala:289)
>     at kafka.network.Processor.read(SocketServer.scala:312)
>     at kafka.network.Processor.run(SocketServer.scala:207)
>     at java.lang.Thread.run(Thread.java:662)
>
> and this exception at consumer side:::
> 12:27:36,259 [FetchRunnable-0] ERROR kafka.consumer.FetcherRunnable  -
> error in FetcherRunnable for orderTopic:1-1: fetched offset = 254633932:
> consumed offset = 254633932
> kafka.common.InvalidMessageSizeException: invalid message size: 1681733685
> only received bytes: 307196 at 254633932( possible causes (1) a single
> message larger than the fetch size; (2) log corruption )
>         at
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:103)
>         at
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:138)
>         at
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:82)
>         at
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
>         at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
>         at
> kafka.message.ByteBufferMessageSet.shallowValidBytes(ByteBufferMessageSet.scala:65)
>         at
> kafka.message.ByteBufferMessageSet.validBytes(ByteBufferMessageSet.scala:60)
>         at
> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:62)
>         at
> kafka.consumer.FetcherRunnable$$anonfun$run$3.apply(FetcherRunnable.scala:82)
>         at
> kafka.consumer.FetcherRunnable$$anonfun$run$3.apply(FetcherRunnable.scala:68)
>         at
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
>         at scala.collection.immutable.List.foreach(List.scala:45)