Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Plain View
Kafka, mail # user - Kafka throw InvalidMessageException and lost data


+
Helin Xiang 2013-03-19, 03:22
+
Jun Rao 2013-03-19, 04:58
Copy link to this message
-
Re: Kafka throw InvalidMessageException and lost data
Helin Xiang 2013-03-19, 05:13
thanks Jun.

we are using java producer.
does the last exception
"java.lang.IllegalArgumentException
    at java.nio.Buffer.limit(Buffer.java:266)
"
also means the broker received corrupted messages ?  sorry i am not
familiar with java nio.
On Tue, Mar 19, 2013 at 12:58 PM, Jun Rao <[EMAIL PROTECTED]> wrote:

> Hmm, both log4j messages suggest that the broker received some corrupted
> produce requests. Are you using the java producer? Also, we have seen that
> network router problems caused corrupted requests before.
>
> Thanks,
>
> Jun
>
> On Mon, Mar 18, 2013 at 8:22 PM, Helin Xiang <[EMAIL PROTECTED]> wrote:
>
> > Hi,
> > We were doing some performance test using kafka 0.7.2. We use only 1
> > broker.
> > On producer client, we use 8 threads to send logs, each thread use sync
> > producer and send 100 logs at a time, (each log is about 1~2K bytes
> long),
> > The total QPS is about 30K.
> > But the number of logs both consumer read and the broker counts is less
> > than the producer send. we believe the data lost when producer sending
> logs
> > to broker.
> >
> > We settle the QPS down to 10K, still lost logs.
> > We found some exceptions in broker logs:
> >
> > 9201051 [kafka-processor-2] ERROR kafka.server.KafkaRequestHandlers  -
> > Error processing ProduceRequest on abc:0
> > kafka.message.InvalidMessageException: message is invalid, compression
> > codec: NoCompressionCodec size: 1021 curr offset: 0 init offset: 0
> >     at
> >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
> >     at
> >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160)
> >     at
> >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
> >     at
> > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> >     at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> >     at
> >
> >
> kafka.message.ByteBufferMessageSet.verifyMessageSize(ByteBufferMessageSet.scala:89)
> >     at kafka.log.Log.append(Log.scala:218)
> >     at
> >
> >
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
> >     at
> >
> >
> kafka.server.KafkaRequestHandlers.handleProducerRequest(KafkaRequestHandlers.scala:53)
> >     at
> >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38)
> >     at
> >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38)
> >     at kafka.network.Processor.handle(SocketServer.scala:296)
> >     at kafka.network.Processor.read(SocketServer.scala:319)
> >     at kafka.network.Processor.run(SocketServer.scala:214)
> >     at java.lang.Thread.run(Thread.java:636)
> >
> > Or this:
> >
> > 1406871 [kafka-processor-2] ERROR kafka.network.Processor  - Closing
> socket
> > for /10.0.2.140 because of error
> > java.nio.BufferUnderflowException
> >     at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:145)
> >     at java.nio.ByteBuffer.get(ByteBuffer.java:692)
> >     at kafka.utils.Utils$.readShortString(Utils.scala:123)
> >     at kafka.api.ProducerRequest$.readFrom(ProducerRequest.scala:29)
> >     at
> >
> >
> kafka.api.MultiProducerRequest$$anonfun$readFrom$1.apply$mcVI$sp(MultiProducerRequest.scala:28)
> >     at
> >
> >
> scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282)
> >     at
> > scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265)
> >     at
> > kafka.api.MultiProducerRequest$.readFrom(MultiProducerRequest.scala:27)
> >     at
> >
> >
> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:59)
> >
> > Or this:
> >
> > 1830146 [kafka-processor-0] ERROR kafka.network.Processor  - Closing
> socket
> > for /10.0.2.140 because of error
> > java.lang.IllegalArgumentException
> >     at java.nio.Buffer.limit(Buffer.java:266)
> >     at kafka.api.ProducerRequest$.readFrom(ProducerRequest.scala:33)

*Best Regards

向河林*

 
+
Neha Narkhede 2013-03-19, 05:55
+
王国栋 2013-03-20, 07:34
+
Jun Rao 2013-03-19, 16:10
+
王国栋 2013-03-20, 07:29
+
Jun Rao 2013-03-20, 14:29
+
Yang Zhou 2013-03-21, 02:09
+
Yang Zhou 2013-03-21, 02:33
+
Jun Rao 2013-03-21, 04:20
+
王国栋 2013-03-21, 04:43
+
Neha Narkhede 2013-03-21, 05:05
+
王国栋 2013-03-25, 03:58
+
Neha Narkhede 2013-03-25, 04:01
+
王国栋 2013-03-25, 04:23