Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
Avro >> mail # user >> How the custom Key class can be used in Avro


Copy link to this message
-
How the custom Key class can be used in Avro
Hi,
Currently I have a MR job needs to use my own Key class to support 2nd sort in the MR job.
The originally job is using Avro String type as the mapper output like this format:
public class MyMapper extends MapReduceBase implements Mapper<LongWritable, Text, AvroKey<CharSequence>,        AvroValue<OneAvroSpecificRecordObject>>
Right now, I need to change the key from Text to a custom Key object, as I need to control complex sorting order and support 2nd sort in my MR job.
So I create a CustomKeyObject (PartitionKey class), which contains 3 Long values and 4 String values. This key class implements WritableComparable and I also have my KeyComparator and KeyGroupComparator class implementation ready.
So in this case, I want to change my mapper for the new format:
public class MyMapper extends MapReduceBase implements Mapper<LongWritable, Text, AvroKey< CustomKeyObject >,        AvroValue<OneAvroSpecificRecordObject>>
Here comes the problem, I don't know what kind of schema I can use in my driver class for this key.
Originally, the driver will have following line:
AvroJob.setMapOutputSchema(conf, Pair.getPairSchema(Schema.create(Schema.Type.STRING), OneAvroSpecificRecordObject.SCHEMA$));
So my question is what kind of schema I should use above to replay the TYPE.STRING?
Here are some things I tried, and the error I got:
1) I tried with a Union Schema, with 3 Long Types and 4 String Types. It does NOT work, as union cannot contain duplicate types.2) Then I think I need to create an anonymous record schema, it should work for my case. So here is what I do:    First, in the code, add the schema definition:    String keySchema = "type........." // create a record schema with 3 long types and 4 string types    Then, generate the schema at runtime in my code:   AvroJob.setMapOutputSchema(conf, Pair.getPairSchema(new Schema.Parser().parse(keySchema), OneAvroSpecificRecordObject.SCHEMA$));   This works fine for all my mapper stage, but in the reducer part, it failed with the following error:   java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to PartitionKeyMy reducer likes this:myReducer implements Reducer<AvroKey< PartitionKey >, AvroValue< OneAvroSpecificRecordObject >, NullWritable, NullWritable>It looks like if I use anonymous record schema, it will use genericData$Record, which I cannot cast to PartitionKey class I want.3) Then I think, do I have to generate a specific PartitionKey object using a new avsc file? I can do that, but the new object generated by Avro won't implements WritableComparable, so I cannot use it as key of mapper.
I wonder, if I want to use a custom key implements WritableComparable as my mapper output key, what schema I should use in Avro? I searched the source code of Avro, and didn't find any existing examples to demo this. Also on the web, not too many examples to talk about it. But for a lot of cases, we want our own custom Key Class implementation, to be used in MR job. Does anyone know how to do the schema for this kind of class? Any examples available?
Thanks
Yong    
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB