Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
Hadoop, mail # user - How does MergeQueue.merge actually sort <K,V> from different segments ??


Copy link to this message
-
Re: How does MergeQueue.merge actually sort <K,V> from different segments ??
Ted Yu 2010-06-20, 15:04
Please note this:

private static class MergeQueue<K extends Object, V extends Object>
  extends PriorityQueue<Segment<K, V>> implements RawKeyValueIterator {

priority queue is used to accomplish sorting.

On Fri, Jun 18, 2010 at 8:14 PM, elton sky <[EMAIL PROTECTED]> wrote:

> Hello everyone,
>
> I am going thru source code of MapReduce. In MergeQueue.merge, I can only
> see the SEGMENTS are combined and sorted by length into a list for merge.
> However, I could not find the procedure to sort those (key, value) in
> segments by key...
>
> here is the function:
>
>   1. RawKeyValueIterator merge(Class<K> keyClass, Class<V> valueClass,
>   2. .
>   3. .
>   4. .
>   5.
>   6.         //if we have lesser number of segments remaining, then just
>   return the
>   7.         //iterator, else do another single level merge
>   8.         if (numSegments <= factor) {
>   9.           // Reset totalBytesProcessed to track the progress of the
>   final merge.
>   10.           // This is considered the progress of the reducePhase, the
>   3rd phase
>   11.           // of reduce task. Currently totalBytesProcessed is not
>   used in sort
>   12.           // phase of reduce task(i.e. when intermediate merges
>   happen).
>   13.           totalBytesProcessed = startBytes;
>   14.
>   15.           //calculate the length of the remaining segments. Required
>   for
>   16.           //calculating the merge progress
>   17.           long totalBytes = 0;
>   18.           for (int i = 0; i < segmentsToMerge.size(); i++) {
>   19.             totalBytes += segmentsToMerge.get(i).getLength();
>   20.           }
>   21.           if (totalBytes != 0) //being paranoid
>   22.             progPerByte = 1.0f / (float)totalBytes;
>   23.
>   24.           if (totalBytes != 0)
>   25.             mergeProgress.set(totalBytesProcessed * progPerByte);
>   26.           else
>   27.             mergeProgress.set(1.0f); // Last pass and no segments
>   left - we're done
>   28.
>   29.           LOG.info("Down to the last merge-pass, with " + numSegments
>   +
>   30.                    " segments left of total size: " + totalBytes + "
>   bytes");
>   31.           return this;
>   32.         } else {
>   33.           LOG.info("Merging " + segmentsToMerge.size() +
>   34.                    " intermediate segments out of a total of " +
>   35.                    (segments.size()+segmentsToMerge.size()));
>   36.
>   37.           //we want to spread the creation of temp files on multiple
>   disks if
>   38.           //available under the space constraints
>   39.           long approxOutputSize = 0;
>   40.           for (Segment<K, V> s : segmentsToMerge) {
>   41.             approxOutputSize += s.getLength() +
>   42.
>      ChecksumFileSystem.getApproxChkSumLength(
>   43.                                 s.getLength());
>   44.           }
>   45.           Path tmpFilename >   46.             new Path(tmpDir, "intermediate").suffix("." + passNo);
>   47.
>   48.           Path outputFile =  lDirAlloc.getLocalPathForWrite(
>   49.                                               tmpFilename.toString(),
>   50.                                               approxOutputSize,
>   conf);
>   51.
>   52.           Writer<K, V> writer >   53.             new Writer<K, V>(conf, fs, outputFile, keyClass,
>   valueClass, codec,
>   54.                              writesCounter);
>   55.           *writeFile(this, writer, reporter, conf);*
>   56.           writer.close();
>   57.
>   58.           //we finished one single level merge; now clean up the
>   priority
>   59.           //queue
>   60.           this.close();
>   61.
>   62.           // Add the newly create segment to the list of segments to
>   be merged
>   63.           Segment<K, V> tempSegment >   64.             new Segment<K, V>(conf, fs, outputFile, codec, false);
>   65.           segments.add(tempSegment);
>   66.           numSegments = segments.size();
>   67.           Collections.sort(segments, segmentComparator);