Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
MapReduce >> mail # user >> Why the reducer's input group count is higher than my GroupComparator implementation


Copy link to this message
-
Re: Why the reducer's input group count is higher than my GroupComparator implementation
Did you overwrite the partitioner as well?
2013/10/29 java8964 java8964 <[EMAIL PROTECTED]>

> Hi, I have a strange question related to my secondary sort implementation
> in the MR job.
> Currently I need to support 2nd sort in one of my MR job. I implemented my
> custom WritableComparable like following:
>
> public class MyPartitionKey implements WritableComparable<MyPartitionKey> {
>     String type;
>     long id1;
>     String id2;
>     String id3;
>     String id4;
>     long timestamp1;
>     long timestamp2
> }
>
> Then I implemented following methods for this class:
>
> public int compareTo(); // sort the data based on all attributes listed
> above, sorted the last 2 timestamps descending
> public int hashCode(); // generate the hashcode using all attributes above
> public boolean equals(); // using all the attributes for equals check
> public void write(DataOutput out) // serialize all the attributes listed
> above
> public void readFields(DataInput in) // deserialize all the attributes
> listed above
>
> For partition and grouping of my keys, I want the following logic:
> Based on the type, the data could partition either by year or by day for
> timestamp1.
>
> For sorting order, I want the data sort by (type, id1, id2, id3, id4),
> then reverse sorting by (timestamp1, timestamp2).
>
> I implemented my KeyComparator using my sorting order logic listed above,
> and my Partitioner and GroupComparator based on my logic listed above.
>
> Here is the pseudo code of the Partitioner and GroupComparator:
>
> public class MyPartitioner implements Partitioner {
>     @Override
>     public int getPartition(MyPartitionKey key, Value value, int
> numPartitions) {
>         int hashCode = key.getActivityType().name().hashCode();
>         StringBuilder sb = new StringBuilder();
>         for (String subPartitionValue : key.getPartitionValue()) {
>             sb.append(subPartitionValue);
>         }
>         return Math.abs(hashCode * 127 + sb.toString().hashCode()) %
> numPartitions;
>     }
>
>     @Override
>     public void configure(JobConf job) {
>     }
> }
>
> // The key getPartitionValue method will return array of string of either
> YYYY or {YYYY, MM, DD} of the timestamp1.
>
> For GroupComparator:
>
>     public static class MyGroupComparator extends WritableComparator {
>         protected MyGroupComparator() {
>             super(MyPartitionKey.class, true);
>         }
>
>         @Override
>         public int compare(WritableComparable w1, WritableComparable w2) {
>             MyPartitionKey key1 = (MyPartitionKey) w1;
>             MyPartitionKey key2 = (MyPartitionKey) w2;
>             int cmp = key1.type.compareTo(key2.type);
>             // different type, send to different group
>             if (cmp != 0)
>                 return cmp;
>
>             // for the same type, should have the same partition value
> array length
>             String[] partitionValue1 = key1.getPartitionValue();
>             String[] partitionValue2 = key2.getPartitionValue();
>             assert partitionValue1.length == partitionValue2.length;
>             StringBuilder sb1 = new StringBuilder();
>             StringBuilder sb2 = new StringBuilder();
>             for (String subValue : partitionValue1) {
>                 sb1.append(subValue);
>             }
>             for (String subValue : partitionValue2) {
>                 sb2.append(subValue);
>             }
>             return sb1.toString().compareTo(sb2.toString());
>         }
>
> Now, here is the strange problem I don't understand. I tested with my MR
> job. I know in the test data, I have 7 types data, 3 of them partitioned
> yearly, 4 of them partition daily. In the test data, for the 3 types
> partitioned daily, there are 2 days data of each type. So I expected the
> Input group count of the reducer should be 11, which is 4 x 2 + 3 = 11. In
> fact, if I don't use this custom MyPartitionKey, just use  Text as the key
> type, with "type + YYYY" for yearly dataset, "type + YYYYMMDD" for daily