|
|
-
Reflective instantiation of Mappers and Reducers
Kris Nuttycombe 2010-04-02, 19:05
Hi, all,
I'm new to Hadoop, and I'm finding myself having a hard time creating highly configurable Mapper and Reducer instances, due to the fact that Hadoop seems to require that Mapper and Reducer instances be instantiated through reflection, so I'm sort of wondering whether I'm doing things wrong.
In my application I have a number of composable classes that can be used to generate mapper and reducer instances by the end-user of the library. Such composition can be done either in code or at runtime by interpretation of a script written in a custom DSL, and I'd like to avoid having separate mapper and reducer classes for the different types of construction; this doesn't seem like it should be something that's the responsibility of the map/reduce part of the library.
What I'm wondering is, is there any way to simply serialize a Mapper or Reducer object, and have the serialized instance copied, passed around and used everywhere instead of always having the Mapper and Reducer instantiated by reflection? This would greatly simplify library design in my case.
Thanks,
Kris
-
Re: Reflective instantiation of Mappers and Reducers
Owen O'Malley 2010-04-02, 21:10
On Apr 2, 2010, at 12:05 PM, Kris Nuttycombe wrote:
> What I'm wondering is, is there any way to simply serialize a Mapper > or Reducer object, and have the serialized instance copied, passed > around and used everywhere instead of always having the Mapper and > Reducer instantiated by reflection? This would greatly simplify > library design in my case.
Currently the best you can do is to make your Mapper or Reduce implement Configurable and use the values out of the configuration.
Take a look at MAPREDUCE-1183. It should be exactly what you are asking for when it gets implemented.
-- Owen
-
Re: Reflective instantiation of Mappers and Reducers
Kris Nuttycombe 2010-04-02, 22:10
On Fri, Apr 2, 2010 at 3:10 PM, Owen O'Malley <[EMAIL PROTECTED]> wrote: > > On Apr 2, 2010, at 12:05 PM, Kris Nuttycombe wrote: > >> What I'm wondering is, is there any way to simply serialize a Mapper >> or Reducer object, and have the serialized instance copied, passed >> around and used everywhere instead of always having the Mapper and >> Reducer instantiated by reflection? This would greatly simplify >> library design in my case. > > Currently the best you can do is to make your Mapper or Reduce implement > Configurable and use the values out of the configuration. > > Take a look at MAPREDUCE-1183. It should be exactly what you are asking for > when it gets implemented. > > -- Owen >
Thanks for the reference to that ticket, Owen. In the meantime, I think I may have figured out a workaround. The following code (completely untested as of yet, but a starting point) provides base classes for an implementation based upon the distributed cache: import org.apache.hadoop.conf._ import org.apache.hadoop.util._ import org.apache.hadoop.mapreduce._ import java.io._ import SerializingResourceToolRunner._
object SerializingResourceToolRunner { val serializedResourceName = "socialmedia.mr_tool.serfile" }
class SerializingResourceToolRunner[T <: Serializable](tool: SerializingResourceTool[T]) { def runWithToolRunner(argv: Array[String]) = { def stripFileArg(i: Int, l: List[String], f: Option[String]): (List[String], Option[String]) = { if (i >= argv.length) (l, f) else if (argv(i) == "-files") stripFileArg(i + 2, l, option(argv(i + 1))) else stripFileArg(i + 1, argv(i) :: l, f) }
val tempFile = File.createTempFile("mr_tool", ".ser") using(new FileOutputStream(tempFile)) { f => using(new ObjectOutputStream(f)) { out => out.writeObject(tool.resource) } }
val (args, filesArg) = stripFileArg(0, Nil, None)
tool.getConf.set(serializedResourceName, tempFile.getName) val filesArgWithTempFile = filesArg.map(_ + "," + tempFile.getAbsolutePath).getOrElse(tempFile.getAbsolutePath) ToolRunner.run(tool, ("-files" :: filesArgWithTempFile :: args).toArray) } }
trait Resources[T] { private var _resource: T = _ def resource: T = _resource
def init(conf: Configuration): Unit = { _resource = using(new FileInputStream(new File(conf.get(serializedResourceName)))) { f => using(new ObjectInputStream(f)) { in => in.readObject.asInstanceOf[T] } } } }
abstract class SerializedResourceMapper[T, KI, VI, KO, VO] extends Mapper[KI, VI, KO, VO] with Resources[T] { override def setup(context: Mapper[KI, VI, KO, VO]#Context): Unit = { super.setup(context) init(context.getConfiguration) } }
abstract class SerializedResourceReducer[T, KI, VI, KO, VO] extends Reducer[KI, VI, KO, VO] with Resources[T] { override def setup(context: Reducer[KI, VI, KO, VO]#Context): Unit = { super.setup(context) init(context.getConfiguration) } }
abstract class SerializingResourceTool[T <: Serializable] extends Configured with Tool { def resource: T }
-
Re: Reflective instantiation of Mappers and Reducers
Kris Nuttycombe 2010-04-02, 22:35
Or heck... I could just base-64 encode the serialized byte arrays and pass them as strings in the configuration. If it's going to be a hack, might as well go all the way.
On Fri, Apr 2, 2010 at 4:10 PM, Kris Nuttycombe <[EMAIL PROTECTED]> wrote: > On Fri, Apr 2, 2010 at 3:10 PM, Owen O'Malley <[EMAIL PROTECTED]> wrote: >> >> On Apr 2, 2010, at 12:05 PM, Kris Nuttycombe wrote: >> >>> What I'm wondering is, is there any way to simply serialize a Mapper >>> or Reducer object, and have the serialized instance copied, passed >>> around and used everywhere instead of always having the Mapper and >>> Reducer instantiated by reflection? This would greatly simplify >>> library design in my case. >> >> Currently the best you can do is to make your Mapper or Reduce implement >> Configurable and use the values out of the configuration. >> >> Take a look at MAPREDUCE-1183. It should be exactly what you are asking for >> when it gets implemented. >> >> -- Owen >> > > Thanks for the reference to that ticket, Owen. In the meantime, I > think I may have figured out a workaround. The following code > (completely untested as of yet, but a starting point) provides base > classes for an implementation based upon the distributed cache: > > > import org.apache.hadoop.conf._ > import org.apache.hadoop.util._ > import org.apache.hadoop.mapreduce._ > import java.io._ > import SerializingResourceToolRunner._ > > object SerializingResourceToolRunner { > val serializedResourceName = "socialmedia.mr_tool.serfile" > } > > class SerializingResourceToolRunner[T <: Serializable](tool: > SerializingResourceTool[T]) { > def runWithToolRunner(argv: Array[String]) = { > def stripFileArg(i: Int, l: List[String], f: Option[String]): > (List[String], Option[String]) = { > if (i >= argv.length) (l, f) > else if (argv(i) == "-files") stripFileArg(i + 2, l, option(argv(i + 1))) > else stripFileArg(i + 1, argv(i) :: l, f) > } > > val tempFile = File.createTempFile("mr_tool", ".ser") > using(new FileOutputStream(tempFile)) { > f => using(new ObjectOutputStream(f)) { > out => out.writeObject(tool.resource) > } > } > > val (args, filesArg) = stripFileArg(0, Nil, None) > > tool.getConf.set(serializedResourceName, tempFile.getName) > val filesArgWithTempFile = filesArg.map(_ + "," + > tempFile.getAbsolutePath).getOrElse(tempFile.getAbsolutePath) > ToolRunner.run(tool, ("-files" :: filesArgWithTempFile :: args).toArray) > } > } > > trait Resources[T] { > private var _resource: T = _ > def resource: T = _resource > > def init(conf: Configuration): Unit = { > _resource = using(new FileInputStream(new > File(conf.get(serializedResourceName)))) { > f => using(new ObjectInputStream(f)) { > in => in.readObject.asInstanceOf[T] > } > } > } > } > > abstract class SerializedResourceMapper[T, KI, VI, KO, VO] extends > Mapper[KI, VI, KO, VO] with Resources[T] { > override def setup(context: Mapper[KI, VI, KO, VO]#Context): Unit = { > super.setup(context) > init(context.getConfiguration) > } > } > > abstract class SerializedResourceReducer[T, KI, VI, KO, VO] extends > Reducer[KI, VI, KO, VO] with Resources[T] { > override def setup(context: Reducer[KI, VI, KO, VO]#Context): Unit = { > super.setup(context) > init(context.getConfiguration) > } > } > > abstract class SerializingResourceTool[T <: Serializable] extends > Configured with Tool { > def resource: T > } >
-
Re: Reflective instantiation of Mappers and Reducers
Tom White 2010-04-05, 03:40
Have a look at org.apache.hadoop.io.Stringifier (and DefaultStringifier), which may be helpful too.
Cheers, Tom
On Fri, Apr 2, 2010 at 3:35 PM, Kris Nuttycombe <[EMAIL PROTECTED]> wrote: > Or heck... I could just base-64 encode the serialized byte arrays and > pass them as strings in the configuration. If it's going to be a hack, > might as well go all the way. > > On Fri, Apr 2, 2010 at 4:10 PM, Kris Nuttycombe > <[EMAIL PROTECTED]> wrote: >> On Fri, Apr 2, 2010 at 3:10 PM, Owen O'Malley <[EMAIL PROTECTED]> wrote: >>> >>> On Apr 2, 2010, at 12:05 PM, Kris Nuttycombe wrote: >>> >>>> What I'm wondering is, is there any way to simply serialize a Mapper >>>> or Reducer object, and have the serialized instance copied, passed >>>> around and used everywhere instead of always having the Mapper and >>>> Reducer instantiated by reflection? This would greatly simplify >>>> library design in my case. >>> >>> Currently the best you can do is to make your Mapper or Reduce implement >>> Configurable and use the values out of the configuration. >>> >>> Take a look at MAPREDUCE-1183. It should be exactly what you are asking for >>> when it gets implemented. >>> >>> -- Owen >>> >> >> Thanks for the reference to that ticket, Owen. In the meantime, I >> think I may have figured out a workaround. The following code >> (completely untested as of yet, but a starting point) provides base >> classes for an implementation based upon the distributed cache: >> >> >> import org.apache.hadoop.conf._ >> import org.apache.hadoop.util._ >> import org.apache.hadoop.mapreduce._ >> import java.io._ >> import SerializingResourceToolRunner._ >> >> object SerializingResourceToolRunner { >> val serializedResourceName = "socialmedia.mr_tool.serfile" >> } >> >> class SerializingResourceToolRunner[T <: Serializable](tool: >> SerializingResourceTool[T]) { >> def runWithToolRunner(argv: Array[String]) = { >> def stripFileArg(i: Int, l: List[String], f: Option[String]): >> (List[String], Option[String]) = { >> if (i >= argv.length) (l, f) >> else if (argv(i) == "-files") stripFileArg(i + 2, l, option(argv(i + 1))) >> else stripFileArg(i + 1, argv(i) :: l, f) >> } >> >> val tempFile = File.createTempFile("mr_tool", ".ser") >> using(new FileOutputStream(tempFile)) { >> f => using(new ObjectOutputStream(f)) { >> out => out.writeObject(tool.resource) >> } >> } >> >> val (args, filesArg) = stripFileArg(0, Nil, None) >> >> tool.getConf.set(serializedResourceName, tempFile.getName) >> val filesArgWithTempFile = filesArg.map(_ + "," + >> tempFile.getAbsolutePath).getOrElse(tempFile.getAbsolutePath) >> ToolRunner.run(tool, ("-files" :: filesArgWithTempFile :: args).toArray) >> } >> } >> >> trait Resources[T] { >> private var _resource: T = _ >> def resource: T = _resource >> >> def init(conf: Configuration): Unit = { >> _resource = using(new FileInputStream(new >> File(conf.get(serializedResourceName)))) { >> f => using(new ObjectInputStream(f)) { >> in => in.readObject.asInstanceOf[T] >> } >> } >> } >> } >> >> abstract class SerializedResourceMapper[T, KI, VI, KO, VO] extends >> Mapper[KI, VI, KO, VO] with Resources[T] { >> override def setup(context: Mapper[KI, VI, KO, VO]#Context): Unit = { >> super.setup(context) >> init(context.getConfiguration) >> } >> } >> >> abstract class SerializedResourceReducer[T, KI, VI, KO, VO] extends >> Reducer[KI, VI, KO, VO] with Resources[T] { >> override def setup(context: Reducer[KI, VI, KO, VO]#Context): Unit = { >> super.setup(context) >> init(context.getConfiguration) >> } >> } >> >> abstract class SerializingResourceTool[T <: Serializable] extends >> Configured with Tool { >> def resource: T >> } >> >
|
|