Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Plain View
Kafka >> mail # user >> Kafka throw InvalidMessageException and lost data


Copy link to this message
-
Kafka throw InvalidMessageException and lost data
Hi,
We were doing some performance test using kafka 0.7.2. We use only 1 broker.
On producer client, we use 8 threads to send logs, each thread use sync
producer and send 100 logs at a time, (each log is about 1~2K bytes long),
The total QPS is about 30K.
But the number of logs both consumer read and the broker counts is less
than the producer send. we believe the data lost when producer sending logs
to broker.

We settle the QPS down to 10K, still lost logs.
We found some exceptions in broker logs:

9201051 [kafka-processor-2] ERROR kafka.server.KafkaRequestHandlers  -
Error processing ProduceRequest on abc:0
kafka.message.InvalidMessageException: message is invalid, compression
codec: NoCompressionCodec size: 1021 curr offset: 0 init offset: 0
    at
kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
    at
kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160)
    at
kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
    at
kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
    at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
    at
kafka.message.ByteBufferMessageSet.verifyMessageSize(ByteBufferMessageSet.scala:89)
    at kafka.log.Log.append(Log.scala:218)
    at
kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
    at
kafka.server.KafkaRequestHandlers.handleProducerRequest(KafkaRequestHandlers.scala:53)
    at
kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38)
    at
kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38)
    at kafka.network.Processor.handle(SocketServer.scala:296)
    at kafka.network.Processor.read(SocketServer.scala:319)
    at kafka.network.Processor.run(SocketServer.scala:214)
    at java.lang.Thread.run(Thread.java:636)

Or this:

1406871 [kafka-processor-2] ERROR kafka.network.Processor  - Closing socket
for /10.0.2.140 because of error
java.nio.BufferUnderflowException
    at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:145)
    at java.nio.ByteBuffer.get(ByteBuffer.java:692)
    at kafka.utils.Utils$.readShortString(Utils.scala:123)
    at kafka.api.ProducerRequest$.readFrom(ProducerRequest.scala:29)
    at
kafka.api.MultiProducerRequest$$anonfun$readFrom$1.apply$mcVI$sp(MultiProducerRequest.scala:28)
    at
scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282)
    at
scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265)
    at
kafka.api.MultiProducerRequest$.readFrom(MultiProducerRequest.scala:27)
    at
kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:59)

Or this:

1830146 [kafka-processor-0] ERROR kafka.network.Processor  - Closing socket
for /10.0.2.140 because of error
java.lang.IllegalArgumentException
    at java.nio.Buffer.limit(Buffer.java:266)
    at kafka.api.ProducerRequest$.readFrom(ProducerRequest.scala:33)
    at
kafka.api.MultiProducerRequest$$anonfun$readFrom$1.apply$mcVI$sp(MultiProducerRequest.scala:28)
    at
scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282)
    at
scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265)
    at
kafka.api.MultiProducerRequest$.readFrom(MultiProducerRequest.scala:27)
    at
kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:59)
    at
kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
    at
kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41)
    at kafka.network.Processor.handle(SocketServer.scala:296)
    at kafka.network.Processor.read(SocketServer.scala:319)
    at kafka.network.Processor.run(SocketServer.scala:214)
    at java.lang.Thread.run(Thread.java:636)

It bothers us for a few days, and at first we thought it might be some
wrong configuration settings, and we changed to the wiki's recommended
configuration, but unfortunately the exceptions still came out.

In what situation can these exceptions  be thrown out ? What can we do to
avoid these exceptions ?

THANKS

*Best Regards

Xiang Helin*

 
+
Jun Rao 2013-03-19, 04:58
+
Helin Xiang 2013-03-19, 05:13
+
Neha Narkhede 2013-03-19, 05:55
+
王国栋 2013-03-20, 07:34
+
Jun Rao 2013-03-19, 16:10
+
王国栋 2013-03-20, 07:29
+
Jun Rao 2013-03-20, 14:29
+
Yang Zhou 2013-03-21, 02:09
+
Yang Zhou 2013-03-21, 02:33
+
Jun Rao 2013-03-21, 04:20
+
王国栋 2013-03-21, 04:43
+
Neha Narkhede 2013-03-21, 05:05
+
王国栋 2013-03-25, 03:58
+
Neha Narkhede 2013-03-25, 04:01
+
王国栋 2013-03-25, 04:23