Hi, I'm trying to stream large message with Kafka into Spark. Generally this has been working nicely, but I found one message (5.1MB in size) which is clogging my pipeline up. I have these settings in server.properties: fetch.message.max.bytes=10485760 replica.fetch.max.bytes=10485760 message.max.bytes=10485760 fetch.size=10485760
I'm not getting any obvious errors in the logs and I can retrieve the large message with this command: kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning
I noticed recently after digging into this problem that the kafkaServer.out log is complaining that the fetch.message.max.bytes parameter is not valid: [2014-06-25 11:33:36,547] WARN Property fetch.message.max.bytes is not valid (kafka.utils.VerifiableProperties) [2014-06-25 11:33:36,547] WARN Property fetch.size is not valid (kafka.utils.VerifiableProperties) That seems like the most critical parameter for my needs. It is apparently not recognizing that it is a parameter despite it being listed on the configuration website (https://kafka.apache.org/08/configuration.html). I'm using 0.8.1.1. Any ideas?
in both the consumer.properties and the server.properties config files, then restarted kafka -> same problem. I'm following up on some of Guozhang's other suggestions now.
One thing I'm confused about (I should read the docs again) is what aspect of Kafka reads consumer.properties. If I'm using a different program (Spark streaming) as consumer, do any Kafka programs/services even read consumer.properties?
thanks On Fri, Jun 27, 2014 at 10:31 AM, Neha Narkhede <[EMAIL PROTECTED]> wrote:
thanks for the help. For others who happen upon this thread, the problem was indeed on the consumer side. Spark (0.9.1) needs a bit of help setting the Kafka properties for big messages.
// setup Kafka with manual parameters to allow big messaging //see spark/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala val kafkaParams = Map[String, String]( "zookeeper.connect" -> zkQuorum, "group.id" -> group, "zookeeper.connection.timeout.ms" -> "10000", "fetch.message.max.bytes" -> "10485760", // 10MB "fetch.size" -> "10485760") // not needed? val lines = kafka.KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topicpMap, StorageLevel.MEMORY_AND_DISK_SER_2).map(_._2)
sorry about all the messages on this topic for those of you who aren't getting digests On Fri, Jun 27, 2014 at 10:43 AM, Louis Clark <[EMAIL PROTECTED]> wrote:
NEW: Monitor These Apps!
Apache Lucene, Apache Solr and all other Apache Software Foundation project and their respective logos are trademarks of the Apache Software Foundation.
Elasticsearch, Kibana, Logstash, and Beats are trademarks of Elasticsearch BV, registered in the U.S. and in other countries. This site and Sematext Group is in no way affiliated with Elasticsearch BV.
Service operated by Sematext