Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
Flume >> mail # user >> Splitting the event flow in Flume


Copy link to this message
-
Splitting the event flow in Flume
Hello everyone,

My setup is the following: I am pulling xml messages from RabbitMQ via a
RabbitMQ Flume Source. Attached to this Source is an Interceptor which
parses the xml into csv and eventually the csv message is dumped to a csv
file by a the Sync. Both the Interceptor and Sync are my own custom
implementations.

A simple sketch of this would be:
rabbitmq -> source -> interceptor -> channel -> sync -> file.csv

This works fine!

Now I need to figure how to also dump the raw xml content to an xml file as
well as the parsed csv content.

I have devised several methods to achieve this. I would like some advice on
whether the methods are possible and which one is the best?

1. Pulling the xml twice from RabbitMQ

rabbitmq -> source1 -> interceptor -> channel -> sync -> file.csv
              -> source2 -> channel -> sync -> file.xml

2. Pulling the xml once but generate both a csv and an xml from the
Interceptor.

rabbitmq -> source1 -> interceptor -> channel -> sync -> file.csv
                                                   -> channel -> sync ->
file.xml

3. Pulling the xml once and having a fan out source and an Interceptor
before the sync.

rabbitmq -> source1 -> channel -> interceptor -> sync -> file.csv
                              -> channel -> sync -> file.xml

In my opinion option 3 would be the best since it doesn't require pulling
the xml twice from RabbitMQ and doesn't require any change in the code that
I wrote. The problem is that I'm not sure it is possible. I tried the
following config file without success:

agent1.sources = rabbitmq-source1
agent1.channels = memch1
agent1.sinks = Console

agent1.sources.rabbitmq-source1.channels = memch1
agent1.sources.rabbitmq-source1.type org.apache.flume.source.rabbitmq.RabbitMQSource
agent1.sources.rabbitmq-source1.hostname = localhost
agent1.sources.rabbitmq-source1.queuename = hello

agent1.sinks.Console.interceptors = interceptor1
agent1.sinks.Console.interceptors.interceptor1.type resilient.flume.MyInterceptor$Builder
agent1.sinks.Console.channel = memch1
agent1.sinks.Console.type = logger

agent1.channels.memch1.type = memory

Am I doing something wrong? or is option 3 not possible at all?

Thanks,
Alex
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB