|
|
-
0.8 producer -- many questions
ben fleis 2012-11-23, 15:51
Hello all,
I am writing a producer client for the v0.8 protocol. I have the wire format encoding working, and much of the functionality works well. However, I have made too many assumptions along the way, and it's time that I figure out the answers to these things. Sorry for the 'laundry list' -- most of them are related, and can possibly be answered in batch form.
TopicMetadata updates
- is there a particular expectation of polling frequency? (per topic, per broker, etc.?) - should a particular topic's metadata request only go to the "current" leader of that topic? - if such a request fails to receive response, is there a particular expected behavior? - I am assuming that producers always push to the leader (feels like it's in the docs someplace, but can't confirm) - if the leader changes, what is the protocol for a producer? must the producer await ACK from outstanding requests before making a change, and if so, presumeably re-push those requests?
I'm sure there will be more, but this opens the door.
Cheers,
ben
+
ben fleis 2012-11-23, 15:51
-
Re: 0.8 producer -- many questions
Jun Rao 2012-11-26, 05:30
Ben,
Please see the inlined answers below.
Thanks,
Jun
On Fri, Nov 23, 2012 at 7:51 AM, ben fleis <[EMAIL PROTECTED]> wrote:
> Hello all, > > I am writing a producer client for the v0.8 protocol. I have the wire > format encoding working, and much of the functionality works well. > However, I have made too many assumptions along the way, and it's time > that I figure out the answers to these things. Sorry for the 'laundry > list' -- most of them are related, and can possibly be answered in batch > form. > > TopicMetadata updates > > - is there a particular expectation of polling frequency? (per topic, > per broker, etc.?) > You just need to do the update the first time you send a message to a topic/partition or you get an error or an exception while sending a message (likely the leader has moved to a different broker). In general, you should be updating the metadata infrequently. > - should a particular topic's metadata request only go to the "current" > leader of that topic? >
Metadata request can be answered by any broker. > - if such a request fails to receive response, is there a particular > expected behavior? >
Then you send the same metadata request to another broker. Also, if the leader in the metadata response is null, it means that the leader is not available yet. You will need to wait a bit and retry the metadata request. > - I am assuming that producers always push to the leader (feels like > it's in the docs someplace, but can't confirm) >
That's right. Producer/Fetch request can only be served at the leader of a partition. > - if the leader changes, what is the protocol for a producer? must the > producer await ACK from outstanding requests before making a change, > and if > so, presumeably re-push those requests? > > If the leader changes, current producer will receive an error code or an exception. This means that the produced data is not guaranteed to be in the broker. Depending on the application, the producer may choose to re-send. > I'm sure there will be more, but this opens the door. > > Cheers, > > ben >
+
Jun Rao 2012-11-26, 05:30
-
Re: 0.8 producer -- many questions
Jason Rosenberg 2012-11-26, 06:53
Is there any 0.8 documentation available yet for all this?
On Sun, Nov 25, 2012 at 9:30 PM, Jun Rao <[EMAIL PROTECTED]> wrote:
> Ben, > > Please see the inlined answers below. > > Thanks, > > Jun > > On Fri, Nov 23, 2012 at 7:51 AM, ben fleis <[EMAIL PROTECTED]> wrote: > > > Hello all, > > > > I am writing a producer client for the v0.8 protocol. I have the wire > > format encoding working, and much of the functionality works well. > > However, I have made too many assumptions along the way, and it's time > > that I figure out the answers to these things. Sorry for the 'laundry > > list' -- most of them are related, and can possibly be answered in batch > > form. > > > > TopicMetadata updates > > > > - is there a particular expectation of polling frequency? (per topic, > > per broker, etc.?) > > > You just need to do the update the first time you send a message to a > topic/partition or you get an error or an exception while sending a message > (likely the leader has moved to a different broker). In general, you should > be updating the metadata infrequently. > > > > - should a particular topic's metadata request only go to the > "current" > > leader of that topic? > > > > Metadata request can be answered by any broker. > > > > - if such a request fails to receive response, is there a particular > > expected behavior? > > > > Then you send the same metadata request to another broker. Also, if the > leader in the metadata response is null, it means that the leader is not > available yet. You will need to wait a bit and retry the metadata request. > > > > - I am assuming that producers always push to the leader (feels like > > it's in the docs someplace, but can't confirm) > > > > That's right. Producer/Fetch request can only be served at the leader of a > partition. > > > > - if the leader changes, what is the protocol for a producer? must > the > > producer await ACK from outstanding requests before making a change, > > and if > > so, presumeably re-push those requests? > > > > > If the leader changes, current producer will receive an error code or an > exception. This means that the produced data is not guaranteed to be in the > broker. Depending on the application, the producer may choose to re-send. > > > > I'm sure there will be more, but this opens the door. > > > > Cheers, > > > > ben > > >
+
Jason Rosenberg 2012-11-26, 06:53
-
Re: 0.8 producer -- many questions
ben fleis 2012-11-26, 11:20
@Jason - I am trying to document what I write, as I write it, with the hopes of handing off a small "docs" set to the kafka team. IMHO, protocol, expectations, etc., are best documented w/ the repository, so that that information is version tagged and via standard release process, should remain reasonably fresh. I haven't pushed it to git in a while, but I will share my github fork here if it's valuable.
@Jun, thanks for the first set. I have a couple more follow up questions.
*Single socket multiplexing* -- the current protocol "standard header" doesn't include 'request_type_id'. This implies that socket multiplexing is either unwelcome, or unexpected. Is the expectation that I send Metadata requests via a separate socket? Or instead that if I do send one, that the Metadata reply is prioritized over any other outstanding ProduceResponses? (As an aside, I've assumed that ProduceResponse ordering from a single broker is undefined? I use the correlation_id, but am curious if there are guarantees.)
*ProduceRequest + MessageAndOffset *-- I haven't learned scala that far yet, and the many layers of abstraction make it difficult (for the naïve to trace). In the ProduceRequest's message_set, it kinda appears that it reads in an array of "MessageAndOffset". Am I misreading? If not, what is the offset, and why? If so, what are those bytes?
For the record, this is what I recorded in my pythonic protocol dummy code:
def pack_produce_message_set(pusher, partition_id, msg_set): pusher.push_int(partition_id) # partition_id pusher.push_int_marker() # msg_set size (backpatched) for msg in msg_set: pusher.push_long(0) # offset (always 0?) pusher.push_int_marker() # msg size (backpatched) pusher.push_int_marker() # CRC32 (backpatched) pusher.push_byte(_message_magic_byte) # Magic byte pusher.push_byte(0) # attributes (compr, codec) Thanks!
ben
+
ben fleis 2012-11-26, 11:20
-
Re: 0.8 producer -- many questions
Joel Koshy 2012-11-27, 00:32
> *Single socket multiplexing* -- the current protocol "standard header" > doesn't include 'request_type_id'. This implies that socket multiplexing > is either unwelcome, or unexpected. Is the expectation that I send > Metadata requests via a separate socket? Or instead that if I do send one, > that the Metadata reply is prioritized over any other outstanding > ProduceResponses? (As an aside, I've assumed that ProduceResponse ordering > from a single broker is undefined? I use the correlation_id, but am > curious if there are guarantees.) >
The requestId for each request-type is written out to the wire (see BoundedByteBufferSend)
*ProduceRequest + MessageAndOffset *-- I haven't learned scala that far > yet, and the many layers of abstraction make it difficult (for the naïve to > trace). In the ProduceRequest's message_set, it kinda appears that it > reads in an array of "MessageAndOffset". Am I misreading? If not, what is > the offset, and why? If so, what are those bytes? > > > For the record, this is what I recorded in my pythonic protocol dummy code: > > def pack_produce_message_set(pusher, partition_id, msg_set): > pusher.push_int(partition_id) # partition_id > pusher.push_int_marker() # msg_set size > (backpatched) > for msg in msg_set: > pusher.push_long(0) # offset (always 0?) > pusher.push_int_marker() # msg size > (backpatched) > pusher.push_int_marker() # CRC32 (backpatched) > pusher.push_byte(_message_magic_byte) # Magic byte > pusher.push_byte(0) # attributes (compr, > codec) > The offsets are largely irrelevant to the producer request - i.e., the entire data buffer is just dumped into the request. If you're writing a native producer then you don't need to deal with offsets of each individual message (i.e., at the time you actually write the data out to wire). The best way to follow this is to trace through the writeTo method in ProducerRequest.
Joel
+
Joel Koshy 2012-11-27, 00:32
|
|