Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Plain View
Flume >> mail # user >> Help with FlumeNG and interceptors please

Copy link to this message
Help with FlumeNG and interceptors please

I'm using flume 1.3.0-SNAPSHOT and have a question about interceptors.  I'm
just upgrading from 0.9.4, and redoing my configurations to work with NG.
 In OG, I used a decorator to tag each tailDir file with the path
representing where the source file originated, so that once in HDFS, the
directory structure would mirror the production system it came from.

For example, my agent to collector looked like this:

exec config pegslog11-udprodae09_1-agent 'tailDir("/mypath/myDir",
"MyFile.log", startFromEnd=false)' '{ value("*tailDir*", "/mypath/myDir")
=> { gzip => autoDFOChain } }'

then my collector to hdfs used the value of "*tailDir*" in the HDFS path:

exec config univac_collector_01 'autoCollectorSource'
 'collectorSink("hdfs://myNameNode/%{*tailDir*}/", "%{tailSrcFile}-",
3660000, raw)'

I was thinking I could replicate this using interceptors in NG.  On my
source, I define my interceptors as such:

# avro-myagent-source properties
myagent.sources.avro-udprodae09_1-source.type = avro
myagent.sources.avro-udprodae09_1-source.bind = myagent.domain.com
myagent.sources.avro-udprodae09_1-source.port = 10000
myagent.sources.avro-udprodae09_1-source.interceptors = path
myagent.sources.avro-udprodae09_1-source.interceptors.path.type = static
myagent.sources.avro-udprodae09_1-source.interceptors.path.key = path
myagent.sources.avro-udprodae09_1-source.interceptors.path.value =*

which I believe is correct. :)  My first question is how to use this static
header variable within the hdfs "type" property of another agent?  My hdfs
properties look like this:

# hdfs-myagent2-sink properties
myagent2.sinks.hdfs-myagent2-sink.type = hdfs
myagent2.sinks.hdfs-myagent2-sink.path = hdfs://namenode.domain.com/*[somehow
put "path" here]*
myagent2.sinks.hdfs-myagent2-sink.filePrefix = filename
myagent2.sinks.hdfs-myagent2-sink.rollInterval = 0
myagent2.sinks.hdfs-myagent2-sink.rollSize = 67108864
myagent2.sinks.hdfs-myagent2-sink.rollCount = 0
myagent2.sinks.hdfs-myagent2-sink.batchSize = 100
myagent2.sinks.hdfs-myagent2-sink.codeC = bzip2
myagent2.sinks.hdfs-myagent2-sink.fileType = CompressedStream

My second question is about line compression between agents.  In OG, I
would use the "gzip" tag to compress prior to sending data from an agent to
a collector.  How do I do something similar in NG?  Does avro-sink to
avro-source do it automagically for me behind the scenes perhaps?

Thanks so much for your time and help!
Chris Neal 2012-08-08, 20:45