Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # user >> Kafka consumer not consuming events


Copy link to this message
-
Re: Kafka consumer not consuming events
Hi Jun,

Please see my comments inline again :)

On 10-Jul-2013, at 9:13 AM, Jun Rao <[EMAIL PROTECTED]> wrote:

> This indicates our in-memory queue is empty. So the consumer thread is
> blocked.

What should we do about this.
As I mentioned in the previous mail, events are there to be consumed.
Killing one consumer makes the other consumer consume events again.
> What about the Kafka fetcher threads? Are they blocked on anything?

One of the fetcher threads is blocked on putting to a queue, the other is sleeping.
Please look below:

"FetchRunnable-1" prio=10 tid=0x00007fcbc902b800 nid=0x2064 waiting on condition [0x00007fcb833eb000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000006809e8000> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
        at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
        at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:61)
        at kafka.consumer.FetcherRunnable$$anonfun$run$5.apply(FetcherRunnable.scala:79)
        at kafka.consumer.FetcherRunnable$$anonfun$run$5.apply(FetcherRunnable.scala:65)
        at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
        at scala.collection.immutable.List.foreach(List.scala:45)
        at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65)

"FetchRunnable-0" prio=10 tid=0x00007fcbc833b800 nid=0x2063 waiting on condition [0x00007fcb836ee000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
        at java.lang.Thread.sleep(Native Method)
        at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:99)

>
> Thanks,
>
> Jun
>
>
> On Tue, Jul 9, 2013 at 8:37 AM, Nihit Purwar <[EMAIL PROTECTED]> wrote:
>
>> Hello Jun,
>>
>> Please see my comments inline.
>>
>> On 09-Jul-2013, at 8:32 PM, Jun Rao <[EMAIL PROTECTED]> wrote:
>>
>>> I assume that each consumer instance consumes all 15 topics.
>> No, we kept dedicated consumer listening to the topic in question.
>> We did this because this queue processes huge amounts of data.
>>
>>
>>> Are all your
>>> consumer threads alive? If one of your thread dies, it will eventually
>>> block the consumption in other threads.
>>
>> Yes. We can see all the threads in the thread dump.
>> We have ensured that the threads do not die due to an Exception.
>>
>> Please look at the stack trace below. We see all the threads waiting like
>> this:
>>
>> "event_queue@150" prio=10 tid=0x00007eff28e41800 nid=0x31f9 waiting on
>> condition [0x00007efedae6d000]
>>   java.lang.Thread.State: WAITING (parking)
>>        at sun.misc.Unsafe.park(Native Method)
>>        - parking to wait for  <0x0000000640248618> (a
>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>        at
>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
>>        at
>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>>        at
>> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
>>        at
>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:60)
>>        at
>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:32)
>>        at
>> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
>>        at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
>>        at
>> com.spr.messageprocessor.KafkaStreamRunnable.run(KafkaStreamRunnable.java:49)
>>        at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
>>        at
>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>>        at java.util.concurrent.FutureTask.run(FutureTask.java:138)
 
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB