Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # user >> Kafka consumer not consuming events


Copy link to this message
-
Re: Kafka consumer not consuming events
Hi Jun,

Thanks for helping out so far.

As per your explanation we are doing exactly as you have mentioned in your workaround below.
> A workaround is to use different consumer connectors, each consuming a
> single topic.
Here is the problem...

We have a topic which gets a lot of events (around a million in a day), so this topic on the server has a high number of partitions, and we have dedicated consumers only listening to this topic and the processing time is in the order of 15-30 millis. So we are assured that our consumers are not slow in processing.

Every now then, it so happens, that our consumers threads stalls and do not receive any events (as suggested in my previous email with the thread stack on idle threads) even though we can see the offset lag increasing for the consumers.

We also noticed that if we force rebalance the consumers (either by starting a new consumer or killing an existing one) data starts to flow in again to these consumer threads. The consumers remains stable (processing events) for about 20-30 mins before the threads go idle again and the backlog starts growing. This happens in a cycle for us and we are not able to figure out the cause for events not flowing in.

As a side note, we are also monitoring the GC cycles and there are hardly any.

Please let us know if you need any additional details.

Thanks
Nihit.
On 10-Jul-2013, at 8:30 PM, Jun Rao <[EMAIL PROTECTED]> wrote:

> Ok. One of the issues is that when you have a consumer that consumes
> multiple topics, if one of the consumer threads is slow in consuming
> messages from one topic, it can block the consumption of other consumer
> threads. This is because we use a shared fetcher to fetch all topics. There
> is an in-memory queue per topic. If one of the queues is full, the fetcher
> will block and can't put the data into other queues.
>
> A workaround is to use different consumer connectors, each consuming a
> single topic.
>
> Thanks,
>
> Jun
>
>
> On Tue, Jul 9, 2013 at 11:12 PM, Nihit Purwar <[EMAIL PROTECTED]> wrote:
>
>> Hi Jun,
>>
>> Please see my comments inline again :)
>>
>> On 10-Jul-2013, at 9:13 AM, Jun Rao <[EMAIL PROTECTED]> wrote:
>>
>>> This indicates our in-memory queue is empty. So the consumer thread is
>>> blocked.
>>
>> What should we do about this.
>> As I mentioned in the previous mail, events are there to be consumed.
>> Killing one consumer makes the other consumer consume events again.
>>
>>
>>> What about the Kafka fetcher threads? Are they blocked on anything?
>>
>> One of the fetcher threads is blocked on putting to a queue, the other is
>> sleeping.
>> Please look below:
>>
>> "FetchRunnable-1" prio=10 tid=0x00007fcbc902b800 nid=0x2064 waiting on
>> condition [0x00007fcb833eb000]
>>   java.lang.Thread.State: WAITING (parking)
>>        at sun.misc.Unsafe.park(Native Method)
>>        - parking to wait for  <0x00000006809e8000> (a
>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>        at
>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
>>        at
>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>>        at
>> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
>>        at
>> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:61)
>>        at
>> kafka.consumer.FetcherRunnable$$anonfun$run$5.apply(FetcherRunnable.scala:79)
>>        at
>> kafka.consumer.FetcherRunnable$$anonfun$run$5.apply(FetcherRunnable.scala:65)
>>        at
>> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
>>        at scala.collection.immutable.List.foreach(List.scala:45)
>>        at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65)
>>
>> "FetchRunnable-0" prio=10 tid=0x00007fcbc833b800 nid=0x2063 waiting on
>> condition [0x00007fcb836ee000]
>>   java.lang.Thread.State: TIMED_WAITING (sleeping)
>>        at java.lang.Thread.sleep(Native Method)