Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka, mail # user - Kafka unable to process large number of messages


Copy link to this message
-
Re: Kafka unable to process large number of messages
Jun Rao 2013-03-29, 14:55
This indicates that the messages sent to the broker are somehow corrupted.
Are you using a java producer? How many instances of producers do you have?

Thanks,

Jun

On Fri, Mar 29, 2013 at 2:46 AM, anand nalya <[EMAIL PROTECTED]> wrote:

> Hi,
>
> I'm running kafka in distributed mode with 2 nodes. It works fine with slow
> ingestion rates but when I increase the ingestion rate, both the nodes
> starts giving the following error:
>
> [2013-03-29 14:51:45,379] ERROR Closing socket for /192.168.145.183because
> of error (kafka.network.Processor)
> kafka.message.InvalidMessageException: message is invalid, compression
> codec: NoCompressionCodec size: 414138 curr offset: 2901235 init offset: 0
>     at
>
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
>     at
>
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160)
>     at
>
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
>     at
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
>     at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
>     at
>
> kafka.message.ByteBufferMessageSet.verifyMessageSize(ByteBufferMessageSet.scala:89)
>     at kafka.log.Log.append(Log.scala:218)
>     at
>
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
>     at
>
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
>     at
>
> kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
>     at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>     at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
>     at
>
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
>     at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
>     at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
>     at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
>     at
>
> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
>     at
>
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
>     at
>
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
>     at kafka.network.Processor.handle(SocketServer.scala:296)
>     at kafka.network.Processor.read(SocketServer.scala:319)
>     at kafka.network.Processor.run(SocketServer.scala:214)
>     at java.lang.Thread.run(Thread.java:662)
>
> I'm running kafak 0.7.2 with jdk 1.6.0.43.
>
> Any idea what I might be doing wrong here?
>
> Regards,
> Anand
>