Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Plain View
Kafka >> mail # user >> 0.8 producer -- many questions


+
ben fleis 2012-11-23, 15:51
+
Jun Rao 2012-11-26, 05:30
+
Jason Rosenberg 2012-11-26, 06:53
Copy link to this message
-
Re: 0.8 producer -- many questions
@Jason - I am trying to document what I write, as I write it, with the
hopes of handing off a small "docs" set to the kafka team.  IMHO, protocol,
expectations, etc., are best documented w/ the repository, so that that
information is version tagged and via standard release process, should
remain reasonably fresh.  I haven't pushed it to git in a while, but I will
share my github fork here if it's valuable.

@Jun, thanks for the first set.  I have a couple more follow up questions.

*Single socket multiplexing* -- the current protocol "standard header"
doesn't include 'request_type_id'.  This implies that socket multiplexing
is either unwelcome, or unexpected.  Is the expectation that I send
Metadata requests via a separate socket?  Or instead that if I do send one,
that the Metadata reply is prioritized over any other outstanding
ProduceResponses?  (As an aside, I've assumed that ProduceResponse ordering
from a single broker is undefined?  I use the correlation_id, but am
curious if there are guarantees.)

*ProduceRequest + MessageAndOffset *-- I haven't learned scala that far
yet, and the many layers of abstraction make it difficult (for the naïve to
trace).  In the ProduceRequest's message_set, it kinda appears that it
reads in an array of "MessageAndOffset".  Am I misreading?  If not, what is
the offset, and why?  If so, what are those bytes?

For the record, this is what I recorded in my pythonic protocol dummy code:

def pack_produce_message_set(pusher, partition_id, msg_set):
    pusher.push_int(partition_id)                   # partition_id
    pusher.push_int_marker()                        # msg_set size
(backpatched)
    for msg in msg_set:
        pusher.push_long(0)                         # offset (always 0?)
        pusher.push_int_marker()                    # msg size (backpatched)
        pusher.push_int_marker()                    # CRC32 (backpatched)
        pusher.push_byte(_message_magic_byte)       # Magic byte
        pusher.push_byte(0)                         # attributes (compr,
codec)
Thanks!

ben
+
Joel Koshy 2012-11-27, 00:32
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB