Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # user >> Retrieve most-recent-n messages from kafka topic


Copy link to this message
-
Re: Retrieve most-recent-n messages from kafka topic
There is not index-based access to messages in 0.7 like there is in 0.8.
You have to start from a known good offset and iterate through the messages.

What's your use case? Running a job periodically that reads the latest N
message from the queue? Is it impractical to run from the last known
offset and only keep the last N?

On 7/19/13 3:45 AM, Shane Moriah wrote:
> We're running Kafka 0.7 and I'm hitting some issues trying to access the
> newest n messages in a topic (or at least in a broker/partition combo) and
> wondering if my use case just isn't supported or if I'm missing something.
>   What I'd like to be able to do is get the most recent offset from a
> broker/partition combo, subtract an amount of bytes roughly equivalent to
> messages_desired*bytes_per_message and then issue a FetchRequest with that
> offset and amount of bytes.
>
> I gathered from this
> post<http://mail-archives.apache.org/mod_mbox/kafka-users/201212.mbox/%3CCCF8F23D.5E4A%[EMAIL PROTECTED]%3E>
> that
> I need to use the Simple Consumer in order to do offset manipulation beyond
> the start from beginning and start from end options.  And I saw from this
> post<http://mail-archives.apache.org/mod_mbox/incubator-kafka-users/201209.mbox/%[EMAIL PROTECTED]%3E>
> that
> the offsets returned by getOffsetsBefore are really only the major
> checkpoints when files are rolled over, every 500MB by default.  I also
> found that if I take an offset returned from getOffsetsBefore and subtract
> a fixed value, say 100KB, and submit that offset with a FetchRequest I get
> a kafka.common.InvalidMessageSizeException, presumably since my computed
> offset didn't align with a real message offset.
>
> As far as I can tell, this leaves me only able to find the most recent
> milestone offset, perhaps up to 500MB behind current data, and extract a
> batch from that point forward. Is there any other way that I'm missing
> here? The two things that seem to be lacking are access to the most recent
> offset and the ability to rollback from that offset by a fixed amount of
> bytes or messages without triggering the InvalidMessageSizeException.
>
> Thanks,
> Shane
>
 
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB