Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
MapReduce >> mail # user >> Why the reducer's input group count is higher than my GroupComparator implementation


Copy link to this message
-
Re: Why the reducer's input group count is higher than my GroupComparator implementation
Did you overwrite the partitioner as well?
2013/10/29 java8964 java8964 <[EMAIL PROTECTED]>

> Hi, I have a strange question related to my secondary sort implementation
> in the MR job.
> Currently I need to support 2nd sort in one of my MR job. I implemented my
> custom WritableComparable like following:
>
> public class MyPartitionKey implements WritableComparable<MyPartitionKey> {
>     String type;
>     long id1;
>     String id2;
>     String id3;
>     String id4;
>     long timestamp1;
>     long timestamp2
> }
>
> Then I implemented following methods for this class:
>
> public int compareTo(); // sort the data based on all attributes listed
> above, sorted the last 2 timestamps descending
> public int hashCode(); // generate the hashcode using all attributes above
> public boolean equals(); // using all the attributes for equals check
> public void write(DataOutput out) // serialize all the attributes listed
> above
> public void readFields(DataInput in) // deserialize all the attributes
> listed above
>
> For partition and grouping of my keys, I want the following logic:
> Based on the type, the data could partition either by year or by day for
> timestamp1.
>
> For sorting order, I want the data sort by (type, id1, id2, id3, id4),
> then reverse sorting by (timestamp1, timestamp2).
>
> I implemented my KeyComparator using my sorting order logic listed above,
> and my Partitioner and GroupComparator based on my logic listed above.
>
> Here is the pseudo code of the Partitioner and GroupComparator:
>
> public class MyPartitioner implements Partitioner {
>     @Override
>     public int getPartition(MyPartitionKey key, Value value, int
> numPartitions) {
>         int hashCode = key.getActivityType().name().hashCode();
>         StringBuilder sb = new StringBuilder();
>         for (String subPartitionValue : key.getPartitionValue()) {
>             sb.append(subPartitionValue);
>         }
>         return Math.abs(hashCode * 127 + sb.toString().hashCode()) %
> numPartitions;
>     }
>
>     @Override
>     public void configure(JobConf job) {
>     }
> }
>
> // The key getPartitionValue method will return array of string of either
> YYYY or {YYYY, MM, DD} of the timestamp1.
>
> For GroupComparator:
>
>     public static class MyGroupComparator extends WritableComparator {
>         protected MyGroupComparator() {
>             super(MyPartitionKey.class, true);
>         }
>
>         @Override
>         public int compare(WritableComparable w1, WritableComparable w2) {
>             MyPartitionKey key1 = (MyPartitionKey) w1;
>             MyPartitionKey key2 = (MyPartitionKey) w2;
>             int cmp = key1.type.compareTo(key2.type);
>             // different type, send to different group
>             if (cmp != 0)
>                 return cmp;
>
>             // for the same type, should have the same partition value
> array length
>             String[] partitionValue1 = key1.getPartitionValue();
>             String[] partitionValue2 = key2.getPartitionValue();
>             assert partitionValue1.length == partitionValue2.length;
>             StringBuilder sb1 = new StringBuilder();
>             StringBuilder sb2 = new StringBuilder();
>             for (String subValue : partitionValue1) {
>                 sb1.append(subValue);
>             }
>             for (String subValue : partitionValue2) {
>                 sb2.append(subValue);
>             }
>             return sb1.toString().compareTo(sb2.toString());
>         }
>
> Now, here is the strange problem I don't understand. I tested with my MR
> job. I know in the test data, I have 7 types data, 3 of them partitioned
> yearly, 4 of them partition daily. In the test data, for the 3 types
> partitioned daily, there are 2 days data of each type. So I expected the
> Input group count of the reducer should be 11, which is 4 x 2 + 3 = 11. In
> fact, if I don't use this custom MyPartitionKey, just use  Text as the key
> type, with "type + YYYY" for yearly dataset, "type + YYYYMMDD" for daily
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB