Home | About | Sematext search-lucene.com search-hadoop.com
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB
 Search Hadoop and all its subprojects:

Switch to Threaded View
MapReduce >> mail # user >> Reflective instantiation of Mappers and Reducers


Copy link to this message
-
Re: Reflective instantiation of Mappers and Reducers
Have a look at org.apache.hadoop.io.Stringifier (and
DefaultStringifier), which may be helpful too.

Cheers,
Tom

On Fri, Apr 2, 2010 at 3:35 PM, Kris Nuttycombe
<[EMAIL PROTECTED]> wrote:
> Or heck... I could just base-64 encode the serialized byte arrays and
> pass them as strings in the configuration. If it's going to be a hack,
> might as well go all the way.
>
> On Fri, Apr 2, 2010 at 4:10 PM, Kris Nuttycombe
> <[EMAIL PROTECTED]> wrote:
>> On Fri, Apr 2, 2010 at 3:10 PM, Owen O'Malley <[EMAIL PROTECTED]> wrote:
>>>
>>> On Apr 2, 2010, at 12:05 PM, Kris Nuttycombe wrote:
>>>
>>>> What I'm wondering is, is there any way to simply serialize a Mapper
>>>> or Reducer object, and have the serialized instance copied, passed
>>>> around and used everywhere instead of always having the Mapper and
>>>> Reducer instantiated by reflection? This would greatly simplify
>>>> library design in my case.
>>>
>>> Currently the best you can do is to make your Mapper or Reduce implement
>>> Configurable and use the values out of the configuration.
>>>
>>> Take a look at MAPREDUCE-1183. It should be exactly what you are asking for
>>> when it gets implemented.
>>>
>>> -- Owen
>>>
>>
>> Thanks for the reference to that ticket, Owen. In the meantime, I
>> think I may have figured out a workaround. The following code
>> (completely untested as of yet, but a starting point) provides base
>> classes for an implementation based upon the distributed cache:
>>
>>
>> import org.apache.hadoop.conf._
>> import org.apache.hadoop.util._
>> import org.apache.hadoop.mapreduce._
>> import java.io._
>> import SerializingResourceToolRunner._
>>
>> object SerializingResourceToolRunner {
>>  val serializedResourceName = "socialmedia.mr_tool.serfile"
>> }
>>
>> class SerializingResourceToolRunner[T <: Serializable](tool:
>> SerializingResourceTool[T]) {
>>  def runWithToolRunner(argv: Array[String]) = {
>>    def stripFileArg(i: Int, l: List[String], f: Option[String]):
>> (List[String], Option[String]) = {
>>      if (i >= argv.length) (l, f)
>>      else if (argv(i) == "-files") stripFileArg(i + 2, l, option(argv(i + 1)))
>>      else stripFileArg(i + 1, argv(i) :: l, f)
>>    }
>>
>>    val tempFile = File.createTempFile("mr_tool", ".ser")
>>    using(new FileOutputStream(tempFile)) {
>>      f => using(new ObjectOutputStream(f)) {
>>        out => out.writeObject(tool.resource)
>>      }
>>    }
>>
>>    val (args, filesArg) = stripFileArg(0, Nil, None)
>>
>>    tool.getConf.set(serializedResourceName, tempFile.getName)
>>    val filesArgWithTempFile = filesArg.map(_ + "," +
>> tempFile.getAbsolutePath).getOrElse(tempFile.getAbsolutePath)
>>    ToolRunner.run(tool, ("-files" :: filesArgWithTempFile :: args).toArray)
>>  }
>> }
>>
>> trait Resources[T] {
>>  private var _resource: T = _
>>  def resource: T = _resource
>>
>>  def init(conf: Configuration): Unit = {
>>    _resource = using(new FileInputStream(new
>> File(conf.get(serializedResourceName)))) {
>>      f => using(new ObjectInputStream(f)) {
>>        in => in.readObject.asInstanceOf[T]
>>      }
>>    }
>>  }
>> }
>>
>> abstract class SerializedResourceMapper[T, KI, VI, KO, VO] extends
>> Mapper[KI, VI, KO, VO] with Resources[T] {
>>  override def setup(context: Mapper[KI, VI, KO, VO]#Context): Unit = {
>>    super.setup(context)
>>    init(context.getConfiguration)
>>  }
>> }
>>
>> abstract class SerializedResourceReducer[T, KI, VI, KO, VO] extends
>> Reducer[KI, VI, KO, VO] with Resources[T] {
>>  override def setup(context: Reducer[KI, VI, KO, VO]#Context): Unit = {
>>    super.setup(context)
>>    init(context.getConfiguration)
>>  }
>> }
>>
>> abstract class SerializingResourceTool[T <: Serializable] extends
>> Configured with Tool {
>>  def resource: T
>> }
>>
>
NEW: Monitor These Apps!
elasticsearch, apache solr, apache hbase, hadoop, redis, casssandra, amazon cloudwatch, mysql, memcached, apache kafka, apache zookeeper, apache storm, ubuntu, centOS, red hat, debian, puppet labs, java, senseiDB