Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka, mail # user - Consume from X messages ago


Copy link to this message
-
Re: Consume from X messages ago
James Englert 2013-03-22, 15:13
Thanks for the help.

FWIW, I ended up writing a simple Util that I can use as my consumer is
starting up to move the offset back.  It *seems* to work decently.
Thoughts?  Would this be something that would be helpful for contribution
back to Kafka, or is the idea just poor?

 /**
   * Attempts to move the consumer offset back.  If it has any issues,
throws an exception.
   */
  def moveConsumerOffsetBack(@Nonnull groupId: String, @Nonnull topic:
String,  partition: Long, approximateMessagesBack: Long) {
    Preconditions.checkNotNull(groupId)
    Preconditions.checkNotNull(topic)
    Preconditions.checkArgument(partition >= 0)
    Preconditions.checkArgument(approximateMessagesBack > 0)

    val path = ZkUtils.ConsumersPath + "/" + groupId + "/offsets/" + topic
+ "/" + partition

    if (zkClient.exists(path)) {
        val currentOffset = zkClient.readData[Any](path)  // We get this
from ZK as Any because the exact type is unpredictable. It might be Long or
String.
        val desiredOffset = math.max(0, (currentOffset.toString.toLong -
approximateMessagesBack))
        zkClient.writeData(path, desiredOffset.toString)
        warn("Reset the " + topic + " consumer to " + desiredOffset)
    }
    else {
      throw new RuntimeException("Unable to find the move the consumer back
in ZK.  This may or may not be an issue, depending on whether you expect
the path to exist. Path: " + path)
    }
  }
On Tue, Mar 19, 2013 at 2:05 PM, Neha Narkhede <[EMAIL PROTECTED]>wrote:

> I guess I missed a step between 4 and 5 -
>
> 4. Replace the exported offsets with these offsets
> *Use ImportZkOffsets to import the offsets from the modified export file.*
> 5. Restart the consumer.
>
> Thanks,
> Neha
>
>
> On Tue, Mar 19, 2013 at 11:00 AM, S Ahmed <[EMAIL PROTECTED]> wrote:
>
> > I thought since the offsets in .8 are numeric and not byte offsets like
> in
> > 0.7x, you can simply just take say the current offset - 10000.
> >
> >
> > On Tue, Mar 19, 2013 at 12:16 PM, Neha Narkhede <[EMAIL PROTECTED]
> > >wrote:
> >
> > > Jim,
> > >
> > > You can leverage the ExportZkOffsets/ImportZkOffsets tools to do this.
> > > ExportZkOffsets exports the consumer offsets for your group to a file
> in
> > a
> > > certain format. You can then place the desired offset per partition you
> > > want to reset your consumer to in the exported file.
> > >
> > > 1. Shutdown the consumer
> > > 2. Export current offsets
> > > 3. Get the desired offset (current offset - 10K). As David mentions,
> this
> > > is approximate and might get you more than 10K messages if the data is
> > > compressed.
> > > 4. Replace the exported offsets with these offsets
> > > 5. Restart the consumer.
> > >
> > > HTH,
> > > Neha
> > >
> > >
> > > On Tue, Mar 19, 2013 at 8:49 AM, David Arthur <[EMAIL PROTECTED]>
> wrote:
> > >
> > > > This API is exposed through the SimpleConsumer scala class. See
> > > > https://github.com/apache/**kafka/blob/trunk/core/src/**
> > > > main/scala/kafka/consumer/**SimpleConsumer.scala#L60<
> > >
> >
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/consumer/SimpleConsumer.scala#L60
> > > >
> > > >
> > > > You will need to set earliestOrLatest to -1 for the latest offset.
> > > >
> > > > There is also a command line tool https://github.com/apache/**
> > > >
> > >
> >
> kafka/blob/trunk/core/src/**main/scala/kafka/tools/**GetOffsetShell.scala<
> > >
> >
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/GetOffsetShell.scala
> > > >
> > > >
> > > > -David
> > > >
> > > >
> > > > On 3/19/13 11:25 AM, James Englert wrote:
> > > >
> > > >> I'm still a bit lost.  Where is the offsets API?  I.e. which class?
> > > >>
> > > >>
> > > >> On Tue, Mar 19, 2013 at 11:16 AM, David Arthur <[EMAIL PROTECTED]>
> > > wrote:
> > > >>
> > > >>  Using the Offsets API, you can get the latest offset by setting
> time
> > to
> > > >>> -1. Then you subtract 10000
> > > >>>
> > > >>> There is no guarantee that 10k prior messages exist of course, so

Jim Englert
Gilt Groupe
2 Park Ave South, 5th Floor
New York, NY 10011
M: 847-707-2942
Please accept my invitation to join Gilt:
http://www.giltgroupe.com/invite/jenglert