千家信息网

Hadoop Outline的示例分析

发表于:2025-02-07 作者:千家信息网编辑
千家信息网最后更新 2025年02月07日,小编给大家分享一下Hadoop Outline的示例分析,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!Hdfs Java API SampleRead by hadoop FsURL
千家信息网最后更新 2025年02月07日Hadoop Outline的示例分析

小编给大家分享一下Hadoop Outline的示例分析,希望大家阅读完这篇文章之后都有所收获,下面让我们一起去探讨吧!

Hdfs Java API Sample

  • Read by hadoop FsURLStreamHandlerFactory

  • Read/Write by hadoop DistributeFileSystem

package com.jinbao.hadoop.hdfs;import java.io.IOException;import java.io.InputStream;import java.io.OutputStream;import java.net.URI;import java.net.URL;import org.apache.commons.io.IOUtils;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;import org.apache.hadoop.fs.Path;/** *  *//** * @author cloudera * */public class HdfsClient {        static String sFileUrl = "hdfs://quickstart.cloudera/gis/gistool/README.md";        /**         * @param args         * @throws IOException          */        public static void main(String[] args) throws IOException {                if(args.length >= 2){                        String sUrl = sFileUrl;                        if(args[0].equalsIgnoreCase("-r-url")){                                sUrl = args[1];                                                                //test read by hadoop FsURLStreamHandlerFactory                                 readHdfsFileByDfsUrl(sUrl);                        }                        else if(args[0].equalsIgnoreCase("-r-file")){                                sUrl = args[1];                                //test read by hadoop dfsFile                                readHdfsFileByDfsFileApi(sUrl);                        }                        else if(args[0].equalsIgnoreCase("-w-file")){                                sUrl = args[1];                                //test read by hadoop dfsFile                                writeHdfsFileByDfsFileApi(sUrl);                        }                        else if(args[0].equalsIgnoreCase("-w-del")){                                sUrl = args[1];                                //test read by hadoop dfsFile                                deleteHdfsFileByDfsFileApi(sUrl);                        }                }                                        }        private static void deleteHdfsFileByDfsFileApi(String sUrl) {                Configuration conf = new Configuration();                try {                                                FileSystem fs = FileSystem.get(URI.create(sUrl),conf);                        Path path = new Path(sUrl);                        fs.delete(path,true);                                        } catch (IOException e) {                        e.printStackTrace();                }                finally{                }        }        private static void writeHdfsFileByDfsFileApi(String sUrl) {                                Configuration conf = new Configuration();                OutputStream out = null;                byte[] data = "Writing Test".getBytes();                 // Get a FSDataInputStream object                try {                        // Get a FSDataInputStream object, actually is HdfsDataInputSteam                        FileSystem fs = FileSystem.get(URI.create(sUrl),conf);                        Path path = new Path(sUrl);                        if(fs.exists(path)){                                out = fs.append(path);                                IOUtils.write(data, out);                        }                        else{                                out = fs.create(path);                                out.write(data);                                // flush buffer to OS                                out.flush();                                FSDataOutputStream fsout = FSDataOutputStream.class.cast(out);                                // Sync data to disk                                fsout.hsync();                                                                // call sync implicitly                                out.close();                        }                } catch (IOException e) {                        // TODO Auto-generated catch block                        e.printStackTrace();                }                finally{                        IOUtils.closeQuietly(out);                }                        }        public static void readHdfsFileByDfsUrl(String sUrl){                                URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());                                InputStream in = null;                try{                        URL url = new URL(sUrl);                        in = url.openStream();                        IOUtils.copy(in,System.out);                }                catch(IOException ioe){                        ioe.printStackTrace();                }                finally{                        IOUtils.closeQuietly(in);                }        }                private static void readHdfsFileByDfsFileApi(String sUrl) {                                Configuration conf = new Configuration();                                InputStream in = null;                                try{                        FileSystem fs = FileSystem.get(URI.create(sUrl),conf);                        //Get a FSDataInputStream object, actually HdfsDataInputStream                                         in = fs.open(new Path(sUrl));                        IOUtils.copy(in,System.out);                }                catch(IOException ioe){                        ioe.printStackTrace();                }                finally{                        IOUtils.closeQuietly(in);                }               }}

Data Flow on Read

1. FileSystem to get DistributeFileSystem

2. DistributeFileSystem通过和namenode调用,获得前面几个块位置,并返回datanode的地址,包括备用节点。

3. 客户端调用DistributeFileSystem获取FSDataInputStream,这个流对象连接距离最近的datanode,通过调用Read读取数据。

4. 如果当前块已经读完,则FSDataInputStream关闭这个块,并寻找下一个最佳的datanode并继续读取。

5. 读完最后一个块后,调用close方法关闭数据流。

容错处理

1. FSDataInputStream如果与datanode通信遇到错误,则尝试下一个最佳备用节点。另外,也会记住那个坏节点,以后不会在它上面读数据。并且,会通知namenode哪一个节点有问题。

Data Flow on Write

1. FileSystem to get DistributeFileSystem

2. DistributeFileSystem通过和namenode的RPC调用create()方法,创建文件原数据。如果已经存在,报出IOException.

3. 客户端调用DistributeFileSystem获取FSDataOutputStream,它封装了一个DFSOutputStream对象,来处理datanode和namenode通信。

4. 客户端开始写数据,则FSDataOutputStream将当前块数据分成一个个的数据包packet,并写入一个数据包队列(Data packet queue), 然后datastreamer来根据datanode列表,要求namenode分配合适的datanode,DataStreamer把这些datanode组成数据管线 (datanode pipeline),数目有dfs.replication决定。

5. 开始写数据,每写入一个包都将它备份到另一个确认队列(Data Ack queue),第一个被写入的节点,会把数据写入第二个节点,然后第三个。如果收到都写完的通知,则从确认队列中删除。

6. 写完一个块后,重复4-5,直到最后写完,调用close方法关闭数据流。

容错处理

FSDataOutputStream如果与datanode通信遇到错误

1. 关闭pipeline

2. 把确认队列的数据包,添加到数据包队列,以防止下游节点(downstream node)丢失数据。

3. 为存储正常的datanode的当前数据制定一个新的标示(identifier), 并把这个标示传递给namenode,以便namenode删除故障node的部分数据。

4. 把剩余的数据写入剩下的好的datanode。namenode会创建新的节点,来复制数据,以达到复本量。对于当前写过程,如果写入成功的节点达到dfs.replication.min就算成功,其他的由namenode进行复制。

复本的布局

照顾稳定性和负载均衡

Hadoop的默认布局策略是在运行客户端上放置第1个复本,如果客户端在cluster外,则在集群中随机选择一个节点.

第2个和第3个会随机选择另外一个相同Rack上的两个节点.

分布式复制 distcp

% hadoop distcp hdfs://namenode/foo hdfs://namenode2/foo

distcp使用map-reduce的作业来实现,非常适用于两个数据中心同步数据。

如果两个数据中心版本不一致,可以试用hftp协议,使得作业之运行在目标系统上

% hadoop distcp hftp://namenode:50070/foo hdfs://namenode2/foo

注:需要指定hftp端口 50070

为了使map平衡集群,可以参考N*20的设置:-m 20*N,N是节点总数.

balancer还没看。

归档工具Har

减小namenode的内存,适合管理小文件,它还是透明的,对map-reduce也是适用的。

%hadoop archive -archivename files.har /myfiles/ /my

不足:

har相当于tar功能,可以打包文件,不支持压缩。仅仅节省的namenode的内存。

一旦创建就不能修改,想添加和删除文件,必须重新建立har.

看完了这篇文章,相信你对"Hadoop Outline的示例分析"有了一定的了解,如果想了解更多相关知识,欢迎关注行业资讯频道,感谢各位的阅读!

数据 节点 客户 客户端 队列 文件 两个 复本 方法 处理 通信 示例 分析 成功 内存 对象 布局 数据中心 数据流 篇文章 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 如何打开服务器管理器界面 单位网络安全宣传计划 软件开发的技术保障 河南单招软件开发专业 雅虎服务器可以用英文吗 影院订票系统数据库 大专学历的计算机网络技术 网络安全宣传周过往行人美篇 齿轮离心式机软件开发厂家 老男孩培训网络安全目录 哈尔滨海上飞互联网科技有限公司 无法输入网络安全密匙 大学生手机网络安全知识竞赛 数据库结构怎么划分 上海阖联网络技术 网络安全挑战赛排名 群力软件开发电话 网络安全是什么主要组成部分 网络安全与管理项目实验指导书 梦幻手游6周年庆服务器 智能dns解析服务器 广西2u工控服务器机箱厂商 连接镜像服务器 水务行业服务器安全加固系统价格 关于数据库的题 基础 软件开发算物资吗 计算机网络技术对数学要求 网络安全网络舆情交流研讨 金融行业软件开发男的工资 监狱 软件开发
0