Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka, mail # user - kafka.common.OffsetOutOfRangeException


Copy link to this message
-
Re: kafka.common.OffsetOutOfRangeException
Philip O'Toole 2013-11-19, 13:44
The Storm mailing list is probably a better place for this thread. If I understand the issue, it is not a ZK issue, nor a Kafka config issue. We run multiple topos draining the same topics all the time.

In any event, you just need to patch the Kafka Spout code to catch this exception, and ask Kafka for the earliest or latest offset (make that choice configurable), and reset with that. We did that here at Loggly.

I don't have a patch to hand, but to be honest, if you want to have any hope of running Kafka and Storm in production you should attempt to code the patch yourself. It will teach you stuff you absolutely need to understand about these two pieces of software, and offset management is particularly important.

It's not a difficult change though. Just build and instrument the code to start. You'll thank me when you hit your next offset-related issue.

Philip

On Nov 18, 2013, at 11:10 PM, Oleg Ruchovets <[EMAIL PROTECTED]> wrote:

> Hi Philip.
>
>   It looks like this is our case:
> https://github.com/nathanmarz/storm-contrib/pull/15
>
> It is interesting that the issue is still open ( after more then 1 year) so
> I am curious how people able to work on production without ability to
> deploy another topology.
> Can community please share is this patch resolve the issue and who is using
> it on production.
>
> Also question : should I change zookeeper , kafka configuration to resolve
> the issue? If yes please share what should be changed.
>
> Thanks
> Oleg.
>
>
>
> On Tue, Nov 19, 2013 at 11:51 AM, Philip O'Toole <[EMAIL PROTECTED]> wrote:
>
>> Don't get scared, this if perfectly normal and easily fixed. :-) The second
>> topology attempted to fetch messages from an offset in Kafka that does not
>> exists. This could happen due to Kafka retention policies (messages
>> deleted) or a bug in your code. Your code needs to catch this exception,
>> and then ask Kafka for the earliest -- or latest offset (take your pick) --
>> and then re-issue the fetch using the returned offset.
>>
>> Are you using a separate path in ZK for the second topology? It is of a
>> completely different nature than the first?
>>
>> Philip
>>
>>
>>
>>
>> On Mon, Nov 18, 2013 at 7:40 PM, Oleg Ruchovets <[EMAIL PROTECTED]
>>> wrote:
>>
>>> We are working with kafka  (0.7.2) + storm.
>>>   1) We deployed 1st topology which subscribed on Kafka topic and it is
>>> working fine already couple of weeks.
>>>    2) Yesterday we deploy 2nd topology which subscribed on the  same
>> Kafka
>>> topic , but 2nd topology immediately failed with exception:
>>>
>>> *What can cause such behavior and how we can resolve the issue: *
>>>
>>>
>>> java.lang.RuntimeException: kafka.common.OffsetOutOfRangeException
>>>
>>>                at
>>>
>>>
>> backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87)
>>>
>>>                at
>>>
>>>
>> backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:58)
>>>
>>>                at
>>>
>>>
>> backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:62)
>>>
>>>                at
>>>
>>>
>> backtype.storm.daemon.executor$fn__4050$fn__4059$fn__4106.invoke(executor.clj:658)
>>>
>>>                at
>>> backtype.storm.util$async_loop$fn__465.invoke(util.clj:377)
>>>
>>>                at clojure.lang.AFn.run(AFn.java:24)
>>>
>>>                at java.lang.Thread.run(Thread.java:662)
>>>
>>> Caused by: kafka.common.OffsetOutOfRangeException
>>>
>>>                at
>>> sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>>>
>>>                at
>>>
>>>
>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39)
>>>
>>>                at
>>>
>>>
>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27)
>>>
>>>                at
>>> java.lang.reflect.Constructor.newInstance(Constructor.java:513)
>>>
>>>                at java.lang.Class.newInstance0(Class.java:355)