Thanks for the help.
FWIW, I ended up writing a simple Util that I can use as my consumer is
starting up to move the offset back. It *seems* to work decently.
Thoughts? Would this be something that would be helpful for contribution
back to Kafka, or is the idea just poor?
/**
* Attempts to move the consumer offset back. If it has any issues,
throws an exception.
*/
def moveConsumerOffsetBack(@Nonnull groupId: String, @Nonnull topic:
String, partition: Long, approximateMessagesBack: Long) {
Preconditions.checkNotNull(groupId)
Preconditions.checkNotNull(topic)
Preconditions.checkArgument(partition >= 0)
Preconditions.checkArgument(approximateMessagesBack > 0)
val path = ZkUtils.ConsumersPath + "/" + groupId + "/offsets/" + topic
+ "/" + partition
if (zkClient.exists(path)) {
val currentOffset = zkClient.readData[Any](path) // We get this
from ZK as Any because the exact type is unpredictable. It might be Long or
String.
val desiredOffset = math.max(0, (currentOffset.toString.toLong -
approximateMessagesBack))
zkClient.writeData(path, desiredOffset.toString)
warn("Reset the " + topic + " consumer to " + desiredOffset)
}
else {
throw new RuntimeException("Unable to find the move the consumer back
in ZK. This may or may not be an issue, depending on whether you expect
the path to exist. Path: " + path)
}
}
On Tue, Mar 19, 2013 at 2:05 PM, Neha Narkhede <[EMAIL PROTECTED]>wrote:
> I guess I missed a step between 4 and 5 -
>
> 4. Replace the exported offsets with these offsets
> *Use ImportZkOffsets to import the offsets from the modified export file.*
> 5. Restart the consumer.
>
> Thanks,
> Neha
>
>
> On Tue, Mar 19, 2013 at 11:00 AM, S Ahmed <[EMAIL PROTECTED]> wrote:
>
> > I thought since the offsets in .8 are numeric and not byte offsets like
> in
> > 0.7x, you can simply just take say the current offset - 10000.
> >
> >
> > On Tue, Mar 19, 2013 at 12:16 PM, Neha Narkhede <[EMAIL PROTECTED]
> > >wrote:
> >
> > > Jim,
> > >
> > > You can leverage the ExportZkOffsets/ImportZkOffsets tools to do this.
> > > ExportZkOffsets exports the consumer offsets for your group to a file
> in
> > a
> > > certain format. You can then place the desired offset per partition you
> > > want to reset your consumer to in the exported file.
> > >
> > > 1. Shutdown the consumer
> > > 2. Export current offsets
> > > 3. Get the desired offset (current offset - 10K). As David mentions,
> this
> > > is approximate and might get you more than 10K messages if the data is
> > > compressed.
> > > 4. Replace the exported offsets with these offsets
> > > 5. Restart the consumer.
> > >
> > > HTH,
> > > Neha
> > >
> > >
> > > On Tue, Mar 19, 2013 at 8:49 AM, David Arthur <[EMAIL PROTECTED]>
> wrote:
> > >
> > > > This API is exposed through the SimpleConsumer scala class. See
> > > >
https://github.com/apache/**kafka/blob/trunk/core/src/**> > > > main/scala/kafka/consumer/**SimpleConsumer.scala#L60<
> > >
> >
>
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/consumer/SimpleConsumer.scala#L60> > > >
> > > >
> > > > You will need to set earliestOrLatest to -1 for the latest offset.
> > > >
> > > > There is also a command line tool
https://github.com/apache/**> > > >
> > >
> >
> kafka/blob/trunk/core/src/**main/scala/kafka/tools/**GetOffsetShell.scala<
> > >
> >
>
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/GetOffsetShell.scala> > > >
> > > >
> > > > -David
> > > >
> > > >
> > > > On 3/19/13 11:25 AM, James Englert wrote:
> > > >
> > > >> I'm still a bit lost. Where is the offsets API? I.e. which class?
> > > >>
> > > >>
> > > >> On Tue, Mar 19, 2013 at 11:16 AM, David Arthur <[EMAIL PROTECTED]>
> > > wrote:
> > > >>
> > > >> Using the Offsets API, you can get the latest offset by setting
> time
> > to
> > > >>> -1. Then you subtract 10000
> > > >>>
> > > >>> There is no guarantee that 10k prior messages exist of course, so
Jim Englert
Gilt Groupe
2 Park Ave South, 5th Floor
New York, NY 10011
M: 847-707-2942
Please accept my invitation to join Gilt:
http://www.giltgroupe.com/invite/jenglert