Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # user >> Consuming "backwards"?


Copy link to this message
-
Re: Consuming "backwards"?
yes, also you can read backwards on the stream if you do this in the
consumer

val maoList: Iterable[MessageAndOffset] = for(messageAndOffset <-
messageSet if(numMessagesConsumed < maxMessages)) yield messageAndOffset

for(messageAndOffset <- maoList.toList.reverse) {

this way every read is the latest before the next earliest so when you
fetch 18,19,20 you will see them coming in as 20,19,18

/*******************************************
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
********************************************/
On Fri, Dec 6, 2013 at 7:02 PM, Steven Parkes <[EMAIL PROTECTED]> wrote:

> Right. If you're not reading contiguously, you need to remember the ranges
> that you have/haven't read. As long as you do that, it all works out, I
> think.
>
> A kafka client always has to store the last offset it read. In the
> simplest "both directions" case where you start with current and read in
> both directions, you just need to remember the first offset you've read as
> well.
>
> On Dec 6, 2013, at 3:50 PM, Joe Stein <[EMAIL PROTECTED]> wrote:
>
> > you have to allow the fetchSize to be variable so in your example since
> the
> > new highwatermark is 12 and the last consumsed message is 10
> >
> > fetchSize = if (highwatermark - lastConsumedOffset < 3) highwatermark -
> > lastConsumedOffset else 3
> >
> > the real trick though is missing messages having to keep track of more
> than
> > one index
> >
> > so lets say now you have 7 more published (13,14,15,16,17,18,19)
> >
> > you then read 17,18,19 (and while that happens 5 more are published ...
> > 20,21,22,23,24)
> >
> > now when you read 22,23,24 ... you have to keep track of not only 22 as
> the
> > last read so you scoop up 20 and 21 but also remember still 17 so you get
> > 16,15,14,13
> >
> > so it can be done with some fancy logic to manage the index and offsets
> > (and persist them)
> >
> >
> > On Fri, Dec 6, 2013 at 6:44 PM, Otis Gospodnetic <
> [EMAIL PROTECTED]
> >> wrote:
> >
> >> Hi,
> >>
> >> On Fri, Dec 6, 2013 at 6:32 PM, Steven Parkes <[EMAIL PROTECTED]>
> >> wrote:
> >>
> >>> On Dec 6, 2013, at 2:03 PM, Otis Gospodnetic <
> [EMAIL PROTECTED]
> >>>
> >>> wrote:
> >>>
> >>>> but I think the
> >>>> problem is that each time we grab we could get some of the same
> >> messages
> >>> we
> >>>> already processed
> >>>
> >>> Doesn't setting the fetchSize to "how far back we need to grab" handle
> >>> that?
> >>
> >>
> >> I *think* it doesn't, but I'm wrong every day.... N times, so....
> >>
> >> I think this is what would happen:
> >> 1) imagine 10 messages in the broker m1 - m10
> >> 2) consumer grabs last N (=3): m8, m9, m10
> >> 3) while it's doing that and before consumer polls for more messages
> >> producer publishes 2 more: m11 and m12
> >> 4) consumer now polls again. It asks broker for publisher offset and
> gets
> >> the answer: 12
> >> 5) good, says consumer, let me then fetch everything after offset
> 12-3=9:
> >> m10, m11, m12
> >>
> >> Problem: consumer got m10 again, but it was already processed in 2).
> >>
> >> No?  Please correct me if I'm wrong anywhere.
> >>
> >> Thanks,
> >> Otis
> >> --
> >> Performance Monitoring * Log Analytics * Search Analytics
> >> Solr & Elasticsearch Support * http://sematext.com/
> >>
>
>

 
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB