Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # user >> Kafka Consumer does not receive any message after a while


Copy link to this message
-
Kafka Consumer does not receive any message after a while
Hi,

I have a problem in fetching messages from Kafka. I am using  simple
consumer API in Java to fetch messages from kafka ( the same one which is
stated in Kafka introduction example).  The problem is that after a while
(could be 30min or couple of hours), the consumer does not receive any
messages from Kafka, while the data exist there (while the streaming of
data to Kafka still running, so Kafka has inputs).
I can see that data exist in Kafka by just running  the following command
and getting the list of messages exist in Kafka, Each message is around 80
bytes :

*bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test
--from-beginning*
Any idea what could be the source of problem?? I also notices that if i
stress the input to kafka (sending 1000 messages per scond) for an hour or
more , the same situation happens again. ?? It seems that something is
wrong with fetching (consumer) part, right?

best,
/Shahab

The kafka is run in one machine, no clusters, replications,....etc, very
basic configuration.

The consumer config file is ;

"zookeeper.connect", myserver:2181);
"group.id", group1);
"zookeeper.session.timeout.ms", "400");
"zookeeper.sync.time.ms", "200");
"auto.commit.interval.ms", "1000");
"fetch.message.max.bytes", "1048576");
"auto.offset.reset", "smallest";
and the server.config looks like this:

Boker.id=0
port=9092
num.network.threads=5
num.io.threads=2
socket.send.buffer.bytes=1048576

socket.receive.buffer.bytes=1048576

socket.request.max.bytes=104857600

log.dirs=/tmp/kafka-logs

num.partitions=2

############################# Log Flush Policy #############################
log.flush.interval.messages=1000

# The maximum amount of time a message can sit in a log before we force a
flush
log.flush.interval.ms=1000

############################ Log Retention Policy
#############################

# The minimum age of a log file to be eligible for deletion
log.retention.hours=1
log.retention.bytes=10485760

# The maximum size of a log segment file. When this size is reached a new
log segment will be created.
log.segment.bytes=536870912

# The interval at which log segments are checked to see if they can be
deleted according
# to the retention policies
log.cleanup.interval.mins=1
ookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=1000000

 
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB