Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Plain View
Kafka >> mail # user >> Retrieve most-recent-n messages from kafka topic


+
Shane Moriah 2013-07-19, 07:46
Copy link to this message
-
Re: Retrieve most-recent-n messages from kafka topic
There is not index-based access to messages in 0.7 like there is in 0.8.
You have to start from a known good offset and iterate through the messages.

What's your use case? Running a job periodically that reads the latest N
message from the queue? Is it impractical to run from the last known
offset and only keep the last N?

On 7/19/13 3:45 AM, Shane Moriah wrote:
> We're running Kafka 0.7 and I'm hitting some issues trying to access the
> newest n messages in a topic (or at least in a broker/partition combo) and
> wondering if my use case just isn't supported or if I'm missing something.
>   What I'd like to be able to do is get the most recent offset from a
> broker/partition combo, subtract an amount of bytes roughly equivalent to
> messages_desired*bytes_per_message and then issue a FetchRequest with that
> offset and amount of bytes.
>
> I gathered from this
> post<http://mail-archives.apache.org/mod_mbox/kafka-users/201212.mbox/%3CCCF8F23D.5E4A%[EMAIL PROTECTED]%3E>
> that
> I need to use the Simple Consumer in order to do offset manipulation beyond
> the start from beginning and start from end options.  And I saw from this
> post<http://mail-archives.apache.org/mod_mbox/incubator-kafka-users/201209.mbox/%[EMAIL PROTECTED]%3E>
> that
> the offsets returned by getOffsetsBefore are really only the major
> checkpoints when files are rolled over, every 500MB by default.  I also
> found that if I take an offset returned from getOffsetsBefore and subtract
> a fixed value, say 100KB, and submit that offset with a FetchRequest I get
> a kafka.common.InvalidMessageSizeException, presumably since my computed
> offset didn't align with a real message offset.
>
> As far as I can tell, this leaves me only able to find the most recent
> milestone offset, perhaps up to 500MB behind current data, and extract a
> batch from that point forward. Is there any other way that I'm missing
> here? The two things that seem to be lacking are access to the most recent
> offset and the ability to rollback from that offset by a fixed amount of
> bytes or messages without triggering the InvalidMessageSizeException.
>
> Thanks,
> Shane
>
 
+
Johan Lundahl 2013-07-19, 18:34
+
Shane Moriah 2013-07-19, 20:02
+
Johan Lundahl 2013-07-19, 20:27
+
Shane Moriah 2013-07-22, 06:37