Java源码示例:org.apache.hadoop.mapred.MapRunnable

示例1
void runOldMapper(
    final JobConf job,
    final MRTaskReporter reporter,
    final MRInputLegacy input,
    final KeyValueWriter output
    ) throws IOException, InterruptedException {

  // Initialize input in-line since it sets parameters which may be used by the processor.
  // Done only for MRInput.
  // TODO use new method in MRInput to get required info
  //input.initialize(job, master);
  
  InputSplit inputSplit = input.getOldInputSplit();
  
  updateJobWithSplit(job, inputSplit);

  RecordReader in = new OldRecordReader(input);

  OutputCollector collector = new OldOutputCollector(output);

  MapRunnable runner =
      (MapRunnable)ReflectionUtils.newInstance(job.getMapRunnerClass(), job);

  runner.run(in, collector, (Reporter)reporter);
  
  // Set progress to 1.0f if there was no exception,
  reporter.setProgress(1.0f);
  // start the sort phase only if there are reducers
  this.statusUpdate();
}
 
示例2
void runOldMapper(
    final JobConf job,
    final MRTaskReporter reporter,
    final MRInputLegacy input,
    final KeyValueWriter output
    ) throws IOException, InterruptedException {

  // Initialize input in-line since it sets parameters which may be used by the processor.
  // Done only for MRInput.
  // TODO use new method in MRInput to get required info
  //input.initialize(job, master);
  
  InputSplit inputSplit = input.getOldInputSplit();
  
  updateJobWithSplit(job, inputSplit);

  RecordReader in = new OldRecordReader(input);

  OutputCollector collector = new OldOutputCollector(output);

  MapRunnable runner =
      (MapRunnable)ReflectionUtils.newInstance(job.getMapRunnerClass(), job);

  runner.run(in, collector, (Reporter)reporter);
  
  // Set progress to 1.0f if there was no exception,
  reporter.setProgress(1.0f);
  // start the sort phase only if there are reducers
  this.statusUpdate();
}