Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Plain View
Flume, mail # user - ElasticSearchSink does not work

Copy link to this message
ElasticSearchSink does not work
shushuai zhu 2013-06-12, 02:37
I am new to Flume. I am trying to send data using Flume Client perl API to Flume Avro source then ElasticSearchSink to an ElasticSearch engine. I could make the file_roll sink to work by sending the data to some file. However, I am encountering issue with ElasticSearchSink. The data did not go through to ElasticSearch engine:
use Flume::Client;
my $ng_client = Flume::Client::Transceiver::Socket->new(host => 'host name', port => 41414);
my $ng_requestor = Flume::Client::Requestor::FlumeNG->new(client => $ng_client);
my ($result, $response) = $ng_requestor->request('appendBatch', [{ headers => {}, body => "hello, this is sent from perl (using FlumeNG)"}]);
print "$response\n";    # response will be 'OK' on success
since the returned $response is not defined (again this worked when file_roll sink was used).
The ElasticSearch engine is working since I could load data to it via LogStash's EalsticSearch output type.
The Flume agent was installed by downloading tarball from Cloudera:
The flume.conf is as followings. I played around the "hostNames" with full name, IP address, etc. Do not see output message in flume.log. Could someone let me know what could potentially cause the issue?
# Define a memory channel called ch1 on agent1
agent1.channels = ch1
agent1.channels.ch1.type = memory
# Define an Avro source called avro-source1 on agent1 and tell it to bind to Connect it to channel ch1.
agent1.sources = avro-source1
agent1.sources.avro-source1.channels = ch1
agent1.sources.avro-source1.type = avro
agent1.sources.avro-source1.bind =
agent1.sources.avro-source1.port = 41414
# Define a local file sink that simply logs all events it receives (this works well)
#agent1.sinks = localout
#agent1.sinks.localout.type = file_roll
#agent1.sinks.localout.sink.directory = /scratch/flume-ng/log
#agent1.sinks.localout.sink.rollInterval = 0
#agent1.sinks.localout.channel = ch1
# Define ElasticSearchSink sink (this does not work)
agent1.sinks = k1
agent1.sinks.k1.type = org.apache.flume.sink.elasticsearch.ElasticSearchSink
agent1.sinks.k1.hostNames = localhost:9300
agent1.sinks.k1.indexName = flume
agent1.sinks.k1.indexType = logs
agent1.sinks.k1.clusterName = elasticsearch
agent1.sinks.k1.batchSize = 2
agent1.sinks.k1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
agent1.sinks.k1.channel = ch1
Allan Feid 2013-06-12, 14:09
shushuai zhu 2013-06-12, 14:35
shushuai zhu 2013-06-12, 15:04
Matt Wise 2013-06-18, 18:36
shushuai zhu 2013-06-19, 03:13
Edward Sargisson 2013-06-12, 16:10
shushuai zhu 2013-06-13, 01:01