Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # user >> Kafka Consumer does not receive any message after a while


Copy link to this message
-
Re: Kafka Consumer does not receive any message after a while
Thanks a lot, very good hints. I am trying to see what happened in my case.

best,
/Shahab
On Wed, Dec 11, 2013 at 5:16 PM, Jun Rao <[EMAIL PROTECTED]> wrote:

> Have you looked at
>
> https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Myconsumerseemstohavestopped%2Cwhy%3F
> ?
>
> Thanks,
>
> Jun
>
>
> On Wed, Dec 11, 2013 at 3:59 AM, shahab <[EMAIL PROTECTED]> wrote:
>
> > Hi,
> >
> > I have a problem in fetching messages from Kafka. I am using  simple
> > consumer API in Java to fetch messages from kafka ( the same one which is
> > stated in Kafka introduction example).  The problem is that after a while
> > (could be 30min or couple of hours), the consumer does not receive any
> > messages from Kafka, while the data exist there (while the streaming of
> > data to Kafka still running, so Kafka has inputs).
> > I can see that data exist in Kafka by just running  the following command
> > and getting the list of messages exist in Kafka, Each message is around
> 80
> > bytes :
> >
> > *bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test
> > --from-beginning*
> >
> >
> > Any idea what could be the source of problem?? I also notices that if i
> > stress the input to kafka (sending 1000 messages per scond) for an hour
> or
> > more , the same situation happens again. ?? It seems that something is
> > wrong with fetching (consumer) part, right?
> >
> > best,
> > /Shahab
> >
> >
> >
> > The kafka is run in one machine, no clusters, replications,....etc, very
> > basic configuration.
> >
> > The consumer config file is ;
> >
> > "zookeeper.connect", myserver:2181);
> > "group.id", group1);
> > "zookeeper.session.timeout.ms", "400");
> > "zookeeper.sync.time.ms", "200");
> > "auto.commit.interval.ms", "1000");
> > "fetch.message.max.bytes", "1048576");
> > "auto.offset.reset", "smallest";
> >
> >
> >
> >
> > and the server.config looks like this:
> >
> > Boker.id=0
> > port=9092
> > num.network.threads=5
> > num.io.threads=2
> > socket.send.buffer.bytes=1048576
> >
> > socket.receive.buffer.bytes=1048576
> >
> > socket.request.max.bytes=104857600
> >
> > log.dirs=/tmp/kafka-logs
> >
> > num.partitions=2
> >
> > ############################# Log Flush Policy
> > #############################
> > log.flush.interval.messages=1000
> >
> > # The maximum amount of time a message can sit in a log before we force a
> > flush
> > log.flush.interval.ms=1000
> >
> > ############################ Log Retention Policy
> > #############################
> >
> > # The minimum age of a log file to be eligible for deletion
> > log.retention.hours=1
> >
> >
> > log.retention.bytes=10485760
> >
> > # The maximum size of a log segment file. When this size is reached a new
> > log segment will be created.
> > log.segment.bytes=536870912
> >
> > # The interval at which log segments are checked to see if they can be
> > deleted according
> > # to the retention policies
> > log.cleanup.interval.mins=1
> > ookeeper.connect=localhost:2181
> >
> > # Timeout in ms for connecting to zookeeper
> > zookeeper.connection.timeout.ms=1000000
> >
>