Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Flume >> mail # user >> Help with FlumeNG and interceptors please


Copy link to this message
-
Re: Help with FlumeNG and interceptors please
I was able to answer my first question myself.  Hopefully this will help
someone else who might be wondering the same thing:

You can reference interceptors in down stream agents just like you could in
OG, but using %{name}.  Like this:

Agent1:
# avro-udprodae09_1-source properties
#pegslog11.sources.avro-udprodae09_1-source.type = avro
#pegslog11.sources.avro-udprodae09_1-source.bind = pegslog11.domain.com
#pegslog11.sources.avro-udprodae09_1-source.port = 10000
pegslog11.sources.avro-udprodae09_1-source.type = exec
pegslog11.sources.avro-udprodae09_1-source.command = tail -F
/path_to_file/MyFile.log
pegslog11.sources.avro-udprodae09_1-source.interceptors = path timestamp
filename
pegslog11.sources.avro-udprodae09_1-source.interceptors.path.type = static
pegslog11.sources.avro-udprodae09_1-source.interceptors.path.key = path
pegslog11.sources.avro-udprodae09_1-source.interceptors.path.value /path_to_file
pegslog11.sources.avro-udprodae09_1-source.interceptors.timestamp.type timestamp
pegslog11.sources.avro-udprodae09_1-source.interceptors.filename.type static
pegslog11.sources.avro-udprodae09_1-source.interceptors.filename.key filename
pegslog11.sources.avro-udprodae09_1-source.interceptors.filename.value MyFile.log

Agent2:
# hdfs-hadoopdn01-sink properties
hadoopdn01.sinks.hdfs-hadoopdn01-sink.type = hdfs
hadoopdn01.sinks.hdfs-hadoopdn01-sink.hdfs.path = hdfs://
hadoopnn01.domain.com/%{path}
hadoopdn01.sinks.hdfs-hadoopdn01-sink.hdfs.filePrefix = %{filename}.%Y-%m-%d
hadoopdn01.sinks.hdfs-hadoopdn01-sink.hdfs.rollInterval = 0
hadoopdn01.sinks.hdfs-hadoopdn01-sink.hdfs.rollSize = 2800072288
hadoopdn01.sinks.hdfs-hadoopdn01-sink.hdfs.rollCount = 0
hadoopdn01.sinks.hdfs-hadoopdn01-sink.hdfs.batchSize = 100
hadoopdn01.sinks.hdfs-hadoopdn01-sink.hdfs.threadsPoolSize = 128
hadoopdn01.sinks.hdfs-hadoopdn01-sink.hdfs.rollTimerPoolSize = 5
hadoopdn01.sinks.hdfs-hadoopdn01-sink.hdfs.codeC = bzip2
hadoopdn01.sinks.hdfs-hadoopdn01-sink.hdfs.fileType = CompressedStream

This results in /path_to_file/MyFile.log being created in HDFS, as I wanted
it to. :)

I'm still not sure about the wire compression of the AvroSink to AvroSource
connection.  Maybe I'll dig into the code and see.

Chris

On Thu, Aug 2, 2012 at 9:32 AM, Chris Neal <[EMAIL PROTECTED]> wrote:

> Hello,
>
> I'm using flume 1.3.0-SNAPSHOT and have a question about interceptors.
>  I'm just upgrading from 0.9.4, and redoing my configurations to work with
> NG.  In OG, I used a decorator to tag each tailDir file with the path
> representing where the source file originated, so that once in HDFS, the
> directory structure would mirror the production system it came from.
>
> For example, my agent to collector looked like this:
>
> exec config pegslog11-udprodae09_1-agent 'tailDir("/mypath/myDir",
> "MyFile.log", startFromEnd=false)' '{ value("*tailDir*", "/mypath/myDir")
> => { gzip => autoDFOChain } }'
>
> then my collector to hdfs used the value of "*tailDir*" in the HDFS path:
>
> exec config univac_collector_01 'autoCollectorSource'
>  'collectorSink("hdfs://myNameNode/%{*tailDir*}/", "%{tailSrcFile}-",
> 3660000, raw)'
>
> I was thinking I could replicate this using interceptors in NG.  On my
> source, I define my interceptors as such:
>
> # avro-myagent-source properties
> myagent.sources.avro-udprodae09_1-source.type = avro
> myagent.sources.avro-udprodae09_1-source.bind = myagent.domain.com
> myagent.sources.avro-udprodae09_1-source.port = 10000
> myagent.sources.avro-udprodae09_1-source.interceptors = path
> myagent.sources.avro-udprodae09_1-source.interceptors.path.type = static
> myagent.sources.avro-udprodae09_1-source.interceptors.path.key = path
> myagent.sources.avro-udprodae09_1-source.interceptors.path.value =*
>  /mypath/myDir*
>
> which I believe is correct. :)  My first question is how to use this
> static header variable within the hdfs "type" property of another agent?
>  My hdfs properties look like this:
>
> # hdfs-myagent2-sink properties
> myagent2.sinks.hdfs-myagent2-sink.type = hdfs