Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Plain View
Flume >> mail # user >> Help with FlumeNG and interceptors please


+
Chris Neal 2012-08-02, 14:32
Copy link to this message
-
Re: Help with FlumeNG and interceptors please
I was able to answer my first question myself.  Hopefully this will help
someone else who might be wondering the same thing:

You can reference interceptors in down stream agents just like you could in
OG, but using %{name}.  Like this:

Agent1:
# avro-udprodae09_1-source properties
#pegslog11.sources.avro-udprodae09_1-source.type = avro
#pegslog11.sources.avro-udprodae09_1-source.bind = pegslog11.domain.com
#pegslog11.sources.avro-udprodae09_1-source.port = 10000
pegslog11.sources.avro-udprodae09_1-source.type = exec
pegslog11.sources.avro-udprodae09_1-source.command = tail -F
/path_to_file/MyFile.log
pegslog11.sources.avro-udprodae09_1-source.interceptors = path timestamp
filename
pegslog11.sources.avro-udprodae09_1-source.interceptors.path.type = static
pegslog11.sources.avro-udprodae09_1-source.interceptors.path.key = path
pegslog11.sources.avro-udprodae09_1-source.interceptors.path.value /path_to_file
pegslog11.sources.avro-udprodae09_1-source.interceptors.timestamp.type timestamp
pegslog11.sources.avro-udprodae09_1-source.interceptors.filename.type static
pegslog11.sources.avro-udprodae09_1-source.interceptors.filename.key filename
pegslog11.sources.avro-udprodae09_1-source.interceptors.filename.value MyFile.log

Agent2:
# hdfs-hadoopdn01-sink properties
hadoopdn01.sinks.hdfs-hadoopdn01-sink.type = hdfs
hadoopdn01.sinks.hdfs-hadoopdn01-sink.hdfs.path = hdfs://
hadoopnn01.domain.com/%{path}
hadoopdn01.sinks.hdfs-hadoopdn01-sink.hdfs.filePrefix = %{filename}.%Y-%m-%d
hadoopdn01.sinks.hdfs-hadoopdn01-sink.hdfs.rollInterval = 0
hadoopdn01.sinks.hdfs-hadoopdn01-sink.hdfs.rollSize = 2800072288
hadoopdn01.sinks.hdfs-hadoopdn01-sink.hdfs.rollCount = 0
hadoopdn01.sinks.hdfs-hadoopdn01-sink.hdfs.batchSize = 100
hadoopdn01.sinks.hdfs-hadoopdn01-sink.hdfs.threadsPoolSize = 128
hadoopdn01.sinks.hdfs-hadoopdn01-sink.hdfs.rollTimerPoolSize = 5
hadoopdn01.sinks.hdfs-hadoopdn01-sink.hdfs.codeC = bzip2
hadoopdn01.sinks.hdfs-hadoopdn01-sink.hdfs.fileType = CompressedStream

This results in /path_to_file/MyFile.log being created in HDFS, as I wanted
it to. :)

I'm still not sure about the wire compression of the AvroSink to AvroSource
connection.  Maybe I'll dig into the code and see.

Chris

On Thu, Aug 2, 2012 at 9:32 AM, Chris Neal <[EMAIL PROTECTED]> wrote:

> Hello,
>
> I'm using flume 1.3.0-SNAPSHOT and have a question about interceptors.
>  I'm just upgrading from 0.9.4, and redoing my configurations to work with
> NG.  In OG, I used a decorator to tag each tailDir file with the path
> representing where the source file originated, so that once in HDFS, the
> directory structure would mirror the production system it came from.
>
> For example, my agent to collector looked like this:
>
> exec config pegslog11-udprodae09_1-agent 'tailDir("/mypath/myDir",
> "MyFile.log", startFromEnd=false)' '{ value("*tailDir*", "/mypath/myDir")
> => { gzip => autoDFOChain } }'
>
> then my collector to hdfs used the value of "*tailDir*" in the HDFS path:
>
> exec config univac_collector_01 'autoCollectorSource'
>  'collectorSink("hdfs://myNameNode/%{*tailDir*}/", "%{tailSrcFile}-",
> 3660000, raw)'
>
> I was thinking I could replicate this using interceptors in NG.  On my
> source, I define my interceptors as such:
>
> # avro-myagent-source properties
> myagent.sources.avro-udprodae09_1-source.type = avro
> myagent.sources.avro-udprodae09_1-source.bind = myagent.domain.com
> myagent.sources.avro-udprodae09_1-source.port = 10000
> myagent.sources.avro-udprodae09_1-source.interceptors = path
> myagent.sources.avro-udprodae09_1-source.interceptors.path.type = static
> myagent.sources.avro-udprodae09_1-source.interceptors.path.key = path
> myagent.sources.avro-udprodae09_1-source.interceptors.path.value =*
>  /mypath/myDir*
>
> which I believe is correct. :)  My first question is how to use this
> static header variable within the hdfs "type" property of another agent?
>  My hdfs properties look like this:
>
> # hdfs-myagent2-sink properties
> myagent2.sinks.hdfs-myagent2-sink.type = hdfs
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB