Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Plain View
Flume >> mail # user >> Missing headers when using AVRO Sink/Source


Copy link to this message
-
Missing headers when using AVRO Sink/Source
Dear all,

I made a custom interceptor in order to insert the timestamp header that is used by the HDFS sink.
Firstly, I run an example using SPOOLING dir as a source, FILE channel and HDFS sink. It was find.
Secondly, I changed the configuration so having two machines, the conf in each of them was:

FIRST MACHINE:
SPOOLING source(with my custom interceptor)
FILE channel
AVRO sink

SECOND MACHINE:
AVRO source
FILE channel
HDFS sink

Now, the error that I am getting is(from second machine):

ERROR hdfs.HDFSEventSink: process failed
java.lang.RuntimeException: Flume wasn't able to parse timestamp header in the event to resolve time based bucketing. Please check that you're correctly populating timestamp header (for example using TimestampInterceptor source interceptor).
at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:160)
at org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:343)
at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:392)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
at java.lang.Thread.run(Thread.java:662)
Caused by: java.lang.NumberFormatException: null
at java.lang.Long.parseLong(Long.java:375)
at java.lang.Long.valueOf(Long.java:525)
at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:158)
... 5 more

It looks like the header is missing.

FIRST MACHINE CONF:
tier1.sources.source1.type     = spooldir
tier1.sources.source1.spoolDir = /home/user/flume/data
tier1.sources.source1.batchSize = 1000
tier1.sources.source1.bufferMaxLines = 3000
tier1.sources.source1.fileHeader = true
tier1.sources.source1.fileSuffix=.COMPLETED
tier1.sources.source1.channels = channel1
tier1.sources.source1.interceptors = it1
tier1.sources.source1.interceptors.it1.type = com.pdi.koios.flume.interceptors.DatetimeInterceptor$Builder
tier1.sources.source1.interceptors.it1.preserveExisting=true
tier1.sources.source1.interceptors.it1.dateRegex=\\d{4}-\\d{2}-\\d{2}
tier1.sources.source1.interceptors.it1.dateFormat=yyyy-MM-dd

tier1.channels.channel1.type   = file
tier1.channels.channel1.checkpointDir = /home/user/flume/channelcheckpoint
tier1.channels.channel1.dataDirs = /home/user/flume/channeldata
tier1.channels.channel1.capacity = 10000
tier1.channels.channel1.transactionCapacity = 10000

tier1.sinks.sink1.type = avro
tier1.sinks.sink1.hostname = 10.95.108.245
tier1.sinks.sink1.port = 4141
tier1.sinks.sink1.channel = channel1

SECOND MACHINE CONF:
tier1.sources.source1.type = avro
tier1.sources.source1.bind = 0.0.0.0
tier1.sources.source1.port = 4141
tier1.sources.source1.channels = channel1

tier1.channels.channel1.type   = file
tier1.channels.channel1.checkpointDir = /home/user/flume/channelcheckpoint
tier1.channels.channel1.dataDirs = /home/user/flume/channeldata
tier1.channels.channel1.capacity = 10000
tier1.channels.channel1.transactionCapacity = 10000

tier1.sinks.sink1.type = hdfs
tier1.sinks.sink1.channel = channel1
tier1.sinks.sink1.hdfs.batchSize = 1000
tier1.sinks.sink1.hdfs.rollInterval = 5
tier1.sinks.sink1.hdfs.rollTimeout = 10
tier1.sinks.sink1.hdfs.rollCount = 0
tier1.sinks.sink1.hdfs.rollSize = 0
tier1.sinks.sink1.hdfs.path = /user/user/flume/%Y/%m/%d
tier1.sinks.sink1.hdfs.fileType = DataStream
tier1.sinks.sink1.writeFormat = Text

________________________________

Este mensaje se dirige exclusivamente a su destinatario. Puede consultar nuestra pol?tica de env?o y recepci?n de correo electr?nico en el enlace situado m?s abajo.
This message is intended exclusively for its addressee. We only send and receive email on the basis of the terms set out at:
http://www.tid.es/ES/PAGINAS/disclaimer.aspx
+
ZORAIDA HIDALGO SANCHEZ 2013-05-22, 12:54
+
Mike Percy 2013-05-22, 15:09
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB