Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka >> mail # dev >> [jira] [Commented] (KAFKA-270)  sync producer / consumer test producing lot of kafka server exceptions & not getting the throughput mentioned here http://incubator.apache.org/kafka/performance.html


Copy link to this message
-
[jira] [Commented] (KAFKA-270)  sync producer / consumer test producing lot of kafka server exceptions & not getting the throughput mentioned here http://incubator.apache.org/kafka/performance.html

    [ https://issues.apache.org/jira/browse/KAFKA-270?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13820938#comment-13820938 ]

wangxu(alvin) commented on KAFKA-270:
-------------------------------------

can also try closing the producer.
I would get the below error in broker console without closing producer. If I close the producer, the error will never appear.
[2013-11-05 14:07:34,097] ERROR Closing socket for /192.168.30.114 because of error (kafka.network.Processor)
java.io.IOException: Connection reset by peer
        at sun.nio.ch.FileDispatcher.read0(Native Method)
        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:198)
        at sun.nio.ch.IOUtil.read(IOUtil.java:171)
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:243)
        at kafka.utils.Utils$.read(Utils.scala:538)
        at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
        at kafka.network.Processor.read(SocketServer.scala:311)
        at kafka.network.Processor.run(SocketServer.scala:214)
        at java.lang.Thread.run(Thread.java:662)

>  sync producer / consumer test producing lot of kafka server exceptions & not getting the throughput mentioned here http://incubator.apache.org/kafka/performance.html
> ----------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-270
>                 URL: https://issues.apache.org/jira/browse/KAFKA-270
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, core
>    Affects Versions: 0.7
>         Environment: Linux 2.6.18-238.1.1.el5 , x86_64 x86_64 x86_64 GNU/Linux
> ext3 file system with raid10
>            Reporter: Praveen Ramachandra
>              Labels: clients, core, newdev, performance
>
> I am getting ridiculously low producer and consumer throughput.
> I am using default config values for producer, consumer and broker
> which are very good starting points, as they should yield sufficient
> throughput.
> Appreciate if you point what settings/changes-in-code needs to be done
> to get higher throughput.
> I changed num.partitions in the server.config to 10.
> Please look below for exception and error messages from the server
> BTW: I am running server, zookeeper, producer, consumer on the same host.
> ====Consumer Code=====
>        long startTime = System.currentTimeMillis();
>        long endTime = startTime + runDuration*1000l;
>        Properties props = new Properties();
>        props.put("zk.connect", "localhost:2181");
>        props.put("groupid", subscriptionName); // to support multiple
> subscribers
>        props.put("zk.sessiontimeout.ms", "400");
>        props.put("zk.synctime.ms", "200");
>        props.put("autocommit.interval.ms", "1000");
>        consConfig =  new ConsumerConfig(props);
>        consumer =
> kafka.consumer.Consumer.createJavaConsumerConnector(consConfig);
>        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
>        topicCountMap.put(topicName, new Integer(1)); // has the topic
> to which to subscribe to
>        Map<String, List<KafkaMessageStream<Message>>> consumerMap =
> consumer.createMessageStreams(topicCountMap);
>        KafkaMessageStream<Message> stream =  consumerMap.get(topicName).get(0);
>        ConsumerIterator<Message> it = stream.iterator();
>        while(System.currentTimeMillis() <= endTime )
>        {
>            it.next(); // discard data
>            consumeMsgCount.incrementAndGet();
>        }
> ====End consumer CODE============================
> =====Producer CODE========================
>        props.put("serializer.class", "kafka.serializer.StringEncoder");
>        props.put("zk.connect", "localhost:2181");
>            // Use random partitioner. Don't need the key type. Just

This message was sent by Atlassian JIRA
(v6.1#6144)