|
|
-
Could pig dynamic change the reduce number according the mapper task number ?
Jeff Zhang 2009-11-12, 08:12
Hi all,
Often, I will run one script on different data set. Sometimes small data set and sometimes large data set. And different size of data set require different number of reducers. I know that the default reduce number is 1, and users can change the reduce number in script by keywords parallel.
But I do not want to be bothered to change reduce number in script each time I run script. So I have an idea that could pig provide some API that users can set the ratio between map task and reduce task. (and some new keyword in pig latin to set the ratio)
e.g. If I set the ratio to be 2:1, then if I have 100 map tasks, it will have 50 reduce task accordingly.
I think it will be convenient for pig users. Jeff Zhang
-
Re: Could pig dynamic change the reduce number according the mapper task number ?
Alan Gates 2009-11-12, 16:25
I agree that it would be very useful to have a dynamic number of reducers. However, I'm not sure how to accomplish it. MapReduce requires that we set the number of reducers up front in JobConf, when we submit the job. But we don't know the number of maps until getSplits is called after job submission. I don't think MR will allow us to set the number of reducers once the job is started.
Others have suggested that we use the file size to specify the number of reducers. We cannot always assume the inputs are HDFS files (it could be from HBase or something). Also different storage formats (text, sequence files, zebra) would need different ratios of bytes to reducers since they store data at different compression rates. Maybe this could still work assuming, only in the HDFS case, with the assumption that the user understands the compression ratios and thus can set the reducer input accordingly. But I'm not sure this will be simple enough to be useful.
Thoughts?
Alan. On Nov 12, 2009, at 12:12 AM, Jeff Zhang wrote:
> Hi all, > > Often, I will run one script on different data set. Sometimes small > data set > and sometimes large data set. And different size of data set require > different number of reducers. > I know that the default reduce number is 1, and users can change the > reduce > number in script by keywords parallel. > > But I do not want to be bothered to change reduce number in script > each time > I run script. > So I have an idea that could pig provide some API that users can set > the > ratio between map task and reduce task. (and some new keyword in pig > latin > to set the ratio) > > e.g. If I set the ratio to be 2:1, then if I have 100 map tasks, it > will > have 50 reduce task accordingly. > > I think it will be convenient for pig users. > > > Jeff Zhang
-
Re: Could pig dynamic change the reduce number according the mapper task number ?
David 2009-11-12, 17:11
I posted a similar question a week or two ago but got no responses.
Here's what I've done - I've taken to breaking my computations into stages and then computing an estimated parallelism very simply. It may not be optimal but it is certainly better than allocating 600 nodes for data that will fit in 1.
Roughly speaking, I:
* sum the sizes of all the bzip compressed data files (sumcompressed) * multiply by 10 to assume a 10:1 compression ratio * multiply by 2 to assume account for 100% overhead of Java objects * divide by the amount of RAM allocated to each Reduce job via -Dmapred.job.reduce.memory.mb
So far, it seems to work reasonably well. Your mileage may vary. For instance, adding a scalar parameter to fine tune the amount of memory might be useful.
Here's the script that wrote to do the computations. I call it from within my Pig-in-a-Blanket wrappers for Paths to Success analysis.
Examples:
Get summary statistics for all subdirs under dir, report in GB ./datasize.pl --memory=1536 --units=gb /user/ciemo/pathstosuccess/revamp/paths.summary.daily/*/all-all/query-succes s-section-url.bz2
Compute parallel nodes required for all data below this path ./datasize.pl --memory=1536 --units=gb --parallelonly --nosubdirs --nopath /user/ciemo/pathstosuccess/revamp/paths.summary.daily/*/all-all/query-succes s-section-url.bz2 #!/usr/bin/perl
use Getopt::Long;
my $subdirs = 1; my $nosubdirs = 0; my $nopath = 0; my $nototal = 0; my $sumonly = 0; my $units = ''; my $unitscalar = 1.0; my $unitformat = '%d'; my $memory = 512; my $parallelonly = 0;
GetOptions( "subdirs" => \$subdirs, "nosubdirs" => \$nosubdirs, "nopath" => \$nopath, "nototal" => \$nototal, "sumonly" => \$sumonly, "parallelonly" => \$parallelonly, "units=s" => \$units, "memory=i" => \$memory, );
if ($units =~ /^kb$/i) { $unitscalar = 1.0E3; $unitformat = '%0.3f'; } if ($units =~ /^mb$/i) { $unitscalar = 1.0E6; $unitformat = '%0.3f'; } if ($units =~ /^gb$/i) { $unitscalar = 1.0E9; $unitformat = '%0.3f'; } if ($units =~ /^tb$/i) { $unitscalar = 1.0E12; $unitformat = '%0.3f'; } if ($units =~ /^$/i) { $unitscalar = 1.0; $unitformat = '%d'; }
if ($nosubdirs) { $subdirs = 0; }
if ($ARGV[0]) { $rootpath = $ARGV[0]; } else { print STDERR "No path specified.\n"; }
open (FILES, qq{hadoop fs -lsr $rootpath |});
while (<FILES>) { $line = $_; s/\r*\n*$//;
my ($perms, $repl, $user, $group, $bytes, $date, $time, $path) = split(/ +/);
if ($perms =~ /^d/) { next; } if ($path =~ /\/_temporary/) { next; }
my $leadpath = $path; $leadpath =~ s/\/part-[0-9]+[^\/]*$//;
my $leadpath = $path; $leadpath =~ s/\/part-[0-9]+[^\/]*$//;
$counts{$rootpath} ++; $sums{$rootpath} += $bytes; $sumssq{$rootpath} += $bytes * $bytes; $mins{$rootpath} = ((defined $mins{$rootpath}) ? (($mins{$rootpath} < $bytes) ? $mins{$rootpath} : $bytes) : $bytes); $maxs{$rootpath} = ((defined $maxs{$rootpath}) ? (($maxs{$rootpath} > $bytes) ? $maxs{$rootpath} : $bytes) : $bytes);
if (not defined $paths{$leadpath}) { $paths{$leadpath} = []; }
push @{$paths{$leadpath}}, $bytes;
$counts{$leadpath} ++; $sums{$leadpath} += $bytes; $sumssq{$leadpath} += $bytes * $bytes; $mins{$leadpath} = ((defined $mins{$leadpath}) ? (($mins{$leadpath} < $bytes) ? $mins{$leadpath} : $bytes) : $bytes); $maxs{$leadpath} = ((defined $maxs{$leadpath}) ? (($maxs{$leadpath} > $bytes) ? $maxs{$leadpath} : $bytes) : $bytes); }
foreach my $path ( ((not $nototal) ? $rootpath : ()), (($subdirs) ? (sort keys %paths) : () ) ) { my $sum = $sums{$path}; my $count = $counts{$path}; my $sumsq = $sumssq{$path}; my $min = $mins{$path}; my $max = $maxs{$path};
my $mean = $sum / $count; my $meansq = $sumsq / $count; my $var = $meansq - $mean*$mean; my $stdev = sqrt($var); my $sterr = $stdev / sqrt($count);
my $parallelism = int(($sum * 10 * 2) / ($memory * 1.0E6) + 1);
if (not $nopath) { print $path . "\t"; }
if ($sumonly) { print join("\t", sprintf($unitformat, $sum / $unitscalar)), "\n"; } if ($parallelonly) { print join("\t", sprintf('%d', $parallelism)), "\n"; } else { print join("\t", $count, sprintf($unitformat, $min / $unitscalar), sprintf($unitformat, $max / $unitscalar), sprintf($unitformat, $mean / $unitscalar), sprintf($unitformat, $sum / $unitscalar), sprintf('%d', $parallelism), ), "\n"; } }
On 11/12/09 8:25 AM, "Alan Gates" <[EMAIL PROTECTED]> wrote:
-
Re: Could pig dynamic change the reduce number according the mapper task number ?
Benjamin Reed 2009-11-12, 18:47
actually i believe that the same conf object that goes to the InputFormat is also the one that gets sent to the jobtracker, so if we update the reducers in that conf object it should also change the number of reduces that get sent to the jobtracker.
ben
Alan Gates wrote: > I agree that it would be very useful to have a dynamic number of > reducers. However, I'm not sure how to accomplish it. MapReduce > requires that we set the number of reducers up front in JobConf, when > we submit the job. But we don't know the number of maps until > getSplits is called after job submission. I don't think MR will allow > us to set the number of reducers once the job is started. > > Others have suggested that we use the file size to specify the number > of reducers. We cannot always assume the inputs are HDFS files (it > could be from HBase or something). Also different storage formats > (text, sequence files, zebra) would need different ratios of bytes to > reducers since they store data at different compression rates. Maybe > this could still work assuming, only in the HDFS case, with the > assumption that the user understands the compression ratios and thus > can set the reducer input accordingly. But I'm not sure this will be > simple enough to be useful. > > Thoughts? > > Alan. > > > On Nov 12, 2009, at 12:12 AM, Jeff Zhang wrote: > > >> Hi all, >> >> Often, I will run one script on different data set. Sometimes small >> data set >> and sometimes large data set. And different size of data set require >> different number of reducers. >> I know that the default reduce number is 1, and users can change the >> reduce >> number in script by keywords parallel. >> >> But I do not want to be bothered to change reduce number in script >> each time >> I run script. >> So I have an idea that could pig provide some API that users can set >> the >> ratio between map task and reduce task. (and some new keyword in pig >> latin >> to set the ratio) >> >> e.g. If I set the ratio to be 2:1, then if I have 100 map tasks, it >> will >> have 50 reduce task accordingly. >> >> I think it will be convenient for pig users. >> >> >> Jeff Zhang >> > >
-
RE: Could pig dynamic change the reduce number according the mapper task number ?
Santhosh Srinivasan 2009-11-12, 18:57
I was hoping that the cost based optimizer being developed by Ashutosh and Dmitriy will address this issue.
Santhosh
-----Original Message----- From: Alan Gates [mailto:[EMAIL PROTECTED]] Sent: Thursday, November 12, 2009 8:26 AM To: [EMAIL PROTECTED] Cc: David (Ciemo) Ciemiewicz Subject: Re: Could pig dynamic change the reduce number according the mapper task number ?
I agree that it would be very useful to have a dynamic number of reducers. However, I'm not sure how to accomplish it. MapReduce requires that we set the number of reducers up front in JobConf, when we submit the job. But we don't know the number of maps until getSplits is called after job submission. I don't think MR will allow us to set the number of reducers once the job is started.
Others have suggested that we use the file size to specify the number of reducers. We cannot always assume the inputs are HDFS files (it could be from HBase or something). Also different storage formats (text, sequence files, zebra) would need different ratios of bytes to reducers since they store data at different compression rates. Maybe this could still work assuming, only in the HDFS case, with the assumption that the user understands the compression ratios and thus can set the reducer input accordingly. But I'm not sure this will be simple enough to be useful.
Thoughts?
Alan. On Nov 12, 2009, at 12:12 AM, Jeff Zhang wrote:
> Hi all, > > Often, I will run one script on different data set. Sometimes small > data set and sometimes large data set. And different size of data set > require different number of reducers. > I know that the default reduce number is 1, and users can change the > reduce number in script by keywords parallel. > > But I do not want to be bothered to change reduce number in script > each time I run script. > So I have an idea that could pig provide some API that users can set > the ratio between map task and reduce task. (and some new keyword in > pig latin to set the ratio) > > e.g. If I set the ratio to be 2:1, then if I have 100 map tasks, it > will have 50 reduce task accordingly. > > I think it will be convenient for pig users. > > > Jeff Zhang
-
Re: Could pig dynamic change the reduce number according the mapper task number ?
Alan Gates 2009-11-12, 20:53
On Nov 12, 2009, at 10:47 AM, Benjamin Reed wrote:
> actually i believe that the same conf object that goes to the > InputFormat is also the one that gets sent to the jobtracker, so if > we update the reducers in that conf object it should also change the > number of reduces that get sent to the jobtracker.
No, it's a copy. Changes made in it don't end up affecting the job.
Alan.
> > ben > > Alan Gates wrote: >> I agree that it would be very useful to have a dynamic number of >> reducers. However, I'm not sure how to accomplish it. MapReduce >> requires that we set the number of reducers up front in JobConf, >> when we submit the job. But we don't know the number of maps >> until getSplits is called after job submission. I don't think MR >> will allow us to set the number of reducers once the job is started. >> >> Others have suggested that we use the file size to specify the >> number of reducers. We cannot always assume the inputs are HDFS >> files (it could be from HBase or something). Also different >> storage formats (text, sequence files, zebra) would need different >> ratios of bytes to reducers since they store data at different >> compression rates. Maybe this could still work assuming, only in >> the HDFS case, with the assumption that the user understands the >> compression ratios and thus can set the reducer input >> accordingly. But I'm not sure this will be simple enough to be >> useful. >> >> Thoughts? >> >> Alan. >> >> >> On Nov 12, 2009, at 12:12 AM, Jeff Zhang wrote: >> >> >>> Hi all, >>> >>> Often, I will run one script on different data set. Sometimes >>> small data set >>> and sometimes large data set. And different size of data set require >>> different number of reducers. >>> I know that the default reduce number is 1, and users can change >>> the reduce >>> number in script by keywords parallel. >>> >>> But I do not want to be bothered to change reduce number in >>> script each time >>> I run script. >>> So I have an idea that could pig provide some API that users can >>> set the >>> ratio between map task and reduce task. (and some new keyword in >>> pig latin >>> to set the ratio) >>> >>> e.g. If I set the ratio to be 2:1, then if I have 100 map tasks, >>> it will >>> have 50 reduce task accordingly. >>> >>> I think it will be convenient for pig users. >>> >>> >>> Jeff Zhang >>> >> >> >
-
RE: Could pig dynamic change the reduce number according the mapper task number ?
Dmitriy Ryaboy 2009-11-12, 21:27
We are going roughly the same route as what Ciemo described, but our decisions are only going to be as good as the statistics. Currently stats object contains file size, but no compression information, so bz files will artificially small. Perhaps we should clarify "uncompressed file size" in the spec.
-----Original Message----- From: "Santhosh Srinivasan" <[EMAIL PROTECTED]> To: [EMAIL PROTECTED] Cc: "David (Ciemo) Ciemiewicz" <[EMAIL PROTECTED]> Sent: 11/12/2009 12:57 PM Subject: RE: Could pig dynamic change the reduce number according the mapper task number ?
I was hoping that the cost based optimizer being developed by Ashutosh and Dmitriy will address this issue.
Santhosh
-----Original Message----- From: Alan Gates [mailto:[EMAIL PROTECTED]] Sent: Thursday, November 12, 2009 8:26 AM To: [EMAIL PROTECTED] Cc: David (Ciemo) Ciemiewicz Subject: Re: Could pig dynamic change the reduce number according the mapper task number ?
I agree that it would be very useful to have a dynamic number of reducers. However, I'm not s
[truncated by sender]
-
Re: Could pig dynamic change the reduce number according the mapper task number ?
Scott Carey 2009-11-12, 22:49
Is it possible to have a script at least use the default configured Hadoop value? Or is there a way to do that already? It won't be optimal, but it will probably be better than 1. Also, having too many reducers used to be a big problem performance-wise, but Hadoop is getting a lot less sensitive to that over time. Especially after the Shuffle refactoring in 0.21. http://issues.apache.org/jira/browse/MAPREDUCE-318So, in the future, over-estimating the number of reduces will likely be a better idea than under-estimating them. On 11/12/09 8:25 AM, "Alan Gates" <[EMAIL PROTECTED]> wrote: I agree that it would be very useful to have a dynamic number of reducers. However, I'm not sure how to accomplish it. MapReduce requires that we set the number of reducers up front in JobConf, when we submit the job. But we don't know the number of maps until getSplits is called after job submission. I don't think MR will allow us to set the number of reducers once the job is started. Others have suggested that we use the file size to specify the number of reducers. We cannot always assume the inputs are HDFS files (it could be from HBase or something). Also different storage formats (text, sequence files, zebra) would need different ratios of bytes to reducers since they store data at different compression rates. Maybe this could still work assuming, only in the HDFS case, with the assumption that the user understands the compression ratios and thus can set the reducer input accordingly. But I'm not sure this will be simple enough to be useful. Thoughts? Alan. On Nov 12, 2009, at 12:12 AM, Jeff Zhang wrote: > Hi all, > > Often, I will run one script on different data set. Sometimes small > data set > and sometimes large data set. And different size of data set require > different number of reducers. > I know that the default reduce number is 1, and users can change the > reduce > number in script by keywords parallel. > > But I do not want to be bothered to change reduce number in script > each time > I run script. > So I have an idea that could pig provide some API that users can set > the > ratio between map task and reduce task. (and some new keyword in pig > latin > to set the ratio) > > e.g. If I set the ratio to be 2:1, then if I have 100 map tasks, it > will > have 50 reduce task accordingly. > > I think it will be convenient for pig users. > > > Jeff Zhang
-
Re: Could pig dynamic change the reduce number according the mapper task number ?
Alan Gates 2009-11-13, 17:23
On Nov 12, 2009, at 2:49 PM, Scott Carey wrote:
> Is it possible to have a script at least use the default configured > Hadoop value? Or is there a way to do that already?
If the user doesn't specify a parallelism Pig doesn't set a value in JobConf for the reduce, which means it will pick up the default for the cluster. Unless cluster administrators change it, the default for the cluster is 1.
> Alan.
-
Re: Could pig dynamic change the reduce number according the mapper task number ?
Jeff Zhang 2009-11-27, 07:38
I get the suggestion from Owen O'Malley that we can control reducer number in InputFormat, and I have tried that, it works. Jeff Zhang
On Sat, Nov 14, 2009 at 1:23 AM, Alan Gates <[EMAIL PROTECTED]> wrote:
> > On Nov 12, 2009, at 2:49 PM, Scott Carey wrote: > > Is it possible to have a script at least use the default configured Hadoop >> value? Or is there a way to do that already? >> > > If the user doesn't specify a parallelism Pig doesn't set a value in > JobConf for the reduce, which means it will pick up the default for the > cluster. Unless cluster administrators change it, the default for the > cluster is 1. > > >> Alan. >
-
Re: Could pig dynamic change the reduce number according the mapper task number ?
Jeff Zhang 2009-11-27, 07:41
Attach my sample code ( this InputFormat generate 1 reducer task for each 5 mapper task):
*public class MyInputFormat extends TextInputFormat {
@Override public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { InputSplit[] splits = super.getSplits(job, numSplits); int reducerNum = splits.length / 5; if (reducerNum == 0) { reducerNum = 1; }
job.setNumReduceTasks(reducerNum); return splits; } }* After pig integrate the InputFormat in LoadFunc (Pig-966), it will be possible to change the reducer task number dynamically. Jeff Zhang On Fri, Nov 27, 2009 at 3:38 PM, Jeff Zhang <[EMAIL PROTECTED]> wrote:
> I get the suggestion from Owen O'Malley that we can control reducer number > in InputFormat, and I have tried that, it works. > > > Jeff Zhang > > > > > On Sat, Nov 14, 2009 at 1:23 AM, Alan Gates <[EMAIL PROTECTED]> wrote: > >> >> On Nov 12, 2009, at 2:49 PM, Scott Carey wrote: >> >> Is it possible to have a script at least use the default configured >>> Hadoop value? Or is there a way to do that already? >>> >> >> If the user doesn't specify a parallelism Pig doesn't set a value in >> JobConf for the reduce, which means it will pick up the default for the >> cluster. Unless cluster administrators change it, the default for the >> cluster is 1. >> >> >>> Alan. >> > >
|
|