Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # user >> re-writing old fetch request to work with 0.8 version


Copy link to this message
-
Re: re-writing old fetch request to work with 0.8 version
Ok so now it is looping through the messages fine, and outputting the
actual message payload:

  while (true) {
    //val fetchRequest = new FetchRequest("TEST", 0, offset, 1024)
    val fetchRequest = new FetchRequestBuilder().addFetch(topic, partition,
offset, 1024).build()

    val fetchResponse: FetchResponse = consumer1.fetch(fetchRequest)

    val messageSet = fetchResponse.messageSet(topic, 0).iterator.toBuffer
    println("consumed Message " +
Utils.readString(messageSet(0).message.payload, "UTF-8") )
    offset += 1

  }
Is there a way for it to not crash at the end?
*** Just to be clear, the idea is the run an embedded version in my web
application so I can verify the messages are being send and processed in
development, this isn't a production idea of mine :)

consumed Message test199

consumed Message test200

[error] (run-main-0) java.lang.IndexOutOfBoundsException: 0

java.lang.IndexOutOfBoundsException: 0

at
scala.collection.mutable.ResizableArray$class.apply(ResizableArray.scala:43)

at scala.collection.mutable.ArrayBuffer.apply(ArrayBuffer.scala:47)

at
com.debugging.jobs.KafkaEmbedded$delayedInit$body.apply(KafkaEmbedded.scala:92)

at scala.Function0$class.apply$mcV$sp(Function0.scala:40)

at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)

at scala.App$$anonfun$main$1.apply(App.scala:71)

at scala.App$$anonfun$main$1.apply(App.scala:71)

at scala.collection.immutable.List.foreach(List.scala:318)

at
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)

at scala.App$class.main(App.scala:71)

at com.debugging.jobs.KafkaEmbedded$.main(KafkaEmbedded.scala:24)

at com.debugging.jobs.KafkaEmbedded.main(KafkaEmbedded.scala)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)
On Fri, Jun 13, 2014 at 4:51 PM, S Ahmed <[EMAIL PROTECTED]> wrote: