Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # dev >> Review Request 17263: New producer for Kafka.


Copy link to this message
-
Re: Review Request 17263: New producer for Kafka.


> On Jan. 26, 2014, 11:30 p.m., Sriram Subramanian wrote:
> > clients/src/main/java/kafka/clients/producer/KafkaProducer.java, line 150
> > <https://reviews.apache.org/r/17263/diff/1/?file=436445#file436445line150>
> >
> >     I think what you meant here when you said "the callback will execute in the I/O thread of the producer and so should be reasonably fast." is that the callback should be implemented such that it returns immediately. This is not very obvious from the current description.
>
> Jay Kreps wrote:
>     Hmm, you don't think so? I will specifically mention blocking.

ok. You may want to update the callback interface with the same description.
> On Jan. 26, 2014, 11:30 p.m., Sriram Subramanian wrote:
> > clients/src/main/java/kafka/clients/producer/RecordSend.java, line 34
> > <https://reviews.apache.org/r/17263/diff/1/?file=436451#file436451line34>
> >
> >     This seems a bit confusing. It would be useful to keep the usage of RecordSend in the callback and sync producer to be the same. Is there any reason that await throws on error when you have apis to check error?
>
> Jay Kreps wrote:
>     I think I am confused. The usage in callback and directly is the same, no?
>    
>     The question of whether to have await() throw an error or not is good. I think that is how futures in java work. It also forces the error versus making it possible to use the offset when there is an error. Basically I think I wanted to avoid the mongodb getLastError() where errors are silent unless you explicitly look for them.

I was referring to the following usage -

1. Error need not have to be checked here
ProducerRecord record = new ProducerRecord("the-topic", "key, "value");
RecordSend send = producer.send(myRecord, null).await();
print send.offset()
    
2. Error has to be checked
ProducerRecord record = new ProducerRecord("the-topic", "key, "value");
producer.send(myRecord, new Callback() {
                               public void onCompletion(RecordSend send) {
                                  try {
                                    if (send.hasError())
                                       print send.getError()
                                    else
                                       print send.offset();
                                  } catch(KafkaException e) {
                                    e.printStackTrace();
                               }

                             }
                         });

I was noting this inconsistency.

> On Jan. 26, 2014, 11:30 p.m., Sriram Subramanian wrote:
> > clients/src/main/java/kafka/clients/producer/internals/BufferPool.java, line 79
> > <https://reviews.apache.org/r/17263/diff/1/?file=436452#file436452line79>
> >
> >     Is this exception safe?
>
> Jay Kreps wrote:
>     Hmm, I think what you are saying is that interrupting the thread may cause it to leak memory?

yes
> On Jan. 26, 2014, 11:30 p.m., Sriram Subramanian wrote:
> > clients/src/main/java/kafka/clients/producer/internals/RecordAccumulator.java, line 87
> > <https://reviews.apache.org/r/17263/diff/1/?file=436455#file436455line87>
> >
> >     We can allocate a batch larger than the batchSize?
>
> Jay Kreps wrote:
>     Yeah. This isn't terribly self-explanatory. The idea here is that you want to encourage medium sized batches to avoid small sends. So maybe you set a batch size of 4k. If you get a 1k message, you allocate a 4k buffer and append it and hope more messages show up. If you get a large message you just allocate a buffer for it and send it right away. This let's us have no artificial bound on message size, but handle small messages efficiently.

Makes sense
- Sriram
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/17263/#review32800
-----------------------------------------------------------
On Jan. 23, 2014, 8:54 p.m., Jay Kreps wrote:
>
> ---------------------------