Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Flume, mail # user - Flume-ng Avro RPC and Python


Copy link to this message
-
Re: Flume-ng Avro RPC and Python
Brock Noland 2012-12-18, 21:16
Hi,

This is because Flume uses the NettyTransceiver and pyton avro only
supports HTTPTransciever.

This is not using avro, but you should be able to send JSON events to
the HTTPSource (http://flume.apache.org/FlumeUserGuide.html#http-source).

Brock

On Tue, Dec 18, 2012 at 3:13 PM, John Michaels <[EMAIL PROTECTED]> wrote:
> Hi,
>
> I'm have a flume-ng source listening on port 45454, and I attempt to use the
> following python script to send an event to the source, but receive the
> netty exception below in the flume logs.
>
> Has anyone had any success sending events via python? Can anyone suggest a
> workaround or maybe I'm doing something wrong?
>
> Thanks,
> John
>
>
> server_addr = ('localhost', 45454)
> PROTOCOL = protocol.parse(open("flume.avpr").read())
>
> def sendData():
>         client = ipc.HTTPTransceiver(server_addr[0], server_addr[1])
>         requestor = ipc.Requestor(PROTOCOL, client)
>
>         event = dict()
>         event['headers'] = {'table_name': 'foo', 'database': 'bar'}
> event['body'] = bytes('hello')
>         params = dict()
>         params['event'] = event        print("Result : " +
> requestor.request('append', params))
>         client.close()
> if __name__ == '__main__':
>         sendData()
>  18 Dec 2012 21:06:46,678 WARN  [New I/O server worker #1-5]
> (org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.exceptionCaught:201)
> - Unexpected exception from downstream.
> org.apache.avro.AvroRuntimeException: Excessively large list allocation
> request detected: 539959368 items! Connection closed.        at
> org.apache.avro.ipc.NettyTransportCodec$NettyFrameDecoder.decodePackHeader(NettyTransportCodec.java:167)
>         at
> org.apache.avro.ipc.NettyTransportCodec$NettyFrameDecoder.decode(NettyTransportCodec.java:139)
> at
> org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:282)
>         at
> org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:216)
> at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274)
>         at
> org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261)
> at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351)
>         at
> org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282)
> at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>         at java.lang.Thread.run(Thread.java:662)

--
Apache MRUnit - Unit testing MapReduce - http://incubator.apache.org/mrunit/