Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Plain View
Kafka, mail # user - question about usage of SimpleConsumer


+
shangan chen 2013-05-31, 06:32
+
Neha Narkhede 2013-05-31, 15:49
+
shangan chen 2013-06-01, 02:44
Copy link to this message
-
Re: question about usage of SimpleConsumer
Neha Narkhede 2013-06-01, 04:02
I mean, simple consumer
won't get the offset where it stopped, but the offset of brokers latest
offset.

Correct. Also each consumer in Kafka reads data independently from the
broker.

Thanks,
Neha
On May 31, 2013 7:44 PM, "shangan chen" <[EMAIL PROTECTED]> wrote:

> You mean kafka broker will maintain earliest/latest offset for each
> partition of each topic ? Do these offsets information have anything to do
> with consumers or just broker's own information? I mean, simple consumer
> won't get the offset where it stopped, but the offset of brokers latest
> offset.
>
> Another thing, If I have multiple simple consumers (in storm there are
> multiple spouts)consume messages from specific partition of specific topic
> of a single broker, will they share the same stream or consume the messages
> independently. In testing like last example, I found each consumer
> established a stream  other than all consumers share the same stream. It
> seems to verify that latest offset is brokers's queue info other than
> consumer's state.
>
>
>
>
> On Fri, May 31, 2013 at 11:49 PM, Neha Narkhede <[EMAIL PROTECTED]
> >wrote:
>
> > getOffsetsBefore sends an RPC call to the Kafka brokers to find out the
> > earliest/latest offset for that topic, partition. In your example, it
> will
> > get you the latest offset at the time of the request.
> >
> > Thanks,
> > Neha
> >
> >
> > On Thu, May 30, 2013 at 11:31 PM, shangan chen <[EMAIL PROTECTED]
> > >wrote:
> >
> > > In Kafka, the consumers are responsible for maintaining state
> information
> > > (offset) on what has been consumed (refer from kafka design
> > > page).high-level consumer api will store its consumption state in
> > > zookeeper, while simple consumer shoud deal with these things itself.
> > > My doubt is  what happened when I call getOffsetsBefore(topic,
> > > partition,OffsetRequest.LatestTime(), maxNumOffsets) ? Where did it
> fetch
> > > offset as I didn't store the offset, it seems that kafka maintain the
> > > offset, anybody can give some explanation.
> > >
> > >
> > > public void open(Map conf, TopologyContext context,
> > > SpoutOutputCollector collector) {
> > > _collector = collector;
> > > _consumer = new SimpleConsumer(host, port, soTimeout, buffersize);
> > > long[] offsets = _consumer.getOffsetsBefore(topic, partition,
> > > OffsetRequest.LatestTime(), maxNumOffsets);
> > > offset = offsets[0];
> > > new StringScheme();
> > > }
> > >
> > > @Override
> > > public void nextTuple() {
> > > FetchRequest fetch = new FetchRequest(topic, partition, offset,
> maxSize);
> > > ByteBufferMessageSet msgSet = _consumer.fetch(fetch);
> > > for (MessageAndOffset msgAndOffset : msgSet) {
> > > String msg = getMessage(msgAndOffset.message());
> > > // log spout process time
> > > Debug.log(this.getClass().getSimpleName(), msg);
> > > Debug.incr(topic + "_" + this.getClass().getSimpleName(), 1);
> > > _collector
> > > .emit(new Values(msg), new KafkaMessageId(msg, offset, 1));
> > > offset = msgAndOffset.offset();
> > > }
> > > }
> > >
> > >
> > > --
> > > have a good day!
> > > chenshang'an
> > >
> >
>
>
>
> --
> have a good day!
> chenshang'an
>