Home | About | Sematext search-lucene.com search-hadoop.com
 Search Hadoop and all its subprojects:

Switch to Threaded View
MapReduce, mail # user - Reflective instantiation of Mappers and Reducers


Copy link to this message
-
Re: Reflective instantiation of Mappers and Reducers
Tom White 2010-04-05, 03:40
Have a look at org.apache.hadoop.io.Stringifier (and
DefaultStringifier), which may be helpful too.

Cheers,
Tom

On Fri, Apr 2, 2010 at 3:35 PM, Kris Nuttycombe
<[EMAIL PROTECTED]> wrote:
> Or heck... I could just base-64 encode the serialized byte arrays and
> pass them as strings in the configuration. If it's going to be a hack,
> might as well go all the way.
>
> On Fri, Apr 2, 2010 at 4:10 PM, Kris Nuttycombe
> <[EMAIL PROTECTED]> wrote:
>> On Fri, Apr 2, 2010 at 3:10 PM, Owen O'Malley <[EMAIL PROTECTED]> wrote:
>>>
>>> On Apr 2, 2010, at 12:05 PM, Kris Nuttycombe wrote:
>>>
>>>> What I'm wondering is, is there any way to simply serialize a Mapper
>>>> or Reducer object, and have the serialized instance copied, passed
>>>> around and used everywhere instead of always having the Mapper and
>>>> Reducer instantiated by reflection? This would greatly simplify
>>>> library design in my case.
>>>
>>> Currently the best you can do is to make your Mapper or Reduce implement
>>> Configurable and use the values out of the configuration.
>>>
>>> Take a look at MAPREDUCE-1183. It should be exactly what you are asking for
>>> when it gets implemented.
>>>
>>> -- Owen
>>>
>>
>> Thanks for the reference to that ticket, Owen. In the meantime, I
>> think I may have figured out a workaround. The following code
>> (completely untested as of yet, but a starting point) provides base
>> classes for an implementation based upon the distributed cache:
>>
>>
>> import org.apache.hadoop.conf._
>> import org.apache.hadoop.util._
>> import org.apache.hadoop.mapreduce._
>> import java.io._
>> import SerializingResourceToolRunner._
>>
>> object SerializingResourceToolRunner {
>>  val serializedResourceName = "socialmedia.mr_tool.serfile"
>> }
>>
>> class SerializingResourceToolRunner[T <: Serializable](tool:
>> SerializingResourceTool[T]) {
>>  def runWithToolRunner(argv: Array[String]) = {
>>    def stripFileArg(i: Int, l: List[String], f: Option[String]):
>> (List[String], Option[String]) = {
>>      if (i >= argv.length) (l, f)
>>      else if (argv(i) == "-files") stripFileArg(i + 2, l, option(argv(i + 1)))
>>      else stripFileArg(i + 1, argv(i) :: l, f)
>>    }
>>
>>    val tempFile = File.createTempFile("mr_tool", ".ser")
>>    using(new FileOutputStream(tempFile)) {
>>      f => using(new ObjectOutputStream(f)) {
>>        out => out.writeObject(tool.resource)
>>      }
>>    }
>>
>>    val (args, filesArg) = stripFileArg(0, Nil, None)
>>
>>    tool.getConf.set(serializedResourceName, tempFile.getName)
>>    val filesArgWithTempFile = filesArg.map(_ + "," +
>> tempFile.getAbsolutePath).getOrElse(tempFile.getAbsolutePath)
>>    ToolRunner.run(tool, ("-files" :: filesArgWithTempFile :: args).toArray)
>>  }
>> }
>>
>> trait Resources[T] {
>>  private var _resource: T = _
>>  def resource: T = _resource
>>
>>  def init(conf: Configuration): Unit = {
>>    _resource = using(new FileInputStream(new
>> File(conf.get(serializedResourceName)))) {
>>      f => using(new ObjectInputStream(f)) {
>>        in => in.readObject.asInstanceOf[T]
>>      }
>>    }
>>  }
>> }
>>
>> abstract class SerializedResourceMapper[T, KI, VI, KO, VO] extends
>> Mapper[KI, VI, KO, VO] with Resources[T] {
>>  override def setup(context: Mapper[KI, VI, KO, VO]#Context): Unit = {
>>    super.setup(context)
>>    init(context.getConfiguration)
>>  }
>> }
>>
>> abstract class SerializedResourceReducer[T, KI, VI, KO, VO] extends
>> Reducer[KI, VI, KO, VO] with Resources[T] {
>>  override def setup(context: Reducer[KI, VI, KO, VO]#Context): Unit = {
>>    super.setup(context)
>>    init(context.getConfiguration)
>>  }
>> }
>>
>> abstract class SerializingResourceTool[T <: Serializable] extends
>> Configured with Tool {
>>  def resource: T
>> }
>>
>