Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Flume, mail # user - Missing headers when using AVRO Sink/Source


Copy link to this message
-
Re: Missing headers when using AVRO Sink/Source
Mike Percy 2013-05-22, 15:09
FYI there is a stock timestamp interceptor, if you want to use that.

Mike

On May 22, 2013, at 3:20 AM, ZORAIDA HIDALGO SANCHEZ <[EMAIL PROTECTED]> wrote:

> Dear all,
>
> I made a custom interceptor in order to insert the timestamp header that is used by the HDFS sink.
> Firstly, I run an example using SPOOLING dir as a source, FILE channel and HDFS sink. It was find.
> Secondly, I changed the configuration so having two machines, the conf in each of them was:
>
> FIRST MACHINE:
> SPOOLING source(with my custom interceptor)
> FILE channel
> AVRO sink
>
> SECOND MACHINE:
> AVRO source
> FILE channel
> HDFS sink
>
> Now, the error that I am getting is(from second machine):
>
> ERROR hdfs.HDFSEventSink: process failed
> java.lang.RuntimeException: Flume wasn't able to parse timestamp header in the event to resolve time based bucketing. Please check that you're correctly populating timestamp header (for example using TimestampInterceptor source interceptor).
> at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:160)
> at org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:343)
> at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:392)
> at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
> at java.lang.Thread.run(Thread.java:662)
> Caused by: java.lang.NumberFormatException: null
> at java.lang.Long.parseLong(Long.java:375)
> at java.lang.Long.valueOf(Long.java:525)
> at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:158)
> ... 5 more
>
> It looks like the header is missing.
>
> FIRST MACHINE CONF:
> tier1.sources.source1.type     = spooldir
> tier1.sources.source1.spoolDir = /home/user/flume/data
> tier1.sources.source1.batchSize = 1000
> tier1.sources.source1.bufferMaxLines = 3000
> tier1.sources.source1.fileHeader = true
> tier1.sources.source1.fileSuffix=.COMPLETED
> tier1.sources.source1.channels = channel1
> tier1.sources.source1.interceptors = it1
> tier1.sources.source1.interceptors.it1.type = com.pdi.koios.flume.interceptors.DatetimeInterceptor$Builder
> tier1.sources.source1.interceptors.it1.preserveExisting=true
> tier1.sources.source1.interceptors.it1.dateRegex=\\d{4}-\\d{2}-\\d{2}
> tier1.sources.source1.interceptors.it1.dateFormat=yyyy-MM-dd
>
> tier1.channels.channel1.type   = file
> tier1.channels.channel1.checkpointDir = /home/user/flume/channelcheckpoint
> tier1.channels.channel1.dataDirs = /home/user/flume/channeldata
> tier1.channels.channel1.capacity = 10000
> tier1.channels.channel1.transactionCapacity = 10000
>
> tier1.sinks.sink1.type = avro
> tier1.sinks.sink1.hostname = 10.95.108.245
> tier1.sinks.sink1.port = 4141
> tier1.sinks.sink1.channel = channel1
>
> SECOND MACHINE CONF:
> tier1.sources.source1.type = avro
> tier1.sources.source1.bind = 0.0.0.0
> tier1.sources.source1.port = 4141
> tier1.sources.source1.channels = channel1
>
> tier1.channels.channel1.type   = file
> tier1.channels.channel1.checkpointDir = /home/user/flume/channelcheckpoint
> tier1.channels.channel1.dataDirs = /home/user/flume/channeldata
> tier1.channels.channel1.capacity = 10000
> tier1.channels.channel1.transactionCapacity = 10000
>
> tier1.sinks.sink1.type = hdfs
> tier1.sinks.sink1.channel = channel1
> tier1.sinks.sink1.hdfs.batchSize = 1000
> tier1.sinks.sink1.hdfs.rollInterval = 5
> tier1.sinks.sink1.hdfs.rollTimeout = 10
> tier1.sinks.sink1.hdfs.rollCount = 0
> tier1.sinks.sink1.hdfs.rollSize = 0
> tier1.sinks.sink1.hdfs.path = /user/user/flume/%Y/%m/%d
> tier1.sinks.sink1.hdfs.fileType = DataStream
> tier1.sinks.sink1.writeFormat = Text
>
>
> Este mensaje se dirige exclusivamente a su destinatario. Puede consultar nuestra política de envío y recepción de correo electrónico en el enlace situado más abajo.
> This message is intended exclusively for its addressee. We only send and receive email on the basis of the terms set out at: