Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # user >> Kafka throw InvalidMessageException and lost data


Copy link to this message
-
Re: Kafka throw InvalidMessageException and lost data
Hmm, both log4j messages suggest that the broker received some corrupted
produce requests. Are you using the java producer? Also, we have seen that
network router problems caused corrupted requests before.

Thanks,

Jun

On Mon, Mar 18, 2013 at 8:22 PM, Helin Xiang <[EMAIL PROTECTED]> wrote:

> Hi,
> We were doing some performance test using kafka 0.7.2. We use only 1
> broker.
> On producer client, we use 8 threads to send logs, each thread use sync
> producer and send 100 logs at a time, (each log is about 1~2K bytes long),
> The total QPS is about 30K.
> But the number of logs both consumer read and the broker counts is less
> than the producer send. we believe the data lost when producer sending logs
> to broker.
>
> We settle the QPS down to 10K, still lost logs.
> We found some exceptions in broker logs:
>
> 9201051 [kafka-processor-2] ERROR kafka.server.KafkaRequestHandlers  -
> Error processing ProduceRequest on abc:0
> kafka.message.InvalidMessageException: message is invalid, compression
> codec: NoCompressionCodec size: 1021 curr offset: 0 init offset: 0
>     at
>
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
>     at
>
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160)
>     at
>
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
>     at
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
>     at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
>     at
>
> kafka.message.ByteBufferMessageSet.verifyMessageSize(ByteBufferMessageSet.scala:89)
>     at kafka.log.Log.append(Log.scala:218)
>     at
>
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
>     at
>
> kafka.server.KafkaRequestHandlers.handleProducerRequest(KafkaRequestHandlers.scala:53)
>     at
>
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38)
>     at
>
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38)
>     at kafka.network.Processor.handle(SocketServer.scala:296)
>     at kafka.network.Processor.read(SocketServer.scala:319)
>     at kafka.network.Processor.run(SocketServer.scala:214)
>     at java.lang.Thread.run(Thread.java:636)
>
> Or this:
>
> 1406871 [kafka-processor-2] ERROR kafka.network.Processor  - Closing socket
> for /10.0.2.140 because of error
> java.nio.BufferUnderflowException
>     at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:145)
>     at java.nio.ByteBuffer.get(ByteBuffer.java:692)
>     at kafka.utils.Utils$.readShortString(Utils.scala:123)
>     at kafka.api.ProducerRequest$.readFrom(ProducerRequest.scala:29)
>     at
>
> kafka.api.MultiProducerRequest$$anonfun$readFrom$1.apply$mcVI$sp(MultiProducerRequest.scala:28)
>     at
>
> scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282)
>     at
> scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265)
>     at
> kafka.api.MultiProducerRequest$.readFrom(MultiProducerRequest.scala:27)
>     at
>
> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:59)
>
> Or this:
>
> 1830146 [kafka-processor-0] ERROR kafka.network.Processor  - Closing socket
> for /10.0.2.140 because of error
> java.lang.IllegalArgumentException
>     at java.nio.Buffer.limit(Buffer.java:266)
>     at kafka.api.ProducerRequest$.readFrom(ProducerRequest.scala:33)
>     at
>
> kafka.api.MultiProducerRequest$$anonfun$readFrom$1.apply$mcVI$sp(MultiProducerRequest.scala:28)
>     at
>
> scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282)
>     at
> scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265)
>     at
> kafka.api.MultiProducerRequest$.readFrom(MultiProducerRequest.scala:27)
>     at
>
> kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:59)
>     at