Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # user >> Kafka unable to process large number of messages


Copy link to this message
-
Kafka unable to process large number of messages
Hi,

I'm running kafka in distributed mode with 2 nodes. It works fine with slow
ingestion rates but when I increase the ingestion rate, both the nodes
starts giving the following error:

[2013-03-29 14:51:45,379] ERROR Closing socket for /192.168.145.183 because
of error (kafka.network.Processor)
kafka.message.InvalidMessageException: message is invalid, compression
codec: NoCompressionCodec size: 414138 curr offset: 2901235 init offset: 0
    at
kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
    at
kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160)
    at
kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
    at
kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
    at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
    at
kafka.message.ByteBufferMessageSet.verifyMessageSize(ByteBufferMessageSet.scala:89)
    at kafka.log.Log.append(Log.scala:218)
    at
kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
    at
kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
    at
kafka.server.KafkaRequestHandlers$$anonfun$handleMultiProducerRequest$1.apply(KafkaRequestHandlers.scala:62)
    at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
    at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206)
    at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:34)
    at scala.collection.mutable.ArrayOps.foreach(ArrayOps.scala:34)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
    at scala.collection.mutable.ArrayOps.map(ArrayOps.scala:34)
    at
kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:62)
    at
kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
    at
kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
    at kafka.network.Processor.handle(SocketServer.scala:296)
    at kafka.network.Processor.read(SocketServer.scala:319)
    at kafka.network.Processor.run(SocketServer.scala:214)
    at java.lang.Thread.run(Thread.java:662)

I'm running kafak 0.7.2 with jdk 1.6.0.43.

Any idea what I might be doing wrong here?

Regards,
Anand

 
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB