Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # user >> java.lang.RuntimeException: Invalid magic byte 6


Copy link to this message
-
Re: java.lang.RuntimeException: Invalid magic byte 6
just to confirm - is this project an integration of kafka with storm?
The page doesnt provide much details.

Thanks,
Navneet Sharma

On Mon, May 21, 2012 at 8:21 PM, Jun Rao <[EMAIL PROTECTED]> wrote:

> Seems like a wrong compression codec is being used. Currently, only codec 0
> (gzip) and 1 (snappy) are supported.
>
> Thanks,
>
> Jun
>
> On Mon, May 21, 2012 at 5:35 AM, Raymond Ng <[EMAIL PROTECTED]> wrote:
>
> > Hi all
> >
> > I'm trying to use the KufkaSpout example from
> > https://github.com/nathanmarz/storm-contrib/tree/master/storm-kafka but
> > got
> > the following error when trying to consume the messages
> >
> > 2012-05-21 12:52:27 worker [INFO] Worker
> > 8ecb4136-8d07-4b08-ba99-faccbab6a28e for storm
> > rolling-ippairs-kafka-17-1337599913 on
> > 1e5d2733-ee7f-4519-91b5-e97f6105df31:6702 has finished loading
> > 2012-05-21 12:52:28 log [INFO]
> > ########################################################
> > 2012-05-21 12:52:28 KafkaSpout [INFO] Fetched 20971522 bytes of messages
> > from Kafka: stormworker03:0
> > 2012-05-21 12:52:28 log [INFO]
> > ########################################################
> > 2012-05-21 12:52:28 util [ERROR] Async loop died!
> > java.lang.RuntimeException: Invalid magic byte 6
> >  at kafka.message.Message.compressionCodec(Message.scala:144)
> >  at
> >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:112)
> >  at
> >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:138)
> >  at
> >
> >
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:82)
> >  at
> > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> >  at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> >  at
> >
> >
> kafka.javaapi.message.ByteBufferMessageSet$$anon$1.hasNext(ByteBufferMessageSet.scala:55)
> >  at
> >
> >
> com.detica.treidan.storm.spouts.kafka.KafkaSpout$PartitionManager.fill(KafkaSpout.java:118)
> >  at
> >
> >
> com.detica.treidan.storm.spouts.kafka.KafkaSpout$PartitionManager.next(KafkaSpout.java:92)
> >  at
> >
> >
> com.detica.treidan.storm.spouts.kafka.KafkaSpout.nextTuple(KafkaSpout.java:215)
> >  at backtype.storm.daemon.task$fn__3465$fn__3514.invoke(task.clj:372)
> >  at
> >
> >
> backtype.storm.daemon.task$mk_task$iter__3386__3390$fn__3391$fn__3392$fn__3393.invoke(task.clj:247)
> >  at clojure.lang.AFn.applyToHelper(AFn.java:159)
> >  at clojure.lang.AFn.applyTo(AFn.java:151)
> >  at clojure.core$apply.invoke(core.clj:540)
> >  at backtype.storm.util$async_loop$fn__487.invoke(util.clj:271)
> >  at clojure.lang.AFn.run(AFn.java:24)
> >  at java.lang.Thread.run(Thread.java:636)
> > 2012-05-21 12:52:28 task [ERROR]
> >
> > I have changed the KafkaConfig.fetchSizeBytes to 1024*1024*20 due to the
> > message size from my test data, and the necessary local environemental
> > config,
> >
> > and I've left most of the KafkaSpout code unchanged apart from putting
> more
> > log messages in to try to debug the issues
> >
> > the following snippet is extracted from KafkaSpout, and the error is
> > against the line    for(MessageAndOffset msg: msgs)   in the fill()
> method
> >
> >
> >  //returns false if it's reached the end of current batch
> >  public EmitState next() {
> >   if(_waitingToEmit.isEmpty()) fill();
> >   MessageAndOffset toEmit = _waitingToEmit.pollFirst();
> >   if(toEmit==null) return EmitState.NO_EMITTED;
> >   List<Object> tup > >
> >
> _spoutConfig.scheme.deserialize(Utils.toByteArray(toEmit.message().payload()));
> >   _collector.emit(tup, new KafkaMessageId(_partition,
> > actualOffset(toEmit)));
> >   LOG.debug("Emitting the following message :
> > "+toEmit.message().toString());
> >   if(_waitingToEmit.size()>0) {
> >    return EmitState.EMITTED_MORE_LEFT;
> >   } else {
> >    return EmitState.EMITTED_END;
> >   }
> >  }
> >
> >  private void fill() {
> >   SimpleConsumer consumer = _partitions.getConsumer(_partition);
> >   int hostPartition = _partitions.getHostPartition(_partition);
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB