Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka, mail # dev - Client improvement discussion


Copy link to this message
-
Re: Client improvement discussion
Jay Kreps 2013-08-03, 02:42
Cool. With respect to compression performance, we definitely see the same
thing, no debate.

Of course if you want to just compress the message payloads you can do that
now without needing much help from kafka--just pass in the compressed data.
Whether it not it will do much depends on the size of the message body--for
small messages you basically need batch compression, but for large messages
just compressing the body is fine. Our extra effort was to get the better
compression ratio of compressed messages.

What I was saying about snappy performance is that I think it may be our
our inefficiency in the compression code-path rather than the underlying
slowness of snappy. For example on this page
  https://github.com/dain/snappy
The compression performance they list for jni (the library we use) tends to
be around 200MB per core-second, with decompression around 1GB per
core-second. So on a modern machine with umpteen cores that should not be a
bottleneck, right? I don't know this to be true but I am wondering if the
the underlying bottleneck is the compression algorithm or our inefficient
code. If you look at kafka.message.ByteBufferMessageSet.{create,
decompress, and assignOffsets} it is pretty inefficient. I did a round of
improvement there but we are still recopying stuff over and over and
creating zillions of little buffers and objects. It is a little tricky to
clean up but probably just a 1-2 day project.

I would rather figure out that it is really the compression that is the
root cause rather than just our inefficiency before we do anything too
drastic design wise. If this is really killing you guys, and if that turns
out to be the cause, we would definitely take a patch to optimize that path
now.

-Jay
On Fri, Aug 2, 2013 at 4:55 PM, Chris Hogue <[EMAIL PROTECTED]> wrote:

> Thanks for the responses. Additional follow-up inline.
>
>
> On Fri, Aug 2, 2013 at 2:21 PM, Jay Kreps <[EMAIL PROTECTED]> wrote:
>
> > Great comments, answers inline!
> >
> > On Fri, Aug 2, 2013 at 12:28 PM, Chris Hogue <[EMAIL PROTECTED]> wrote:
> >
> > > These sounds like great steps. A couple of votes and questions:
> > >
> > > 1.  Moving serialization out and basing it all off of byte[] for key
> and
> > > payload makes sense. Echoing a response below, we've ended up doing
> that
> > in
> > > some cases anyway, and the others do a trivial transform to bytes with
> an
> > > Encoder.
> > >
> >
> > Cool.
> >
> >
> > > 2. On the single producer thread, we're actually suffering a bit from
> > this
> > > in 0.8, but it's mostly because compression and the blocking send
> happen
> > on
> > > this thread. In 0.7 since there was a thread-per-broker, a nice
> > side-effect
> > > was that compression and the blocking could "go wide", at least to the
> > > number of brokers. If compression is moved out and the sends are now
> > > non-blocking then this sounds like a nice improvement.
> > >
> >
> > I think even in 0.7 there was only one thread, right?
> >
> >
> I believe it was actually 1 per broker. Producer.scala iterates the brokers
> and adds a new producer for each. The ProducerPool.addProducer() method
> adds a new AsyncProducer instance for the broker (assuming async mode), and
> each AsyncProducer creates and starts its own ProducerSendThread.
>
> In either case, going to multiplexed I/O and not having the compression on
> this thread probably solves any issue there.
>
>
>
> >
> > > 3. The wiki talks about static partition assignment for consumers. Just
> > > adding a vote for that as we're currently working through how to do
> that
> > > ourselves with the 0.8 consumer.
> > >
> >
> > Cool, yeah currently you must use the simple consumer to get that which
> is
> > a pain.
> >
> >
> > > 4. I'm curious how compression would interact with the new ByteBuffer
> > > buffering you've described. If I'm following correctly you've said that
> > > rather than queueing objects you'd end up doing in-place writes to the
> > > pre-allocated ByteBuffer. Presumably this means the compression has