Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka, mail # user - KafkaStream bug?

Copy link to this message
Re: KafkaStream bug?
Joel Koshy 2013-10-15, 20:53
This is probably because KafkaStream is a scala iterable - toString on
an iterable. Per the scala-doc: "returns a string representation of
this collection. By default this string consists of the stringPrefix
of this immutable iterable collection, followed by all elements
separated by commas and enclosed in parentheses."

On Mon, Oct 14, 2013 at 12:23 PM, Bruno D. Rodrigues
> Yes it's kind of blocking. It basically tries to consume every message and create a representation of it, kind of like a List.toString(). Why would that make any sense, I have no idea, just mentioning that I did do the same mistake trying to get something out of the toString() for the connection, and getting scared when I stepped inside the code mostly because I have not much knowledge about scala. And yes it did confuse me to see Eclipse reporting the toString() as a non-overriden version from Object, but then stepping in via debugger it does enter the kafka code and shows non-java code that tries that "consume them all". I may be completely wrong though.
> A 14/10/2013, às 20:05, "[EMAIL PROTECTED]" <[EMAIL PROTECTED]> escreveu:
>> I found some weird behavior,
>> I follow the exact code example for HighlevelConsumer
>> https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example#
>> but add one debug line here
>> "
>>    public void run() {
>>        ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
>>        while (it.hasNext()){
>> * ////////////////////////////////////////////my line
>> here//////////////////////////////////////////////////*
>> * System.out.println("from the stream" + m_stream); \\This line will be
>> blocked. KafkaStream.toString() is a blocking method?????*
>> * /////////////////////////////////////////// end of my line
>> ///////////////////////////////////////////////////////*
>> *
>> *
>>            System.out.println("Thread " + m_threadNumber + ": " + new
>> String(it.next().message()));
>> }
>>        System.out.println("Shutting down Thread: " + m_threadNumber);
>>    }
>> "