需求:
数据格式如下:
/data/input/news/old.dat
/data/input/news/current.dat
/data/input/news/20131001.txt
......
/data/input/news/20131030.txt
/data/input/news/20131031.txt
我们需要计算分析10月份的新闻,其他的暂时不计算,那么就需要做一个过滤操作,input path为新闻根路径即/data/input/news/
思路:
基于PathFilter实现
实现代码:
fitler:
class TextPathFilter extends Configured implements PathFilter { Configuration conf = null; @Override public Configuration getConf() { return conf; } @Override public void setConf(Configuration conf) { this.conf = conf; } @Override public boolean accept(Path path) { String regex = conf.get("org.test.filter.regex"); if (regex == null) { return true; } return path.toString().matches(regex); }}
使用方式:
Configuration conf = new Configuration();conf.set("org.test.filter.regex", "2012[1-12][1-31].txt");........TextInputFormat.setInputPathFilter(job, TextPathFilter.class);
其他说明:
0.21.0版本之前会报错,错误信息如下:
Caused by: org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://your path at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:231) at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:248) at org.apache.hadoop.mapred.JobClient.writeNewSplits(JobClient.java:950) at org.apache.hadoop.mapred.JobClient.writeSplits(JobClient.java:967) at org.apache.hadoop.mapred.JobClient.access$500(JobClient.java:170) at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:880) at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:833) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:396) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1177) at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:833) at org.apache.hadoop.mapreduce.Job.submit(Job.java:476) at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:506)
错误原因:
在进行match的时候返回了null,应该返回一个空的FileStatus[],代码位置在FileSystem.java中
0.21.0版本之后此bug已经修复,因此升级即可