Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
Hadoop >> mail # user >> How does MergeQueue.merge actually sort <K,V> from different segments ??


Copy link to this message
-
Re: How does MergeQueue.merge actually sort <K,V> from different segments ??
Please note this:

private static class MergeQueue<K extends Object, V extends Object>
  extends PriorityQueue<Segment<K, V>> implements RawKeyValueIterator {

priority queue is used to accomplish sorting.

On Fri, Jun 18, 2010 at 8:14 PM, elton sky <[EMAIL PROTECTED]> wrote:

> Hello everyone,
>
> I am going thru source code of MapReduce. In MergeQueue.merge, I can only
> see the SEGMENTS are combined and sorted by length into a list for merge.
> However, I could not find the procedure to sort those (key, value) in
> segments by key...
>
> here is the function:
>
>   1. RawKeyValueIterator merge(Class<K> keyClass, Class<V> valueClass,
>   2. .
>   3. .
>   4. .
>   5.
>   6.         //if we have lesser number of segments remaining, then just
>   return the
>   7.         //iterator, else do another single level merge
>   8.         if (numSegments <= factor) {
>   9.           // Reset totalBytesProcessed to track the progress of the
>   final merge.
>   10.           // This is considered the progress of the reducePhase, the
>   3rd phase
>   11.           // of reduce task. Currently totalBytesProcessed is not
>   used in sort
>   12.           // phase of reduce task(i.e. when intermediate merges
>   happen).
>   13.           totalBytesProcessed = startBytes;
>   14.
>   15.           //calculate the length of the remaining segments. Required
>   for
>   16.           //calculating the merge progress
>   17.           long totalBytes = 0;
>   18.           for (int i = 0; i < segmentsToMerge.size(); i++) {
>   19.             totalBytes += segmentsToMerge.get(i).getLength();
>   20.           }
>   21.           if (totalBytes != 0) //being paranoid
>   22.             progPerByte = 1.0f / (float)totalBytes;
>   23.
>   24.           if (totalBytes != 0)
>   25.             mergeProgress.set(totalBytesProcessed * progPerByte);
>   26.           else
>   27.             mergeProgress.set(1.0f); // Last pass and no segments
>   left - we're done
>   28.
>   29.           LOG.info("Down to the last merge-pass, with " + numSegments
>   +
>   30.                    " segments left of total size: " + totalBytes + "
>   bytes");
>   31.           return this;
>   32.         } else {
>   33.           LOG.info("Merging " + segmentsToMerge.size() +
>   34.                    " intermediate segments out of a total of " +
>   35.                    (segments.size()+segmentsToMerge.size()));
>   36.
>   37.           //we want to spread the creation of temp files on multiple
>   disks if
>   38.           //available under the space constraints
>   39.           long approxOutputSize = 0;
>   40.           for (Segment<K, V> s : segmentsToMerge) {
>   41.             approxOutputSize += s.getLength() +
>   42.
>      ChecksumFileSystem.getApproxChkSumLength(
>   43.                                 s.getLength());
>   44.           }
>   45.           Path tmpFilename >   46.             new Path(tmpDir, "intermediate").suffix("." + passNo);
>   47.
>   48.           Path outputFile =  lDirAlloc.getLocalPathForWrite(
>   49.                                               tmpFilename.toString(),
>   50.                                               approxOutputSize,
>   conf);
>   51.
>   52.           Writer<K, V> writer >   53.             new Writer<K, V>(conf, fs, outputFile, keyClass,
>   valueClass, codec,
>   54.                              writesCounter);
>   55.           *writeFile(this, writer, reporter, conf);*
>   56.           writer.close();
>   57.
>   58.           //we finished one single level merge; now clean up the
>   priority
>   59.           //queue
>   60.           this.close();
>   61.
>   62.           // Add the newly create segment to the list of segments to
>   be merged
>   63.           Segment<K, V> tempSegment >   64.             new Segment<K, V>(conf, fs, outputFile, codec, false);
>   65.           segments.add(tempSegment);
>   66.           numSegments = segments.size();
>   67.           Collections.sort(segments, segmentComparator);
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB