千家信息网

mapreduce中怎么合并小文件

发表于:2024-11-28 作者:千家信息网编辑
千家信息网最后更新 2024年11月28日,今天就跟大家聊聊有关mapreduce中怎么合并小文件,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。HDFS中PathFilter类在单个操作
千家信息网最后更新 2024年11月28日mapreduce中怎么合并小文件

今天就跟大家聊聊有关mapreduce中怎么合并小文件,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。

HDFS中PathFilter类

在单个操作中处理一批文件,这是很常见的需求。比如说处理日志的 MapReduce作业可能需要分析一个月内包含在大量目录中的日志文件。在一个表达式中使用通配符在匹配多个文件时比较方便的,无需列举每个文件和目录 来指定输入。hadoop为执行通配提供了两个FIleSystem方法:

1 public FileStatus[] globStatus(Path pathPattern) throw IOException

2 public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throw IOException

globStatus()方法返回与路径想匹配的所有文件的FileStatus对象数组,并按路径排序。hadoop所支持的通配符与Unix bash相同。

第二个方法传了一个PathFilter对象作为参数,PathFilter可以进一步对匹配进行限制。PathFilter是一个接口,里面只有一个方法accept(Path path)。具体使用参考下面代码

package com.tv;import java.io.IOException;import java.net.URI;import java.net.URISyntaxException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileStatus;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.FileUtil;import org.apache.hadoop.fs.Path;import org.apache.hadoop.fs.PathFilter;import org.apache.hadoop.io.IOUtils;public class MergeSmallFilesToHDFS {        private static FileSystem fs = null;        private static FileSystem local = null;        public static class RegexExcludePathFilter implements PathFilter{                private final String regex;                public RegexExcludePathFilter(String regex) {                this.regex = regex;            }                public boolean accept(Path path) {                        // TODO Auto-generated method stub                        boolean flag = path.toString().matches(regex);                        //过滤 regex 格式的文件,只需 return !flag                return !flag;                }        }        public static class RegexAcceptPathFilter implements PathFilter {        private final String regex;                public RegexAcceptPathFilter(String regex) {            this.regex = regex;        }                public boolean accept(Path path) {                        // TODO Auto-generated method stub                        boolean flag = path.toString().matches(regex);                        //接受 regex 格式的文件,只需 return flag            return flag;                }        }        public static void list() throws IOException, URISyntaxException {                //读取配置文件        Configuration conf = new Configuration();        URI uri = new URI("hdfs://zbc:9000");        // FileSystem是用户操作HDFS的核心类,它获得URI对应的HDFS文件系统        fs = FileSystem.get(uri, conf);        // 获得本地文件系统        local = FileSystem.getLocal(conf);        //获取该目录下的所有子目录(日期名称)        FileStatus[] dirstatus = local.globStatus(new Path("C:/Users/zaish/Documents/学习/hadooop分析数据/tvdata/*"),new RegexExcludePathFilter("^.*svn$"));        Path[] dirs = FileUtil.stat2Paths(dirstatus);        FSDataOutputStream out = null;        FSDataInputStream in = null;        for (Path dir : dirs) {            String fileName = dir.getName().replace("-", "");//文件名称            //只接受日期目录下的.txt文件            FileStatus[] localStatus = local.globStatus(new Path(dir+"/*"),new RegexAcceptPathFilter("^.*txt$"));            // 获得日期目录下的所有文件            Path[] listedPaths = FileUtil.stat2Paths(localStatus);            //输出路径            Path block = new Path("hdfs://zbc:9000/middle/tv/"+ fileName + ".txt");            // 打开输出流            out = fs.create(block);                        for (Path p : listedPaths) {                in = local.open(p);// 打开输入流                IOUtils.copyBytes(in, out, 4096, false); // 复制数据                // 关闭输入流                in.close();            }            if (out != null) {                    // 关闭输出流                out.close();            }        }                }        public static void main(String[] args) throws Exception {                list();        }}

看完上述内容,你们对mapreduce中怎么合并小文件有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注行业资讯频道,感谢大家的支持。

0