Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Plain View
Kafka, mail # user - question about usage of SimpleConsumer


+
shangan chen 2013-05-31, 06:32
Copy link to this message
-
Re: question about usage of SimpleConsumer
Neha Narkhede 2013-05-31, 15:49
getOffsetsBefore sends an RPC call to the Kafka brokers to find out the
earliest/latest offset for that topic, partition. In your example, it will
get you the latest offset at the time of the request.

Thanks,
Neha
On Thu, May 30, 2013 at 11:31 PM, shangan chen <[EMAIL PROTECTED]>wrote:

> In Kafka, the consumers are responsible for maintaining state information
> (offset) on what has been consumed (refer from kafka design
> page).high-level consumer api will store its consumption state in
> zookeeper, while simple consumer shoud deal with these things itself.
> My doubt is  what happened when I call getOffsetsBefore(topic,
> partition,OffsetRequest.LatestTime(), maxNumOffsets) ? Where did it fetch
> offset as I didn't store the offset, it seems that kafka maintain the
> offset, anybody can give some explanation.
>
>
> public void open(Map conf, TopologyContext context,
> SpoutOutputCollector collector) {
> _collector = collector;
> _consumer = new SimpleConsumer(host, port, soTimeout, buffersize);
> long[] offsets = _consumer.getOffsetsBefore(topic, partition,
> OffsetRequest.LatestTime(), maxNumOffsets);
> offset = offsets[0];
> new StringScheme();
> }
>
> @Override
> public void nextTuple() {
> FetchRequest fetch = new FetchRequest(topic, partition, offset, maxSize);
> ByteBufferMessageSet msgSet = _consumer.fetch(fetch);
> for (MessageAndOffset msgAndOffset : msgSet) {
> String msg = getMessage(msgAndOffset.message());
> // log spout process time
> Debug.log(this.getClass().getSimpleName(), msg);
> Debug.incr(topic + "_" + this.getClass().getSimpleName(), 1);
> _collector
> .emit(new Values(msg), new KafkaMessageId(msg, offset, 1));
> offset = msgAndOffset.offset();
> }
> }
>
>
> --
> have a good day!
> chenshang'an
>

 
+
shangan chen 2013-06-01, 02:44
+
Neha Narkhede 2013-06-01, 04:02