Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
Flume >> mail # user >> Flume - Avro source to avro sink


Copy link to this message
-
Flume - Avro source to avro sink
Hello,

   I am trying to configure a two node work flow.

Avro source ---> mem Channel ----> Avro sink --> (next node) avro source
--> mem channel ---> hdfs sink

#agent1 on  node1
 agent1.sources = avroSource
 agent1.channels = ch1
 agent1.sinks = avroSink

#agent2 on node2
 agent2.sources = avroSource2
 agent2.channels = ch2
 agent2.sinks = hdfsSink

# first source - avro
 agent1.sources.avroSource.type = avro
 agent1.sources.avroSource.bind = 0.0.0.0
 agent1.sources.avroSource.port = 41414
 agent1.sources.avroSource.channels = ch1

# first sink - avro
 agent1.sinks.avroSink.type = avro
 agent1.sinks.avroSink.hostname = 0.0.0.0
 agent1.sinks.avroSink.port = 41415
 agent1.sinks.avroSink.channel = ch1

# second source - avro
 agent2.sources.avroSource2.type = avro
 agent2.sources.avroSource2.bind = node2 ip
 agent2.sources.avroSource2.port = 41415
 agent2.sources.avroSource2.channel = ch2

# second sink - hdfs
 agent2.sinks.hdfsSink.type = hdfs
 agent2.sinks.hdfsSink.channel = ch2
agent2.sinks.hdfsSink.hdfs.writeFormat = Text
 agent2.sinks.hdfsSink.hdfs.filePrefix =  testing
 agent2.sinks.hdfsSink.hdfs.path = hdfs://node2:9000/flume/

# channels
 agent1.channels.ch1.type = memory
 agent1.channels.ch1.capacity = 1000
 agent2.channels.ch2.type = memory
 agent2.channels.ch2.capacity = 1000
Am getting errors with the ports. Could someone please check if I have
connected the sink in node1 to source in node 2 properly?

13/03/24 04:32:16 INFO source.AvroSource: Starting Avro source avroSource:
{ bindAddress: 0.0.0.0, port: 41414 }...
13/03/24 04:32:16 INFO instrumentation.MonitoredCounterGroup: Monitoried
counter group for type: SINK, name: avroSink, registered successfully.
13/03/24 04:32:16 INFO instrumentation.MonitoredCounterGroup: Component
type: SINK, name: avroSink started
13/03/24 04:32:16 INFO sink.AvroSink: Avro sink avroSink: Building
RpcClient with hostname: 0.0.0.0, port: 41415
13/03/24 04:32:16 WARN sink.AvroSink: Unable to create avro client using
hostname: 0.0.0.0, port: 41415
org.apache.flume.FlumeException: NettyAvroRpcClient { host: 0.0.0.0, port:
41415 }: RPC connection error
        at
org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:117)
        at
org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:93)
        at
org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:507)
        at
org.apache.flume.api.RpcClientFactory.getInstance(RpcClientFactory.java:88)
        at
org.apache.flume.sink.AvroSink.createConnection(AvroSink.java:182)
        at org.apache.flume.sink.AvroSink.start(AvroSink.java:242)
        at
org.apache.flume.sink.DefaultSinkProcessor.start(DefaultSinkProcessor.java:46)
        at org.apache.flume.SinkRunner.start(SinkRunner.java:79)
        at
org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:236)
        at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:452)
        at
java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:328)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:161)
        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:109)
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB