Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # user >> Kafka throw InvalidMessageException and lost data


Copy link to this message
-
Re: Kafka throw InvalidMessageException and lost data
How many producer instances do you have? Can you reproduce the problem with
a single producer?

Thanks,

Jun

On Wed, Mar 20, 2013 at 12:29 AM, 王国栋 <[EMAIL PROTECTED]> wrote:

> Hi Jun,
>
> we do not use any compression in our test.
>
> We deploy producer and broker in the same machine. The problem still
> exists. We use sync producer, and send one message at a time(no batch now).
> We find that when the qps reaches more than 40k, the exception appears. So
> I don't think it's the underlying system error.
>
> Any suggestions if we want to do some debug on kafka
> serialization/deserialization?
>
> Thanks.
>
>
>
>
> On Wed, Mar 20, 2013 at 12:10 AM, Jun Rao <[EMAIL PROTECTED]> wrote:
>
> > It basically means that the broker is expecting to read certain number of
> > bytes in a buffer received from socket, but there are fewer bytes than
> > expected in the buffer. Possible causes are (1) a bug in Kafka request
> > serialization/deserialization logic; (2) corruption in the underlying
> > system such as network.
> >
> > BTW, did you enable compression in your producer?
> >
> > Thanks,
> >
> > Jun
> >
> > On Mon, Mar 18, 2013 at 10:12 PM, Helin Xiang <[EMAIL PROTECTED]> wrote:
> >
> > > thanks Jun.
> > >
> > > we are using java producer.
> > > does the last exception
> > > "java.lang.IllegalArgumentException
> > >     at java.nio.Buffer.limit(Buffer.java:266)
> > > "
> > > also means the broker received corrupted messages ?  sorry i am not
> > > familiar with java nio.
> > >
> > >
> > >
> > >
> > > On Tue, Mar 19, 2013 at 12:58 PM, Jun Rao <[EMAIL PROTECTED]> wrote:
> > >
> > > > Hmm, both log4j messages suggest that the broker received some
> > corrupted
> > > > produce requests. Are you using the java producer? Also, we have seen
> > > that
> > > > network router problems caused corrupted requests before.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Mon, Mar 18, 2013 at 8:22 PM, Helin Xiang <[EMAIL PROTECTED]>
> > wrote:
> > > >
> > > > > Hi,
> > > > > We were doing some performance test using kafka 0.7.2. We use only
> 1
> > > > > broker.
> > > > > On producer client, we use 8 threads to send logs, each thread use
> > sync
> > > > > producer and send 100 logs at a time, (each log is about 1~2K bytes
> > > > long),
> > > > > The total QPS is about 30K.
> > > > > But the number of logs both consumer read and the broker counts is
> > less
> > > > > than the producer send. we believe the data lost when producer
> > sending
> > > > logs
> > > > > to broker.
> > > > >
> > > > > We settle the QPS down to 10K, still lost logs.
> > > > > We found some exceptions in broker logs:
> > > > >
> > > > > 9201051 [kafka-processor-2] ERROR kafka.server.KafkaRequestHandlers
> >  -
> > > > > Error processing ProduceRequest on abc:0
> > > > > kafka.message.InvalidMessageException: message is invalid,
> > compression
> > > > > codec: NoCompressionCodec size: 1021 curr offset: 0 init offset: 0
> > > > >     at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
> > > > >     at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:160)
> > > > >     at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
> > > > >     at
> > > > >
> > >
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> > > > >     at
> > kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> > > > >     at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.message.ByteBufferMessageSet.verifyMessageSize(ByteBufferMessageSet.scala:89)
> > > > >     at kafka.log.Log.append(Log.scala:218)
> > > > >     at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
> > > > >     at
> > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.KafkaRequestHandlers.handleProducerRequest(KafkaRequestHandlers.scala:53)