Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka, mail # user - 0.8 producer -- many questions

Copy link to this message
Re: 0.8 producer -- many questions
ben fleis 2012-11-26, 11:20
@Jason - I am trying to document what I write, as I write it, with the
hopes of handing off a small "docs" set to the kafka team.  IMHO, protocol,
expectations, etc., are best documented w/ the repository, so that that
information is version tagged and via standard release process, should
remain reasonably fresh.  I haven't pushed it to git in a while, but I will
share my github fork here if it's valuable.

@Jun, thanks for the first set.  I have a couple more follow up questions.

*Single socket multiplexing* -- the current protocol "standard header"
doesn't include 'request_type_id'.  This implies that socket multiplexing
is either unwelcome, or unexpected.  Is the expectation that I send
Metadata requests via a separate socket?  Or instead that if I do send one,
that the Metadata reply is prioritized over any other outstanding
ProduceResponses?  (As an aside, I've assumed that ProduceResponse ordering
from a single broker is undefined?  I use the correlation_id, but am
curious if there are guarantees.)

*ProduceRequest + MessageAndOffset *-- I haven't learned scala that far
yet, and the many layers of abstraction make it difficult (for the naïve to
trace).  In the ProduceRequest's message_set, it kinda appears that it
reads in an array of "MessageAndOffset".  Am I misreading?  If not, what is
the offset, and why?  If so, what are those bytes?

For the record, this is what I recorded in my pythonic protocol dummy code:

def pack_produce_message_set(pusher, partition_id, msg_set):
    pusher.push_int(partition_id)                   # partition_id
    pusher.push_int_marker()                        # msg_set size
    for msg in msg_set:
        pusher.push_long(0)                         # offset (always 0?)
        pusher.push_int_marker()                    # msg size (backpatched)
        pusher.push_int_marker()                    # CRC32 (backpatched)
        pusher.push_byte(_message_magic_byte)       # Magic byte
        pusher.push_byte(0)                         # attributes (compr,