|
|
+
Chris Neal 2012-08-02, 14:32
-
Re: Help with FlumeNG and interceptors pleaseChris Neal 2012-08-08, 20:45
I was able to answer my first question myself. Hopefully this will help
someone else who might be wondering the same thing: You can reference interceptors in down stream agents just like you could in OG, but using %{name}. Like this: Agent1: # avro-udprodae09_1-source properties #pegslog11.sources.avro-udprodae09_1-source.type = avro #pegslog11.sources.avro-udprodae09_1-source.bind = pegslog11.domain.com #pegslog11.sources.avro-udprodae09_1-source.port = 10000 pegslog11.sources.avro-udprodae09_1-source.type = exec pegslog11.sources.avro-udprodae09_1-source.command = tail -F /path_to_file/MyFile.log pegslog11.sources.avro-udprodae09_1-source.interceptors = path timestamp filename pegslog11.sources.avro-udprodae09_1-source.interceptors.path.type = static pegslog11.sources.avro-udprodae09_1-source.interceptors.path.key = path pegslog11.sources.avro-udprodae09_1-source.interceptors.path.value /path_to_file pegslog11.sources.avro-udprodae09_1-source.interceptors.timestamp.type timestamp pegslog11.sources.avro-udprodae09_1-source.interceptors.filename.type static pegslog11.sources.avro-udprodae09_1-source.interceptors.filename.key filename pegslog11.sources.avro-udprodae09_1-source.interceptors.filename.value MyFile.log Agent2: # hdfs-hadoopdn01-sink properties hadoopdn01.sinks.hdfs-hadoopdn01-sink.type = hdfs hadoopdn01.sinks.hdfs-hadoopdn01-sink.hdfs.path = hdfs:// hadoopnn01.domain.com/%{path} hadoopdn01.sinks.hdfs-hadoopdn01-sink.hdfs.filePrefix = %{filename}.%Y-%m-%d hadoopdn01.sinks.hdfs-hadoopdn01-sink.hdfs.rollInterval = 0 hadoopdn01.sinks.hdfs-hadoopdn01-sink.hdfs.rollSize = 2800072288 hadoopdn01.sinks.hdfs-hadoopdn01-sink.hdfs.rollCount = 0 hadoopdn01.sinks.hdfs-hadoopdn01-sink.hdfs.batchSize = 100 hadoopdn01.sinks.hdfs-hadoopdn01-sink.hdfs.threadsPoolSize = 128 hadoopdn01.sinks.hdfs-hadoopdn01-sink.hdfs.rollTimerPoolSize = 5 hadoopdn01.sinks.hdfs-hadoopdn01-sink.hdfs.codeC = bzip2 hadoopdn01.sinks.hdfs-hadoopdn01-sink.hdfs.fileType = CompressedStream This results in /path_to_file/MyFile.log being created in HDFS, as I wanted it to. :) I'm still not sure about the wire compression of the AvroSink to AvroSource connection. Maybe I'll dig into the code and see. Chris On Thu, Aug 2, 2012 at 9:32 AM, Chris Neal <[EMAIL PROTECTED]> wrote: > Hello, > > I'm using flume 1.3.0-SNAPSHOT and have a question about interceptors. > I'm just upgrading from 0.9.4, and redoing my configurations to work with > NG. In OG, I used a decorator to tag each tailDir file with the path > representing where the source file originated, so that once in HDFS, the > directory structure would mirror the production system it came from. > > For example, my agent to collector looked like this: > > exec config pegslog11-udprodae09_1-agent 'tailDir("/mypath/myDir", > "MyFile.log", startFromEnd=false)' '{ value("*tailDir*", "/mypath/myDir") > => { gzip => autoDFOChain } }' > > then my collector to hdfs used the value of "*tailDir*" in the HDFS path: > > exec config univac_collector_01 'autoCollectorSource' > 'collectorSink("hdfs://myNameNode/%{*tailDir*}/", "%{tailSrcFile}-", > 3660000, raw)' > > I was thinking I could replicate this using interceptors in NG. On my > source, I define my interceptors as such: > > # avro-myagent-source properties > myagent.sources.avro-udprodae09_1-source.type = avro > myagent.sources.avro-udprodae09_1-source.bind = myagent.domain.com > myagent.sources.avro-udprodae09_1-source.port = 10000 > myagent.sources.avro-udprodae09_1-source.interceptors = path > myagent.sources.avro-udprodae09_1-source.interceptors.path.type = static > myagent.sources.avro-udprodae09_1-source.interceptors.path.key = path > myagent.sources.avro-udprodae09_1-source.interceptors.path.value =* > /mypath/myDir* > > which I believe is correct. :) My first question is how to use this > static header variable within the hdfs "type" property of another agent? > My hdfs properties look like this: > > # hdfs-myagent2-sink properties > myagent2.sinks.hdfs-myagent2-sink.type = hdfs |