Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # user >> high level consumer usage question with auto topic creation


Copy link to this message
-
Re: high level consumer usage question with auto topic creation
One thing is that you need to make sure the consumer starts consuming from
the beginning of the topic, otherwise by default, it will start from the
latest message in the topic, from the time it starts up.  Since the
consumer and producer are asynchronous, it's hard to assert that the
consumer is complete started up and ready to go, before starting the
consumer.

I had this same issue, when converting my tests from 0.7 to 0.8.  In 0.8,
the default for auto.offset.reset was changed to largest from smallest, and
so I saw the issue you are seeing.

So, you need to initialize the consumer with this property:

    cProps.put("auto.offset.reset", OffsetRequest.SmallestTimeString());
    ConsumerConfig consumerConfig = new ConsumerConfig(cProps);

Jason
On Tue, Jul 30, 2013 at 7:44 AM, Garrett Barton <[EMAIL PROTECTED]>wrote:

> I am trying to write a junit test for working with an embedded Kafka 0.8
> server where I send and receive messages just to verify my embedded server,
> producer and consumer wrappers work.  Order of operation in the junit looks
> something like:
>
> -Start zk. [own thread] (wait for init)
> -Start Kafka [own thread] (wait for init)
> -Start consumer [own thread] (I block on my threads run method releasing a
> lock once it starts). consumer is copied from:
> https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
> -Start producer (dont see anything to wait/block on, so I move along)
>
> Now that things are online, I send 5 messages out from the producer.  There
> exists no topic, so one is created auto-magically by kafka (btw, why is
> there no api to create topics in java?)  I then read from the consumer and
> compare how many I received against the sent ones.
>
> What I notice is that when I run my test over and over, I will get anywhere
> from 0-5 messages received.  The logs look like for some reason the client
> seems to re-establish its connection when messages start being sent and
> depending on how long that takes depends on how many of the messages I'll
> get to read.
>
> Is the high level consumer meant to be used this way? If a high level
> consumer is started first am I not guaranteed all messages produced from
> that point on? I have a single stream asked for from the topic so according
> to the doc that single stream should get all messages on the topic (which
> inits with 2 partitions).  Does anyone have any more insight or should I
> just migrate into the simple consumer example?
>
> ~Garrett
>