Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # user >> Consume from X messages ago


Copy link to this message
-
Re: Consume from X messages ago
Thanks for the help.

FWIW, I ended up writing a simple Util that I can use as my consumer is
starting up to move the offset back.  It *seems* to work decently.
Thoughts?  Would this be something that would be helpful for contribution
back to Kafka, or is the idea just poor?

 /**
   * Attempts to move the consumer offset back.  If it has any issues,
throws an exception.
   */
  def moveConsumerOffsetBack(@Nonnull groupId: String, @Nonnull topic:
String,  partition: Long, approximateMessagesBack: Long) {
    Preconditions.checkNotNull(groupId)
    Preconditions.checkNotNull(topic)
    Preconditions.checkArgument(partition >= 0)
    Preconditions.checkArgument(approximateMessagesBack > 0)

    val path = ZkUtils.ConsumersPath + "/" + groupId + "/offsets/" + topic
+ "/" + partition

    if (zkClient.exists(path)) {
        val currentOffset = zkClient.readData[Any](path)  // We get this
from ZK as Any because the exact type is unpredictable. It might be Long or
String.
        val desiredOffset = math.max(0, (currentOffset.toString.toLong -
approximateMessagesBack))
        zkClient.writeData(path, desiredOffset.toString)
        warn("Reset the " + topic + " consumer to " + desiredOffset)
    }
    else {
      throw new RuntimeException("Unable to find the move the consumer back
in ZK.  This may or may not be an issue, depending on whether you expect
the path to exist. Path: " + path)
    }
  }
On Tue, Mar 19, 2013 at 2:05 PM, Neha Narkhede <[EMAIL PROTECTED]>wrote:

> I guess I missed a step between 4 and 5 -
>
> 4. Replace the exported offsets with these offsets
> *Use ImportZkOffsets to import the offsets from the modified export file.*
> 5. Restart the consumer.
>
> Thanks,
> Neha
>
>
> On Tue, Mar 19, 2013 at 11:00 AM, S Ahmed <[EMAIL PROTECTED]> wrote:
>
> > I thought since the offsets in .8 are numeric and not byte offsets like
> in
> > 0.7x, you can simply just take say the current offset - 10000.
> >
> >
> > On Tue, Mar 19, 2013 at 12:16 PM, Neha Narkhede <[EMAIL PROTECTED]
> > >wrote:
> >
> > > Jim,
> > >
> > > You can leverage the ExportZkOffsets/ImportZkOffsets tools to do this.
> > > ExportZkOffsets exports the consumer offsets for your group to a file
> in
> > a
> > > certain format. You can then place the desired offset per partition you
> > > want to reset your consumer to in the exported file.
> > >
> > > 1. Shutdown the consumer
> > > 2. Export current offsets
> > > 3. Get the desired offset (current offset - 10K). As David mentions,
> this
> > > is approximate and might get you more than 10K messages if the data is
> > > compressed.
> > > 4. Replace the exported offsets with these offsets
> > > 5. Restart the consumer.
> > >
> > > HTH,
> > > Neha
> > >
> > >
> > > On Tue, Mar 19, 2013 at 8:49 AM, David Arthur <[EMAIL PROTECTED]>
> wrote:
> > >
> > > > This API is exposed through the SimpleConsumer scala class. See
> > > > https://github.com/apache/**kafka/blob/trunk/core/src/**
> > > > main/scala/kafka/consumer/**SimpleConsumer.scala#L60<
> > >
> >
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/consumer/SimpleConsumer.scala#L60
> > > >
> > > >
> > > > You will need to set earliestOrLatest to -1 for the latest offset.
> > > >
> > > > There is also a command line tool https://github.com/apache/**
> > > >
> > >
> >
> kafka/blob/trunk/core/src/**main/scala/kafka/tools/**GetOffsetShell.scala<
> > >
> >
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/GetOffsetShell.scala
> > > >
> > > >
> > > > -David
> > > >
> > > >
> > > > On 3/19/13 11:25 AM, James Englert wrote:
> > > >
> > > >> I'm still a bit lost.  Where is the offsets API?  I.e. which class?
> > > >>
> > > >>
> > > >> On Tue, Mar 19, 2013 at 11:16 AM, David Arthur <[EMAIL PROTECTED]>
> > > wrote:
> > > >>
> > > >>  Using the Offsets API, you can get the latest offset by setting
> time
> > to
> > > >>> -1. Then you subtract 10000
> > > >>>
> > > >>> There is no guarantee that 10k prior messages exist of course, so

Jim Englert
Gilt Groupe
2 Park Ave South, 5th Floor
New York, NY 10011
M: 847-707-2942
Please accept my invitation to join Gilt:
http://www.giltgroupe.com/invite/jenglert

 
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB