Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Flume >> mail # user >> flume-ng cassandra sink problem..


Copy link to this message
-
flume-ng cassandra sink problem..
HI,

I am working on the POC of Cassandra flume integration. For that am
using Cassandra sink plugin from *Github (flume-ng Cassandra sink plugin).*
*And *
*Flume-NG version-1.2.0*
*Apache Cassandra Version :1.1.5*
*I *have build the jar using maven and am using sink configuration as below
in flume.conf.cassandra in conf directory...

*agent.sources = avrosource*

*agent.channels = channel1*

*agent.sinks = cassandraSink*

* *

*#source defination*

*agent.sources.avrosource.channels = channel1*

*agent.sources.avrosource.type = exec*

*agent.sources.avrosource.command = tail -f
/home/user/priyanka/flume-ng/flnginput.txt*

* *

*#agent.sources.avrosource.type = avro*

*#agent.sources.avrosource.channels = channel1*

*#agent.sources.avrosource.bind =127.0.0.1*

*#agent.sources.avrosource.port =41414*

* *

*#Flume header event*

*agent.sources.avrosource.interceptors = addHost*

*agent.sources.avrosource.interceptors.addHost.type org.apache.flume.interceptor.HostInterceptor$Builder*

*agent.sources.avrosource.interceptors.addHost.preserveExisting = false*

*agent.sources.avrosource.interceptors.addHost.useIP = false*

*agent.sources.avrosource.interceptors.addHost.hostHeader = host*

*agent.sources.avrosource.interceptors = addTimestamp*

*agent.sources.avrosource.interceptors.addTimestamp.type org.apache.flume.interceptor.TimestampInterceptor$Builder*

* *

*# Cassandra flow*

*agent.channels.channel1.type = FILE*

*agent.channels.channel1.checkpointDir = file-channel1/check*

*agent.channels.channel1.dataDirs = file-channel1/data*

* *

*agent.sinks.cassandraSink.channel = channel1*

*agent.sinks.cassandraSink.type com.btoddb.flume.sinks.cassandra.CassandraSink*

*agent.sinks.cassandraSink.hosts = localhost*

*agent.sinks.cassandraSink.port = 9160*

*agent.sinks.cassandraSink.keyspace-name  = logs*

*agent.sinks.cassandraSink.records-colfam  = records*

*
*

*
*

*
*

*Am running this using the command :-*

*
*

flume-ng agent -n agent -c /usr/lib/flume-ng-1.2/conf/ -f
/usr/lib/flume-ng-1.2/conf/flume.conf.cassandra
-Dflume.root.logger=DEBUG,console
Got the error while running :-

* *

2012-12-21 14:37:07,743 (SinkRunner-PollingRunner-DefaultSinkProcessor)
[ERROR -
com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:193)]
exception while processing in Cassandra Sink

java.lang.IllegalArgumentException: Missing flume header attribute, 'key' -
cannot process this event

at
com.btoddb.flume.sinks.cassandra.CassandraSinkRepository.saveToCassandra(CassandraSinkRepository.java:125)

at
com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:166)

at
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)

at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)

atjava.lang.Thread.run(Thread.java:722)

2012-12-21 14:37:07,743 (SinkRunner-PollingRunner-DefaultSinkProcessor)
[ERROR -
org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable
to deliver event. Exception follows.

org.apache.flume.EventDeliveryException: Failed to persist message

at
com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:194)

at
org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)

at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)

atjava.lang.Thread.run(Thread.java:722)

Caused by: java.lang.IllegalArgumentException: Missing flume header
attribute, 'key' - cannot process this event

at
com.btoddb.flume.sinks.cassandra.CassandraSinkRepository.saveToCassandra(CassandraSinkRepository.java:125)

at
com.btoddb.flume.sinks.cassandra.CassandraSink.process(CassandraSink.java:166)

        ... 3 more

I got one solution as Key is Src+Key but am not getting how to configure it.
So can any one please help me out to solve this problem.
So