|
|
-
Map Reduce Theory Question, getting OutOfMemoryError while reducing
Berry, Matt 2012-06-28, 21:37
I have a MapReduce job that reads in several gigs of log files and separates the records based on who generated them. My MapReduce job looks like this:
InputFormat: NLineInputFormat - Reads N lines of an input file, which is an array of URLs for log files to download Mapper: <LongWritable, Text, WhoKey, LogRecord> - (LongWritable,Text) is coming from the input format. The Text is parsed into an array of URLs. Each log is downloaded and the records extracted - WhoKey is just an multipart Key that describes who caused a record to be logged - LogRecord is the record that they logged, with all irrelevant information purged Reducer: <WhoKey, LogRecord, WhoKey, AggregateRecords> - The Reducer iterates through the LogRecords for the WhoKey and adds them to a LinkedList (AggregateRecords) and emits that to the output format OutputFormat: <WhoKey, AggregateRecords> - Creates a file for each WhoKey and writes the records into it
However I'm running into a problem. When a single person generates an inordinate number of records, all of them have to be held in memory causing my heap space to run out. I could increase the heap size, but that will not solve the problem as they could just generate more records and break it again. I've spent a lot of time thinking about how I could alter my setup so no more than N number of records are held in memory at a time, but I can't think of a way to do it.
Is there something seriously wrong with how I am processing this? Should I have structured the job in a different way that would avoid this scenario? Isn't the MapReduce framework designed to operate on large data sets, shouldn't it be managing the heap better?
Stderr and Stack Trace:
12/06/28 12:10:55 INFO mapred.JobClient: map 100% reduce 67% 12/06/28 12:10:58 INFO mapred.LocalJobRunner: reduce > reduce 12/06/28 12:10:58 INFO mapred.JobClient: map 100% reduce 69% 12/06/28 12:11:01 INFO mapred.LocalJobRunner: reduce > reduce 12/06/28 12:11:01 INFO mapred.JobClient: map 100% reduce 71% 12/06/28 12:11:04 INFO mapred.LocalJobRunner: reduce > reduce 12/06/28 12:11:06 INFO mapred.JobClient: map 100% reduce 72% 12/06/28 12:11:07 INFO mapred.LocalJobRunner: reduce > reduce 12/06/28 12:11:11 INFO mapred.LocalJobRunner: reduce > reduce 12/06/28 12:11:15 INFO mapred.LocalJobRunner: reduce > reduce 12/06/28 12:15:31 INFO mapred.LocalJobRunner: reduce > reduce 12/06/28 12:15:35 INFO mapred.LocalJobRunner: reduce > reduce 12/06/28 12:15:41 INFO mapred.LocalJobRunner: reduce > reduce 12/06/28 12:15:46 INFO mapred.LocalJobRunner: reduce > reduce 12/06/28 12:15:51 INFO mapred.LocalJobRunner: reduce > reduce 12/06/28 12:15:56 INFO mapred.LocalJobRunner: reduce > reduce 12/06/28 12:16:01 INFO mapred.LocalJobRunner: reduce > reduce 12/06/28 12:16:06 INFO mapred.LocalJobRunner: reduce > reduce 12/06/28 12:16:12 INFO mapred.LocalJobRunner: reduce > reduce 12/06/28 12:16:22 INFO mapred.LocalJobRunner: reduce > reduce 12/06/28 12:16:44 WARN mapred.LocalJobRunner: job_local_0001 java.lang.OutOfMemoryError: GC overhead limit exceeded at sun.reflect.GeneratedConstructorAccessor14.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27) at java.lang.reflect.Constructor.newInstance(Constructor.java:513) at java.lang.Class.newInstance0(Class.java:355) at java.lang.Class.newInstance(Class.java:308) at xxxxxx.xxxxxxxxxxx.xxxx.xxxxxxxx.xxxxxxxxxxxxxxxxxx.readFields(xxxxxxxxxxxxxxxxxx.java:41) at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:67) at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:40) at org.apache.hadoop.mapreduce.ReduceContext.nextKeyValue(ReduceContext.java:116) at org.apache.hadoop.mapreduce.ReduceContext$ValueIterator.next(ReduceContext.java:163) at xxxxxx.xxxxxxxxxxx.xxxx.xxxxx.xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx.reduce(xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx.java:34) at xxxxxx.xxxxxxxxxxx.xxxx.xxxxx.xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx.reduce(xxxxxxxxxxxxxxxxxxxxxxxxxxxxx.java:26) at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176) at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:566) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:408) at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:216)
P.S. Already used jmap to dump the heap and trim each object down to its bare minimum and to also confirm there are no slow memory leaks.
+
Berry, Matt 2012-06-28, 21:37
-
Re: Map Reduce Theory Question, getting OutOfMemoryError while reducing
Steve Lewis 2012-06-28, 22:43
It is NEVER a good idea to hold items in memory - after all this is big data and you want it to scale - I do not see what stops you from reading one record, processing it and writing it out without retaining it. It is OK to keep statistics while iterating through a key and output them at the end but holding all values for a key is almost never a good idea unless you can guarantee limits to these
On Thu, Jun 28, 2012 at 2:37 PM, Berry, Matt <[EMAIL PROTECTED]> wrote:
> I have a MapReduce job that reads in several gigs of log files and > separates the records based on who generated them. My MapReduce job looks > like this: > > InputFormat: NLineInputFormat > - Reads N lines of an input file, which is an array of URLs for log files > to download > Mapper: <LongWritable, Text, WhoKey, LogRecord> > - (LongWritable,Text) is coming from the input format. The Text is parsed > into an array of URLs. Each log is downloaded and the records extracted > - WhoKey is just an multipart Key that describes who caused a record to be > logged > - LogRecord is the record that they logged, with all irrelevant > information purged > Reducer: <WhoKey, LogRecord, WhoKey, AggregateRecords> > - The Reducer iterates through the LogRecords for the WhoKey and adds them > to a LinkedList (AggregateRecords) and emits that to the output format > OutputFormat: <WhoKey, AggregateRecords> > - Creates a file for each WhoKey and writes the records into it > > However I'm running into a problem. When a single person generates an > inordinate number of records, all of them have to be held in memory causing > my heap space to run out. I could increase the heap size, but that will not > solve the problem as they could just generate more records and break it > again. I've spent a lot of time thinking about how I could alter my setup > so no more than N number of records are held in memory at a time, but I > can't think of a way to do it. > > Is there something seriously wrong with how I am processing this? Should I > have structured the job in a different way that would avoid this scenario? > Isn't the MapReduce framework designed to operate on large data sets, > shouldn't it be managing the heap better? > > Stderr and Stack Trace: > > 12/06/28 12:10:55 INFO mapred.JobClient: map 100% reduce 67% > 12/06/28 12:10:58 INFO mapred.LocalJobRunner: reduce > reduce > 12/06/28 12:10:58 INFO mapred.JobClient: map 100% reduce 69% > 12/06/28 12:11:01 INFO mapred.LocalJobRunner: reduce > reduce > 12/06/28 12:11:01 INFO mapred.JobClient: map 100% reduce 71% > 12/06/28 12:11:04 INFO mapred.LocalJobRunner: reduce > reduce > 12/06/28 12:11:06 INFO mapred.JobClient: map 100% reduce 72% > 12/06/28 12:11:07 INFO mapred.LocalJobRunner: reduce > reduce > 12/06/28 12:11:11 INFO mapred.LocalJobRunner: reduce > reduce > 12/06/28 12:11:15 INFO mapred.LocalJobRunner: reduce > reduce > 12/06/28 12:15:31 INFO mapred.LocalJobRunner: reduce > reduce > 12/06/28 12:15:35 INFO mapred.LocalJobRunner: reduce > reduce > 12/06/28 12:15:41 INFO mapred.LocalJobRunner: reduce > reduce > 12/06/28 12:15:46 INFO mapred.LocalJobRunner: reduce > reduce > 12/06/28 12:15:51 INFO mapred.LocalJobRunner: reduce > reduce > 12/06/28 12:15:56 INFO mapred.LocalJobRunner: reduce > reduce > 12/06/28 12:16:01 INFO mapred.LocalJobRunner: reduce > reduce > 12/06/28 12:16:06 INFO mapred.LocalJobRunner: reduce > reduce > 12/06/28 12:16:12 INFO mapred.LocalJobRunner: reduce > reduce > 12/06/28 12:16:22 INFO mapred.LocalJobRunner: reduce > reduce > 12/06/28 12:16:44 WARN mapred.LocalJobRunner: job_local_0001 > java.lang.OutOfMemoryError: GC overhead limit exceeded > at sun.reflect.GeneratedConstructorAccessor14.newInstance(Unknown > Source) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27) > at java.lang.reflect.Constructor.newInstance(Constructor.java:513) > at java.lang.Class.newInstance0(Class.java:355) > at java.lang.Class.newInstance(Class.java:308)
Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com
+
Steve Lewis 2012-06-28, 22:43
-
RE: Map Reduce Theory Question, getting OutOfMemoryError while reducing
Berry, Matt 2012-06-28, 23:20
My end goal is to have all the records sorted chronologically, regardless of the source file. To present it formally:
Let there are X servers. Let each server produce one chronological log file that records who operated on the server and when. Let there be Y users. Assume a given user can operate on any number of servers simultaneously. Assume a given user can perform any number of operations a second.
My goal would be to have Y output files, each containing the records for only that user, sorted chronologically. So working backwards from the output.
In order for records to be written chronologically to the file: - All records for a given user must arrive at the same reducer (or the file IO will mess with the order) - All records arriving at a given reducer must be chronological with respect to a given user
In order for records to arrive a reducer in chronological with respect to a given user: - The sorter must be set to sort by time and operate over all records for a user
In order for the sorter to operate over all records for a user - The grouper must be set to group by user, or not group at all (each record is a group)
In order for all records for a given user to arrive at the same reducer: - The partitioner must be set to partition by user (i.e., user number mod number of partitions)
>From this vantage point I see two possible ways to do this. 1. Set the Key to be the user number, set the grouper to group by key. This results in all records for a user being aggregated (very large) 2. Set they Key to be {user number, time}, set the grouper to group by key. This results in each record being emitted to the reducer one at a time (lots of overhead)
Neither of those seems very favorable. Is anyone aware of a different means to achieve that goal? From: Steve Lewis [mailto:[EMAIL PROTECTED]] Sent: Thursday, June 28, 2012 3:43 PM To: [EMAIL PROTECTED] Subject: Re: Map Reduce Theory Question, getting OutOfMemoryError while reducing
It is NEVER a good idea to hold items in memory - after all this is big data and you want it to scale - I do not see what stops you from reading one record, processing it and writing it out without retaining it. It is OK to keep statistics while iterating through a key and output them at the end but holding all values for a key is almost never a good idea unless you can guarantee limits to these On Thu, Jun 28, 2012 at 2:37 PM, Berry, Matt <[EMAIL PROTECTED]> wrote: I have a MapReduce job that reads in several gigs of log files and separates the records based on who generated them. My MapReduce job looks like this: -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com
+
Berry, Matt 2012-06-28, 23:20
-
Re: Map Reduce Theory Question, getting OutOfMemoryError while reducing
Steve Lewis 2012-06-29, 00:58
Use a custom partitioner and grouping comparator as described here http://pkghosh.wordpress.com/2011/04/13/map-reduce-secondary-sort-does-it-all/in effect make the time part of the key for sort but not grouping or partitioning - also you might frameworks like pig On Thu, Jun 28, 2012 at 4:20 PM, Berry, Matt <[EMAIL PROTECTED]> wrote: > My end goal is to have all the records sorted chronologically, regardless > of the source file. To present it formally: > > Let there are X servers. > Let each server produce one chronological log file that records who > operated on the server and when. > Let there be Y users. > Assume a given user can operate on any number of servers simultaneously. > Assume a given user can perform any number of operations a second. > > My goal would be to have Y output files, each containing the records for > only that user, sorted chronologically. > So working backwards from the output. > > In order for records to be written chronologically to the file: > - All records for a given user must arrive at the same reducer (or the > file IO will mess with the order) > - All records arriving at a given reducer must be chronological with > respect to a given user > > In order for records to arrive a reducer in chronological with respect to > a given user: > - The sorter must be set to sort by time and operate over all records for > a user > > In order for the sorter to operate over all records for a user > - The grouper must be set to group by user, or not group at all (each > record is a group) > > In order for all records for a given user to arrive at the same reducer: > - The partitioner must be set to partition by user (i.e., user number mod > number of partitions) > > From this vantage point I see two possible ways to do this. > 1. Set the Key to be the user number, set the grouper to group by key. > This results in all records for a user being aggregated (very large) > 2. Set they Key to be {user number, time}, set the grouper to group by > key. This results in each record being emitted to the reducer one at a time > (lots of overhead) > > Neither of those seems very favorable. Is anyone aware of a different > means to achieve that goal? > > > From: Steve Lewis [mailto:[EMAIL PROTECTED]] > Sent: Thursday, June 28, 2012 3:43 PM > To: [EMAIL PROTECTED] > Subject: Re: Map Reduce Theory Question, getting OutOfMemoryError while > reducing > > It is NEVER a good idea to hold items in memory - after all this is big > data and you want it to scale - > I do not see what stops you from reading one record, processing it and > writing it out without retaining it. > It is OK to keep statistics while iterating through a key and output them > at the end but holding all values for a key is almost never a good idea > unless you can guarantee limits to these > On Thu, Jun 28, 2012 at 2:37 PM, Berry, Matt <[EMAIL PROTECTED]> wrote: > I have a MapReduce job that reads in several gigs of log files and > separates the records based on who generated them. My MapReduce job looks > like this: > -- > Steven M. Lewis PhD > 4221 105th Ave NE > Kirkland, WA 98033 > 206-384-1340 (cell) > Skype lordjoe_com > > -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com
+
Steve Lewis 2012-06-29, 00:58
-
Re: Map Reduce Theory Question, getting OutOfMemoryError while reducing
Harsh J 2012-06-29, 13:52
Hey Matt, As far as I can tell, Hadoop isn't at fault here truly. If your issue is that you collect in a list before you store, you should focus on that and just avoid collecting it completely. Why don't you serialize as you receive, if the incoming order is already taken care of? As far as I can tell, your AggregateRecords probably does nothing else but serialize the stored LinkedList. So instead of using a LinkedList, or even a composed Writable such as AggregateRecords, just write them in as you receive them via each .next(). Would this not work for you? You may batch a constant bit to gain some write performance but at least you won't have to use up your memory. You can serialize as you receive by following this: http://wiki.apache.org/hadoop/FAQ#Can_I_write_create.2BAC8-write-to_hdfs_files_directly_from_map.2BAC8-reduce_tasks.3FOn Fri, Jun 29, 2012 at 3:07 AM, Berry, Matt <[EMAIL PROTECTED]> wrote: > I have a MapReduce job that reads in several gigs of log files and separates the records based on who generated them. My MapReduce job looks like this: > > InputFormat: NLineInputFormat > - Reads N lines of an input file, which is an array of URLs for log files to download > Mapper: <LongWritable, Text, WhoKey, LogRecord> > - (LongWritable,Text) is coming from the input format. The Text is parsed into an array of URLs. Each log is downloaded and the records extracted > - WhoKey is just an multipart Key that describes who caused a record to be logged > - LogRecord is the record that they logged, with all irrelevant information purged > Reducer: <WhoKey, LogRecord, WhoKey, AggregateRecords> > - The Reducer iterates through the LogRecords for the WhoKey and adds them to a LinkedList (AggregateRecords) and emits that to the output format > OutputFormat: <WhoKey, AggregateRecords> > - Creates a file for each WhoKey and writes the records into it > > However I'm running into a problem. When a single person generates an inordinate number of records, all of them have to be held in memory causing my heap space to run out. I could increase the heap size, but that will not solve the problem as they could just generate more records and break it again. I've spent a lot of time thinking about how I could alter my setup so no more than N number of records are held in memory at a time, but I can't think of a way to do it. > > Is there something seriously wrong with how I am processing this? Should I have structured the job in a different way that would avoid this scenario? Isn't the MapReduce framework designed to operate on large data sets, shouldn't it be managing the heap better? > > Stderr and Stack Trace: > > 12/06/28 12:10:55 INFO mapred.JobClient: map 100% reduce 67% > 12/06/28 12:10:58 INFO mapred.LocalJobRunner: reduce > reduce > 12/06/28 12:10:58 INFO mapred.JobClient: map 100% reduce 69% > 12/06/28 12:11:01 INFO mapred.LocalJobRunner: reduce > reduce > 12/06/28 12:11:01 INFO mapred.JobClient: map 100% reduce 71% > 12/06/28 12:11:04 INFO mapred.LocalJobRunner: reduce > reduce > 12/06/28 12:11:06 INFO mapred.JobClient: map 100% reduce 72% > 12/06/28 12:11:07 INFO mapred.LocalJobRunner: reduce > reduce > 12/06/28 12:11:11 INFO mapred.LocalJobRunner: reduce > reduce > 12/06/28 12:11:15 INFO mapred.LocalJobRunner: reduce > reduce > 12/06/28 12:15:31 INFO mapred.LocalJobRunner: reduce > reduce > 12/06/28 12:15:35 INFO mapred.LocalJobRunner: reduce > reduce > 12/06/28 12:15:41 INFO mapred.LocalJobRunner: reduce > reduce > 12/06/28 12:15:46 INFO mapred.LocalJobRunner: reduce > reduce > 12/06/28 12:15:51 INFO mapred.LocalJobRunner: reduce > reduce > 12/06/28 12:15:56 INFO mapred.LocalJobRunner: reduce > reduce > 12/06/28 12:16:01 INFO mapred.LocalJobRunner: reduce > reduce > 12/06/28 12:16:06 INFO mapred.LocalJobRunner: reduce > reduce > 12/06/28 12:16:12 INFO mapred.LocalJobRunner: reduce > reduce > 12/06/28 12:16:22 INFO mapred.LocalJobRunner: reduce > reduce > 12/06/28 12:16:44 WARN mapred.LocalJobRunner: job_local_0001 > java.lang.OutOfMemoryError: GC overhead limit exceeded Harsh J
+
Harsh J 2012-06-29, 13:52
-
RE: Map Reduce Theory Question, getting OutOfMemoryError while reducing
Berry, Matt 2012-06-29, 17:06
I was actually quite curious as to how Hadoop was managing to get all of the records into the Iterable in the first place. I thought they were using a very specialized object that implements Iterable, but a heap dump shows they're likely just using a LinkedList. All I was doing was duplicating that object. Supposing I do as you suggest, am I in danger of having their list consume all the memory if a user decides to log 2x or 3x as much as they did this time? ~Matt -----Original Message----- From: Harsh J [mailto:[EMAIL PROTECTED]] Sent: Friday, June 29, 2012 6:52 AM To: [EMAIL PROTECTED] Subject: Re: Map Reduce Theory Question, getting OutOfMemoryError while reducing Hey Matt, As far as I can tell, Hadoop isn't at fault here truly. If your issue is that you collect in a list before you store, you should focus on that and just avoid collecting it completely. Why don't you serialize as you receive, if the incoming order is already taken care of? As far as I can tell, your AggregateRecords probably does nothing else but serialize the stored LinkedList. So instead of using a LinkedList, or even a composed Writable such as AggregateRecords, just write them in as you receive them via each .next(). Would this not work for you? You may batch a constant bit to gain some write performance but at least you won't have to use up your memory. You can serialize as you receive by following this: http://wiki.apache.org/hadoop/FAQ#Can_I_write_create.2BAC8-write-to_hdfs_files_directly_from_map.2BAC8-reduce_tasks.3F-- Harsh J
+
Berry, Matt 2012-06-29, 17:06
-
RE: Map Reduce Theory Question, getting OutOfMemoryError while reducing
GUOJUN Zhu 2012-06-29, 17:32
If you are referring the iterable in the reducer, they are special and not in the memory at all. Once the iterator pass a value, it is lost and you cannot recover it. There is nothing like linkedlist in behind. Zhu, Guojun Modeling Sr Graduate 571-3824370 [EMAIL PROTECTED] Financial Engineering Freddie Mac "Berry, Matt" <[EMAIL PROTECTED]> 06/29/2012 01:06 PM Please respond to [EMAIL PROTECTED] To "[EMAIL PROTECTED]" <[EMAIL PROTECTED]> cc Subject RE: Map Reduce Theory Question, getting OutOfMemoryError while reducing I was actually quite curious as to how Hadoop was managing to get all of the records into the Iterable in the first place. I thought they were using a very specialized object that implements Iterable, but a heap dump shows they're likely just using a LinkedList. All I was doing was duplicating that object. Supposing I do as you suggest, am I in danger of having their list consume all the memory if a user decides to log 2x or 3x as much as they did this time? ~Matt -----Original Message----- From: Harsh J [mailto:[EMAIL PROTECTED]] Sent: Friday, June 29, 2012 6:52 AM To: [EMAIL PROTECTED] Subject: Re: Map Reduce Theory Question, getting OutOfMemoryError while reducing Hey Matt, As far as I can tell, Hadoop isn't at fault here truly. If your issue is that you collect in a list before you store, you should focus on that and just avoid collecting it completely. Why don't you serialize as you receive, if the incoming order is already taken care of? As far as I can tell, your AggregateRecords probably does nothing else but serialize the stored LinkedList. So instead of using a LinkedList, or even a composed Writable such as AggregateRecords, just write them in as you receive them via each .next(). Would this not work for you? You may batch a constant bit to gain some write performance but at least you won't have to use up your memory. You can serialize as you receive by following this: http://wiki.apache.org/hadoop/FAQ#Can_I_write_create.2BAC8-write-to_hdfs_files_directly_from_map.2BAC8-reduce_tasks.3F-- Harsh J
+
GUOJUN Zhu 2012-06-29, 17:32
-
Re: Map Reduce Theory Question, getting OutOfMemoryError while reducing
Harsh J 2012-06-30, 04:40
Guojun is right, the reduce() inputs are buffered and read off of disk. You are in no danger there. On Fri, Jun 29, 2012 at 11:02 PM, GUOJUN Zhu <[EMAIL PROTECTED]>wrote: > > If you are referring the iterable in the reducer, they are special and not > in the memory at all. Once the iterator pass a value, it is lost and you > cannot recover it. There is nothing like linkedlist in behind. > > Zhu, Guojun > Modeling Sr Graduate > 571-3824370 > [EMAIL PROTECTED] > Financial Engineering > Freddie Mac > > > *"Berry, Matt" <[EMAIL PROTECTED]>* > > 06/29/2012 01:06 PM > Please respond to > [EMAIL PROTECTED] > > To > "[EMAIL PROTECTED]" <[EMAIL PROTECTED]> > cc > Subject > RE: Map Reduce Theory Question, getting OutOfMemoryError while reducing > > > > > I was actually quite curious as to how Hadoop was managing to get all of > the records into the Iterable in the first place. I thought they were using > a very specialized object that implements Iterable, but a heap dump shows > they're likely just using a LinkedList. All I was doing was duplicating > that object. Supposing I do as you suggest, am I in danger of having their > list consume all the memory if a user decides to log 2x or 3x as much as > they did this time? > > ~Matt > > -----Original Message----- > From: Harsh J [mailto:[EMAIL PROTECTED]] > Sent: Friday, June 29, 2012 6:52 AM > To: [EMAIL PROTECTED] > Subject: Re: Map Reduce Theory Question, getting OutOfMemoryError while > reducing > > Hey Matt, > > As far as I can tell, Hadoop isn't at fault here truly. > > If your issue is that you collect in a list before you store, you should > focus on that and just avoid collecting it completely. Why don't you > serialize as you receive, if the incoming order is already taken care of? > As far as I can tell, your AggregateRecords probably does nothing else but > serialize the stored LinkedList. So instead of using a LinkedList, or even > a composed Writable such as AggregateRecords, just write them in as you > receive them via each .next(). Would this not work for you? You may batch a > constant bit to gain some write performance but at least you won't have to > use up your memory. > > You can serialize as you receive by following this: > > http://wiki.apache.org/hadoop/FAQ#Can_I_write_create.2BAC8-write-to_hdfs_files_directly_from_map.2BAC8-reduce_tasks.3F> > > -- > Harsh J > < http://wiki.apache.org/hadoop/FAQ#Can_I_write_create.2BAC8-write-to_hdfs_files_directly_from_map.2BAC8-reduce_tasks.3F>> -- Harsh J
+
Harsh J 2012-06-30, 04:40
-
RE: Map Reduce Theory Question, getting OutOfMemoryError while reducing
Berry, Matt 2012-07-02, 16:00
Thanks everyone for the help. Emitting each record individually from the reducer is working well, and I can still aggregate the needed information as I go. From: Harsh J [mailto:[EMAIL PROTECTED]] Sent: Friday, June 29, 2012 9:40 PM To: [EMAIL PROTECTED] Subject: Re: Map Reduce Theory Question, getting OutOfMemoryError while reducing Guojun is right, the reduce() inputs are buffered and read off of disk. You are in no danger there. On Fri, Jun 29, 2012 at 11:02 PM, GUOJUN Zhu <[EMAIL PROTECTED]<mailto:[EMAIL PROTECTED]>> wrote: If you are referring the iterable in the reducer, they are special and not in the memory at all. Once the iterator pass a value, it is lost and you cannot recover it. There is nothing like linkedlist in behind. Zhu, Guojun Modeling Sr Graduate 571-3824370<tel:571-3824370> [EMAIL PROTECTED]<mailto:[EMAIL PROTECTED]> Financial Engineering Freddie Mac "Berry, Matt" <[EMAIL PROTECTED]<mailto:[EMAIL PROTECTED]>> 06/29/2012 01:06 PM Please respond to [EMAIL PROTECTED]<mailto:[EMAIL PROTECTED]> To "[EMAIL PROTECTED]<mailto:[EMAIL PROTECTED]>" <[EMAIL PROTECTED]<mailto:[EMAIL PROTECTED]>> cc Subject RE: Map Reduce Theory Question, getting OutOfMemoryError while reducing I was actually quite curious as to how Hadoop was managing to get all of the records into the Iterable in the first place. I thought they were using a very specialized object that implements Iterable, but a heap dump shows they're likely just using a LinkedList. All I was doing was duplicating that object. Supposing I do as you suggest, am I in danger of having their list consume all the memory if a user decides to log 2x or 3x as much as they did this time? ~Matt -----Original Message----- From: Harsh J [mailto:[EMAIL PROTECTED]<mailto:[EMAIL PROTECTED]>] Sent: Friday, June 29, 2012 6:52 AM To: [EMAIL PROTECTED]<mailto:[EMAIL PROTECTED]> Subject: Re: Map Reduce Theory Question, getting OutOfMemoryError while reducing Hey Matt, As far as I can tell, Hadoop isn't at fault here truly. If your issue is that you collect in a list before you store, you should focus on that and just avoid collecting it completely. Why don't you serialize as you receive, if the incoming order is already taken care of? As far as I can tell, your AggregateRecords probably does nothing else but serialize the stored LinkedList. So instead of using a LinkedList, or even a composed Writable such as AggregateRecords, just write them in as you receive them via each .next(). Would this not work for you? You may batch a constant bit to gain some write performance but at least you won't have to use up your memory. You can serialize as you receive by following this: http://wiki.apache.org/hadoop/FAQ#Can_I_write_create.2BAC8-write-to_hdfs_files_directly_from_map.2BAC8-reduce_tasks.3F-- Harsh J < http://wiki.apache.org/hadoop/FAQ#Can_I_write_create.2BAC8-write-to_hdfs_files_directly_from_map.2BAC8-reduce_tasks.3F>-- Harsh J
+
Berry, Matt 2012-07-02, 16:00
|
|