Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Kafka, mail # dev - having problem with 0.8 gzip compression


Copy link to this message
-
Re: having problem with 0.8 gzip compression
Jun Rao 2013-07-09, 04:07
Did you start the consumer before the producer? Be default, the consumer
gets only the new data?

Thanks,

Jun
On Mon, Jul 8, 2013 at 2:53 PM, Scott Wang <
[EMAIL PROTECTED]> wrote:

> I am testing with Kafka 0.8 beta and having problem of receiving message in
> consumer.  There is no error so does anyone have any insights.  When I
> commented out the "compression.code" everything works fine.
>
> My producer:
> public class TestKafka08Prod {
>
>     public static void main(String [] args) {
>
>         Producer<Integer, String> producer = null;
>         try {
>             Properties props = new Properties();
>             props.put("metadata.broker.list", "localhost:9092");
>             props.put("serializer.class",
> "kafka.serializer.StringEncoder");
>             props.put("producer.type", "sync");
>             props.put("request.required.acks","1");
>             props.put("compression.codec", "gzip");
>             ProducerConfig config = new ProducerConfig(props);
>             producer = new Producer<Integer, String>(config);
>             int j=0;
>             for(int i=0; i<10; i++) {
>                 KeyedMessage<Integer, String> data = new
> KeyedMessage<Integer, String>("test-topic", "test-message: "+i+"
> "+System.currentTimeMillis());
>                 producer.send(data);
>
>             }
>
>         } catch (Exception e) {
>             System.out.println("Error happened: ");
>             e.printStackTrace();
>         } finally {
>             if(null != null) {
>                 producer.close();
>             }
>
>             System.out.println("Ened of Sending");
>         }
>
>         System.exit(0);
>     }
> }
>
>
> My consumer:
>
> public class TestKafka08Consumer {
>     public static void main(String [] args) throws UnknownHostException,
> SocketException {
>
>         Properties props = new Properties();
>         props.put("zookeeper.connect", "localhost:2181/kafka_0_8");
>         props.put("group.id", "test08ConsumerId");
>         props.put("zk.sessiontimeout.ms", "4000");
>         props.put("zk.synctime.ms", "2000");
>         props.put("autocommit.interval.ms", "1000");
>
>         ConsumerConfig consumerConfig = new ConsumerConfig(props);
>
>         ConsumerConnector consumerConnector =
> kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
>
>         String topic = "test-topic";
>         Map<String, Integer> topicCountMap = new HashMap<String,
> Integer>();
>         topicCountMap.put(topic, new Integer(1));
>         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> consumerConnector.createMessageStreams(topicCountMap);
>         KafkaStream<byte[], byte[]> stream =
>  consumerMap.get(topic).get(0);
>
>         ConsumerIterator<byte[], byte[]> it = stream.iterator();
>
>         int counter=0;
>         while(it.hasNext()) {
>             try {
>                 String fromPlatform = new String(it.next().message());
>                 System.out.println("The messages: "+fromPlatform);
>             } catch(Exception e) {
>                 e.printStackTrace();
>             }
>         }
>         System.out.println("SystemOut");
>     }
> }
>
>
> Thanks
>