|
|
-
Help with FlumeNG and interceptors please
Chris Neal 2012-08-02, 14:32
Hello,
I'm using flume 1.3.0-SNAPSHOT and have a question about interceptors. I'm just upgrading from 0.9.4, and redoing my configurations to work with NG. In OG, I used a decorator to tag each tailDir file with the path representing where the source file originated, so that once in HDFS, the directory structure would mirror the production system it came from.
For example, my agent to collector looked like this:
exec config pegslog11-udprodae09_1-agent 'tailDir("/mypath/myDir", "MyFile.log", startFromEnd=false)' '{ value("*tailDir*", "/mypath/myDir") => { gzip => autoDFOChain } }'
then my collector to hdfs used the value of "*tailDir*" in the HDFS path:
exec config univac_collector_01 'autoCollectorSource' 'collectorSink("hdfs://myNameNode/%{*tailDir*}/", "%{tailSrcFile}-", 3660000, raw)'
I was thinking I could replicate this using interceptors in NG. On my source, I define my interceptors as such:
# avro-myagent-source properties myagent.sources.avro-udprodae09_1-source.type = avro myagent.sources.avro-udprodae09_1-source.bind = myagent.domain.com myagent.sources.avro-udprodae09_1-source.port = 10000 myagent.sources.avro-udprodae09_1-source.interceptors = path myagent.sources.avro-udprodae09_1-source.interceptors.path.type = static myagent.sources.avro-udprodae09_1-source.interceptors.path.key = path myagent.sources.avro-udprodae09_1-source.interceptors.path.value =* /mypath/myDir*
which I believe is correct. :) My first question is how to use this static header variable within the hdfs "type" property of another agent? My hdfs properties look like this:
# hdfs-myagent2-sink properties myagent2.sinks.hdfs-myagent2-sink.type = hdfs myagent2.sinks.hdfs-myagent2-sink.path = hdfs://namenode.domain.com/*[somehow put "path" here]* myagent2.sinks.hdfs-myagent2-sink.filePrefix = filename myagent2.sinks.hdfs-myagent2-sink.rollInterval = 0 myagent2.sinks.hdfs-myagent2-sink.rollSize = 67108864 myagent2.sinks.hdfs-myagent2-sink.rollCount = 0 myagent2.sinks.hdfs-myagent2-sink.batchSize = 100 myagent2.sinks.hdfs-myagent2-sink.codeC = bzip2 myagent2.sinks.hdfs-myagent2-sink.fileType = CompressedStream
My second question is about line compression between agents. In OG, I would use the "gzip" tag to compress prior to sending data from an agent to a collector. How do I do something similar in NG? Does avro-sink to avro-source do it automagically for me behind the scenes perhaps?
Thanks so much for your time and help!
+
Chris Neal 2012-08-02, 14:32
-
Re: Help with FlumeNG and interceptors please
Chris Neal 2012-08-08, 20:45
I was able to answer my first question myself. Hopefully this will help someone else who might be wondering the same thing:
You can reference interceptors in down stream agents just like you could in OG, but using %{name}. Like this:
Agent1: # avro-udprodae09_1-source properties #pegslog11.sources.avro-udprodae09_1-source.type = avro #pegslog11.sources.avro-udprodae09_1-source.bind = pegslog11.domain.com #pegslog11.sources.avro-udprodae09_1-source.port = 10000 pegslog11.sources.avro-udprodae09_1-source.type = exec pegslog11.sources.avro-udprodae09_1-source.command = tail -F /path_to_file/MyFile.log pegslog11.sources.avro-udprodae09_1-source.interceptors = path timestamp filename pegslog11.sources.avro-udprodae09_1-source.interceptors.path.type = static pegslog11.sources.avro-udprodae09_1-source.interceptors.path.key = path pegslog11.sources.avro-udprodae09_1-source.interceptors.path.value /path_to_file pegslog11.sources.avro-udprodae09_1-source.interceptors.timestamp.type timestamp pegslog11.sources.avro-udprodae09_1-source.interceptors.filename.type static pegslog11.sources.avro-udprodae09_1-source.interceptors.filename.key filename pegslog11.sources.avro-udprodae09_1-source.interceptors.filename.value MyFile.log
Agent2: # hdfs-hadoopdn01-sink properties hadoopdn01.sinks.hdfs-hadoopdn01-sink.type = hdfs hadoopdn01.sinks.hdfs-hadoopdn01-sink.hdfs.path = hdfs:// hadoopnn01.domain.com/%{path} hadoopdn01.sinks.hdfs-hadoopdn01-sink.hdfs.filePrefix = %{filename}.%Y-%m-%d hadoopdn01.sinks.hdfs-hadoopdn01-sink.hdfs.rollInterval = 0 hadoopdn01.sinks.hdfs-hadoopdn01-sink.hdfs.rollSize = 2800072288 hadoopdn01.sinks.hdfs-hadoopdn01-sink.hdfs.rollCount = 0 hadoopdn01.sinks.hdfs-hadoopdn01-sink.hdfs.batchSize = 100 hadoopdn01.sinks.hdfs-hadoopdn01-sink.hdfs.threadsPoolSize = 128 hadoopdn01.sinks.hdfs-hadoopdn01-sink.hdfs.rollTimerPoolSize = 5 hadoopdn01.sinks.hdfs-hadoopdn01-sink.hdfs.codeC = bzip2 hadoopdn01.sinks.hdfs-hadoopdn01-sink.hdfs.fileType = CompressedStream
This results in /path_to_file/MyFile.log being created in HDFS, as I wanted it to. :)
I'm still not sure about the wire compression of the AvroSink to AvroSource connection. Maybe I'll dig into the code and see.
Chris
On Thu, Aug 2, 2012 at 9:32 AM, Chris Neal <[EMAIL PROTECTED]> wrote:
> Hello, > > I'm using flume 1.3.0-SNAPSHOT and have a question about interceptors. > I'm just upgrading from 0.9.4, and redoing my configurations to work with > NG. In OG, I used a decorator to tag each tailDir file with the path > representing where the source file originated, so that once in HDFS, the > directory structure would mirror the production system it came from. > > For example, my agent to collector looked like this: > > exec config pegslog11-udprodae09_1-agent 'tailDir("/mypath/myDir", > "MyFile.log", startFromEnd=false)' '{ value("*tailDir*", "/mypath/myDir") > => { gzip => autoDFOChain } }' > > then my collector to hdfs used the value of "*tailDir*" in the HDFS path: > > exec config univac_collector_01 'autoCollectorSource' > 'collectorSink("hdfs://myNameNode/%{*tailDir*}/", "%{tailSrcFile}-", > 3660000, raw)' > > I was thinking I could replicate this using interceptors in NG. On my > source, I define my interceptors as such: > > # avro-myagent-source properties > myagent.sources.avro-udprodae09_1-source.type = avro > myagent.sources.avro-udprodae09_1-source.bind = myagent.domain.com > myagent.sources.avro-udprodae09_1-source.port = 10000 > myagent.sources.avro-udprodae09_1-source.interceptors = path > myagent.sources.avro-udprodae09_1-source.interceptors.path.type = static > myagent.sources.avro-udprodae09_1-source.interceptors.path.key = path > myagent.sources.avro-udprodae09_1-source.interceptors.path.value =* > /mypath/myDir* > > which I believe is correct. :) My first question is how to use this > static header variable within the hdfs "type" property of another agent? > My hdfs properties look like this: > > # hdfs-myagent2-sink properties > myagent2.sinks.hdfs-myagent2-sink.type = hdfs
+
Chris Neal 2012-08-08, 20:45
|
|