Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Plain View
Kafka >> mail # dev >> Trouble producing gzip messages


Copy link to this message
-
Trouble producing gzip messages
I'm working on adding gzip support to the Python client, and I'm running into some issues. I think I might not understand exactly how the compression is supposed to be implemented.

My initial approach was to set the compression byte to 1 to indicate gzip, and then simply gzip the payload. Here is an example request sent to Kafka (with byte-by-byte breakdown). I am sending the payload "test" to "my-topic" partition 0:

\x00\x00\x006\x00\x00\x00\x08my-topic\x00\x00\x00\x00\x00\x00\x00"\x00\x00\x00\x1e\x01\x011\xc6\xb4\x08\x1f\x8b\x08\x00\x97\x91cP\x02\xff+I-.\x01\x00\x0c~\x7f\xd8\x04\x00\x00\x00

\x00 \x00 \x00 6  
'--------------'
 request length = 54

\x00 \x00
'-------'
  type = 0

\x00 \x08 m y - t o p i c
'-------' '-------------'
  len = 8     topic

\x00 \x00 \x00 \x00
'-----------------'
  partition = 0
  
\x00 \x00 \x00 "
'--------------'
 messageset length = 34

\x00 \x00 \x00 \x1e
'-----------------'
  message length = 30

\x01 magic = 1
\x01 compression = 1
\xbb\x83\x82\xe0 checksum = -1149009184

\x1f\x8b\x08\x00\x8b\x8acP\x02\xff+I-.\x01\x00\x0c~\x7f\xd8\x04\x00\x00\x00 = "test" gzipped, length=24

This all seems fine to me, but Kafka is throwing a strange error message:

[2012-09-26 19:36:55,253] TRACE 54 bytes read from /127.0.0.1:64196 (kafka.network.Processor)
[2012-09-26 19:36:55,253] TRACE Handling produce request from /127.0.0.1:64196 (kafka.request.logger)
[2012-09-26 19:36:55,256] TRACE Producer request ProducerRequest(my-topic,0,34) (kafka.request.logger)
[2012-09-26 19:36:55,259] DEBUG makeNext() in internalIterator: innerDone = true (kafka.message.ByteBufferMessageSet)
[2012-09-26 19:36:55,260] TRACE Remaining bytes in iterator = 30 (kafka.message.ByteBufferMessageSet)
[2012-09-26 19:36:55,260] TRACE size of data = 30 (kafka.message.ByteBufferMessageSet)
[2012-09-26 19:36:55,264] DEBUG Message is compressed. Valid byte count = 0 (kafka.message.ByteBufferMessageSet)
[2012-09-26 19:36:55,276] DEBUG makeNext() in internalIterator: innerDone = true (kafka.message.ByteBufferMessageSet)
[2012-09-26 19:36:55,276] TRACE Remaining bytes in iterator = 0 (kafka.message.ByteBufferMessageSet)
[2012-09-26 19:36:55,276] TRACE size of data = 1952805748 (kafka.message.ByteBufferMessageSet)
[2012-09-26 19:36:55,277] ERROR Error processing ProduceRequest on my-topic:0 (kafka.server.KafkaRequestHandlers)
kafka.common.InvalidMessageSizeException: invalid message size: 1952805748 only received bytes: 0 at 0( possible causes (1) a single message larger than the fetch size; (2) log corruption )
at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:120)
at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:149)
at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
at scala.collection.Iterator$class.foreach(Iterator.scala:631)
at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
at kafka.message.MessageSet.foreach(MessageSet.scala:87)
at kafka.log.Log.append(Log.scala:205)
at kafka.server.KafkaRequestHandlers.kafka$server$KafkaRequestHandlers$$handleProducerRequest(KafkaRequestHandlers.scala:69)
at kafka.server.KafkaRequestHandlers.handleProducerRequest(KafkaRequestHandlers.scala:53)
at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38)
at kafka.server.KafkaRequestHandlers$$anonfun$handlerFor$1.apply(KafkaRequestHandlers.scala:38)
at kafka.network.Processor.handle(SocketServer.scala:296)
at kafka.network.Processor.read(SocketServer.scala:319)
at kafka.network.Processor.run(SocketServer.scala:214)
at java.lang.Thread.run(Thread.java:680)

The strangest part is that the "invalid message size" 1952805748 is the decompressed message payload "test" represented as an int32. Any ideas?

-David
+
David Arthur 2012-09-27, 02:37
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB