Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Plain View
Kafka, mail # user - Unable to send and consume compressed events.


+
Lu Xuechao 2013-08-29, 08:40
+
Paul Mackles 2013-08-29, 10:48
+
Lu Xuechao 2013-08-29, 13:24
+
Lu Xuechao 2013-08-29, 13:29
+
Jun Rao 2013-08-29, 14:29
+
Lu Xuechao 2013-08-30, 00:41
+
Lu Xuechao 2013-08-30, 02:25
+
Jun Rao 2013-08-30, 03:57
+
Lu Xuechao 2013-08-30, 05:16
+
Lu Xuechao 2013-08-30, 06:30
+
Joe Stein 2013-08-30, 07:23
+
Lu Xuechao 2013-08-30, 09:03
+
Jun Rao 2013-08-30, 14:32
Copy link to this message
-
Re: Unable to send and consume compressed events.
Jay Kreps 2013-08-30, 16:32
This seems like more of a bug then a FAQ, no? We are swallowing the
exception...

-Jay
On Thu, Aug 29, 2013 at 11:30 PM, Lu Xuechao <[EMAIL PROTECTED]> wrote:

> Hi Jun,
>
> Thanks for you help. Finally, I found the reason by enabling producer side
> DEBUG info output. The snappy jar is not included in the classpath. Added
> it and it worked.
>
> Thanks again.
>
>
>
>
> On Fri, Aug 30, 2013 at 12:53 PM, Lu Xuechao <[EMAIL PROTECTED]> wrote:
>
> > No.
> >
> >
> > On Fri, Aug 30, 2013 at 11:57 AM, Jun Rao <[EMAIL PROTECTED]> wrote:
> >
> >> These are the metadata requests. Do you see Producer requests from your
> >> client?
> >>
> >> Thanks,
> >>
> >> Jun
> >>
> >>
> >> On Thu, Aug 29, 2013 at 5:40 PM, Lu Xuechao <[EMAIL PROTECTED]> wrote:
> >>
> >> > After I sent 1,000 compressed events, I saw these messages in broker's
> >> log
> >> > files:
> >> >
> >> > in kafka-request.log
> >> >
> >> > [2013-08-30 08:38:18,713] TRACE Processor 6 received request : Name:
> >> > TopicMetadataRequest; Version: 0; CorrelationId: 0; ClientId: ;
> Topics:
> >> > topic1 (kafka.network.RequestChannel$)
> >> > [2013-08-30 08:38:18,718] TRACE Completed request:Name:
> >> > TopicMetadataRequest; Version: 0; CorrelationId: 0; ClientId: ;
> Topics:
> >> > topic1 from client
> >> > /127.0.0.1:64238
> >> > ;totalTime:5,queueTime:3,localTime:1,remoteTime:0,sendTime:1
> >> > (kafka.request.logger)
> >> >
> >> >
> >> > in server.log
> >> >
> >> > [2013-08-30 08:38:18,759] INFO Closing socket connection to /
> 127.0.0.1.
> >> > (kafka.network.Processor)
> >> >
> >> >
> >> > any ideas?  Thanks.
> >> >
> >> >
> >> > On Thu, Aug 29, 2013 at 10:28 PM, Jun Rao <[EMAIL PROTECTED]> wrote:
> >> >
> >> > > Did you see any error in the producer log? Did the broker receive
> the
> >> > > produce request (you can look at the request log in the broker)?
> >> > >
> >> > > Thanks,
> >> > >
> >> > > Jun
> >> > >
> >> > >
> >> > > On Thu, Aug 29, 2013 at 6:29 AM, Lu Xuechao <[EMAIL PROTECTED]>
> wrote:
> >> > >
> >> > > > Let me post my test code here. I could see producer.send(data);
> >> > returned
> >> > > > with no error.
> >> > > >
> >> > > > public class TestProducer extends Thread {
> >> > > >     private final Producer<String, String> producer;
> >> > > >
> >> > > >     private final int m_events;
> >> > > >     private final int m_threadNumber;
> >> > > >
> >> > > >     private static String msg = StringUtils.rightPad("", 1000,
> '*');
> >> > > >
> >> > > >     public TestProducer(int threadNumber, int events) {
> >> > > >         m_threadNumber = threadNumber;
> >> > > >         m_events = events;
> >> > > >
> >> > > >         Properties props = new Properties();
> >> > > >         props.put("serializer.class",
> >> > > KafkaProperties.p_serializer_class);
> >> > > >         props.put("metadata.broker.list",
> >> > > > KafkaProperties.p_metadata_broker_list);
> >> > > >         props.put("partitioner.class",
> >> > > > KafkaProperties.p_partitioner_class);
> >> > > >         props.put("queue.enqueue.timeout.ms",
> >> > > > KafkaProperties.p_queue_enqueue_timeout);
> >> > > >         props.put("request.required.acks",
> >> > > > KafkaProperties.p_request_required_acks);
> >> > > >         props.put("producer.type",
> KafkaProperties.p_producer_type);
> >> > > >
> >> > > >         props.put("batch.num.messages",
> >> KafkaProperties.p_batch_num);
> >> > > >
> >> > > >         props.put("compression.codec",
> >> > > > KafkaProperties.p_compression_codec);
> >> > > >
> >> > > >         ProducerConfig config = new ProducerConfig(props);
> >> > > >         producer = new Producer<String, String>(config);
> >> > > >     }
> >> > > >
> >> > > >     @Override
> >> > > >     public void run() {
> >> > > >         long start;
> >> > > >         long num = 0;
> >> > > >         System.out.println(new Date() + " - Message sent thread "
> +
> >> > > > m_threadNumber + " started.");
> >> > > >         while (true) {
> >> > > >             start = System.currentTimeMillis();
> >> > >
 
+
Joe Stein 2013-08-30, 16:52
+
Joe Stein 2013-08-30, 16:57
+
Jun Rao 2013-08-31, 03:49