Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # dev >> [jira] [Updated] (KAFKA-807) LineMessageReader doesn't correctly parse the key separator


Copy link to this message
-
[jira] [Updated] (KAFKA-807) LineMessageReader doesn't correctly parse the key separator

     [ https://issues.apache.org/jira/browse/KAFKA-807?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jun Rao updated KAFKA-807:
--------------------------

    Resolution: Fixed
      Assignee: Dragos Manolescu
        Status: Resolved  (was: Patch Available)

Thanks for the patch. Committed the change to ConsoleProducer to 0.8.
                
> LineMessageReader doesn't correctly parse the key separator
> -----------------------------------------------------------
>
>                 Key: KAFKA-807
>                 URL: https://issues.apache.org/jira/browse/KAFKA-807
>             Project: Kafka
>          Issue Type: Bug
>          Components: tools
>    Affects Versions: 0.8
>            Reporter: Dragos Manolescu
>            Assignee: Dragos Manolescu
>            Priority: Trivial
>              Labels: patch, producer
>             Fix For: 0.8
>
>
> Typo in key name prevents extracting the key separator. The patch follows; what's the recommended way to submit patches?
> Index: core/src/main/scala/kafka/producer/ConsoleProducer.scala
> IDEA additional info:
> Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
> <+>UTF-8
> Subsystem: com.intellij.openapi.diff.impl.patch.BaseRevisionTextPatchEP
> <+>/**\n * Licensed to the Apache Software Foundation (ASF) under one or more\n * contributor license agreements.  See the NOTICE file distributed with\n * this work for additional information regarding copyright ownership.\n * The ASF licenses this file to You under the Apache License, Version 2.0\n * (the \"License\"); you may not use this file except in compliance with\n * the License.  You may obtain a copy of the License at\n * \n *    http://www.apache.org/licenses/LICENSE-2.0n *\n * Unless required by applicable law or agreed to in writing, software\n * distributed under the License is distributed on an \"AS IS\" BASIS,\n * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n * See the License for the specific language governing permissions and\n * limitations under the License.\n */\n\npackage kafka.producer\n\nimport scala.collection.JavaConversions._\nimport joptsimple._\nimport java.util.Properties\nimport java.io._\nimport kafka.common._\nimport kafka.message._\nimport kafka.serializer._\n\nobject ConsoleProducer { \n\n  def main(args: Array[String]) { \n    val parser = new OptionParser\n    val topicOpt = parser.accepts(\"topic\", \"REQUIRED: The topic id to produce messages to.\")\n                           .withRequiredArg\n                           .describedAs(\"topic\")\n                           .ofType(classOf[String])\n    val brokerListOpt = parser.accepts(\"broker-list\", \"REQUIRED: The broker list string in the form HOST1:PORT1,HOST2:PORT2.\")\n                           .withRequiredArg\n                           .describedAs(\"broker-list\")\n                           .ofType(classOf[String])\n    val syncOpt = parser.accepts(\"sync\", \"If set message send requests to the brokers are synchronously, one at a time as they arrive.\")\n    val compressOpt = parser.accepts(\"compress\", \"If set, messages batches are sent compressed\")\n    val batchSizeOpt = parser.accepts(\"batch-size\", \"Number of messages to send in a single batch if they are not being sent synchronously.\")\n                             .withRequiredArg\n                             .describedAs(\"size\")\n                             .ofType(classOf[java.lang.Integer])\n                             .defaultsTo(200)\n    val sendTimeoutOpt = parser.accepts(\"timeout\", \"If set and the producer is running in asynchronous mode, this gives the maximum amount of time\" + \n                                                   \" a message will queue awaiting suffient batch size. The value is given in ms.\")\n                               .withRequiredArg\n                               .describedAs(\"timeout_ms\")\n                               .ofType(classOf[java.lang.Long])\n                               .defaultsTo(1000)\n    val queueSizeOpt = parser.accepts(\"queue-size\", \"If set and the producer is running in asynchronous mode, this gives the maximum amount of \" + \n                                                   \" messages will queue awaiting suffient batch size.\")\n                               .withRequiredArg\n                               .describedAs(\"queue_size\")\n                               .ofType(classOf[java.lang.Long])\n                               .defaultsTo(10000)\n    val queueEnqueueTimeoutMsOpt = parser.accepts(\"queue-enqueuetimeout-ms\", \"Timeout for event enqueue\")\n                               .withRequiredArg\n                               .describedAs(\"queue enqueuetimeout ms\")\n                               .ofType(classOf[java.lang.Long])\n                               .defaultsTo(0)\n    val requestRequiredAcksOpt = parser.accepts(\"request-required-acks\", \"The required acks of the producer requests\")\n                               .withRequiredArg\n                               .describedAs(\"request required acks\")\n                               .ofType(classOf[java.lang.Integer])\n                               .defaultsTo(0)\n    val requestTimeoutMsOpt = parser.accepts(\"request-timeout-ms\", \"The ack timeout of the producer requests. Value must be non-negative and non-zero\")\n                               .withRequiredArg\n                               .describedAs(\"request timeout ms\")\n                               .ofType(classOf[java.lang.Integer])\n                               .defaultsTo(1500)\n    val valueEncoderOpt = parser.accepts(\"value-serializer\", \"The class name of the message encoder implementation to use for serializing values.\")\n                                 .withRequiredArg\n                                 .describedAs(\"encoder_class\")\n                                 .ofType(classOf[java.lang.String])\n                      
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB