Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # user >> Consuming "backwards"?


Copy link to this message
-
Re: Consuming "backwards"?
yes, also you can read backwards on the stream if you do this in the
consumer

val maoList: Iterable[MessageAndOffset] = for(messageAndOffset <-
messageSet if(numMessagesConsumed < maxMessages)) yield messageAndOffset

for(messageAndOffset <- maoList.toList.reverse) {

this way every read is the latest before the next earliest so when you
fetch 18,19,20 you will see them coming in as 20,19,18

/*******************************************
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
********************************************/
On Fri, Dec 6, 2013 at 7:02 PM, Steven Parkes <[EMAIL PROTECTED]> wrote:

> Right. If you're not reading contiguously, you need to remember the ranges
> that you have/haven't read. As long as you do that, it all works out, I
> think.
>
> A kafka client always has to store the last offset it read. In the
> simplest "both directions" case where you start with current and read in
> both directions, you just need to remember the first offset you've read as
> well.
>
> On Dec 6, 2013, at 3:50 PM, Joe Stein <[EMAIL PROTECTED]> wrote:
>
> > you have to allow the fetchSize to be variable so in your example since
> the
> > new highwatermark is 12 and the last consumsed message is 10
> >
> > fetchSize = if (highwatermark - lastConsumedOffset < 3) highwatermark -
> > lastConsumedOffset else 3
> >
> > the real trick though is missing messages having to keep track of more
> than
> > one index
> >
> > so lets say now you have 7 more published (13,14,15,16,17,18,19)
> >
> > you then read 17,18,19 (and while that happens 5 more are published ...
> > 20,21,22,23,24)
> >
> > now when you read 22,23,24 ... you have to keep track of not only 22 as
> the
> > last read so you scoop up 20 and 21 but also remember still 17 so you get
> > 16,15,14,13
> >
> > so it can be done with some fancy logic to manage the index and offsets
> > (and persist them)
> >
> >
> > On Fri, Dec 6, 2013 at 6:44 PM, Otis Gospodnetic <
> [EMAIL PROTECTED]
> >> wrote:
> >
> >> Hi,
> >>
> >> On Fri, Dec 6, 2013 at 6:32 PM, Steven Parkes <[EMAIL PROTECTED]>
> >> wrote:
> >>
> >>> On Dec 6, 2013, at 2:03 PM, Otis Gospodnetic <
> [EMAIL PROTECTED]
> >>>
> >>> wrote:
> >>>
> >>>> but I think the
> >>>> problem is that each time we grab we could get some of the same
> >> messages
> >>> we
> >>>> already processed
> >>>
> >>> Doesn't setting the fetchSize to "how far back we need to grab" handle
> >>> that?
> >>
> >>
> >> I *think* it doesn't, but I'm wrong every day.... N times, so....
> >>
> >> I think this is what would happen:
> >> 1) imagine 10 messages in the broker m1 - m10
> >> 2) consumer grabs last N (=3): m8, m9, m10
> >> 3) while it's doing that and before consumer polls for more messages
> >> producer publishes 2 more: m11 and m12
> >> 4) consumer now polls again. It asks broker for publisher offset and
> gets
> >> the answer: 12
> >> 5) good, says consumer, let me then fetch everything after offset
> 12-3=9:
> >> m10, m11, m12
> >>
> >> Problem: consumer got m10 again, but it was already processed in 2).
> >>
> >> No?  Please correct me if I'm wrong anywhere.
> >>
> >> Thanks,
> >> Otis
> >> --
> >> Performance Monitoring * Log Analytics * Search Analytics
> >> Solr & Elasticsearch Support * http://sematext.com/
> >>
>
>