Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka, mail # dev - Review Request 17263: New producer for Kafka.

Copy link to this message
Re: Review Request 17263: New producer for Kafka.
Jay Kreps 2014-01-30, 18:54

> On Jan. 28, 2014, 8:48 p.m., Joel Koshy wrote:
> > Overall, this looks great. Here are a few comments - I have some more
> > minor/stylistic comments (typos in comments and broken javadoc, code
> > conventions, minor javadoc edits and such) which I would rather defer for a
> > later more final patch.
> >
> > I only did a cursory review of Protocol (which I on the surface looks very
> > promising) and did not review the unit tests and metrics package. WRT
> > metrics I'm a bit ambivalent - i.e., my preference would be to resolve that
> > first if you intend to switch to custom metrics or just stick to coda hale.
> > The main benefits I see are full control over how metrics are computed
> > (e.g., coda hale metrics at least in the version we are using makes it
> > difficult/impossible to configure reservoir samples sizes, decay constants,
> > reporting intervals, etc.) and predictable API.
> >
> > A lot of logging and stats are missing (e.g., request rates, callback
> > execution time, ser-de time, etc.). I'm assuming these will be addressed in
> > your final patch.
> >

Yes, I held off on adding metrics and stats until we agree on how to do that.
> On Jan. 28, 2014, 8:48 p.m., Joel Koshy wrote:
> > clients/src/main/java/kafka/clients/producer/KafkaProducer.java, line 182
> > <https://reviews.apache.org/r/17263/diff/1/?file=436445#file436445line182>
> >
> >     This could add a couple minutes startup for producers that send to several
> >     topics (such as mirror-makers) since metadata lookup (at startup) will be
> >     issued serially for each topic.
> >

Wait, why? Shouldn't metadata fetch be very fast?
> On Jan. 28, 2014, 8:48 p.m., Joel Koshy wrote:
> > clients/src/main/java/kafka/clients/producer/KafkaProducer.java, line 186
> > <https://reviews.apache.org/r/17263/diff/1/?file=436445#file436445line186>
> >
> >     IIRC our current producer allows re-partitioning to an available partition.
> >     We can probably do something similar on producer errors, but arguably
> >     that approach is an artifact of longer than ideal leader election latency.
> >

You can kind of do this. That is you can mimic the 0.7 behavior by having a partitioner that only partitions to available partitions irrespective of key. Since there are no retries yet I think this is effectively as good as it gets.

I am a little averse to the idea of some kind of -1 partition that is handled magically by the producer but that is also an option.
> On Jan. 28, 2014, 8:48 p.m., Joel Koshy wrote:
> > clients/src/main/java/kafka/clients/producer/internals/BufferPool.java, line 21
> > <https://reviews.apache.org/r/17263/diff/1/?file=436452#file436452line21>
> >
> >     My reading of bufferPool - let me know if this is incorrect:
> >     - For most producers, most allocations will be of batch-size. (And most
> >       batches will not entirely fill the batch-size buffer).
> >     - Batch size needs to be properly tuned:
> >       - If too low, then you end up having (typically) one of the following:
> >         - Messages exceed batch size and thus poolable size and so you end up
> >           having to allocate on every record.
> >         - Messages fit but you have more sends than (probably) necessary given
> >           that you are sending smaller requests
> >       - If too high, then you could end up wasting a lot of memory especially
> >         for producers such as the mirror maker which produces to several
> >         partitions.
> >            
> >     It is an interesting approach, and I will think about it more. I would
> >     expect it to work very well for 90% of producers that send to a few topics,
> >     but it may be problematic for large producers such as mirror makers for
> >     which my intuition is that even very simple memory banks (i.e., a few levels
> >     of free lists) would address without fully reimplementing malloc. E.g., for
> >     each free list if we were to maintain counts on percentage of
> >     smaller-than-half allocations we could if necessary create a free list of

Yeah I originally shared a lot of your concerns, but consider a few additional things:
1. I don't think that for large messages doing an allocation and send per-message is a problem because the sending of bytes will dominate. i.e. I think we could hard code a batch size of 16k and it would be close to optimal for most producers. Basically the goal is to avoid small allocations/sends not to avoid per-message allocations/sends.
2. The memory fragmentation problem is not as bad as it seems. Keep in mind that each java object significant overhead. Keeping around a list of ProducerRecord instances is actually pretty pricy if you sit and add up the actual per message overheads. I think also internally malloc/new must have fragmentation, that is when you ask for 15 bytes you often get more than 15 bytes. I'm not convinced this strategy is that much worse.
3. One optimization that Neha pointed out that would improve things would be to take all ready batches for a partition. I think this could be added later.
4. With 16k batch size and 1000 partitions you only need 16M of buffer space which is not unreasonable so I think this may work even for mirror-maker like cases.

That said if you have some thoughts on how the pooling could work I would be interested to hear it. I considered something like what you describe where I pick a few sizes to pool at, but it definitely adds some complications and corner cases so the challenge is to algorithmically work through some of these.

Yeah this is interesting. What I interpret you to be saying is to sort all the batches by the created timestamp on the record batch which corresponds to the first message enqueue.

The advantage of this is that instead of round-robining in the check always chose the partition which has the oldest data.

Avoiding the sort on each iteration might require some kind of heap.

This would be a good optimization.

The error will be thrown by await