|
James Englert
2013-03-19, 15:05
David Arthur
2013-03-19, 15:17
James Englert
2013-03-19, 15:26
David Arthur
2013-03-19, 15:49
Neha Narkhede
2013-03-19, 16:16
S Ahmed
2013-03-19, 18:01
Neha Narkhede
2013-03-19, 18:05
James Englert
2013-03-22, 15:13
|
-
Consume from X messages agoJames Englert 2013-03-19, 15:05
Hi,
I'm using Kafka 0.8. I would like to setup a consumer to fetch the last 10,000 messages and then start consuming messages. I see the configuration autooffset.reset, but that isn't quite what I want. I want only the last 10,000 messages. Is there a good way to achieve this in 0.8, besides just hacking the data in ZK? Thanks, Jim -- Jim Englert Gilt Groupe 2 Park Ave South, 5th Floor New York, NY 10011 M: 847-707-2942 Please accept my invitation to join Gilt: http://www.giltgroupe.com/invite/jenglert
-
Re: Consume from X messages agoDavid Arthur 2013-03-19, 15:17
Using the Offsets API, you can get the latest offset by setting time to
-1. Then you subtract 10000 There is no guarantee that 10k prior messages exist of course, so you'd need to handle that case. -David On 3/19/13 11:04 AM, James Englert wrote: > Hi, > > I'm using Kafka 0.8. I would like to setup a consumer to fetch the last > 10,000 messages and then start consuming messages. > > I see the configuration autooffset.reset, but that isn't quite what I > want. I want only the last 10,000 messages. > > Is there a good way to achieve this in 0.8, besides just hacking the data > in ZK? > > Thanks, > Jim >
-
Re: Consume from X messages agoJames Englert 2013-03-19, 15:26
I'm still a bit lost. Where is the offsets API? I.e. which class?
On Tue, Mar 19, 2013 at 11:16 AM, David Arthur <[EMAIL PROTECTED]> wrote: > Using the Offsets API, you can get the latest offset by setting time to > -1. Then you subtract 10000 > > There is no guarantee that 10k prior messages exist of course, so you'd > need to handle that case. > > -David > > > On 3/19/13 11:04 AM, James Englert wrote: > >> Hi, >> >> I'm using Kafka 0.8. I would like to setup a consumer to fetch the last >> 10,000 messages and then start consuming messages. >> >> I see the configuration autooffset.reset, but that isn't quite what I >> want. I want only the last 10,000 messages. >> >> Is there a good way to achieve this in 0.8, besides just hacking the data >> in ZK? >> >> Thanks, >> Jim >> >> > -- Jim Englert Gilt Groupe 2 Park Ave South, 5th Floor New York, NY 10011 M: 847-707-2942 Please accept my invitation to join Gilt: http://www.giltgroupe.com/invite/jenglert
-
Re: Consume from X messages agoDavid Arthur 2013-03-19, 15:49
This API is exposed through the SimpleConsumer scala class. See
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/consumer/SimpleConsumer.scala#L60 You will need to set earliestOrLatest to -1 for the latest offset. There is also a command line tool https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/GetOffsetShell.scala -David On 3/19/13 11:25 AM, James Englert wrote: > I'm still a bit lost. Where is the offsets API? I.e. which class? > > > On Tue, Mar 19, 2013 at 11:16 AM, David Arthur <[EMAIL PROTECTED]> wrote: > >> Using the Offsets API, you can get the latest offset by setting time to >> -1. Then you subtract 10000 >> >> There is no guarantee that 10k prior messages exist of course, so you'd >> need to handle that case. >> >> -David >> >> >> On 3/19/13 11:04 AM, James Englert wrote: >> >>> Hi, >>> >>> I'm using Kafka 0.8. I would like to setup a consumer to fetch the last >>> 10,000 messages and then start consuming messages. >>> >>> I see the configuration autooffset.reset, but that isn't quite what I >>> want. I want only the last 10,000 messages. >>> >>> Is there a good way to achieve this in 0.8, besides just hacking the data >>> in ZK? >>> >>> Thanks, >>> Jim >>> >>> >
-
Re: Consume from X messages agoNeha Narkhede 2013-03-19, 16:16
Jim,
You can leverage the ExportZkOffsets/ImportZkOffsets tools to do this. ExportZkOffsets exports the consumer offsets for your group to a file in a certain format. You can then place the desired offset per partition you want to reset your consumer to in the exported file. 1. Shutdown the consumer 2. Export current offsets 3. Get the desired offset (current offset - 10K). As David mentions, this is approximate and might get you more than 10K messages if the data is compressed. 4. Replace the exported offsets with these offsets 5. Restart the consumer. HTH, Neha On Tue, Mar 19, 2013 at 8:49 AM, David Arthur <[EMAIL PROTECTED]> wrote: > This API is exposed through the SimpleConsumer scala class. See > https://github.com/apache/**kafka/blob/trunk/core/src/** > main/scala/kafka/consumer/**SimpleConsumer.scala#L60<https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/consumer/SimpleConsumer.scala#L60> > > You will need to set earliestOrLatest to -1 for the latest offset. > > There is also a command line tool https://github.com/apache/** > kafka/blob/trunk/core/src/**main/scala/kafka/tools/**GetOffsetShell.scala<https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/GetOffsetShell.scala> > > -David > > > On 3/19/13 11:25 AM, James Englert wrote: > >> I'm still a bit lost. Where is the offsets API? I.e. which class? >> >> >> On Tue, Mar 19, 2013 at 11:16 AM, David Arthur <[EMAIL PROTECTED]> wrote: >> >> Using the Offsets API, you can get the latest offset by setting time to >>> -1. Then you subtract 10000 >>> >>> There is no guarantee that 10k prior messages exist of course, so you'd >>> need to handle that case. >>> >>> -David >>> >>> >>> On 3/19/13 11:04 AM, James Englert wrote: >>> >>> Hi, >>>> >>>> I'm using Kafka 0.8. I would like to setup a consumer to fetch the last >>>> 10,000 messages and then start consuming messages. >>>> >>>> I see the configuration autooffset.reset, but that isn't quite what I >>>> want. I want only the last 10,000 messages. >>>> >>>> Is there a good way to achieve this in 0.8, besides just hacking the >>>> data >>>> in ZK? >>>> >>>> Thanks, >>>> Jim >>>> >>>> >>>> >> >
-
Re: Consume from X messages agoS Ahmed 2013-03-19, 18:01
I thought since the offsets in .8 are numeric and not byte offsets like in
0.7x, you can simply just take say the current offset - 10000. On Tue, Mar 19, 2013 at 12:16 PM, Neha Narkhede <[EMAIL PROTECTED]>wrote: > Jim, > > You can leverage the ExportZkOffsets/ImportZkOffsets tools to do this. > ExportZkOffsets exports the consumer offsets for your group to a file in a > certain format. You can then place the desired offset per partition you > want to reset your consumer to in the exported file. > > 1. Shutdown the consumer > 2. Export current offsets > 3. Get the desired offset (current offset - 10K). As David mentions, this > is approximate and might get you more than 10K messages if the data is > compressed. > 4. Replace the exported offsets with these offsets > 5. Restart the consumer. > > HTH, > Neha > > > On Tue, Mar 19, 2013 at 8:49 AM, David Arthur <[EMAIL PROTECTED]> wrote: > > > This API is exposed through the SimpleConsumer scala class. See > > https://github.com/apache/**kafka/blob/trunk/core/src/** > > main/scala/kafka/consumer/**SimpleConsumer.scala#L60< > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/consumer/SimpleConsumer.scala#L60 > > > > > > You will need to set earliestOrLatest to -1 for the latest offset. > > > > There is also a command line tool https://github.com/apache/** > > > kafka/blob/trunk/core/src/**main/scala/kafka/tools/**GetOffsetShell.scala< > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/GetOffsetShell.scala > > > > > > -David > > > > > > On 3/19/13 11:25 AM, James Englert wrote: > > > >> I'm still a bit lost. Where is the offsets API? I.e. which class? > >> > >> > >> On Tue, Mar 19, 2013 at 11:16 AM, David Arthur <[EMAIL PROTECTED]> > wrote: > >> > >> Using the Offsets API, you can get the latest offset by setting time to > >>> -1. Then you subtract 10000 > >>> > >>> There is no guarantee that 10k prior messages exist of course, so you'd > >>> need to handle that case. > >>> > >>> -David > >>> > >>> > >>> On 3/19/13 11:04 AM, James Englert wrote: > >>> > >>> Hi, > >>>> > >>>> I'm using Kafka 0.8. I would like to setup a consumer to fetch the > last > >>>> 10,000 messages and then start consuming messages. > >>>> > >>>> I see the configuration autooffset.reset, but that isn't quite what I > >>>> want. I want only the last 10,000 messages. > >>>> > >>>> Is there a good way to achieve this in 0.8, besides just hacking the > >>>> data > >>>> in ZK? > >>>> > >>>> Thanks, > >>>> Jim > >>>> > >>>> > >>>> > >> > > >
-
Re: Consume from X messages agoNeha Narkhede 2013-03-19, 18:05
I guess I missed a step between 4 and 5 -
4. Replace the exported offsets with these offsets *Use ImportZkOffsets to import the offsets from the modified export file.* 5. Restart the consumer. Thanks, Neha On Tue, Mar 19, 2013 at 11:00 AM, S Ahmed <[EMAIL PROTECTED]> wrote: > I thought since the offsets in .8 are numeric and not byte offsets like in > 0.7x, you can simply just take say the current offset - 10000. > > > On Tue, Mar 19, 2013 at 12:16 PM, Neha Narkhede <[EMAIL PROTECTED] > >wrote: > > > Jim, > > > > You can leverage the ExportZkOffsets/ImportZkOffsets tools to do this. > > ExportZkOffsets exports the consumer offsets for your group to a file in > a > > certain format. You can then place the desired offset per partition you > > want to reset your consumer to in the exported file. > > > > 1. Shutdown the consumer > > 2. Export current offsets > > 3. Get the desired offset (current offset - 10K). As David mentions, this > > is approximate and might get you more than 10K messages if the data is > > compressed. > > 4. Replace the exported offsets with these offsets > > 5. Restart the consumer. > > > > HTH, > > Neha > > > > > > On Tue, Mar 19, 2013 at 8:49 AM, David Arthur <[EMAIL PROTECTED]> wrote: > > > > > This API is exposed through the SimpleConsumer scala class. See > > > https://github.com/apache/**kafka/blob/trunk/core/src/** > > > main/scala/kafka/consumer/**SimpleConsumer.scala#L60< > > > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/consumer/SimpleConsumer.scala#L60 > > > > > > > > > You will need to set earliestOrLatest to -1 for the latest offset. > > > > > > There is also a command line tool https://github.com/apache/** > > > > > > kafka/blob/trunk/core/src/**main/scala/kafka/tools/**GetOffsetShell.scala< > > > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/GetOffsetShell.scala > > > > > > > > > -David > > > > > > > > > On 3/19/13 11:25 AM, James Englert wrote: > > > > > >> I'm still a bit lost. Where is the offsets API? I.e. which class? > > >> > > >> > > >> On Tue, Mar 19, 2013 at 11:16 AM, David Arthur <[EMAIL PROTECTED]> > > wrote: > > >> > > >> Using the Offsets API, you can get the latest offset by setting time > to > > >>> -1. Then you subtract 10000 > > >>> > > >>> There is no guarantee that 10k prior messages exist of course, so > you'd > > >>> need to handle that case. > > >>> > > >>> -David > > >>> > > >>> > > >>> On 3/19/13 11:04 AM, James Englert wrote: > > >>> > > >>> Hi, > > >>>> > > >>>> I'm using Kafka 0.8. I would like to setup a consumer to fetch the > > last > > >>>> 10,000 messages and then start consuming messages. > > >>>> > > >>>> I see the configuration autooffset.reset, but that isn't quite what > I > > >>>> want. I want only the last 10,000 messages. > > >>>> > > >>>> Is there a good way to achieve this in 0.8, besides just hacking the > > >>>> data > > >>>> in ZK? > > >>>> > > >>>> Thanks, > > >>>> Jim > > >>>> > > >>>> > > >>>> > > >> > > > > > >
-
Re: Consume from X messages agoJames Englert 2013-03-22, 15:13
Thanks for the help.
FWIW, I ended up writing a simple Util that I can use as my consumer is starting up to move the offset back. It *seems* to work decently. Thoughts? Would this be something that would be helpful for contribution back to Kafka, or is the idea just poor? /** * Attempts to move the consumer offset back. If it has any issues, throws an exception. */ def moveConsumerOffsetBack(@Nonnull groupId: String, @Nonnull topic: String, partition: Long, approximateMessagesBack: Long) { Preconditions.checkNotNull(groupId) Preconditions.checkNotNull(topic) Preconditions.checkArgument(partition >= 0) Preconditions.checkArgument(approximateMessagesBack > 0) val path = ZkUtils.ConsumersPath + "/" + groupId + "/offsets/" + topic + "/" + partition if (zkClient.exists(path)) { val currentOffset = zkClient.readData[Any](path) // We get this from ZK as Any because the exact type is unpredictable. It might be Long or String. val desiredOffset = math.max(0, (currentOffset.toString.toLong - approximateMessagesBack)) zkClient.writeData(path, desiredOffset.toString) warn("Reset the " + topic + " consumer to " + desiredOffset) } else { throw new RuntimeException("Unable to find the move the consumer back in ZK. This may or may not be an issue, depending on whether you expect the path to exist. Path: " + path) } } On Tue, Mar 19, 2013 at 2:05 PM, Neha Narkhede <[EMAIL PROTECTED]>wrote: > I guess I missed a step between 4 and 5 - > > 4. Replace the exported offsets with these offsets > *Use ImportZkOffsets to import the offsets from the modified export file.* > 5. Restart the consumer. > > Thanks, > Neha > > > On Tue, Mar 19, 2013 at 11:00 AM, S Ahmed <[EMAIL PROTECTED]> wrote: > > > I thought since the offsets in .8 are numeric and not byte offsets like > in > > 0.7x, you can simply just take say the current offset - 10000. > > > > > > On Tue, Mar 19, 2013 at 12:16 PM, Neha Narkhede <[EMAIL PROTECTED] > > >wrote: > > > > > Jim, > > > > > > You can leverage the ExportZkOffsets/ImportZkOffsets tools to do this. > > > ExportZkOffsets exports the consumer offsets for your group to a file > in > > a > > > certain format. You can then place the desired offset per partition you > > > want to reset your consumer to in the exported file. > > > > > > 1. Shutdown the consumer > > > 2. Export current offsets > > > 3. Get the desired offset (current offset - 10K). As David mentions, > this > > > is approximate and might get you more than 10K messages if the data is > > > compressed. > > > 4. Replace the exported offsets with these offsets > > > 5. Restart the consumer. > > > > > > HTH, > > > Neha > > > > > > > > > On Tue, Mar 19, 2013 at 8:49 AM, David Arthur <[EMAIL PROTECTED]> > wrote: > > > > > > > This API is exposed through the SimpleConsumer scala class. See > > > > https://github.com/apache/**kafka/blob/trunk/core/src/** > > > > main/scala/kafka/consumer/**SimpleConsumer.scala#L60< > > > > > > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/consumer/SimpleConsumer.scala#L60 > > > > > > > > > > > > You will need to set earliestOrLatest to -1 for the latest offset. > > > > > > > > There is also a command line tool https://github.com/apache/** > > > > > > > > > > kafka/blob/trunk/core/src/**main/scala/kafka/tools/**GetOffsetShell.scala< > > > > > > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/GetOffsetShell.scala > > > > > > > > > > > > -David > > > > > > > > > > > > On 3/19/13 11:25 AM, James Englert wrote: > > > > > > > >> I'm still a bit lost. Where is the offsets API? I.e. which class? > > > >> > > > >> > > > >> On Tue, Mar 19, 2013 at 11:16 AM, David Arthur <[EMAIL PROTECTED]> > > > wrote: > > > >> > > > >> Using the Offsets API, you can get the latest offset by setting > time > > to > > > >>> -1. Then you subtract 10000 > > > >>> > > > >>> There is no guarantee that 10k prior messages exist of course, so Jim Englert Gilt Groupe 2 Park Ave South, 5th Floor New York, NY 10011 M: 847-707-2942 Please accept my invitation to join Gilt: http://www.giltgroupe.com/invite/jenglert |