Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # user >> Wire format for Kafka 0.7.1


Copy link to this message
-
Re: Wire format for Kafka 0.7.1
Jun

I was just bashing myself for not having read the spec carefully;
particularly the magic byte.

The Java producer seems to use a different encoder for strings. I am, now,
able to correctly produce and consume messages (bytes in, bytes out) as per
the spec in erlang. (simple producer, simple consumer).

On disk, there seems to be a constant 10 byte gap between each message;
apart from the message itself (which ofcourse is variable length). This is
how I calculate offset. So far, it seems to work (next offset).
On Sep 28, 2012 8:33 AM, "Jun Rao" <[EMAIL PROTECTED]> wrote:

> Milind,
>
> The spec that you listed seems correct. Perhaps you can send the same
> message using the java producer. Then you can look at the on disk format of
> the message and see how it differs from the one generated from your Erlang
> producer.
>
> Thanks,
>
> Jun
>
> On Wed, Sep 26, 2012 at 11:21 PM, Milind Parikh <[EMAIL PROTECTED]
> >wrote:
>
> > I am writing an erlang driver for Kafka. I am using the spec from
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Writing+a+Driver+for+Kafka
> > .
> > Just learnt something that I thought should be the ML for someone
> > developing a different driver.
> >
> > My specific issue currently has to do with the PRODUCE request. It
> appears
> > that the request header is getting parsed correctly. BUT the specific
> > message does not seem to parsed with the topic of "test" and the message
> of
> > "hi", partition 0, magic 0, compression 0.
> >
> >
> > [2012-09-26 22:55:30,131] INFO Created log for 'test'-0
> > (kafka.log.LogManager)
> > [2012-09-26 22:55:30,134] INFO Begin registering broker topic
> > /brokers/topics/test/0 with 1 partitions (kafka.server.KafkaZooKeeper)
> > [2012-09-26 22:55:30,138] ERROR Error processing ProduceRequest on test:0
> > (kafka.server.KafkaRequestHandlers)
> > kafka.message.InvalidMessageException: message is invalid, compression
> > codec: NoCompressionCodec size: 8 curr offset: 0 init offset: 0
> >     at
> >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
> >     at
> >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
> >     at
> >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
> >     at
> > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> >     at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> >     at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> >     at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
> >     at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
> >     at kafka.message.MessageSet.foreach(MessageSet.scala:87)
> >     at kafka.log.Log.append(Log.scala:205)
> >     at
> >
> >
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
> >     at
> >
> >
> kafka.server.KafkaRequestHandlers.handleProducerRequest(KafkaRequestHandlers.scala:53)
> >     at
> >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38)
> >     at
> >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38)
> >     at kafka.network.Processor.handle(SocketServer.scala:296)
> >     at kafka.network.Processor.read(SocketServer.scala:319)
> >     at kafka.network.Processor.run(SocketServer.scala:214)
> >     at java.lang.Thread.run(Thread.java:679)
> > [2012-09-26 22:55:30,143] ERROR Closing socket for /127.0.0.1 because of
> > error (kafka.network.Processor)
> > kafka.message.InvalidMessageException: message is invalid, compression
> > codec: NoCompressionCodec size: 8 curr offset: 0 init offset: 0
> >     at
> >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
> >     at
> >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)