Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Flume, mail # user - flume-ng cassandra sink problem..


Copy link to this message
-
Re: flume-ng cassandra sink problem..
Nitin Pawar 2012-12-26, 17:52
from the README
you need to have following things in conf

The Sink expects several flume event headers to be present:

   - key - used (combined with src) to create the Cassandra row key. It
   should be generated by the application doing the logging
   - timestamp - timestamp of when the log occurred, not necessarily when
   the flume event is created
   - src - A logical source of the flume event. Could be host, but probably
   you will have many hosts for a source. A more likely candidate for source
   is the name of the application
   - host - the name of the host where the message was generated
   -

On Wed, Dec 26, 2012 at 11:09 PM, Priyanka jain
<[EMAIL PROTECTED]>wrote:

> HI,
>
> I am working on the POC of Cassandra flume integration. For that am
> using Cassandra sink plugin from *Github (flume-ng Cassandra sink plugin).
> *
> *And *
> *Flume-NG version-1.2.0*
> *Apache Cassandra Version :1.1.5*
> *I *have build the jar using maven and am using sink configuration as
> below in flume.conf.cassandra in conf directory...
>
> *agent.sources = avrosource*
>
> *agent.channels = channel1*
>
> *agent.sinks = cassandraSink*
>
> * *
>
> *#source defination*
>
> *agent.sources.avrosource.channels = channel1*
>
> *agent.sources.avrosource.type = exec*
>
> *agent.sources.avrosource.command = tail -f
> /home/user/priyanka/flume-ng/flnginput.txt*
>
> * *
>
> *#agent.sources.avrosource.type = avro*
>
> *#agent.sources.avrosource.channels = channel1*
>
> *#agent.sources.avrosource.bind =127.0.0.1*
>
> *#agent.sources.avrosource.port =41414*
>
> * *
>
> *#Flume header event*
>
> *agent.sources.avrosource.interceptors = addHost*
>
> *agent.sources.avrosource.interceptors.addHost.type > org.apache.flume.interceptor.HostInterceptor$Builder*
>
> *agent.sources.avrosource.interceptors.addHost.preserveExisting = false*
>
> *agent.sources.avrosource.interceptors.addHost.useIP = false*
>
> *agent.sources.avrosource.interceptors.addHost.hostHeader = host*
>
> *agent.sources.avrosource.interceptors = addTimestamp*
>
> *agent.sources.avrosource.interceptors.addTimestamp.type > org.apache.flume.interceptor.TimestampInterceptor$Builder*
>
> * *
>
> *# Cassandra flow*
>
> *agent.channels.channel1.type = FILE*
>
> *agent.channels.channel1.checkpointDir = file-channel1/check*
>
> *agent.channels.channel1.dataDirs = file-channel1/data*
>
> * *
>
> *agent.sinks.cassandraSink.channel = channel1*
>
> *agent.sinks.cassandraSink.type > com.btoddb.flume.sinks.cassandra.CassandraSink*
>
> *agent.sinks.cassandraSink.hosts = localhost*
>
> *agent.sinks.cassandraSink.port = 9160*
>
> *agent.sinks.cassandraSink.keyspace-name  = logs*
>
> *agent.sinks.cassandraSink.records-colfam  = records*
>
> *
> *
>
> *
> *
>
> *
> *
>
> *Am running this using the command :-*
>
> *
> *
>
> flume-ng agent -n agent -c /usr/lib/flume-ng-1.2/conf/ -f
> /usr/lib/flume-ng-1.2/conf/flume.conf.cassandra
> -Dflume.root.logger=DEBUG,console
>
>
> Got the error while running :-
>
> * *
>
> 2012-12-21 14:37:07,743 (SinkRunner-PollingRunner-DefaultSinkProcessor)
> [ERROR -
> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:193)]
> exception while processing in Cassandra Sink
>
> java.lang.IllegalArgumentException: Missing flume header attribute, 'key'
> - cannot process this event
>
> at
> com.btoddb.flume.sinks.cassandra.CassandraSinkRepository.saveToCassandra(CassandraSinkRepository.java:125)
>
> at
> com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:166)
>
> at
> org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
>
> at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
>
> atjava.lang.Thread.run(Thread.java:722)
>
> 2012-12-21 14:37:07,743 (SinkRunner-PollingRunner-DefaultSinkProcessor)
> [ERROR -
> org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable
> to deliver event. Exception follows.
>
> org.apache.flume.EventDeliveryException: Failed to persist message
Nitin Pawar