Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
Flume >> mail # user >> Missing headers when using AVRO Sink/Source


Copy link to this message
-
Re: Missing headers when using AVRO Sink/Source
FYI there is a stock timestamp interceptor, if you want to use that.

Mike

On May 22, 2013, at 3:20 AM, ZORAIDA HIDALGO SANCHEZ <[EMAIL PROTECTED]> wrote:

> Dear all,
>
> I made a custom interceptor in order to insert the timestamp header that is used by the HDFS sink.
> Firstly, I run an example using SPOOLING dir as a source, FILE channel and HDFS sink. It was find.
> Secondly, I changed the configuration so having two machines, the conf in each of them was:
>
> FIRST MACHINE:
> SPOOLING source(with my custom interceptor)
> FILE channel
> AVRO sink
>
> SECOND MACHINE:
> AVRO source
> FILE channel
> HDFS sink
>
> Now, the error that I am getting is(from second machine):
>
> ERROR hdfs.HDFSEventSink: process failed
> java.lang.RuntimeException: Flume wasn't able to parse timestamp header in the event to resolve time based bucketing. Please check that you're correctly populating timestamp header (for example using TimestampInterceptor source interceptor).
> at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:160)
> at org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:343)
> at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:392)
> at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
> at java.lang.Thread.run(Thread.java:662)
> Caused by: java.lang.NumberFormatException: null
> at java.lang.Long.parseLong(Long.java:375)
> at java.lang.Long.valueOf(Long.java:525)
> at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:158)
> ... 5 more
>
> It looks like the header is missing.
>
> FIRST MACHINE CONF:
> tier1.sources.source1.type     = spooldir
> tier1.sources.source1.spoolDir = /home/user/flume/data
> tier1.sources.source1.batchSize = 1000
> tier1.sources.source1.bufferMaxLines = 3000
> tier1.sources.source1.fileHeader = true
> tier1.sources.source1.fileSuffix=.COMPLETED
> tier1.sources.source1.channels = channel1
> tier1.sources.source1.interceptors = it1
> tier1.sources.source1.interceptors.it1.type = com.pdi.koios.flume.interceptors.DatetimeInterceptor$Builder
> tier1.sources.source1.interceptors.it1.preserveExisting=true
> tier1.sources.source1.interceptors.it1.dateRegex=\\d{4}-\\d{2}-\\d{2}
> tier1.sources.source1.interceptors.it1.dateFormat=yyyy-MM-dd
>
> tier1.channels.channel1.type   = file
> tier1.channels.channel1.checkpointDir = /home/user/flume/channelcheckpoint
> tier1.channels.channel1.dataDirs = /home/user/flume/channeldata
> tier1.channels.channel1.capacity = 10000
> tier1.channels.channel1.transactionCapacity = 10000
>
> tier1.sinks.sink1.type = avro
> tier1.sinks.sink1.hostname = 10.95.108.245
> tier1.sinks.sink1.port = 4141
> tier1.sinks.sink1.channel = channel1
>
> SECOND MACHINE CONF:
> tier1.sources.source1.type = avro
> tier1.sources.source1.bind = 0.0.0.0
> tier1.sources.source1.port = 4141
> tier1.sources.source1.channels = channel1
>
> tier1.channels.channel1.type   = file
> tier1.channels.channel1.checkpointDir = /home/user/flume/channelcheckpoint
> tier1.channels.channel1.dataDirs = /home/user/flume/channeldata
> tier1.channels.channel1.capacity = 10000
> tier1.channels.channel1.transactionCapacity = 10000
>
> tier1.sinks.sink1.type = hdfs
> tier1.sinks.sink1.channel = channel1
> tier1.sinks.sink1.hdfs.batchSize = 1000
> tier1.sinks.sink1.hdfs.rollInterval = 5
> tier1.sinks.sink1.hdfs.rollTimeout = 10
> tier1.sinks.sink1.hdfs.rollCount = 0
> tier1.sinks.sink1.hdfs.rollSize = 0
> tier1.sinks.sink1.hdfs.path = /user/user/flume/%Y/%m/%d
> tier1.sinks.sink1.hdfs.fileType = DataStream
> tier1.sinks.sink1.writeFormat = Text
>
>
> Este mensaje se dirige exclusivamente a su destinatario. Puede consultar nuestra política de envío y recepción de correo electrónico en el enlace situado más abajo.
> This message is intended exclusively for its addressee. We only send and receive email on the basis of the terms set out at:
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB