Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # user >> java.lang.RuntimeException: Invalid magic byte 6


Copy link to this message
-
Re: java.lang.RuntimeException: Invalid magic byte 6
just to confirm - is this project an integration of kafka with storm?
The page doesnt provide much details.

Thanks,
Navneet Sharma

On Mon, May 21, 2012 at 8:21 PM, Jun Rao <[EMAIL PROTECTED]> wrote:

> Seems like a wrong compression codec is being used. Currently, only codec 0
> (gzip) and 1 (snappy) are supported.
>
> Thanks,
>
> Jun
>
> On Mon, May 21, 2012 at 5:35 AM, Raymond Ng <[EMAIL PROTECTED]> wrote:
>
> > Hi all
> >
> > I'm trying to use the KufkaSpout example from
> > https://github.com/nathanmarz/storm-contrib/tree/master/storm-kafka but
> > got
> > the following error when trying to consume the messages
> >
> > 2012-05-21 12:52:27 worker [INFO] Worker
> > 8ecb4136-8d07-4b08-ba99-faccbab6a28e for storm
> > rolling-ippairs-kafka-17-1337599913 on
> > 1e5d2733-ee7f-4519-91b5-e97f6105df31:6702 has finished loading
> > 2012-05-21 12:52:28 log [INFO]
> > ########################################################
> > 2012-05-21 12:52:28 KafkaSpout [INFO] Fetched 20971522 bytes of messages
> > from Kafka: stormworker03:0
> > 2012-05-21 12:52:28 log [INFO]
> > ########################################################
> > 2012-05-21 12:52:28 util [ERROR] Async loop died!
> > java.lang.RuntimeException: Invalid magic byte 6
> >  at kafka.message.Message.compressionCodec(Message.scala:144)
> >  at
> >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:112)
> >  at
> >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:138)
> >  at
> >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:82)
> >  at
> > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> >  at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> >  at
> >
> >
> kafka.javaapi.message.ByteBufferMessageSet$$anon$1.hasNext(ByteBufferMessageSet.scala:55)
> >  at
> >
> >
> com.detica.treidan.storm.spouts.kafka.KafkaSpout$PartitionManager.fill(KafkaSpout.java:118)
> >  at
> >
> >
> com.detica.treidan.storm.spouts.kafka.KafkaSpout$PartitionManager.next(KafkaSpout.java:92)
> >  at
> >
> >
> com.detica.treidan.storm.spouts.kafka.KafkaSpout.nextTuple(KafkaSpout.java:215)
> >  at backtype.storm.daemon.task$fn__3465$fn__3514.invoke(task.clj:372)
> >  at
> >
> >
> backtype.storm.daemon.task$mk_task$iter__3386__3390$fn__3391$fn__3392$fn__3393.invoke(task.clj:247)
> >  at clojure.lang.AFn.applyToHelper(AFn.java:159)
> >  at clojure.lang.AFn.applyTo(AFn.java:151)
> >  at clojure.core$apply.invoke(core.clj:540)
> >  at backtype.storm.util$async_loop$fn__487.invoke(util.clj:271)
> >  at clojure.lang.AFn.run(AFn.java:24)
> >  at java.lang.Thread.run(Thread.java:636)
> > 2012-05-21 12:52:28 task [ERROR]
> >
> > I have changed the KafkaConfig.fetchSizeBytes to 1024*1024*20 due to the
> > message size from my test data, and the necessary local environemental
> > config,
> >
> > and I've left most of the KafkaSpout code unchanged apart from putting
> more
> > log messages in to try to debug the issues
> >
> > the following snippet is extracted from KafkaSpout, and the error is
> > against the line    for(MessageAndOffset msg: msgs)   in the fill()
> method
> >
> >
> >  //returns false if it's reached the end of current batch
> >  public EmitState next() {
> >   if(_waitingToEmit.isEmpty()) fill();
> >   MessageAndOffset toEmit = _waitingToEmit.pollFirst();
> >   if(toEmit==null) return EmitState.NO_EMITTED;
> >   List<Object> tup > >
> >
> _spoutConfig.scheme.deserialize(Utils.toByteArray(toEmit.message().payload()));
> >   _collector.emit(tup, new KafkaMessageId(_partition,
> > actualOffset(toEmit)));
> >   LOG.debug("Emitting the following message :
> > "+toEmit.message().toString());
> >   if(_waitingToEmit.size()>0) {
> >    return EmitState.EMITTED_MORE_LEFT;
> >   } else {
> >    return EmitState.EMITTED_END;
> >   }
> >  }
> >
> >  private void fill() {
> >   SimpleConsumer consumer = _partitions.getConsumer(_partition);
> >   int hostPartition = _partitions.getHostPartition(_partition);