I am trying to use the code supplied in hadoop-consumer package. I am running into following issues:
1) This code is using SimpleConsumer which is actually contacting Kafka Broker without Zookeeper. Because of which messages are not getting cleared from broker. And i am getting duplicate messages in each run.
2) The retention policy specified as log.retention.hours in server.properties is not working. Not sure if its due to SimpleConsumer.
Is it expected behaviour. Is there any code using high level consumer for same work?
I think you may be misunderstanding the way Kafka works.
A kafka broker is never supposed to clear messages just because a consumer read them.
The kafka broker will instead clear messages after their retention period ends, though it will not delete the messages at the exact time when they expire. Instead, a background process will periodically delete a batch of expired messages. The retention policies guarantee a minimum retention time, not an exact retention time.
It is the responsibility of each consumer to keep track of which messages they have consumed already (by recording an offset for each consumed partition). The high-level consumer stores these offsets in ZK. The simple consumer has no built-in capability to store and manage offsets, so it is the developer's responsibility to do so. In the case of the hadoop consumer in the contrib package, these offsets are stored in offset files within HDFS.
I wrote a blog post a while ago that explains how to use the offset files generated by the contrib consumer to do incremental consumption (so that you don't get duplicated messages by re-consuming everything in subsequent runs).
Thanks Felix for sharing your work. Contrib hadoop-consumer looks like the same way.
I think i need to really understand this offset stuff. So far i have used only high level consumer.When consumer is done reading all the messages, i used to kill the process(because it won't on its own).
Again i used Producer to pump more messages and Consumer to read the new messages(which is a new process as i killed the last consumer).
But i never saw messages getting duplicating.
Now its not very clear for me that how offsets is tracked specifically when i am re-launching the consumer? And why retention policy is not working when used with SimpleConsumer? For my experiment i made it 4 hours.
Please help me understand.
Thanks, Navneet On Tue, Jan 15, 2013 at 4:12 AM, Felix GV <[EMAIL PROTECTED]> wrote:
It may look a little long, but it's as short as it can be. Kafka differs from other messaging system in a couple of ways, and it's important to understand the fundamental design choices that were made in order to understand the way Kafka works.
I believe my previous email already answers both your offset tracking and retention questions, but if my explanation are not clear enough, then the next best thing is probably to read the design paper :)
Felix On Tue, Jan 15, 2013 at 12:01 PM, navneet sharma < [EMAIL PROTECTED]> wrote:
One question still remains. Why SimpleConsumer? Why not high level Consumer? If i change the code to high level consumer, will it create any challenges? Navneet On Tue, Jan 15, 2013 at 11:46 PM, Felix GV <[EMAIL PROTECTED]> wrote:
I think the main reason for using SimpleConsumer is to manage offsets explicitly. For example, this is useful when Hadoop retries failed tasks. Another reason is that Hadoop already does load balancing. So, there is not much need to balance the load again using the high level consumer.
On Wed, Jan 16, 2013 at 4:40 PM, navneet sharma <[EMAIL PROTECTED]