Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # dev >> [jira] [Updated] (KAFKA-807) LineMessageReader doesn't correctly parse the key separator

Copy link to this message
[jira] [Updated] (KAFKA-807) LineMessageReader doesn't correctly parse the key separator

     [ https://issues.apache.org/jira/browse/KAFKA-807?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jun Rao updated KAFKA-807:

    Resolution: Fixed
      Assignee: Dragos Manolescu
        Status: Resolved  (was: Patch Available)

Thanks for the patch. Committed the change to ConsoleProducer to 0.8.
> LineMessageReader doesn't correctly parse the key separator
> -----------------------------------------------------------
>                 Key: KAFKA-807
>                 URL: https://issues.apache.org/jira/browse/KAFKA-807
>             Project: Kafka
>          Issue Type: Bug
>          Components: tools
>    Affects Versions: 0.8
>            Reporter: Dragos Manolescu
>            Assignee: Dragos Manolescu
>            Priority: Trivial
>              Labels: patch, producer
>             Fix For: 0.8
> Typo in key name prevents extracting the key separator. The patch follows; what's the recommended way to submit patches?
> Index: core/src/main/scala/kafka/producer/ConsoleProducer.scala
> IDEA additional info:
> Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
> <+>UTF-8
> Subsystem: com.intellij.openapi.diff.impl.patch.BaseRevisionTextPatchEP
> <+>/**\n * Licensed to the Apache Software Foundation (ASF) under one or more\n * contributor license agreements.  See the NOTICE file distributed with\n * this work for additional information regarding copyright ownership.\n * The ASF licenses this file to You under the Apache License, Version 2.0\n * (the \"License\"); you may not use this file except in compliance with\n * the License.  You may obtain a copy of the License at\n * \n *    http://www.apache.org/licenses/LICENSE-2.0n *\n * Unless required by applicable law or agreed to in writing, software\n * distributed under the License is distributed on an \"AS IS\" BASIS,\n * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n * See the License for the specific language governing permissions and\n * limitations under the License.\n */\n\npackage kafka.producer\n\nimport scala.collection.JavaConversions._\nimport joptsimple._\nimport java.util.Properties\nimport java.io._\nimport kafka.common._\nimport kafka.message._\nimport kafka.serializer._\n\nobject ConsoleProducer { \n\n  def main(args: Array[String]) { \n    val parser = new OptionParser\n    val topicOpt = parser.accepts(\"topic\", \"REQUIRED: The topic id to produce messages to.\")\n                           .withRequiredArg\n                           .describedAs(\"topic\")\n                           .ofType(classOf[String])\n    val brokerListOpt = parser.accepts(\"broker-list\", \"REQUIRED: The broker list string in the form HOST1:PORT1,HOST2:PORT2.\")\n                           .withRequiredArg\n                           .describedAs(\"broker-list\")\n                           .ofType(classOf[String])\n    val syncOpt = parser.accepts(\"sync\", \"If set message send requests to the brokers are synchronously, one at a time as they arrive.\")\n    val compressOpt = parser.accepts(\"compress\", \"If set, messages batches are sent compressed\")\n    val batchSizeOpt = parser.accepts(\"batch-size\", \"Number of messages to send in a single batch if they are not being sent synchronously.\")\n                             .withRequiredArg\n                             .describedAs(\"size\")\n                             .ofType(classOf[java.lang.Integer])\n                             .defaultsTo(200)\n    val sendTimeoutOpt = parser.accepts(\"timeout\", \"If set and the producer is running in asynchronous mode, this gives the maximum amount of time\" + \n                                                   \" a message will queue awaiting suffient batch size. The value is given in ms.\")\n                               .withRequiredArg\n                               .describedAs(\"timeout_ms\")\n                               .ofType(classOf[java.lang.Long])\n                               .defaultsTo(1000)\n    val queueSizeOpt = parser.accepts(\"queue-size\", \"If set and the producer is running in asynchronous mode, this gives the maximum amount of \" + \n                                                   \" messages will queue awaiting suffient batch size.\")\n                               .withRequiredArg\n                               .describedAs(\"queue_size\")\n                               .ofType(classOf[java.lang.Long])\n                               .defaultsTo(10000)\n    val queueEnqueueTimeoutMsOpt = parser.accepts(\"queue-enqueuetimeout-ms\", \"Timeout for event enqueue\")\n                               .withRequiredArg\n                               .describedAs(\"queue enqueuetimeout ms\")\n                               .ofType(classOf[java.lang.Long])\n                               .defaultsTo(0)\n    val requestRequiredAcksOpt = parser.accepts(\"request-required-acks\", \"The required acks of the producer requests\")\n                               .withRequiredArg\n                               .describedAs(\"request required acks\")\n                               .ofType(classOf[java.lang.Integer])\n                               .defaultsTo(0)\n    val requestTimeoutMsOpt = parser.accepts(\"request-timeout-ms\", \"The ack timeout of the producer requests. Value must be non-negative and non-zero\")\n                               .withRequiredArg\n                               .describedAs(\"request timeout ms\")\n                               .ofType(classOf[java.lang.Integer])\n                               .defaultsTo(1500)\n    val valueEncoderOpt = parser.accepts(\"value-serializer\", \"The class name of the message encoder implementation to use for serializing values.\")\n                                 .withRequiredArg\n                                 .describedAs(\"encoder_class\")\n                                 .ofType(classOf[java.lang.String])\n