|
Helin Xiang
2013-03-19, 03:22
Jun Rao
2013-03-19, 04:58
Helin Xiang
2013-03-19, 05:13
Neha Narkhede
2013-03-19, 05:55
Jun Rao
2013-03-19, 16:10
王国栋
2013-03-20, 07:29
王国栋
2013-03-20, 07:34
Jun Rao
2013-03-20, 14:29
Yang Zhou
2013-03-21, 02:09
Yang Zhou
2013-03-21, 02:33
Jun Rao
2013-03-21, 04:20
王国栋
2013-03-21, 04:43
Neha Narkhede
2013-03-21, 05:05
王国栋
2013-03-25, 03:58
Neha Narkhede
2013-03-25, 04:01
王国栋
2013-03-25, 04:23
|
-
Kafka throw InvalidMessageException and lost dataHelin Xiang 2013-03-19, 03:22
Hi,
We were doing some performance test using kafka 0.7.2. We use only 1 broker. On producer client, we use 8 threads to send logs, each thread use sync producer and send 100 logs at a time, (each log is about 1~2K bytes long), The total QPS is about 30K. But the number of logs both consumer read and the broker counts is less than the producer send. we believe the data lost when producer sending logs to broker. We settle the QPS down to 10K, still lost logs. We found some exceptions in broker logs: 9201051 [kafka-processor-2] ERROR kafka.server.KafkaRequestHandlers - Error processing ProduceRequest on abc:0 kafka.message.InvalidMessageException: message is invalid, compression codec: NoCompressionCodec size: 1021 curr offset: 0 init offset: 0 at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) at kafka.message.ByteBufferMessageSet.verifyMessageSize(ByteBufferMessageSet.scala:89) at kafka.log.Log.append(Log.scala:218) at kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69) at kafka.server.KafkaRequestHandlers.handleProducerRequest(KafkaRequestHandlers.scala:53) at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38) at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38) at kafka.network.Processor.handle(SocketServer.scala:296) at kafka.network.Processor.read(SocketServer.scala:319) at kafka.network.Processor.run(SocketServer.scala:214) at java.lang.Thread.run(Thread.java:636) Or this: 1406871 [kafka-processor-2] ERROR kafka.network.Processor - Closing socket for /10.0.2.140 because of error java.nio.BufferUnderflowException at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:145) at java.nio.ByteBuffer.get(ByteBuffer.java:692) at kafka.utils.Utils$.readShortString(Utils.scala:123) at kafka.api.ProducerRequest$.readFrom(ProducerRequest.scala:29) at kafka.api.MultiProducerRequest$$anonfun$readFrom$1.apply$mcVI$sp(MultiProducerRequest.scala:28) at scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282) at scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265) at kafka.api.MultiProducerRequest$.readFrom(MultiProducerRequest.scala:27) at kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:59) Or this: 1830146 [kafka-processor-0] ERROR kafka.network.Processor - Closing socket for /10.0.2.140 because of error java.lang.IllegalArgumentException at java.nio.Buffer.limit(Buffer.java:266) at kafka.api.ProducerRequest$.readFrom(ProducerRequest.scala:33) at kafka.api.MultiProducerRequest$$anonfun$readFrom$1.apply$mcVI$sp(MultiProducerRequest.scala:28) at scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282) at scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265) at kafka.api.MultiProducerRequest$.readFrom(MultiProducerRequest.scala:27) at kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:59) at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$4.apply(KafkaRequestHandlers.scala:41) at kafka.network.Processor.handle(SocketServer.scala:296) at kafka.network.Processor.read(SocketServer.scala:319) at kafka.network.Processor.run(SocketServer.scala:214) at java.lang.Thread.run(Thread.java:636) It bothers us for a few days, and at first we thought it might be some wrong configuration settings, and we changed to the wiki's recommended configuration, but unfortunately the exceptions still came out. In what situation can these exceptions be thrown out ? What can we do to avoid these exceptions ? THANKS *Best Regards Xiang Helin*
-
Re: Kafka throw InvalidMessageException and lost dataJun Rao 2013-03-19, 04:58
Hmm, both log4j messages suggest that the broker received some corrupted
produce requests. Are you using the java producer? Also, we have seen that network router problems caused corrupted requests before. Thanks, Jun On Mon, Mar 18, 2013 at 8:22 PM, Helin Xiang <[EMAIL PROTECTED]> wrote: > Hi, > We were doing some performance test using kafka 0.7.2. We use only 1 > broker. > On producer client, we use 8 threads to send logs, each thread use sync > producer and send 100 logs at a time, (each log is about 1~2K bytes long), > The total QPS is about 30K. > But the number of logs both consumer read and the broker counts is less > than the producer send. we believe the data lost when producer sending logs > to broker. > > We settle the QPS down to 10K, still lost logs. > We found some exceptions in broker logs: > > 9201051 [kafka-processor-2] ERROR kafka.server.KafkaRequestHandlers - > Error processing ProduceRequest on abc:0 > kafka.message.InvalidMessageException: message is invalid, compression > codec: NoCompressionCodec size: 1021 curr offset: 0 init offset: 0 > at > > kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) > at > > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160) > at > > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) > at > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) > at > > kafka.message.ByteBufferMessageSet.verifyMessageSize(ByteBufferMessageSet.scala:89) > at kafka.log.Log.append(Log.scala:218) > at > > kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69) > at > > kafka.server.KafkaRequestHandlers.handleProducerRequest(KafkaRequestHandlers.scala:53) > at > > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38) > at > > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38) > at kafka.network.Processor.handle(SocketServer.scala:296) > at kafka.network.Processor.read(SocketServer.scala:319) > at kafka.network.Processor.run(SocketServer.scala:214) > at java.lang.Thread.run(Thread.java:636) > > Or this: > > 1406871 [kafka-processor-2] ERROR kafka.network.Processor - Closing socket > for /10.0.2.140 because of error > java.nio.BufferUnderflowException > at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:145) > at java.nio.ByteBuffer.get(ByteBuffer.java:692) > at kafka.utils.Utils$.readShortString(Utils.scala:123) > at kafka.api.ProducerRequest$.readFrom(ProducerRequest.scala:29) > at > > kafka.api.MultiProducerRequest$$anonfun$readFrom$1.apply$mcVI$sp(MultiProducerRequest.scala:28) > at > > scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282) > at > scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265) > at > kafka.api.MultiProducerRequest$.readFrom(MultiProducerRequest.scala:27) > at > > kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:59) > > Or this: > > 1830146 [kafka-processor-0] ERROR kafka.network.Processor - Closing socket > for /10.0.2.140 because of error > java.lang.IllegalArgumentException > at java.nio.Buffer.limit(Buffer.java:266) > at kafka.api.ProducerRequest$.readFrom(ProducerRequest.scala:33) > at > > kafka.api.MultiProducerRequest$$anonfun$readFrom$1.apply$mcVI$sp(MultiProducerRequest.scala:28) > at > > scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282) > at > scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265) > at > kafka.api.MultiProducerRequest$.readFrom(MultiProducerRequest.scala:27) > at > > kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:59) > at
-
Re: Kafka throw InvalidMessageException and lost dataHelin Xiang 2013-03-19, 05:13
thanks Jun.
we are using java producer. does the last exception "java.lang.IllegalArgumentException at java.nio.Buffer.limit(Buffer.java:266) " also means the broker received corrupted messages ? sorry i am not familiar with java nio. On Tue, Mar 19, 2013 at 12:58 PM, Jun Rao <[EMAIL PROTECTED]> wrote: > Hmm, both log4j messages suggest that the broker received some corrupted > produce requests. Are you using the java producer? Also, we have seen that > network router problems caused corrupted requests before. > > Thanks, > > Jun > > On Mon, Mar 18, 2013 at 8:22 PM, Helin Xiang <[EMAIL PROTECTED]> wrote: > > > Hi, > > We were doing some performance test using kafka 0.7.2. We use only 1 > > broker. > > On producer client, we use 8 threads to send logs, each thread use sync > > producer and send 100 logs at a time, (each log is about 1~2K bytes > long), > > The total QPS is about 30K. > > But the number of logs both consumer read and the broker counts is less > > than the producer send. we believe the data lost when producer sending > logs > > to broker. > > > > We settle the QPS down to 10K, still lost logs. > > We found some exceptions in broker logs: > > > > 9201051 [kafka-processor-2] ERROR kafka.server.KafkaRequestHandlers - > > Error processing ProduceRequest on abc:0 > > kafka.message.InvalidMessageException: message is invalid, compression > > codec: NoCompressionCodec size: 1021 curr offset: 0 init offset: 0 > > at > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) > > at > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160) > > at > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) > > at > > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) > > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) > > at > > > > > kafka.message.ByteBufferMessageSet.verifyMessageSize(ByteBufferMessageSet.scala:89) > > at kafka.log.Log.append(Log.scala:218) > > at > > > > > kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69) > > at > > > > > kafka.server.KafkaRequestHandlers.handleProducerRequest(KafkaRequestHandlers.scala:53) > > at > > > > > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38) > > at > > > > > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38) > > at kafka.network.Processor.handle(SocketServer.scala:296) > > at kafka.network.Processor.read(SocketServer.scala:319) > > at kafka.network.Processor.run(SocketServer.scala:214) > > at java.lang.Thread.run(Thread.java:636) > > > > Or this: > > > > 1406871 [kafka-processor-2] ERROR kafka.network.Processor - Closing > socket > > for /10.0.2.140 because of error > > java.nio.BufferUnderflowException > > at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:145) > > at java.nio.ByteBuffer.get(ByteBuffer.java:692) > > at kafka.utils.Utils$.readShortString(Utils.scala:123) > > at kafka.api.ProducerRequest$.readFrom(ProducerRequest.scala:29) > > at > > > > > kafka.api.MultiProducerRequest$$anonfun$readFrom$1.apply$mcVI$sp(MultiProducerRequest.scala:28) > > at > > > > > scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282) > > at > > scala.collection.immutable.Range$$anon$2.foreach$mVc$sp(Range.scala:265) > > at > > kafka.api.MultiProducerRequest$.readFrom(MultiProducerRequest.scala:27) > > at > > > > > kafka.server.KafkaRequestHandlers.handleMultiProducerRequest(KafkaRequestHandlers.scala:59) > > > > Or this: > > > > 1830146 [kafka-processor-0] ERROR kafka.network.Processor - Closing > socket > > for /10.0.2.140 because of error > > java.lang.IllegalArgumentException > > at java.nio.Buffer.limit(Buffer.java:266) > > at kafka.api.ProducerRequest$.readFrom(ProducerRequest.scala:33) *Best Regards 向河林*
-
Re: Kafka throw InvalidMessageException and lost dataNeha Narkhede 2013-03-19, 05:55
Do you mind trying out the DumpLogSegment tool on the log segment for the
corrupted topic. That will validate if the log data is corrupted. Also, Is your test reproducible ? We ran into a similar issue in production but could not reproduce it. Thanks, Neha On Monday, March 18, 2013, Helin Xiang wrote: > thanks Jun. > > we are using java producer. > does the last exception > "java.lang.IllegalArgumentException > at java.nio.Buffer.limit(Buffer.java:266) > " > also means the broker received corrupted messages ? sorry i am not > familiar with java nio. > > > > > On Tue, Mar 19, 2013 at 12:58 PM, Jun Rao <[EMAIL PROTECTED]> wrote: > > > Hmm, both log4j messages suggest that the broker received some corrupted > > produce requests. Are you using the java producer? Also, we have seen > that > > network router problems caused corrupted requests before. > > > > Thanks, > > > > Jun > > > > On Mon, Mar 18, 2013 at 8:22 PM, Helin Xiang <[EMAIL PROTECTED]> wrote: > > > > > Hi, > > > We were doing some performance test using kafka 0.7.2. We use only 1 > > > broker. > > > On producer client, we use 8 threads to send logs, each thread use sync > > > producer and send 100 logs at a time, (each log is about 1~2K bytes > > long), > > > The total QPS is about 30K. > > > But the number of logs both consumer read and the broker counts is less > > > than the producer send. we believe the data lost when producer sending > > logs > > > to broker. > > > > > > We settle the QPS down to 10K, still lost logs. > > > We found some exceptions in broker logs: > > > > > > 9201051 [kafka-processor-2] ERROR kafka.server.KafkaRequestHandlers - > > > Error processing ProduceRequest on abc:0 > > > kafka.message.InvalidMessageException: message is invalid, compression > > > codec: NoCompressionCodec size: 1021 curr offset: 0 init offset: 0 > > > at > > > > > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) > > > at > > > > > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160) > > > at > > > > > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) > > > at > > > > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) > > > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) > > > at > > > > > > > > > kafka.message.ByteBufferMessageSet.verifyMessageSize(ByteBufferMessageSet.scala:89) > > > at kafka.log.Log.append(Log.scala:218) > > > at > > > > > > > > > kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69) > > > at > > > > > > > > > kafka.server.KafkaRequestHandlers.handleProducerRequest(KafkaRequestHandlers.scala:53) > > > at > > > > > > > > > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38) > > > at > > > > > > > > > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38) > > > at kafka.network.Processor.handle(SocketServer.scala:296) > > > at kafka.network.Processor.read(SocketServer.scala:319) > > > at kafka.network.Processor.run(SocketServer.scala:214) > > > at java.lang.Thread.run(Thread.java:636) > > > > > > Or this: > > > > > > 1406871 [kafka-processor-2] ERROR kafka.network.Processor - Closing > > socket > > > for /10.0.2.140 because of error > > > java.nio.BufferUnderflowException > > > at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:145) > > > at java.nio.ByteBuffer.get(ByteBuffer.java:692) > > > at kafka.utils.Utils$.readShortString(Utils.scala:123) > > > at kafka.api.ProducerRequest$.readFrom(ProducerRequest.scala:29) > > > at > > > > > > > > > kafka.api.MultiProducerRequest$$anonfun$readFrom$1.apply$mcVI$sp(MultiProducerRequest.scala:28) > > > at > > > > > > > > > scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282) > > > at >
-
Re: Kafka throw InvalidMessageException and lost dataJun Rao 2013-03-19, 16:10
It basically means that the broker is expecting to read certain number of
bytes in a buffer received from socket, but there are fewer bytes than expected in the buffer. Possible causes are (1) a bug in Kafka request serialization/deserialization logic; (2) corruption in the underlying system such as network. BTW, did you enable compression in your producer? Thanks, Jun On Mon, Mar 18, 2013 at 10:12 PM, Helin Xiang <[EMAIL PROTECTED]> wrote: > thanks Jun. > > we are using java producer. > does the last exception > "java.lang.IllegalArgumentException > at java.nio.Buffer.limit(Buffer.java:266) > " > also means the broker received corrupted messages ? sorry i am not > familiar with java nio. > > > > > On Tue, Mar 19, 2013 at 12:58 PM, Jun Rao <[EMAIL PROTECTED]> wrote: > > > Hmm, both log4j messages suggest that the broker received some corrupted > > produce requests. Are you using the java producer? Also, we have seen > that > > network router problems caused corrupted requests before. > > > > Thanks, > > > > Jun > > > > On Mon, Mar 18, 2013 at 8:22 PM, Helin Xiang <[EMAIL PROTECTED]> wrote: > > > > > Hi, > > > We were doing some performance test using kafka 0.7.2. We use only 1 > > > broker. > > > On producer client, we use 8 threads to send logs, each thread use sync > > > producer and send 100 logs at a time, (each log is about 1~2K bytes > > long), > > > The total QPS is about 30K. > > > But the number of logs both consumer read and the broker counts is less > > > than the producer send. we believe the data lost when producer sending > > logs > > > to broker. > > > > > > We settle the QPS down to 10K, still lost logs. > > > We found some exceptions in broker logs: > > > > > > 9201051 [kafka-processor-2] ERROR kafka.server.KafkaRequestHandlers - > > > Error processing ProduceRequest on abc:0 > > > kafka.message.InvalidMessageException: message is invalid, compression > > > codec: NoCompressionCodec size: 1021 curr offset: 0 init offset: 0 > > > at > > > > > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) > > > at > > > > > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160) > > > at > > > > > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) > > > at > > > > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) > > > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) > > > at > > > > > > > > > kafka.message.ByteBufferMessageSet.verifyMessageSize(ByteBufferMessageSet.scala:89) > > > at kafka.log.Log.append(Log.scala:218) > > > at > > > > > > > > > kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69) > > > at > > > > > > > > > kafka.server.KafkaRequestHandlers.handleProducerRequest(KafkaRequestHandlers.scala:53) > > > at > > > > > > > > > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38) > > > at > > > > > > > > > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38) > > > at kafka.network.Processor.handle(SocketServer.scala:296) > > > at kafka.network.Processor.read(SocketServer.scala:319) > > > at kafka.network.Processor.run(SocketServer.scala:214) > > > at java.lang.Thread.run(Thread.java:636) > > > > > > Or this: > > > > > > 1406871 [kafka-processor-2] ERROR kafka.network.Processor - Closing > > socket > > > for /10.0.2.140 because of error > > > java.nio.BufferUnderflowException > > > at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:145) > > > at java.nio.ByteBuffer.get(ByteBuffer.java:692) > > > at kafka.utils.Utils$.readShortString(Utils.scala:123) > > > at kafka.api.ProducerRequest$.readFrom(ProducerRequest.scala:29) > > > at > > > > > > > > > kafka.api.MultiProducerRequest$$anonfun$readFrom$1.apply$mcVI$sp(MultiProducerRequest.scala:28)
-
Re: Kafka throw InvalidMessageException and lost data王国栋 2013-03-20, 07:29
Hi Jun,
we do not use any compression in our test. We deploy producer and broker in the same machine. The problem still exists. We use sync producer, and send one message at a time(no batch now). We find that when the qps reaches more than 40k, the exception appears. So I don't think it's the underlying system error. Any suggestions if we want to do some debug on kafka serialization/deserialization? Thanks. On Wed, Mar 20, 2013 at 12:10 AM, Jun Rao <[EMAIL PROTECTED]> wrote: > It basically means that the broker is expecting to read certain number of > bytes in a buffer received from socket, but there are fewer bytes than > expected in the buffer. Possible causes are (1) a bug in Kafka request > serialization/deserialization logic; (2) corruption in the underlying > system such as network. > > BTW, did you enable compression in your producer? > > Thanks, > > Jun > > On Mon, Mar 18, 2013 at 10:12 PM, Helin Xiang <[EMAIL PROTECTED]> wrote: > > > thanks Jun. > > > > we are using java producer. > > does the last exception > > "java.lang.IllegalArgumentException > > at java.nio.Buffer.limit(Buffer.java:266) > > " > > also means the broker received corrupted messages ? sorry i am not > > familiar with java nio. > > > > > > > > > > On Tue, Mar 19, 2013 at 12:58 PM, Jun Rao <[EMAIL PROTECTED]> wrote: > > > > > Hmm, both log4j messages suggest that the broker received some > corrupted > > > produce requests. Are you using the java producer? Also, we have seen > > that > > > network router problems caused corrupted requests before. > > > > > > Thanks, > > > > > > Jun > > > > > > On Mon, Mar 18, 2013 at 8:22 PM, Helin Xiang <[EMAIL PROTECTED]> > wrote: > > > > > > > Hi, > > > > We were doing some performance test using kafka 0.7.2. We use only 1 > > > > broker. > > > > On producer client, we use 8 threads to send logs, each thread use > sync > > > > producer and send 100 logs at a time, (each log is about 1~2K bytes > > > long), > > > > The total QPS is about 30K. > > > > But the number of logs both consumer read and the broker counts is > less > > > > than the producer send. we believe the data lost when producer > sending > > > logs > > > > to broker. > > > > > > > > We settle the QPS down to 10K, still lost logs. > > > > We found some exceptions in broker logs: > > > > > > > > 9201051 [kafka-processor-2] ERROR kafka.server.KafkaRequestHandlers > - > > > > Error processing ProduceRequest on abc:0 > > > > kafka.message.InvalidMessageException: message is invalid, > compression > > > > codec: NoCompressionCodec size: 1021 curr offset: 0 init offset: 0 > > > > at > > > > > > > > > > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) > > > > at > > > > > > > > > > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160) > > > > at > > > > > > > > > > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) > > > > at > > > > > > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) > > > > at > kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) > > > > at > > > > > > > > > > > > > > kafka.message.ByteBufferMessageSet.verifyMessageSize(ByteBufferMessageSet.scala:89) > > > > at kafka.log.Log.append(Log.scala:218) > > > > at > > > > > > > > > > > > > > kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69) > > > > at > > > > > > > > > > > > > > kafka.server.KafkaRequestHandlers.handleProducerRequest(KafkaRequestHandlers.scala:53) > > > > at > > > > > > > > > > > > > > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38) > > > > at > > > > > > > > > > > > > > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38) > > > > at kafka.network.Processor.handle(SocketServer.scala:296) > > > > at kafka.network.Processor.read(SocketServer.scala:319) Guodong Wang 王国栋
-
Re: Kafka throw InvalidMessageException and lost data王国栋 2013-03-20, 07:34
Hi Neha,
Our test is reproducible. We use a log replay tool to send the same part of log to kafka. We will check the log file latter and try to find some more hints. Till now, it seems that if the batch size in sync producer is small, the problem is hard to reproduce. Thanks. On Tue, Mar 19, 2013 at 1:54 PM, Neha Narkhede <[EMAIL PROTECTED]>wrote: > Do you mind trying out the DumpLogSegment tool on the log segment for the > corrupted topic. That will validate if the log data is corrupted. Also, Is > your test reproducible ? We ran into a similar issue in production but > could not reproduce it. > > Thanks, > Neha > > On Monday, March 18, 2013, Helin Xiang wrote: > > > thanks Jun. > > > > we are using java producer. > > does the last exception > > "java.lang.IllegalArgumentException > > at java.nio.Buffer.limit(Buffer.java:266) > > " > > also means the broker received corrupted messages ? sorry i am not > > familiar with java nio. > > > > > > > > > > On Tue, Mar 19, 2013 at 12:58 PM, Jun Rao <[EMAIL PROTECTED]> wrote: > > > > > Hmm, both log4j messages suggest that the broker received some > corrupted > > > produce requests. Are you using the java producer? Also, we have seen > > that > > > network router problems caused corrupted requests before. > > > > > > Thanks, > > > > > > Jun > > > > > > On Mon, Mar 18, 2013 at 8:22 PM, Helin Xiang <[EMAIL PROTECTED]> > wrote: > > > > > > > Hi, > > > > We were doing some performance test using kafka 0.7.2. We use only 1 > > > > broker. > > > > On producer client, we use 8 threads to send logs, each thread use > sync > > > > producer and send 100 logs at a time, (each log is about 1~2K bytes > > > long), > > > > The total QPS is about 30K. > > > > But the number of logs both consumer read and the broker counts is > less > > > > than the producer send. we believe the data lost when producer > sending > > > logs > > > > to broker. > > > > > > > > We settle the QPS down to 10K, still lost logs. > > > > We found some exceptions in broker logs: > > > > > > > > 9201051 [kafka-processor-2] ERROR kafka.server.KafkaRequestHandlers > - > > > > Error processing ProduceRequest on abc:0 > > > > kafka.message.InvalidMessageException: message is invalid, > compression > > > > codec: NoCompressionCodec size: 1021 curr offset: 0 init offset: 0 > > > > at > > > > > > > > > > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) > > > > at > > > > > > > > > > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160) > > > > at > > > > > > > > > > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) > > > > at > > > > > > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) > > > > at > kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) > > > > at > > > > > > > > > > > > > > kafka.message.ByteBufferMessageSet.verifyMessageSize(ByteBufferMessageSet.scala:89) > > > > at kafka.log.Log.append(Log.scala:218) > > > > at > > > > > > > > > > > > > > kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69) > > > > at > > > > > > > > > > > > > > kafka.server.KafkaRequestHandlers.handleProducerRequest(KafkaRequestHandlers.scala:53) > > > > at > > > > > > > > > > > > > > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38) > > > > at > > > > > > > > > > > > > > kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38) > > > > at kafka.network.Processor.handle(SocketServer.scala:296) > > > > at kafka.network.Processor.read(SocketServer.scala:319) > > > > at kafka.network.Processor.run(SocketServer.scala:214) > > > > at java.lang.Thread.run(Thread.java:636) > > > > > > > > Or this: > > > > > > > > 1406871 [kafka-processor-2] ERROR kafka.network.Processor - Closing Guodong Wang 王国栋
-
Re: Kafka throw InvalidMessageException and lost dataJun Rao 2013-03-20, 14:29
How many producer instances do you have? Can you reproduce the problem with
a single producer? Thanks, Jun On Wed, Mar 20, 2013 at 12:29 AM, 王国栋 <[EMAIL PROTECTED]> wrote: > Hi Jun, > > we do not use any compression in our test. > > We deploy producer and broker in the same machine. The problem still > exists. We use sync producer, and send one message at a time(no batch now). > We find that when the qps reaches more than 40k, the exception appears. So > I don't think it's the underlying system error. > > Any suggestions if we want to do some debug on kafka > serialization/deserialization? > > Thanks. > > > > > On Wed, Mar 20, 2013 at 12:10 AM, Jun Rao <[EMAIL PROTECTED]> wrote: > > > It basically means that the broker is expecting to read certain number of > > bytes in a buffer received from socket, but there are fewer bytes than > > expected in the buffer. Possible causes are (1) a bug in Kafka request > > serialization/deserialization logic; (2) corruption in the underlying > > system such as network. > > > > BTW, did you enable compression in your producer? > > > > Thanks, > > > > Jun > > > > On Mon, Mar 18, 2013 at 10:12 PM, Helin Xiang <[EMAIL PROTECTED]> wrote: > > > > > thanks Jun. > > > > > > we are using java producer. > > > does the last exception > > > "java.lang.IllegalArgumentException > > > at java.nio.Buffer.limit(Buffer.java:266) > > > " > > > also means the broker received corrupted messages ? sorry i am not > > > familiar with java nio. > > > > > > > > > > > > > > > On Tue, Mar 19, 2013 at 12:58 PM, Jun Rao <[EMAIL PROTECTED]> wrote: > > > > > > > Hmm, both log4j messages suggest that the broker received some > > corrupted > > > > produce requests. Are you using the java producer? Also, we have seen > > > that > > > > network router problems caused corrupted requests before. > > > > > > > > Thanks, > > > > > > > > Jun > > > > > > > > On Mon, Mar 18, 2013 at 8:22 PM, Helin Xiang <[EMAIL PROTECTED]> > > wrote: > > > > > > > > > Hi, > > > > > We were doing some performance test using kafka 0.7.2. We use only > 1 > > > > > broker. > > > > > On producer client, we use 8 threads to send logs, each thread use > > sync > > > > > producer and send 100 logs at a time, (each log is about 1~2K bytes > > > > long), > > > > > The total QPS is about 30K. > > > > > But the number of logs both consumer read and the broker counts is > > less > > > > > than the producer send. we believe the data lost when producer > > sending > > > > logs > > > > > to broker. > > > > > > > > > > We settle the QPS down to 10K, still lost logs. > > > > > We found some exceptions in broker logs: > > > > > > > > > > 9201051 [kafka-processor-2] ERROR kafka.server.KafkaRequestHandlers > > - > > > > > Error processing ProduceRequest on abc:0 > > > > > kafka.message.InvalidMessageException: message is invalid, > > compression > > > > > codec: NoCompressionCodec size: 1021 curr offset: 0 init offset: 0 > > > > > at > > > > > > > > > > > > > > > > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) > > > > > at > > > > > > > > > > > > > > > > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160) > > > > > at > > > > > > > > > > > > > > > > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) > > > > > at > > > > > > > > > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) > > > > > at > > kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) > > > > > at > > > > > > > > > > > > > > > > > > > > kafka.message.ByteBufferMessageSet.verifyMessageSize(ByteBufferMessageSet.scala:89) > > > > > at kafka.log.Log.append(Log.scala:218) > > > > > at > > > > > > > > > > > > > > > > > > > > kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69) > > > > > at > > > > > > > > > > > > > > > > > > > > kafka.server.KafkaRequestHandlers.handleProducerRequest(KafkaRequestHandlers.scala:53)
-
Re: Kafka throw InvalidMessageException and lost dataYang Zhou 2013-03-21, 02:09
There was only one producer running in all our tests. Beside, we also tried
the low-level java api, the problem still shows up. Thanks 2013/3/20 Jun Rao <[EMAIL PROTECTED]> > How many producer instances do you have? Can you reproduce the problem with > a single producer? > > Thanks, > > Jun > > On Wed, Mar 20, 2013 at 12:29 AM, 王国栋 <[EMAIL PROTECTED]> wrote: > > > Hi Jun, > > > > we do not use any compression in our test. > > > > We deploy producer and broker in the same machine. The problem still > > exists. We use sync producer, and send one message at a time(no batch > now). > > We find that when the qps reaches more than 40k, the exception appears. > So > > I don't think it's the underlying system error. > > > > Any suggestions if we want to do some debug on kafka > > serialization/deserialization? > > > > Thanks. > > > > > > > > > > On Wed, Mar 20, 2013 at 12:10 AM, Jun Rao <[EMAIL PROTECTED]> wrote: > > > > > It basically means that the broker is expecting to read certain number > of > > > bytes in a buffer received from socket, but there are fewer bytes than > > > expected in the buffer. Possible causes are (1) a bug in Kafka request > > > serialization/deserialization logic; (2) corruption in the underlying > > > system such as network. > > > > > > BTW, did you enable compression in your producer? > > > > > > Thanks, > > > > > > Jun > > > > > > On Mon, Mar 18, 2013 at 10:12 PM, Helin Xiang <[EMAIL PROTECTED]> > wrote: > > > > > > > thanks Jun. > > > > > > > > we are using java producer. > > > > does the last exception > > > > "java.lang.IllegalArgumentException > > > > at java.nio.Buffer.limit(Buffer.java:266) > > > > " > > > > also means the broker received corrupted messages ? sorry i am not > > > > familiar with java nio. > > > > > > > > > > > > > > > > > > > > On Tue, Mar 19, 2013 at 12:58 PM, Jun Rao <[EMAIL PROTECTED]> wrote: > > > > > > > > > Hmm, both log4j messages suggest that the broker received some > > > corrupted > > > > > produce requests. Are you using the java producer? Also, we have > seen > > > > that > > > > > network router problems caused corrupted requests before. > > > > > > > > > > Thanks, > > > > > > > > > > Jun > > > > > > > > > > On Mon, Mar 18, 2013 at 8:22 PM, Helin Xiang <[EMAIL PROTECTED]> > > > wrote: > > > > > > > > > > > Hi, > > > > > > We were doing some performance test using kafka 0.7.2. We use > only > > 1 > > > > > > broker. > > > > > > On producer client, we use 8 threads to send logs, each thread > use > > > sync > > > > > > producer and send 100 logs at a time, (each log is about 1~2K > bytes > > > > > long), > > > > > > The total QPS is about 30K. > > > > > > But the number of logs both consumer read and the broker counts > is > > > less > > > > > > than the producer send. we believe the data lost when producer > > > sending > > > > > logs > > > > > > to broker. > > > > > > > > > > > > We settle the QPS down to 10K, still lost logs. > > > > > > We found some exceptions in broker logs: > > > > > > > > > > > > 9201051 [kafka-processor-2] ERROR > kafka.server.KafkaRequestHandlers > > > - > > > > > > Error processing ProduceRequest on abc:0 > > > > > > kafka.message.InvalidMessageException: message is invalid, > > > compression > > > > > > codec: NoCompressionCodec size: 1021 curr offset: 0 init offset: > 0 > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160) > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) > > > > > > at > > > > > > > > > > > > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) > > > > > > at > > > kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) Yang Zhou(周阳) Department of Computer Science and Engineering Shanghai Jiao Tong University
-
Re: Kafka throw InvalidMessageException and lost dataYang Zhou 2013-03-21, 02:33
Sorry, I made a mistake, we use many threads producing at same time.
2013/3/20 Jun Rao <[EMAIL PROTECTED]> > How many producer instances do you have? Can you reproduce the problem with > a single producer? > > Thanks, > > Jun > > On Wed, Mar 20, 2013 at 12:29 AM, 王国栋 <[EMAIL PROTECTED]> wrote: > > > Hi Jun, > > > > we do not use any compression in our test. > > > > We deploy producer and broker in the same machine. The problem still > > exists. We use sync producer, and send one message at a time(no batch > now). > > We find that when the qps reaches more than 40k, the exception appears. > So > > I don't think it's the underlying system error. > > > > Any suggestions if we want to do some debug on kafka > > serialization/deserialization? > > > > Thanks. > > > > > > > > > > On Wed, Mar 20, 2013 at 12:10 AM, Jun Rao <[EMAIL PROTECTED]> wrote: > > > > > It basically means that the broker is expecting to read certain number > of > > > bytes in a buffer received from socket, but there are fewer bytes than > > > expected in the buffer. Possible causes are (1) a bug in Kafka request > > > serialization/deserialization logic; (2) corruption in the underlying > > > system such as network. > > > > > > BTW, did you enable compression in your producer? > > > > > > Thanks, > > > > > > Jun > > > > > > On Mon, Mar 18, 2013 at 10:12 PM, Helin Xiang <[EMAIL PROTECTED]> > wrote: > > > > > > > thanks Jun. > > > > > > > > we are using java producer. > > > > does the last exception > > > > "java.lang.IllegalArgumentException > > > > at java.nio.Buffer.limit(Buffer.java:266) > > > > " > > > > also means the broker received corrupted messages ? sorry i am not > > > > familiar with java nio. > > > > > > > > > > > > > > > > > > > > On Tue, Mar 19, 2013 at 12:58 PM, Jun Rao <[EMAIL PROTECTED]> wrote: > > > > > > > > > Hmm, both log4j messages suggest that the broker received some > > > corrupted > > > > > produce requests. Are you using the java producer? Also, we have > seen > > > > that > > > > > network router problems caused corrupted requests before. > > > > > > > > > > Thanks, > > > > > > > > > > Jun > > > > > > > > > > On Mon, Mar 18, 2013 at 8:22 PM, Helin Xiang <[EMAIL PROTECTED]> > > > wrote: > > > > > > > > > > > Hi, > > > > > > We were doing some performance test using kafka 0.7.2. We use > only > > 1 > > > > > > broker. > > > > > > On producer client, we use 8 threads to send logs, each thread > use > > > sync > > > > > > producer and send 100 logs at a time, (each log is about 1~2K > bytes > > > > > long), > > > > > > The total QPS is about 30K. > > > > > > But the number of logs both consumer read and the broker counts > is > > > less > > > > > > than the producer send. we believe the data lost when producer > > > sending > > > > > logs > > > > > > to broker. > > > > > > > > > > > > We settle the QPS down to 10K, still lost logs. > > > > > > We found some exceptions in broker logs: > > > > > > > > > > > > 9201051 [kafka-processor-2] ERROR > kafka.server.KafkaRequestHandlers > > > - > > > > > > Error processing ProduceRequest on abc:0 > > > > > > kafka.message.InvalidMessageException: message is invalid, > > > compression > > > > > > codec: NoCompressionCodec size: 1021 curr offset: 0 init offset: > 0 > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160) > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) > > > > > > at > > > > > > > > > > > > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) > > > > > > at > > > kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) > > > > > > at > > > > > > > > > > > > > > > > > > > > > > Yang Zhou(周阳) Department of Computer Science and Engineering Shanghai Jiao Tong University
-
Re: Kafka throw InvalidMessageException and lost dataJun Rao 2013-03-21, 04:20
How many threads are you using?
Thanks, Jun On Wed, Mar 20, 2013 at 7:33 PM, Yang Zhou <[EMAIL PROTECTED]> wrote: > Sorry, I made a mistake, we use many threads producing at same time. > > > 2013/3/20 Jun Rao <[EMAIL PROTECTED]> > > > How many producer instances do you have? Can you reproduce the problem > with > > a single producer? > > > > Thanks, > > > > Jun > > > > On Wed, Mar 20, 2013 at 12:29 AM, 王国栋 <[EMAIL PROTECTED]> wrote: > > > > > Hi Jun, > > > > > > we do not use any compression in our test. > > > > > > We deploy producer and broker in the same machine. The problem still > > > exists. We use sync producer, and send one message at a time(no batch > > now). > > > We find that when the qps reaches more than 40k, the exception appears. > > So > > > I don't think it's the underlying system error. > > > > > > Any suggestions if we want to do some debug on kafka > > > serialization/deserialization? > > > > > > Thanks. > > > > > > > > > > > > > > > On Wed, Mar 20, 2013 at 12:10 AM, Jun Rao <[EMAIL PROTECTED]> wrote: > > > > > > > It basically means that the broker is expecting to read certain > number > > of > > > > bytes in a buffer received from socket, but there are fewer bytes > than > > > > expected in the buffer. Possible causes are (1) a bug in Kafka > request > > > > serialization/deserialization logic; (2) corruption in the underlying > > > > system such as network. > > > > > > > > BTW, did you enable compression in your producer? > > > > > > > > Thanks, > > > > > > > > Jun > > > > > > > > On Mon, Mar 18, 2013 at 10:12 PM, Helin Xiang <[EMAIL PROTECTED]> > > wrote: > > > > > > > > > thanks Jun. > > > > > > > > > > we are using java producer. > > > > > does the last exception > > > > > "java.lang.IllegalArgumentException > > > > > at java.nio.Buffer.limit(Buffer.java:266) > > > > > " > > > > > also means the broker received corrupted messages ? sorry i am not > > > > > familiar with java nio. > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Mar 19, 2013 at 12:58 PM, Jun Rao <[EMAIL PROTECTED]> > wrote: > > > > > > > > > > > Hmm, both log4j messages suggest that the broker received some > > > > corrupted > > > > > > produce requests. Are you using the java producer? Also, we have > > seen > > > > > that > > > > > > network router problems caused corrupted requests before. > > > > > > > > > > > > Thanks, > > > > > > > > > > > > Jun > > > > > > > > > > > > On Mon, Mar 18, 2013 at 8:22 PM, Helin Xiang <[EMAIL PROTECTED]> > > > > wrote: > > > > > > > > > > > > > Hi, > > > > > > > We were doing some performance test using kafka 0.7.2. We use > > only > > > 1 > > > > > > > broker. > > > > > > > On producer client, we use 8 threads to send logs, each thread > > use > > > > sync > > > > > > > producer and send 100 logs at a time, (each log is about 1~2K > > bytes > > > > > > long), > > > > > > > The total QPS is about 30K. > > > > > > > But the number of logs both consumer read and the broker counts > > is > > > > less > > > > > > > than the producer send. we believe the data lost when producer > > > > sending > > > > > > logs > > > > > > > to broker. > > > > > > > > > > > > > > We settle the QPS down to 10K, still lost logs. > > > > > > > We found some exceptions in broker logs: > > > > > > > > > > > > > > 9201051 [kafka-processor-2] ERROR > > kafka.server.KafkaRequestHandlers > > > > - > > > > > > > Error processing ProduceRequest on abc:0 > > > > > > > kafka.message.InvalidMessageException: message is invalid, > > > > compression > > > > > > > codec: NoCompressionCodec size: 1021 curr offset: 0 init > offset: > > 0 > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130) > > > > > > > at > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160) > > > > > > > at > > > > > >
-
Re: Kafka throw InvalidMessageException and lost data王国栋 2013-03-21, 04:43
Hi Jun,
We use one thread with one sync produce to send data to broker (QPS:10k-15k, each log is about 1k bytes). The problem is reproduced. We have used Producer and SyncProducer in our test. The same Exception appears. Thanks. On Thu, Mar 21, 2013 at 12:19 PM, Jun Rao <[EMAIL PROTECTED]> wrote: > How many threads are you using? > > Thanks, > > Jun > > On Wed, Mar 20, 2013 at 7:33 PM, Yang Zhou <[EMAIL PROTECTED]> > wrote: > > > Sorry, I made a mistake, we use many threads producing at same time. > > > > > > 2013/3/20 Jun Rao <[EMAIL PROTECTED]> > > > > > How many producer instances do you have? Can you reproduce the problem > > with > > > a single producer? > > > > > > Thanks, > > > > > > Jun > > > > > > On Wed, Mar 20, 2013 at 12:29 AM, 王国栋 <[EMAIL PROTECTED]> wrote: > > > > > > > Hi Jun, > > > > > > > > we do not use any compression in our test. > > > > > > > > We deploy producer and broker in the same machine. The problem still > > > > exists. We use sync producer, and send one message at a time(no batch > > > now). > > > > We find that when the qps reaches more than 40k, the exception > appears. > > > So > > > > I don't think it's the underlying system error. > > > > > > > > Any suggestions if we want to do some debug on kafka > > > > serialization/deserialization? > > > > > > > > Thanks. > > > > > > > > > > > > > > > > > > > > On Wed, Mar 20, 2013 at 12:10 AM, Jun Rao <[EMAIL PROTECTED]> wrote: > > > > > > > > > It basically means that the broker is expecting to read certain > > number > > > of > > > > > bytes in a buffer received from socket, but there are fewer bytes > > than > > > > > expected in the buffer. Possible causes are (1) a bug in Kafka > > request > > > > > serialization/deserialization logic; (2) corruption in the > underlying > > > > > system such as network. > > > > > > > > > > BTW, did you enable compression in your producer? > > > > > > > > > > Thanks, > > > > > > > > > > Jun > > > > > > > > > > On Mon, Mar 18, 2013 at 10:12 PM, Helin Xiang <[EMAIL PROTECTED]> > > > wrote: > > > > > > > > > > > thanks Jun. > > > > > > > > > > > > we are using java producer. > > > > > > does the last exception > > > > > > "java.lang.IllegalArgumentException > > > > > > at java.nio.Buffer.limit(Buffer.java:266) > > > > > > " > > > > > > also means the broker received corrupted messages ? sorry i am > not > > > > > > familiar with java nio. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Mar 19, 2013 at 12:58 PM, Jun Rao <[EMAIL PROTECTED]> > > wrote: > > > > > > > > > > > > > Hmm, both log4j messages suggest that the broker received some > > > > > corrupted > > > > > > > produce requests. Are you using the java producer? Also, we > have > > > seen > > > > > > that > > > > > > > network router problems caused corrupted requests before. > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > On Mon, Mar 18, 2013 at 8:22 PM, Helin Xiang < > [EMAIL PROTECTED]> > > > > > wrote: > > > > > > > > > > > > > > > Hi, > > > > > > > > We were doing some performance test using kafka 0.7.2. We use > > > only > > > > 1 > > > > > > > > broker. > > > > > > > > On producer client, we use 8 threads to send logs, each > thread > > > use > > > > > sync > > > > > > > > producer and send 100 logs at a time, (each log is about 1~2K > > > bytes > > > > > > > long), > > > > > > > > The total QPS is about 30K. > > > > > > > > But the number of logs both consumer read and the broker > counts > > > is > > > > > less > > > > > > > > than the producer send. we believe the data lost when > producer > > > > > sending > > > > > > > logs > > > > > > > > to broker. > > > > > > > > > > > > > > > > We settle the QPS down to 10K, still lost logs. > > > > > > > > We found some exceptions in broker logs: > > > > > > > > > > > > > > > > 9201051 [kafka-processor-2] ERROR > > > kafka.server.KafkaRequestHandlers > > > > > - > > > > > > > > Error processing ProduceRequest on abc:0 > > > > > > > > kafka.message.InvalidMessageException: message is invalid, Guodong Wang 王国栋
-
Re: Kafka throw InvalidMessageException and lost dataNeha Narkhede 2013-03-21, 05:05
Do you mind filing a bug and attaching the reproducible test case there ?
Thanks, Neha On Wednesday, March 20, 2013, Íõ¹ú¶° wrote: > Hi Jun, > > We use one thread with one sync produce to send data to broker > (QPS:10k-15k, each log is about 1k bytes). The problem is reproduced. > > We have used Producer and SyncProducer in our test. The same Exception > appears. > > Thanks. > > > > On Thu, Mar 21, 2013 at 12:19 PM, Jun Rao <[EMAIL PROTECTED]> wrote: > > > How many threads are you using? > > > > Thanks, > > > > Jun > > > > On Wed, Mar 20, 2013 at 7:33 PM, Yang Zhou <[EMAIL PROTECTED]> > > wrote: > > > > > Sorry, I made a mistake, we use many threads producing at same time. > > > > > > > > > 2013/3/20 Jun Rao <[EMAIL PROTECTED]> > > > > > > > How many producer instances do you have? Can you reproduce the > problem > > > with > > > > a single producer? > > > > > > > > Thanks, > > > > > > > > Jun > > > > > > > > On Wed, Mar 20, 2013 at 12:29 AM, Íõ¹ú¶° <[EMAIL PROTECTED]> wrote: > > > > > > > > > Hi Jun, > > > > > > > > > > we do not use any compression in our test. > > > > > > > > > > We deploy producer and broker in the same machine. The problem > still > > > > > exists. We use sync producer, and send one message at a time(no > batch > > > > now). > > > > > We find that when the qps reaches more than 40k, the exception > > appears. > > > > So > > > > > I don't think it's the underlying system error. > > > > > > > > > > Any suggestions if we want to do some debug on kafka > > > > > serialization/deserialization? > > > > > > > > > > Thanks. > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Mar 20, 2013 at 12:10 AM, Jun Rao <[EMAIL PROTECTED]> > wrote: > > > > > > > > > > > It basically means that the broker is expecting to read certain > > > number > > > > of > > > > > > bytes in a buffer received from socket, but there are fewer bytes > > > than > > > > > > expected in the buffer. Possible causes are (1) a bug in Kafka > > > request > > > > > > serialization/deserialization logic; (2) corruption in the > > underlying > > > > > > system such as network. > > > > > > > > > > > > BTW, did you enable compression in your producer? > > > > > > > > > > > > Thanks, > > > > > > > > > > > > Jun > > > > > > > > > > > > On Mon, Mar 18, 2013 at 10:12 PM, Helin Xiang <[EMAIL PROTECTED] > > > > > > wrote: > > > > > > > > > > > > > thanks Jun. > > > > > > > > > > > > > > we are using java producer. > > > > > > > does the last exception > > > > > > > "java.lang.IllegalArgumentException > > > > > > > at java.nio.Buffer.limit(Buffer.java:266) > > > > > > > " > > > > > > > also means the broker received corrupted messages ? sorry i am > > not > > > > > > > familiar with java nio. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Mar 19, 2013 at 12:58 PM, Jun Rao <[EMAIL PROTECTED]> > > > wrote: > > > > > > > > > > > > > > > Hmm, both log4j messages suggest that the broker received > some > > > > > > corrupted > > > > > > > > produce requests. Are you using the java producer? Also, we > > have > > > > seen > > > > > > > that > > > > > > > > network router problems caused corrupted requests before. > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > On Mon, Mar 18, 2013 at 8:22 PM, Helin Xiang < > -- > Guodong Wang > Íõ¹ú¶° >
-
Re: Kafka throw InvalidMessageException and lost data王国栋 2013-03-25, 03:58
Hi Neha & Jun,
I think we have found the reason of this bug. It is related to jdk versions. In the beginning, we use jdk1.6.20 to run the test. Yesterday, we used jdk1.6.38. Everything is OK now. Thanks a lot for all your kind help and answer. On Thu, Mar 21, 2013 at 1:05 PM, Neha Narkhede <[EMAIL PROTECTED]>wrote: > Do you mind filing a bug and attaching the reproducible test case there ? > > Thanks, > Neha > > On Wednesday, March 20, 2013, 王国栋 wrote: > > > Hi Jun, > > > > We use one thread with one sync produce to send data to broker > > (QPS:10k-15k, each log is about 1k bytes). The problem is reproduced. > > > > We have used Producer and SyncProducer in our test. The same Exception > > appears. > > > > Thanks. > > > > > > > > On Thu, Mar 21, 2013 at 12:19 PM, Jun Rao <[EMAIL PROTECTED]> wrote: > > > > > How many threads are you using? > > > > > > Thanks, > > > > > > Jun > > > > > > On Wed, Mar 20, 2013 at 7:33 PM, Yang Zhou <[EMAIL PROTECTED]> > > > wrote: > > > > > > > Sorry, I made a mistake, we use many threads producing at same time. > > > > > > > > > > > > 2013/3/20 Jun Rao <[EMAIL PROTECTED]> > > > > > > > > > How many producer instances do you have? Can you reproduce the > > problem > > > > with > > > > > a single producer? > > > > > > > > > > Thanks, > > > > > > > > > > Jun > > > > > > > > > > On Wed, Mar 20, 2013 at 12:29 AM, 王国栋 <[EMAIL PROTECTED]> wrote: > > > > > > > > > > > Hi Jun, > > > > > > > > > > > > we do not use any compression in our test. > > > > > > > > > > > > We deploy producer and broker in the same machine. The problem > > still > > > > > > exists. We use sync producer, and send one message at a time(no > > batch > > > > > now). > > > > > > We find that when the qps reaches more than 40k, the exception > > > appears. > > > > > So > > > > > > I don't think it's the underlying system error. > > > > > > > > > > > > Any suggestions if we want to do some debug on kafka > > > > > > serialization/deserialization? > > > > > > > > > > > > Thanks. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Mar 20, 2013 at 12:10 AM, Jun Rao <[EMAIL PROTECTED]> > > wrote: > > > > > > > > > > > > > It basically means that the broker is expecting to read certain > > > > number > > > > > of > > > > > > > bytes in a buffer received from socket, but there are fewer > bytes > > > > than > > > > > > > expected in the buffer. Possible causes are (1) a bug in Kafka > > > > request > > > > > > > serialization/deserialization logic; (2) corruption in the > > > underlying > > > > > > > system such as network. > > > > > > > > > > > > > > BTW, did you enable compression in your producer? > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > On Mon, Mar 18, 2013 at 10:12 PM, Helin Xiang < > [EMAIL PROTECTED] > > > > > > > > wrote: > > > > > > > > > > > > > > > thanks Jun. > > > > > > > > > > > > > > > > we are using java producer. > > > > > > > > does the last exception > > > > > > > > "java.lang.IllegalArgumentException > > > > > > > > at java.nio.Buffer.limit(Buffer.java:266) > > > > > > > > " > > > > > > > > also means the broker received corrupted messages ? sorry i > am > > > not > > > > > > > > familiar with java nio. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Mar 19, 2013 at 12:58 PM, Jun Rao <[EMAIL PROTECTED]> > > > > wrote: > > > > > > > > > > > > > > > > > Hmm, both log4j messages suggest that the broker received > > some > > > > > > > corrupted > > > > > > > > > produce requests. Are you using the java producer? Also, we > > > have > > > > > seen > > > > > > > > that > > > > > > > > > network router problems caused corrupted requests before. > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > On Mon, Mar 18, 2013 at 8:22 PM, Helin Xiang < > > -- > > Guodong Wang > > 王国栋 > > > -- Guodong Wang 王国栋
-
Re: Kafka throw InvalidMessageException and lost dataNeha Narkhede 2013-03-25, 04:01
That's great. Would you know the details of the bug though ? It will be
helpful for the community to understand. Thanks, Neha On Sunday, March 24, 2013, 王国栋 wrote: > Hi Neha & Jun, > > I think we have found the reason of this bug. > It is related to jdk versions. In the beginning, we use jdk1.6.20 to run > the test. > > Yesterday, we used jdk1.6.38. Everything is OK now. > > Thanks a lot for all your kind help and answer. > > > > On Thu, Mar 21, 2013 at 1:05 PM, Neha Narkhede <[EMAIL PROTECTED]<javascript:;> > >wrote: > > > Do you mind filing a bug and attaching the reproducible test case there ? > > > > Thanks, > > Neha > > > > On Wednesday, March 20, 2013, 王国栋 wrote: > > > > > Hi Jun, > > > > > > We use one thread with one sync produce to send data to broker > > > (QPS:10k-15k, each log is about 1k bytes). The problem is reproduced. > > > > > > We have used Producer and SyncProducer in our test. The same Exception > > > appears. > > > > > > Thanks. > > > > > > > > > > > > On Thu, Mar 21, 2013 at 12:19 PM, Jun Rao <[EMAIL PROTECTED]> wrote: > > > > > > > How many threads are you using? > > > > > > > > Thanks, > > > > > > > > Jun > > > > > > > > On Wed, Mar 20, 2013 at 7:33 PM, Yang Zhou <[EMAIL PROTECTED] > > > > > > wrote: > > > > > > > > > Sorry, I made a mistake, we use many threads producing at same > time. > > > > > > > > > > > > > > > 2013/3/20 Jun Rao <[EMAIL PROTECTED]> > > > > > > > > > > > How many producer instances do you have? Can you reproduce the > > > problem > > > > > with > > > > > > a single producer? > > > > > > > > > > > > Thanks, > > > > > > > > > > > > Jun > > > > > > > > > > > > On Wed, Mar 20, 2013 at 12:29 AM, 王国栋 <[EMAIL PROTECTED]> > wrote: > > > > > > > > > > > > > Hi Jun, > > > > > > > > > > > > > > we do not use any compression in our test. > > > > > > > > > > > > > > We deploy producer and broker in the same machine. The problem > > > still > > > > > > > exists. We use sync producer, and send one message at a time(no > > > batch > > > > > > now). > > > > > > > We find that when the qps reaches more than 40k, the exception > > > > appears. > > > > > > So > > > > > > > I don't think it's the underlying system error. > > > > > > > > > > > > > > Any suggestions if we want to do some debug on kafka > > > > > > > serialization/deserialization? > > > > > > > > > > > > > > Thanks. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Mar 20, 2013 at 12:10 AM, Jun Rao <[EMAIL PROTECTED]> > > > wrote: > > > > > > > > > > > > > > > It basically means that the broker is expecting to read > certain > > > > > number > > > > > > of > > > > > > > > bytes in a buffer received from socket, but there are fewer > > bytes > > > > > than > > > > > > > > expected in the buffer. Possible causes are (1) a bug in > Kafka > > > > > request > > > > > > > > serialization/deserialization logic; (2) corruption in the > > > > underlying > > > > > > > > system such as network. > > > > > > > > > > > > > > > > BTW, did you enable compression in your producer? > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > On Mon, Mar 18, 2013 at 10:12 PM, Helin Xiang < > > [EMAIL PROTECTED] > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > thanks Jun. > > > > > > > > > > > > > > > > > > we are using java producer. > > > > > > > > > does the last exception > > > > > > > > > "java.lang.IllegalArgumentException<-- > Guodong Wang > 王国栋 >
-
Re: Kafka throw InvalidMessageException and lost data王国栋 2013-03-25, 04:23
Hi Neha,
I am sorry, we couldn't find the detail reason of this bug. But we do believe that the bug is in java.nio package. Something bad may happen during sync-producer sends data in byteBuffer or broker receives data from the channel. The serialize/deserialize process is right, because we enable valid-message in producer, the crc checksum is right in client. But the crc checksum is invalid in the server. BTW: the bug is only reproduced in high concurrency situation. Thanks. On Mon, Mar 25, 2013 at 12:00 PM, Neha Narkhede <[EMAIL PROTECTED]>wrote: > That's great. Would you know the details of the bug though ? It will be > helpful for the community to understand. > > Thanks, > Neha > > On Sunday, March 24, 2013, 王国栋 wrote: > > > Hi Neha & Jun, > > > > I think we have found the reason of this bug. > > It is related to jdk versions. In the beginning, we use jdk1.6.20 to run > > the test. > > > > Yesterday, we used jdk1.6.38. Everything is OK now. > > > > Thanks a lot for all your kind help and answer. > > > > > > > > On Thu, Mar 21, 2013 at 1:05 PM, Neha Narkhede <[EMAIL PROTECTED] > <javascript:;> > > >wrote: > > > > > Do you mind filing a bug and attaching the reproducible test case > there ? > > > > > > Thanks, > > > Neha > > > > > > On Wednesday, March 20, 2013, 王国栋 wrote: > > > > > > > Hi Jun, > > > > > > > > We use one thread with one sync produce to send data to broker > > > > (QPS:10k-15k, each log is about 1k bytes). The problem is reproduced. > > > > > > > > We have used Producer and SyncProducer in our test. The same > Exception > > > > appears. > > > > > > > > Thanks. > > > > > > > > > > > > > > > > On Thu, Mar 21, 2013 at 12:19 PM, Jun Rao <[EMAIL PROTECTED]> wrote: > > > > > > > > > How many threads are you using? > > > > > > > > > > Thanks, > > > > > > > > > > Jun > > > > > > > > > > On Wed, Mar 20, 2013 at 7:33 PM, Yang Zhou < > [EMAIL PROTECTED] > > > > > > > > wrote: > > > > > > > > > > > Sorry, I made a mistake, we use many threads producing at same > > time. > > > > > > > > > > > > > > > > > > 2013/3/20 Jun Rao <[EMAIL PROTECTED]> > > > > > > > > > > > > > How many producer instances do you have? Can you reproduce the > > > > problem > > > > > > with > > > > > > > a single producer? > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > On Wed, Mar 20, 2013 at 12:29 AM, 王国栋 <[EMAIL PROTECTED]> > > wrote: > > > > > > > > > > > > > > > Hi Jun, > > > > > > > > > > > > > > > > we do not use any compression in our test. > > > > > > > > > > > > > > > > We deploy producer and broker in the same machine. The > problem > > > > still > > > > > > > > exists. We use sync producer, and send one message at a > time(no > > > > batch > > > > > > > now). > > > > > > > > We find that when the qps reaches more than 40k, the > exception > > > > > appears. > > > > > > > So > > > > > > > > I don't think it's the underlying system error. > > > > > > > > > > > > > > > > Any suggestions if we want to do some debug on kafka > > > > > > > > serialization/deserialization? > > > > > > > > > > > > > > > > Thanks. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Mar 20, 2013 at 12:10 AM, Jun Rao <[EMAIL PROTECTED]> > > > > wrote: > > > > > > > > > > > > > > > > > It basically means that the broker is expecting to read > > certain > > > > > > number > > > > > > > of > > > > > > > > > bytes in a buffer received from socket, but there are fewer > > > bytes > > > > > > than > > > > > > > > > expected in the buffer. Possible causes are (1) a bug in > > Kafka > > > > > > request > > > > > > > > > serialization/deserialization logic; (2) corruption in the > > > > > underlying > > > > > > > > > system such as network. > > > > > > > > > > > > > > > > > > BTW, did you enable compression in your producer? > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > On Mon, Mar 18, 2013 at 10 Guodong Wang 王国栋 |