Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Flume, mail # user - Extra information being delivered via Flume

Copy link to this message
Extra information being delivered via Flume
DSuiter RDX 2013-10-10, 18:56
Hi all,

We set up a pipeline to get rsyslog input from a remote server via TCP
using rsyslog remote TCP forwarding functionality. The data gets sent from
the server to a syslogTCP source, delivered to an Avro sink via memory
channel, which then delivers it to an Avro source channeled to an HDFS
sink. It is moving from source to destination fine, but the output is messy
in HDFS. I realize some of it is Avro schema being defined, but there are
Severity and Facility markers, and extra timestamps that do not appear in
/var/log/messages in the original server.

I am wondering if anyone can help us eliminate them? The extra information
is not useful, so if we could get the information down to what is showing
up in the /var/log/messages, that would simplify the next task of sorting
the data in MapReduce.

Here is the agent recipe, and a scrubbed sample of the data we are getting.

RT_syslog.sources = syslogTCP_RT_Tier1_Source avro_RT_Tier2_Source
RT_syslog.sinks = avro_RT_Tier1_Sink HDFS_RT_Tier2_Sink
RT_syslog.channels = memory_RT_Tier1_Channel memory_RT_Tier2_Channel

# sources
RT_syslog.sources.syslogTCP_RT_Tier1_Source.type = syslogtcp
RT_syslog.sources.syslogTCP_RT_Tier1_Source.host =
RT_syslog.sources.syslogTCP_RT_Tier1_Source.port = 5140
RT_syslog.sources.syslogTCP_RT_Tier1_Source.channels memory_RT_Tier1_Channel

# channels
RT_syslog.channels.memory_RT_Tier1_Channel.type = memory
RT_syslog.channels.memory_RT_Tier1_Channel.capacity = 1500
RT_syslog.channels.memory_RT_Tier1_Channel.transactionCapacity = 1500

# sinks
RT_syslog.sinks.avro_RT_Tier1_Sink.type = avro
RT_syslog.sinks.avro_RT_Tier1_Sink.hostname =
RT_syslog.sinks.avro_RT_Tier1_Sink.port = 5141
RT_syslog.sinks.avro_RT_Tier1_Sink.batch-size = 1500
RT_syslog.sinks.avro_RT_Tier1_Sink.channel = memory_RT_Tier1_Channel

# sources
RT_syslog.sources.avro_RT_Tier2_Source.type = avro
RT_syslog.sources.avro_RT_Tier2_Source.bind =
RT_syslog.sources.avro_RT_Tier2_Source.port = 5141
RT_syslog.sources.avro_RT_Tier2_Source.channels = memory_RT_Tier2_Channel

# channels
RT_syslog.channels.memory_RT_Tier2_Channel.type = memory
RT_syslog.channels.memory_RT_Tier2_Channel.capacity = 15000
RT_syslog.channels.memory_RT_Tier2_Channel.transactionCapacity = 15000

# sinks
RT_syslog.sinks.HDFS_RT_Tier2_Sink.type = hdfs
RT_syslog.sinks.HDFS_RT_Tier2_Sink.channel = memory_RT_Tier2_Channel
RT_syslog.sinks.HDFS_RT_Tier2_Sink.hdfs.path = /user/flume/RT_syslog
RT_syslog.sinks.HDFS_RT_Tier2_Sink.hdfs.fileSuffix = .avro
RT_syslog.sinks.HDFS_RT_Tier2_Sink.serializer = avro_event
RT_syslog.sinks.HDFS_RT_Tier2_Sink.hdfs.fileType = DataStream
RT_syslog.sinks.HDFS_RT_Tier2_Sink.hdfs.rollInterval = 86400
RT_syslog.sinks.HDFS_RT_Tier2_Sink.hdfs.rollSize = 134217728
RT_syslog.sinks.HDFS_RT_Tier2_Sink.hdfs.batchSize = 15000
RT_syslog.sinks.HDFS_RT_Tier2_Sink.hdfs.rollCount = 0

Data we are getting in HDFS:

u'headers': {u'timestamp': u'1381256530000', u'host': u'server001',
u'Severity': u'6', u'Facility': u'1'}}
{u'body': "RT: Ticket XXXXXX created in queue 'General' by info

What that looks like in original form:

Oct 10 11:33:42 server001 RT: Ticket XXXXXX created in queue 'General' by
info (/opt/rt4/sbin/../lib/RT/Ticket.pm:694)

*Devin Suiter*
Jr. Data Solutions Software Engineer
100 Sandusky Street | 2nd Floor | Pittsburgh, PA 15212
Google Voice: 412-256-8556 | www.rdx.com