Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Flume, mail # user - Flume Ng replaying events when the source is idle


Copy link to this message
-
Flume Ng replaying events when the source is idle
Sagar Mehta 2013-02-27, 19:37
Hi Guys,

I'm using Flume-Ng and it is working pretty well except for a weird
situation which I observed lately. In essence I'm using an exec source for
doing  tail -F on a logfile and using two HDFS sinks with a File channel.

However I have observed that when the source [ logfile of a jetty based
collector] is idle - that is no new events are pushed to the logFile,
FlumeNg seems to replay the same set of events.

For example collector110 received no events for 2 subsequent hours and
below are the corresponding Flume written files at the HDFS sink

hadoop@jobtracker301:/home/hadoop/sagar$ hls
/ngpipes-raw-logs/2013-02-27/1400/collector110*
-rw-r--r--   3 hadoop supergroup        441 2013-02-27 14:20
/ngpipes-raw-logs/2013-02-27/1400/collector110.ngpipes.sac.ngmoco.com.1361974853210.gz
-rw-r--r--   3 hadoop supergroup        441 2013-02-27 14:50
/ngpipes-raw-logs/2013-02-27/1400/collector110.ngpipes.sac.ngmoco.com.1361976653432.gz

hadoop@jobtracker301:/home/hadoop/sagar$ hls
/ngpipes-raw-logs/2013-02-27/1500/collector110*
-rw-r--r--   3 hadoop supergroup        441 2013-02-27 15:20
/ngpipes-raw-logs/2013-02-27/1500/collector110.ngpipes.sac.ngmoco.com.1361978454123.gz
-rw-r--r--   3 hadoop supergroup        441 2013-02-27 15:50
/ngpipes-raw-logs/2013-02-27/1500/collector110.ngpipes.sac.ngmoco.com.1361980254338.gz

hadoop@jobtracker301:/home/hadoop/sagar$ md5sum *
c7360ef5c8deaee3ce9f4c92e9d9be63
 collector110.ngpipes.sac.ngmoco.com.1361974853210.gz
c7360ef5c8deaee3ce9f4c92e9d9be63
 collector110.ngpipes.sac.ngmoco.com.1361976653432.gz
c7360ef5c8deaee3ce9f4c92e9d9be63
 collector110.ngpipes.sac.ngmoco.com.1361978454123.gz
c7360ef5c8deaee3ce9f4c92e9d9be63
 collector110.ngpipes.sac.ngmoco.com.1361980254338.gz

As you can see above the md5sums match.

I'm using a File channel which has checkpoints, so I'm not sure what is
going on. Btw looks like the difference in timestamps of the two replays is
exactly 30 mins.

*Is this a known bug or am I missing something?*
*
*
*Below is my Flume config file*

smehta@collector110:/opt/flume/conf$ cat hdfs.conf
# An hdfs sink to write events to the hdfs on the test cluster
# A memory based channel to connect the above source and sink

# Name the components on this agent
collector110.sources = source1
collector110.sinks = sink1 sink2
collector110.channels = channel1 channel2

# Configure the source
collector110.sources.source1.type = exec
collector110.sources.source1.command = tail -F /opt/jetty/logFile.log

# Configure the interceptors
collector110.sources.source1.interceptors = TimestampInterceptor
HostInterceptor

# We use the Timestamp interceptor to get timestamps of when flume receives
events
# This is used for figuring out the bucket to which an event goes
collector110.sources.source1.interceptors.TimestampInterceptor.type timestamp

# We use the Host interceptor to populate the host header with the fully
qualified domain name of the collector.
# That way we know which file in the sink respresents which collector.
collector110.sources.source1.interceptors.HostInterceptor.type org.apache.flume.interceptor.HostInterceptor$Builder
collector110.sources.source1.interceptors.HostInterceptor.preserveExisting
= false
collector110.sources.source1.interceptors.HostInterceptor.useIP = false
collector110.sources.source1.interceptors.HostInterceptor.hostHeader = host
# Configure the sink

collector110.sinks.sink1.type = hdfs

# Configure the bucketing
collector110.sinks.sink1.hdfs.path=hdfs://
namenode3001.ngpipes.milp.ngmoco.com:9000/ngpipes-raw-logs/%Y-%m-%d/%H00

# Prefix the file with the source so that we know where the events in the
file came from
collector110.sinks.sink1.hdfs.filePrefix = %{host}

# We roll the flume output file based on time interval - currently every 5
minutes
collector110.sinks.sink1.hdfs.rollSize = 0
collector110.sinks.sink1.hdfs.rollCount = 0
collector110.sinks.sink1.hdfs.rollInterval = 300

#gzip compression related settings
collector110.sinks.sink1.hdfs.codeC = gzip
collector110.sinks.sink1.hdfs.fileType = CompressedStream
collector110.sinks.sink1.hdfs.fileSuffix = .gz

# Configure the sink

collector110.sinks.sink2.type = hdfs

# Configure the bucketing
collector110.sinks.sink2.hdfs.path=hdfs://
namenode5001.ngpipes.sac.ngmoco.com:9000/ngpipes-raw-logs/%Y-%m-%d/%H00

# Prefix the file with the source so that we know where the events in the
file came from
collector110.sinks.sink2.hdfs.filePrefix = %{host}

# We roll the flume output file based on time interval - currently every 5
minutes
collector110.sinks.sink2.hdfs.rollSize = 0
collector110.sinks.sink2.hdfs.rollCount = 0
collector110.sinks.sink2.hdfs.rollInterval = 300

#gzip compression related settings
collector110.sinks.sink2.hdfs.codeC = gzip
collector110.sinks.sink2.hdfs.fileType = CompressedStream
collector110.sinks.sink2.hdfs.fileSuffix = .gz

# Configure the channel that connects the source to the sink

# Use a channel which buffers events in filesystem
collector110.channels.channel1.type = file
collector110.channels.channel1.checkpointDir /data/flume_data/channel1/checkpoint
collector110.channels.channel1.dataDirs = /data/flume_data/channel1/data

# Use a channel which buffers events in filesystem
collector110.channels.channel2.type = file
collector110.channels.channel2.checkpointDir /data/flume_data/channel2/checkpoint
collector110.channels.channel2.dataDirs = /data/flume_data/channel2/data

# Bind the source and sink to the channel configured above
collector110.sources.source1.channels = channel1 channel2
collector110.sinks.sink1.channel = channel1
collector110.sinks.sink2.channel = channel2

Sagar