Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Flume, mail # user - ElasticSearchSink does not work


Copy link to this message
-
ElasticSearchSink does not work
shushuai zhu 2013-06-12, 02:37
Hi,
 
I am new to Flume. I am trying to send data using Flume Client perl API to Flume Avro source then ElasticSearchSink to an ElasticSearch engine. I could make the file_roll sink to work by sending the data to some file. However, I am encountering issue with ElasticSearchSink. The data did not go through to ElasticSearch engine:
 
use Flume::Client;
my $ng_client = Flume::Client::Transceiver::Socket->new(host => 'host name', port => 41414);
my $ng_requestor = Flume::Client::Requestor::FlumeNG->new(client => $ng_client);
my ($result, $response) = $ng_requestor->request('appendBatch', [{ headers => {}, body => "hello, this is sent from perl (using FlumeNG)"}]);
print "$response\n";    # response will be 'OK' on success
 
since the returned $response is not defined (again this worked when file_roll sink was used).
 
The ElasticSearch engine is working since I could load data to it via LogStash's EalsticSearch output type.
 
The Flume agent was installed by downloading tarball from Cloudera:
 
http://archive.cloudera.com/cdh4/cdh/4/flume-ng-1.3.0-cdh4.3.0.tar.gz
 
The flume.conf is as followings. I played around the "hostNames" with full name, IP address, etc. Do not see output message in flume.log. Could someone let me know what could potentially cause the issue?
 
Thanks.
 
Shushuai
 
 
 
# Define a memory channel called ch1 on agent1
agent1.channels = ch1
agent1.channels.ch1.type = memory
 
# Define an Avro source called avro-source1 on agent1 and tell it to bind to 0.0.0.0:41414. Connect it to channel ch1.
agent1.sources = avro-source1
agent1.sources.avro-source1.channels = ch1
agent1.sources.avro-source1.type = avro
agent1.sources.avro-source1.bind = 0.0.0.0
agent1.sources.avro-source1.port = 41414
 
# Define a local file sink that simply logs all events it receives (this works well)
#agent1.sinks = localout
#agent1.sinks.localout.type = file_roll
#agent1.sinks.localout.sink.directory = /scratch/flume-ng/log
#agent1.sinks.localout.sink.rollInterval = 0
#agent1.sinks.localout.channel = ch1
 
# Define ElasticSearchSink sink (this does not work)
agent1.sinks = k1
agent1.sinks.k1.type = org.apache.flume.sink.elasticsearch.ElasticSearchSink
agent1.sinks.k1.hostNames = localhost:9300
agent1.sinks.k1.indexName = flume
agent1.sinks.k1.indexType = logs
agent1.sinks.k1.clusterName = elasticsearch
agent1.sinks.k1.batchSize = 2
agent1.sinks.k1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
agent1.sinks.k1.channel = ch1