hadoop方法将输出发送到多个目录
问题内容:
我的MapReduce
工作按日期处理数据,需要将输出写入特定的文件夹结构。当前的期望是生成如下结构:
2013
01
02
..
2012
01
02
..
等等
在任何时候,我最多只能获得12个月的数据,因此,我正在使用MultipleOutputs
类在驱动程序中使用以下函数创建12个输出:
public void createOutputs(){
Calendar c = Calendar.getInstance();
String monthStr, pathStr;
// Create multiple outputs for last 12 months
// TODO make 12 configurable
for(int i = 0; i < 12; ++i ){
//Get month and add 1 as month is 0 based index
int month = c.get(Calendar.MONTH)+1;
//Add leading 0
monthStr = month > 10 ? "" + month : "0" + month ;
// Generate path string in the format 2013/03/etl
pathStr = c.get(Calendar.YEAR) + "" + monthStr + "etl";
// Add the named output
MultipleOutputs.addNamedOutput(config, pathStr );
// Move to previous month
c.add(Calendar.MONTH, -1);
}
}
在reducer中,我添加了一个清理功能,以将生成的输出移动到适当的目录。
protected void cleanup(Context context) throws IOException, InterruptedException {
// Custom function to recursively process data
moveFiles (FileSystem.get(new Configuration()), new Path("/MyOutputPath"));
}
问题:在将输出从_temporary目录移动到输出目录之前,reducer的清除功能正在执行。因此,由于所有数据仍位于_temporary目录中,因此上述函数在执行时看不到任何输出。
对我而言,实现所需功能的最佳方法是什么?赞赏任何见解。
思考以下内容:
- 有没有一种使用自定义输出提交器的方法?
- 连锁另一个工作更好还是为此而矫kill过正?
- 有没有我不知道的更简单的选择。
这是来自cleanup
函数的文件结构的示例日志:
MyMapReduce: filepath:hdfs://localhost:8020/dev/test
MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_logs
MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_logs/history/job_201310301015_0224_1383763613843_371979_HtmlEtl
MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_temporary
MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_temporary/_attempt_201310301015_0224_r_000000_0
MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_temporary/_attempt_201310301015_0224_r_000000_0/201307etl-r-00000
MyMapReduce: filepath:hdfs://localhost:8020/dev/test/_temporary/_attempt_201310301015_0224_r_000000_0/part-r-00000
问题答案:
您不需要第二份工作。我目前正在使用MultipleOutputs在我的一个程序中创建大量输出目录。尽管有30个以上的目录,但我只能使用几个MultipleOutputs对象。这是因为您可以在写入时设置输出目录,因此只能在需要时确定输出目录。如果要以不同的格式输出,则实际上只需要一个以上的namedOutput(例如,一种具有键:Text.class,值:Text.class,另一种具有键:Text.class和Value:IntWritable.class)
建立:
MultipleOutputs.addNamedOutput(job, "Output", TextOutputFormat.class, Text.class, Text.class);
减速器的设置:
mout = new MultipleOutputs<Text, Text>(context);
在减速器中调用mout:
String key; //set to whatever output key will be
String value; //set to whatever output value will be
String outputFileName; //set to absolute path to file where this should write
mout.write("Output",new Text(key),new Text(value),outputFileName);
您可以用一段代码在编码时确定目录。例如,说您想按月份和年份指定目录:
int year;//extract year from data
int month;//extract month from data
String baseFileName; //parent directory to all outputs from this job
String outputFileName = baseFileName + "/" + year + "/" + month;
mout.write("Output",new Text(key),new Text(value),outputFileName);
希望这可以帮助。
编辑:以上示例的输出文件结构:
Base
2013
01
02
03
...
2012
01
...
...