Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # user >> re-writing old fetch request to work with 0.8 version


Copy link to this message
-
Re: re-writing old fetch request to work with 0.8 version
Ok so now it is looping through the messages fine, and outputting the
actual message payload:

  while (true) {
    //val fetchRequest = new FetchRequest("TEST", 0, offset, 1024)
    val fetchRequest = new FetchRequestBuilder().addFetch(topic, partition,
offset, 1024).build()

    val fetchResponse: FetchResponse = consumer1.fetch(fetchRequest)

    val messageSet = fetchResponse.messageSet(topic, 0).iterator.toBuffer
    println("consumed Message " +
Utils.readString(messageSet(0).message.payload, "UTF-8") )
    offset += 1

  }
Is there a way for it to not crash at the end?
*** Just to be clear, the idea is the run an embedded version in my web
application so I can verify the messages are being send and processed in
development, this isn't a production idea of mine :)

consumed Message test199

consumed Message test200

[error] (run-main-0) java.lang.IndexOutOfBoundsException: 0

java.lang.IndexOutOfBoundsException: 0

at
scala.collection.mutable.ResizableArray$class.apply(ResizableArray.scala:43)

at scala.collection.mutable.ArrayBuffer.apply(ArrayBuffer.scala:47)

at
com.debugging.jobs.KafkaEmbedded$delayedInit$body.apply(KafkaEmbedded.scala:92)

at scala.Function0$class.apply$mcV$sp(Function0.scala:40)

at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)

at scala.App$$anonfun$main$1.apply(App.scala:71)

at scala.App$$anonfun$main$1.apply(App.scala:71)

at scala.collection.immutable.List.foreach(List.scala:318)

at
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)

at scala.App$class.main(App.scala:71)

at com.debugging.jobs.KafkaEmbedded$.main(KafkaEmbedded.scala:24)

at com.debugging.jobs.KafkaEmbedded.main(KafkaEmbedded.scala)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)
On Fri, Jun 13, 2014 at 4:51 PM, S Ahmed <[EMAIL PROTECTED]> wrote:
 
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB