Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # user >> Why does the high level consumer block, or rather where does it?


Copy link to this message
-
Re: Why does the high level consumer block, or rather where does it?
Yes, ConsumerIterator blocks when there is no new message. This is done by
calling take() on a blocking queue.

Thanks,

Jun
On Sun, Jan 5, 2014 at 5:53 PM, S Ahmed <[EMAIL PROTECTED]> wrote:

> I'm trying to trace through the codebase and figure out where exactly the
> block occurs in the high level consumer?
>
> public void run() {
>      ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
>      while (it.hasNext())
>          System.out.println("Thread " + m_threadNumber + ": " + new
> String(it.next().message()));
>      System.out.println("Shutting down Thread: " + m_threadNumber);
>  }
> Reference:
> https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
>
> So from what I understand, the while.it.hasNext() will block if there are
> no new messages for this particular topic/partion correct?
>
> Just to understand, can someone clarify where in the kafka source this
> block occurs, i.e. the broker that this consumer is connected to will keep
> a socket connection open to this consumer and block until a new message
> that is owned by this consumer thread arrives and then pushes it to the
> consumer to process.
>
> Is it at the iterator level somewhere?
>
> https://github.com/kafka-dev/kafka/blob/master/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
>