Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Plain View
Kafka, mail # dev - Trouble producing gzip messages


Copy link to this message
-
Trouble producing gzip messages
David Arthur 2012-09-26, 23:43
I'm working on adding gzip support to the Python client, and I'm running into some issues. I think I might not understand exactly how the compression is supposed to be implemented.

My initial approach was to set the compression byte to 1 to indicate gzip, and then simply gzip the payload. Here is an example request sent to Kafka (with byte-by-byte breakdown). I am sending the payload "test" to "my-topic" partition 0:

\x00\x00\x006\x00\x00\x00\x08my-topic\x00\x00\x00\x00\x00\x00\x00"\x00\x00\x00\x1e\x01\x011\xc6\xb4\x08\x1f\x8b\x08\x00\x97\x91cP\x02\xff+I-.\x01\x00\x0c~\x7f\xd8\x04\x00\x00\x00

\x00 \x00 \x00 6  
'--------------'
 request length = 54

\x00 \x00
'-------'
  type = 0

\x00 \x08 m y - t o p i c
'-------' '-------------'
  len = 8     topic

\x00 \x00 \x00 \x00
'-----------------'
  partition = 0
  
\x00 \x00 \x00 "
'--------------'
 messageset length = 34

\x00 \x00 \x00 \x1e
'-----------------'
  message length = 30

\x01 magic = 1
\x01 compression = 1
\xbb\x83\x82\xe0 checksum = -1149009184

\x1f\x8b\x08\x00\x8b\x8acP\x02\xff+I-.\x01\x00\x0c~\x7f\xd8\x04\x00\x00\x00 = "test" gzipped, length=24

This all seems fine to me, but Kafka is throwing a strange error message:

[2012-09-26 19:36:55,253] TRACE 54 bytes read from /127.0.0.1:64196 (kafka.network.Processor)
[2012-09-26 19:36:55,253] TRACE Handling produce request from /127.0.0.1:64196 (kafka.request.logger)
[2012-09-26 19:36:55,256] TRACE Producer request ProducerRequest(my-topic,0,34) (kafka.request.logger)
[2012-09-26 19:36:55,259] DEBUG makeNext() in internalIterator: innerDone = true (kafka.message.ByteBufferMessageSet)
[2012-09-26 19:36:55,260] TRACE Remaining bytes in iterator = 30 (kafka.message.ByteBufferMessageSet)
[2012-09-26 19:36:55,260] TRACE size of data = 30 (kafka.message.ByteBufferMessageSet)
[2012-09-26 19:36:55,264] DEBUG Message is compressed. Valid byte count = 0 (kafka.message.ByteBufferMessageSet)
[2012-09-26 19:36:55,276] DEBUG makeNext() in internalIterator: innerDone = true (kafka.message.ByteBufferMessageSet)
[2012-09-26 19:36:55,276] TRACE Remaining bytes in iterator = 0 (kafka.message.ByteBufferMessageSet)
[2012-09-26 19:36:55,276] TRACE size of data = 1952805748 (kafka.message.ByteBufferMessageSet)
[2012-09-26 19:36:55,277] ERROR Error processing ProduceRequest on my-topic:0 (kafka.server.KafkaRequestHandlers)
kafka.common.InvalidMessageSizeException: invalid message size: 1952805748 only received bytes: 0 at 0( possible causes (1) a single message larger than the fetch size; (2) log corruption )
at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:120)
at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:149)
at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
at scala.collection.Iterator$class.foreach(Iterator.scala:631)
at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
at kafka.message.MessageSet.foreach(MessageSet.scala:87)
at kafka.log.Log.append(Log.scala:205)
at kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
at kafka.server.KafkaRequestHandlers.handleProducerRequest(KafkaRequestHandlers.scala:53)
at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38)
at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38)
at kafka.network.Processor.handle(SocketServer.scala:296)
at kafka.network.Processor.read(SocketServer.scala:319)
at kafka.network.Processor.run(SocketServer.scala:214)
at java.lang.Thread.run(Thread.java:680)

The strangest part is that the "invalid message size" 1952805748 is the decompressed message payload "test" represented as an int32. Any ideas?

-David
+
David Arthur 2012-09-27, 02:37