Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # user >> Consume from X messages ago


Copy link to this message
-
Re: Consume from X messages ago
I guess I missed a step between 4 and 5 -

4. Replace the exported offsets with these offsets
*Use ImportZkOffsets to import the offsets from the modified export file.*
5. Restart the consumer.

Thanks,
Neha
On Tue, Mar 19, 2013 at 11:00 AM, S Ahmed <[EMAIL PROTECTED]> wrote:

> I thought since the offsets in .8 are numeric and not byte offsets like in
> 0.7x, you can simply just take say the current offset - 10000.
>
>
> On Tue, Mar 19, 2013 at 12:16 PM, Neha Narkhede <[EMAIL PROTECTED]
> >wrote:
>
> > Jim,
> >
> > You can leverage the ExportZkOffsets/ImportZkOffsets tools to do this.
> > ExportZkOffsets exports the consumer offsets for your group to a file in
> a
> > certain format. You can then place the desired offset per partition you
> > want to reset your consumer to in the exported file.
> >
> > 1. Shutdown the consumer
> > 2. Export current offsets
> > 3. Get the desired offset (current offset - 10K). As David mentions, this
> > is approximate and might get you more than 10K messages if the data is
> > compressed.
> > 4. Replace the exported offsets with these offsets
> > 5. Restart the consumer.
> >
> > HTH,
> > Neha
> >
> >
> > On Tue, Mar 19, 2013 at 8:49 AM, David Arthur <[EMAIL PROTECTED]> wrote:
> >
> > > This API is exposed through the SimpleConsumer scala class. See
> > > https://github.com/apache/**kafka/blob/trunk/core/src/**
> > > main/scala/kafka/consumer/**SimpleConsumer.scala#L60<
> >
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/consumer/SimpleConsumer.scala#L60
> > >
> > >
> > > You will need to set earliestOrLatest to -1 for the latest offset.
> > >
> > > There is also a command line tool https://github.com/apache/**
> > >
> >
> kafka/blob/trunk/core/src/**main/scala/kafka/tools/**GetOffsetShell.scala<
> >
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/GetOffsetShell.scala
> > >
> > >
> > > -David
> > >
> > >
> > > On 3/19/13 11:25 AM, James Englert wrote:
> > >
> > >> I'm still a bit lost.  Where is the offsets API?  I.e. which class?
> > >>
> > >>
> > >> On Tue, Mar 19, 2013 at 11:16 AM, David Arthur <[EMAIL PROTECTED]>
> > wrote:
> > >>
> > >>  Using the Offsets API, you can get the latest offset by setting time
> to
> > >>> -1. Then you subtract 10000
> > >>>
> > >>> There is no guarantee that 10k prior messages exist of course, so
> you'd
> > >>> need to handle that case.
> > >>>
> > >>> -David
> > >>>
> > >>>
> > >>> On 3/19/13 11:04 AM, James Englert wrote:
> > >>>
> > >>>  Hi,
> > >>>>
> > >>>> I'm using Kafka 0.8.  I would like to setup a consumer to fetch the
> > last
> > >>>> 10,000 messages and then start consuming messages.
> > >>>>
> > >>>> I see the configuration autooffset.reset, but that isn't quite what
> I
> > >>>> want.  I want only the last 10,000 messages.
> > >>>>
> > >>>> Is there a good way to achieve this in 0.8, besides just hacking the
> > >>>> data
> > >>>> in ZK?
> > >>>>
> > >>>> Thanks,
> > >>>> Jim
> > >>>>
> > >>>>
> > >>>>
> > >>
> > >
> >
>