Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # user >> Wire format for Kafka 0.7.1


Copy link to this message
-
Re: Wire format for Kafka 0.7.1
Jun

I was just bashing myself for not having read the spec carefully;
particularly the magic byte.

The Java producer seems to use a different encoder for strings. I am, now,
able to correctly produce and consume messages (bytes in, bytes out) as per
the spec in erlang. (simple producer, simple consumer).

On disk, there seems to be a constant 10 byte gap between each message;
apart from the message itself (which ofcourse is variable length). This is
how I calculate offset. So far, it seems to work (next offset).
On Sep 28, 2012 8:33 AM, "Jun Rao" <[EMAIL PROTECTED]> wrote:

> Milind,
>
> The spec that you listed seems correct. Perhaps you can send the same
> message using the java producer. Then you can look at the on disk format of
> the message and see how it differs from the one generated from your Erlang
> producer.
>
> Thanks,
>
> Jun
>
> On Wed, Sep 26, 2012 at 11:21 PM, Milind Parikh <[EMAIL PROTECTED]
> >wrote:
>
> > I am writing an erlang driver for Kafka. I am using the spec from
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Writing+a+Driver+for+Kafka
> > .
> > Just learnt something that I thought should be the ML for someone
> > developing a different driver.
> >
> > My specific issue currently has to do with the PRODUCE request. It
> appears
> > that the request header is getting parsed correctly. BUT the specific
> > message does not seem to parsed with the topic of "test" and the message
> of
> > "hi", partition 0, magic 0, compression 0.
> >
> >
> > [2012-09-26 22:55:30,131] INFO Created log for 'test'-0
> > (kafka.log.LogManager)
> > [2012-09-26 22:55:30,134] INFO Begin registering broker topic
> > /brokers/topics/test/0 with 1 partitions (kafka.server.KafkaZooKeeper)
> > [2012-09-26 22:55:30,138] ERROR Error processing ProduceRequest on test:0
> > (kafka.server.KafkaRequestHandlers)
> > kafka.message.InvalidMessageException: message is invalid, compression
> > codec: NoCompressionCodec size: 8 curr offset: 0 init offset: 0
> >     at
> >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
> >     at
> >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
> >     at
> >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
> >     at
> > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> >     at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> >     at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> >     at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
> >     at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
> >     at kafka.message.MessageSet.foreach(MessageSet.scala:87)
> >     at kafka.log.Log.append(Log.scala:205)
> >     at
> >
> >
> kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
> >     at
> >
> >
> kafka.server.KafkaRequestHandlers.handleProducerRequest(KafkaRequestHandlers.scala:53)
> >     at
> >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38)
> >     at
> >
> >
> kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38)
> >     at kafka.network.Processor.handle(SocketServer.scala:296)
> >     at kafka.network.Processor.read(SocketServer.scala:319)
> >     at kafka.network.Processor.run(SocketServer.scala:214)
> >     at java.lang.Thread.run(Thread.java:679)
> > [2012-09-26 22:55:30,143] ERROR Closing socket for /127.0.0.1 because of
> > error (kafka.network.Processor)
> > kafka.message.InvalidMessageException: message is invalid, compression
> > codec: NoCompressionCodec size: 8 curr offset: 0 init offset: 0
> >     at
> >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:130)
> >     at
> >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB