Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
Accumulo >> mail # dev >> KeyRangePartitioner


Copy link to this message
-
Re: KeyRangePartitioner
The partitioner doesn't change the sort order used in MapReduce, that's the
Comparator. The partitioner just changes how the various keys emitted get
bucketed and distributed to the different reduce tasks. And because
TabletServers don't split rows, we don't want to get partitions mid-row.
On Mon, Jun 10, 2013 at 4:55 PM, David Medinets <[EMAIL PROTECTED]>wrote:

> Is there any reason why changing the RangePartitioner so that it
> understands Accumulo Key objects is bad? It seems like the only significant
> change is passing key.getRow() into the binarySearch call in findPartition.
> With this change it seems that the sort phase of map-reduce is sorting
> accumulo keys properly.
>
> Have i overlooked something?
>
> /*
>  * Licensed to the Apache Software Foundation (ASF) under one or more
>  * contributor license agreements.  See the NOTICE file distributed with
>  * this work for additional information regarding copyright ownership.
>  * The ASF licenses this file to You under the Apache License, Version 2.0
>  * (the "License"); you may not use this file except in compliance with
>  * the License.  You may obtain a copy of the License at
>  *
>  *     http://www.apache.org/licenses/LICENSE-2.0
>  *
>  * Unless required by applicable law or agreed to in writing, software
>  * distributed under the License is distributed on an "AS IS" BASIS,
>  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>  * See the License for the specific language governing permissions and
>  * limitations under the License.
>  */
> package org.apache.accumulo.core.client.mapreduce.lib.partition;
>
> import java.io.BufferedReader;
> import java.io.FileNotFoundException;
> import java.io.FileReader;
> import java.io.IOException;
> import java.net.URI;
> import java.util.Arrays;
> import java.util.Scanner;
> import java.util.TreeSet;
>
> import org.apache.commons.codec.binary.Base64;
> import org.apache.hadoop.conf.Configurable;
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.filecache.DistributedCache;
> import org.apache.hadoop.fs.Path;
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.io.Writable;
> import org.apache.hadoop.mapreduce.JobContext;
> import org.apache.hadoop.mapreduce.Partitioner;
>
> import *org.apache.accumulo.core.data.Key;*
>
> /**
>  * Hadoop partitioner that uses ranges, and optionally sub-bins based
> on hashing.
>  */
> public class KeyRangePartitioner extends Partitioner<Key,Writable>
> implements Configurable {
>   private static final String PREFIX = RangePartitioner.class.getName();
>   private static final String CUTFILE_KEY = PREFIX + ".cutFile";
>   private static final String NUM_SUBBINS = PREFIX + ".subBins";
>
>   private Configuration conf;
>
>   public int getPartition(Key key, Writable value, int numPartitions) {
>     try {
>       return findPartition(key, getCutPoints(), getNumSubBins());
>     } catch (IOException e) {
>       throw new RuntimeException(e);
>     }
>   }
>
>   int findPartition(Key key, Text[] array, int numSubBins) {
>     // find the bin for the range, and guarantee it is positive
>     int index = Arrays.binarySearch(array, key.getRow());
>     index = index < 0 ? (index + 1) * -1 : index;
>
>     // both conditions work with numSubBins == 1, but this check is to
> avoid
>     // hashing, when we don't need to, for speed
>     if (numSubBins < 2)
>       return index;
>     return (key.toString().hashCode() & Integer.MAX_VALUE) %
> numSubBins + index * numSubBins;
>   }
>
>   private int _numSubBins = 0;
>
>   private synchronized int getNumSubBins() {
>     if (_numSubBins < 1) {
>       // get number of sub-bins and guarantee it is positive
>       _numSubBins = Math.max(1, getConf().getInt(NUM_SUBBINS, 1));
>     }
>     return _numSubBins;
>   }
>
>   private Text cutPointArray[] = null;
>
>   private synchronized Text[] getCutPoints() throws IOException {
>     if (cutPointArray == null) {
>       String cutFileName = conf.get(CUTFILE_KEY);
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB