|
|
Hi: After reading/testing the code of reduce phase code in hadoop-0.23.0, I think i found some bugs, maybe helpful to you . 1) in the constructor of OnDiskMerger in MergeManager.java on line 472, *public OnDiskMerger(MergeManager<K, V> manager) { super(manager, Integer.MAX_VALUE, exceptionReporter); setName("OnDiskMerger - Thread to merge on-disk map-outputs"); setDaemon(true); }* the second parameter mergeFactor in constructor can't be *Integer.MAX_VALUE, *it should be io.sort.factor. if set mergeFactor to *Integer.MAX_VALUE* the OnDiskMerger will merge all files feed to it at a time , don't show respect to io.sort.factor parameter.
2) still in MergeManager.java on line 90, the data structure *Set<Path> onDiskMapOutputs = new TreeSet<Path>();* is incorrect, you should sort the on disk files by file length, not the uri of Path. So the files feed to OnDiskMerger is not sorted by length, hurt the overall performance.
3) still in MergeManager.java being from line 745
if (0 != onDiskBytes) { final int numInMemSegments = memDiskSegments.size(); diskSegments.addAll(0, memDiskSegments); memDiskSegments.clear(); // Pass mergePhase only if there is a going to be intermediate // merges. See comment where mergePhaseFinished is being set Progress thisPhase = (mergePhaseFinished) ? null : mergePhase; RawKeyValueIterator diskMerge =* Merger.merge( job, fs, keyClass, valueClass, diskSegments, ioSortFactor, numInMemSegments, tmpDir, comparator, reporter, false, spilledRecordsCounter, null, thisPhase);* diskSegments.clear(); if (0 == finalSegments.size()) { return diskMerge; } finalSegments.add(new Segment<K,V>( new RawKVIteratorReader(diskMerge, onDiskBytes), true)); }
the above bold code which will merge files down to io.sort.factor, maybe have intermediate merge process . What's wrong with that you didn't pass in the *codec *parameter, the intermediate merge process will not compress the written file on disk, leading to huge performance degrade. 4) the implementation of Reader in IFile.java is(maybe) not very efficient, in previous hadoop version, each time read a buffer from the disk, no need to hit disk for each key value. I don't known why rewrite this codes. hopefully I don't miss anything. -- Best Regards Anty Rao
-
Re: Bugs in hadoop-0.23.0
Arun C Murthy 2012-01-18, 06:18
Thanks Anty!
Can you please go ahead and file jira(s) for them? Of course, we'd love a patch too... :)
Arun
On Jan 17, 2012, at 10:05 PM, Anty wrote:
> Hi: > After reading/testing the code of reduce phase code in hadoop-0.23.0, I > think i found some bugs, maybe helpful to you . > 1) in the constructor of OnDiskMerger in MergeManager.java on line 472, > *public OnDiskMerger(MergeManager<K, V> manager) { > super(manager, Integer.MAX_VALUE, exceptionReporter); > setName("OnDiskMerger - Thread to merge on-disk map-outputs"); > setDaemon(true); > }* > the second parameter mergeFactor in constructor can't be *Integer.MAX_VALUE, > *it should be io.sort.factor. if set mergeFactor to *Integer.MAX_VALUE* > the OnDiskMerger will merge all files feed to it at a time , don't show > respect to io.sort.factor parameter. > > 2) still in MergeManager.java on line 90, the data structure > *Set<Path> onDiskMapOutputs = new TreeSet<Path>();* > is incorrect, you should sort the on disk files by file length, not the uri > of Path. > So the files feed to OnDiskMerger is not sorted by length, hurt the overall > performance. > > 3) still in MergeManager.java being from line 745 > > if (0 != onDiskBytes) { > final int numInMemSegments = memDiskSegments.size(); > diskSegments.addAll(0, memDiskSegments); > memDiskSegments.clear(); > // Pass mergePhase only if there is a going to be intermediate > // merges. See comment where mergePhaseFinished is being set > Progress thisPhase = (mergePhaseFinished) ? null : mergePhase; > RawKeyValueIterator diskMerge =* Merger.merge( > job, fs, keyClass, valueClass, diskSegments, > ioSortFactor, numInMemSegments, tmpDir, comparator, > reporter, false, spilledRecordsCounter, null, thisPhase);* > diskSegments.clear(); > if (0 == finalSegments.size()) { > return diskMerge; > } > finalSegments.add(new Segment<K,V>( > new RawKVIteratorReader(diskMerge, onDiskBytes), true)); > } > > the above bold code which will merge files down to io.sort.factor, maybe > have intermediate merge process . > What's wrong with that you didn't pass in the *codec *parameter, the > intermediate merge process will not compress > the written file on disk, leading to huge performance degrade. > > > 4) the implementation of Reader in IFile.java is(maybe) not very efficient, > in previous hadoop version, each time read a buffer from the disk, no need > to hit disk for each key value. I don't known why rewrite this codes. > > > hopefully I don't miss anything. > > > -- > Best Regards > Anty Rao
-
Re: Bugs in hadoop-0.23.0
Anty 2012-01-18, 06:25
Hi:Arun C Murthy I am glad to take a crack . I will file jiras from them and submit patches for above bugs.
On Wed, Jan 18, 2012 at 2:18 PM, Arun C Murthy <[EMAIL PROTECTED]> wrote:
> Thanks Anty! > > Can you please go ahead and file jira(s) for them? Of course, we'd love a > patch too... :) > > Arun > > On Jan 17, 2012, at 10:05 PM, Anty wrote: > > > Hi: > > After reading/testing the code of reduce phase code in hadoop-0.23.0, I > > think i found some bugs, maybe helpful to you . > > 1) in the constructor of OnDiskMerger in MergeManager.java on line 472, > > *public OnDiskMerger(MergeManager<K, V> manager) { > > super(manager, Integer.MAX_VALUE, exceptionReporter); > > setName("OnDiskMerger - Thread to merge on-disk map-outputs"); > > setDaemon(true); > > }* > > the second parameter mergeFactor in constructor can't be > *Integer.MAX_VALUE, > > *it should be io.sort.factor. if set mergeFactor to *Integer.MAX_VALUE* > > the OnDiskMerger will merge all files feed to it at a time , don't show > > respect to io.sort.factor parameter. > > > > 2) still in MergeManager.java on line 90, the data structure > > *Set<Path> onDiskMapOutputs = new TreeSet<Path>();* > > is incorrect, you should sort the on disk files by file length, not the > uri > > of Path. > > So the files feed to OnDiskMerger is not sorted by length, hurt the > overall > > performance. > > > > 3) still in MergeManager.java being from line 745 > > > > if (0 != onDiskBytes) { > > final int numInMemSegments = memDiskSegments.size(); > > diskSegments.addAll(0, memDiskSegments); > > memDiskSegments.clear(); > > // Pass mergePhase only if there is a going to be intermediate > > // merges. See comment where mergePhaseFinished is being set > > Progress thisPhase = (mergePhaseFinished) ? null : mergePhase; > > RawKeyValueIterator diskMerge =* Merger.merge( > > job, fs, keyClass, valueClass, diskSegments, > > ioSortFactor, numInMemSegments, tmpDir, comparator, > > reporter, false, spilledRecordsCounter, null, thisPhase);* > > diskSegments.clear(); > > if (0 == finalSegments.size()) { > > return diskMerge; > > } > > finalSegments.add(new Segment<K,V>( > > new RawKVIteratorReader(diskMerge, onDiskBytes), true)); > > } > > > > the above bold code which will merge files down to io.sort.factor, maybe > > have intermediate merge process . > > What's wrong with that you didn't pass in the *codec *parameter, the > > intermediate merge process will not compress > > the written file on disk, leading to huge performance degrade. > > > > > > 4) the implementation of Reader in IFile.java is(maybe) not very > efficient, > > in previous hadoop version, each time read a buffer from the disk, no > need > > to hit disk for each key value. I don't known why rewrite this codes. > > > > > > hopefully I don't miss anything. > > > > > > -- > > Best Regards > > Anty Rao > > -- Best Regards Anty Rao
|
|