Hey guys! We recently deployed our kafka data pipeline application over the weekend and it is working out quite well once we ironed out all the issues. There is one behavior that we've noticed that is mildly troubling, though not a deal breaker. We're using a single topic with many partitions (1200 total) to load balance our 300 consumers, but what seems to happen is that some partitions end up more backed up than others. This is probably due more to the specifics of the application since some messages take much longer than others to process.
I'm thinking that the random partitioning in the producer is unsuited to our specific needs. One option I was considering was to write an alternate partitioner that looks at the consumer offsets from zookeeper (as in the ConsumerOffsetChecker) and probabilistically weights the partitions by their lag. Does this sound like a good idea to anyone else? Is there a better or preferably already built solution? If anyone has any ideas or feedback I'd sincerely appreciate it.
Thanks so much in advance.
P.S. thanks especially to everyone who's answered my dumb questions on this mailing list over the past few months, we couldn't have done it without you!
Making producer side partitioning depend on consumer behavior might not be such a good idea. If consumption is a bottleneck, changing producer side partitioning may not help. To relieve consumption bottleneck, you may need to increase the number of partitions for those topics and increase the number of consumer instances.
You mentioned that the consumers take longer to process certain kinds of messages. What you can do is place the messages that require slower processing in separate topics, so that you can scale the number of partitions and number of consumer instances, for those messages independently.
Thanks, Neha On Sat, Aug 24, 2013 at 9:57 AM, Ian Friedman <[EMAIL PROTECTED]> wrote:
When I said "some messages take longer than others" that may have been misleading. What I meant there is that the performance of the entire application is inconsistent, mostly due to pressure from other applications (mapreduce) on our HBase and MySQL backends. On top of that, some messages just contain more data. Now I suppose what you're suggesting is that I segment my messages by the average or expected time it takes the payloads to process, but I suspect what will happen if I do that is I will have several consumers doing nothing most of the time, and the rest of them backlogged inconsistently the same way they are now. The problem isn't so much the size of the payloads but the fact that we're seeing some messages, which i suspect are in partitions with lots of longer running processing tasks, sit around for hours without getting consumed. That's what I'm trying to solve.
Is there any way to "add more consumers" without actually adding more consumer JVM processes? We've hit something of a saturation point for our MySQL database. Is this maybe where having multiple consumer threads would help? If so, given that I have a singular shared processing queue in each consumer, how would I leverage that to solve this problem?
Ian Friedman On Sunday, August 25, 2013 at 12:13 PM, Mark wrote:
Sorry I reread what I've written so far and found that it doesn't state the actual problem very well. Let me clarify once again:
The problem we're trying to solve is that we can't let messages go for unbounded amounts of time without getting processed, and it seems that something about what we're doing (which I suspect is the fact that consumers own several partitions but only consume from one of them at a time until it's caught up) is causing a small number of them to sit around for hours and hours. This is despite some consumers idling due to being fully caught up on the partitions they own. We've found that requeueing the oldest messages (consumers ignore messages that have already been processed) is fairly effective in getting them to go away, but I'm looking for a more stable solution.
Ian Friedman On Sunday, August 25, 2013 at 1:15 PM, Ian Friedman wrote:
I'm still a little confused by your description of the problem. It might be easier to understand if you listed out the exact things you have measured, what you saw, and what you expected to see.
Since you mentioned the consumer I can give a little info on how that works. The consumer consumes from all the partitions it owns simultaneously. The behavior is that we interleve fetched data chunks of messages from each partition the consumer is processing. The chunk size is controlled by the fetch size set in the consumer. So the behavior you would expect is that you would get a bunch of messages from one partition followed by a bunch from another partition. The reason for doing this instead of, say, interleving individual messages is that it is a big performance boost--making every message an entry in a blocking queue gives a 5x performance hit in high-throughput cases. Perhaps this interleaving is the problem?
-Jay On Sun, Aug 25, 2013 at 10:22 AM, Ian Friedman <[EMAIL PROTECTED]> wrote:
Just to make sure i have this right, on the producer side we'd set max.message.size and then on the consumer side we'd set fetch.size? I admittedly didn't research how all the tuning options would affect us, thank you for the info. Would queuedchunks.max have any effect?
Ian Friedman On Monday, August 26, 2013 at 1:26 PM, Jay Kreps wrote:
On Sunday, August 25, 2013 at 3:11 PM, Jay Kreps wrote:
The problem is that some consumers are slower than others, due to a lot of factors such as resource contention on the box itself, on our HBase cluster, and the actual processing it's doing itself. We are sending very small messages that are actually HDFS paths, which then get opened on the consumers and read. Each of these files takes between 1-15 minutes to process, and sometimes can take up to 30 minutes when the load on our hbase cluster is very high from certain MR jobs. We were hoping to get some experience with Kafka and flush out any issues with our use of the project before implementing a solution that actually queued all the data in those HDFS files to Kafka itself, and this seemed like a good intermediate step.
Lowering queuedchunks.max shouldn't help if the problem is what I described. That options controls how many chunks the consumer has ready in memory for processing. But we are hypothesisizing that your problem is actually that the individual chunks are just too large leading to the consumer spending a long time processing from one partition before it gets the next chunk.
-Jay On Mon, Aug 26, 2013 at 11:18 AM, Ian Friedman <[EMAIL PROTECTED]> wrote: