千家信息网

HDFS中怎么实现本地文件上传

发表于:2025-01-19 作者:千家信息网编辑
千家信息网最后更新 2025年01月19日,本篇文章给大家分享的是有关HDFS中怎么实现本地文件上传,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。public synchroniz
千家信息网最后更新 2025年01月19日HDFS中怎么实现本地文件上传

本篇文章给大家分享的是有关HDFS中怎么实现本地文件上传,小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着小编一起来看看吧。

public synchronized void write(byte b[], int off, int len)

throws IOException {

if (closed) { //校验是否关闭了,关闭了自然不应该再写入数据了

throw new IOException("Stream closed");

}

while (len > 0) { //这里的len就是指源缓冲区剩下的未写完的数据长度,单位byte

int remaining = BUFFER_SIZE - pos; //目的缓冲区里可以写的字节数

int toWrite = Math.min(remaining, len); //跟需要写的字节数比较,取较小值作为真正要写入的字节数

System.arraycopy(b, off, outBuf, pos, toWrite); //开始复制来作为写入到目的缓冲区操作

pos += toWrite; //更新目的缓冲区位置指针

off += toWrite; //更新源缓冲区位置指针

len -= toWrite; //更新源缓冲区剩下的内容长度

filePos += toWrite; //计算整个文件的总的已经写入的长度(包括缓冲区里的内容)

if ((bytesWrittenToBlock + pos >= BLOCK_SIZE) ||

(pos == BUFFER_SIZE)) {

flush(); //这里是2个条件引起flush,一个是总长度(已写+缓存)超过一个块大小,

//第2个就是目的缓冲区已经满了,都么空间写入了,自然需要flush了。

}

}

}

//友情提醒,这里的前半段写入是能写多少写多少,写完了再判断!

为啥有2个判断条件?想必很多人对缓冲区满了很好理解,因为都没剩余空间了

而对bytesWrittenToBlock + pos >= BLOCK_SIZE可能不是很清楚

这是因为一个Block写满了就要另起炉灶,重新开一个Block.

flush()函数暂时不解释,后面再解释!

---

public synchronized void write(int b) throws IOException {

if (closed) {//仍然是校验是否关闭

throw new IOException("Stream closed");

}

if ((bytesWrittenToBlock + pos == BLOCK_SIZE) ||

(pos >= BUFFER_SIZE)) {

flush();

}//仍然是2个条件的校验

outBuf[pos++] = (byte) b;

filePos++;//这2句的意义在于真正的写入到目的缓冲区里

不过为啥不把这2段调一下顺序更好理解?果然思维独特!

}

---

public synchronized void flush() throws IOException {

if (closed) {

throw new IOException("Stream closed");

}//检验是否关闭,老规矩

if (bytesWrittenToBlock + pos >= BLOCK_SIZE) {

flushData(BLOCK_SIZE - bytesWrittenToBlock);

}//如果需要新起1个Block的话,就把剩下的不足字节数先写上

if (bytesWrittenToBlock == BLOCK_SIZE) {

endBlock();//然后关闭当前块,新起一块

}

flushData(pos);//对当前块继续写剩下的

}

---

继续看别的函数

在看别的函数之前,首先希望读者先建立一个0.1.0中文件的存储机制。

在读取本地文件上传到HDFS中,文件流是这样的。

本地文件--->本地内存缓冲区Buffer--->本地文件--->上传到远程HDFS系统。

而本地内存缓冲区Buffer--->本地文件就是flushData做的事情,请再复习下flush函数,然后再接下来分析flushData.

PS:看代码比写代码累,看代码是了解别人的思维,写代码是把自己的思维实现起来。。。

private synchronized void flushData(int maxPos) throws IOException {

int workingPos = Math.min(pos, maxPos);//计算要写入的字节数,真是多此一举。

if (workingPos > 0) {//如果确实需要写的话

//

// To the local block backup, write just the bytes

//

backupStream.write(outBuf, 0, workingPos);//写入到本地文件

//注意,请认真阅读backupStream的初始化过程,是一个本地文件。

//也就是说计划把内存缓冲区里的内容写到本地文件中,写完一个block再发送给HDFS.

//聪明的读者应该想到最后一个block的大小是<=blockSize的。

// Track position

//

bytesWrittenToBlock += workingPos;//更新写入到block块的字节数,

//尤其要强调,当一个块结束后,这个变量就会重置为0,你懂的。

System.arraycopy(outBuf, workingPos, outBuf, 0, pos - workingPos);

//字节前挪移到偏移量为0的位置,方便后面IO操作,你懂得,不解释。

pos -= workingPos;//相关变量都需要更新

}

}

---------------

接下来到了比较核心的函数endBlock(); 意思是关闭当前块,新起一块,下面来看看具体的代码!

private synchronized void endBlock() throws IOException {

//

// Done with local copy

//

backupStream.close();//关闭本地文件系统的临时文件

//

// Send it to datanode//准备发送给datanode了。

//

boolean mustRecover = true;//定义一个哨兵变量

while (mustRecover) {//需要读取当前文件时

nextBlockOutputStream();

因为这个函数到后面才分析,所以提把背景知识补充好,这个函数

主要是初始化了一对IO流句柄,这个流是当前shell和远程datanode

之间的TCP连接,这对IO流句柄就是 blockStream + blockReplyStream,

分别对应着输出流和输入流,输出流用来输出文件头和文件内容,输入流是

用来读取响应。

InputStream in = new FileInputStream(backupFile);//既然第一行关闭了写,

现在就可以开始读了

try {

byte buf[] = new byte[BUFFER_SIZE];//还是局部的IO缓冲区

int bytesRead = in.read(buf);//从本地文件中读取内容

while (bytesRead > 0) {//大于0?

blockStream.writeLong((long) bytesRead);//写入字节数

blockStream.write(buf, 0, bytesRead);//写入缓冲区的内容

bytesRead = in.read(buf);//继续从本地文件中读取

}

internalClose();//跟NameNode和DataNode的交互,表示关闭

mustRecover = false;//表示任务结束

} catch (IOException ie) {

handleSocketException(ie);

} finally {

in.close();//关闭当前文件的输入流

}

}

//

// Delete local backup, start new one

//下面4行是从新建立起本地文件系统的文件缓冲系统,不解释

backupFile.delete();

backupFile = newBackupFile();

backupStream = new FileOutputStream(backupFile);

bytesWrittenToBlock = 0;

}

在阅读以上代码之后,我个人认为如果用C语言来写这段逻辑的话,我会直接调用sendfile来实现文件传输。

当然JAVA的API滞后性以及OS当时或许都不提供这种方式吧,反正现在的内核都提供了。

---------------------------------------

那么接下来分析的是函数:nextBlockOutputStream()

private synchronized void nextBlockOutputStream() throws IOException {

boolean retry = false;//不解释

long start = System.currentTimeMillis();//当前开始时间

do {

retry = false;//重置为false

long localstart = System.currentTimeMillis();//当前开始时间

boolean blockComplete = false;//标注块是否OK

LocatedBlock lb = null; //初始化为null

while (! blockComplete) {//如果未结束

if (firstTime) {//如果是第一次开启一个文件

lb = namenode.create(src.toString(), clientName.toString(), localName, overwrite);//创建一个文件

} else {

lb = namenode.addBlock(src.toString(), localName);

}//增加一个block

if (lb == null) {//如果找不到

try {

Thread.sleep(400);//就沉睡400毫秒

if (System.currentTimeMillis() - localstart > 5000) {

LOG.info("Waiting to find new output block node for " + (System.currentTimeMillis() - start) + "ms");

}

} catch (InterruptedException ie) {

}

} else {

blockComplete = true;//设置blockComplete为true.解释为找到了一个block

}

}

block = lb.getBlock();//从lb中获取block的信息

DatanodeInfo nodes[] = lb.getLocations();//从lb中获取block要存储的DataNode数组

//

// Connect to first DataNode in the list. Abort if this fails.

//请注意上面这句的意思:连接第一个数据节点,

//为啥?数据传输采用计算机组成原理的菊花链模式

InetSocketAddress target = DataNode.createSocketAddr(nodes[0].getName().toString());//解析

try {

s = new Socket();

s.connect(target, READ_TIMEOUT);//连接第一个DataNode

s.setSoTimeout(READ_TIMEOUT);//设置读取时间

} catch (IOException ie) {//异常这里就不分析了

// Connection failed. Let's wait a little bit and retry

try {

if (System.currentTimeMillis() - start > 5000) {

LOG.info("Waiting to find target node: " + target);

}

Thread.sleep(6000);

} catch (InterruptedException iex) {

}

if (firstTime) {

namenode.abandonFileInProgress(src.toString());

} else {

namenode.abandonBlock(block, src.toString());

}

retry = true;

continue;

}

//此时已经成功连接到了远程DataNode节点,bingo!

// Xmit header info to datanode

//

DataOutputStream out = new DataOutputStream(new BufferedOutputStream(s.getOutputStream()));

//获取输出流句柄

out.write(OP_WRITE_BLOCK);//输出行为标识

out.writeBoolean(false);//false?

block.write(out);//写入block信息,注意:是把从namenode获取到的block写给DataNode

out.writeInt(nodes.length);//这一样和下面这一行是为了写入所有存储及备份的DataNode

for (int i = 0; i < nodes.length; i++) {

nodes[i].write(out);//不解释

}

out.write(CHUNKED_ENCODING);//写CHUNKED_ENCODING

bytesWrittenToBlock = 0;//重置为0

blockStream = out;//把句柄赋值给类的局部变量供后续使用

blockReplyStream = new DataInputStream(new BufferedInputStream(s.getInputStream()));//同理,不解释

} while (retry);

firstTime = false;//firstTime在至少有一个块信息返回后就为false

===================================================

接下来要分析的函数是

private synchronized void internalClose() throws IOException {

blockStream.writeLong(0);//表明长度结束了

blockStream.flush();//把缓冲内容全部输出。

long complete = blockReplyStream.readLong();//读取响应

if (complete != WRITE_COMPLETE) {//如果不是结束

LOG.info("Did not receive WRITE_COMPLETE flag: " + complete);

throw new IOException("Did not receive WRITE_COMPLETE_FLAG: " + complete);

}

LocatedBlock lb = new LocatedBlock();//创建一个新对象

lb.readFields(blockReplyStream);//根据响应流来赋值

namenode.reportWrittenBlock(lb);//向namenode报告写入成功

s.close();//关闭此流

s = null;

}

================

最后就是close函数

public synchronized void close() throws IOException {

if (closed) {

throw new IOException("Stream closed");

}//校验是否关闭了

flush();//尽可能的输出内容

if (filePos == 0 || bytesWrittenToBlock != 0) {

try {

endBlock();//结束一个块

} catch (IOException e) {

namenode.abandonFileInProgress(src.toString());//抛弃此file

throw e;

}

}

backupStream.close();//关闭流

backupFile.delete();//删除文件

if (s != null) {

s.close();//不解释

s = null;

}

super.close();

long localstart = System.currentTimeMillis();

boolean fileComplete = false;

while (! fileComplete) {//循环报告文件写完了

fileComplete = namenode.complete(src.toString(), clientName.toString());

if (!fileComplete) {

try {

Thread.sleep(400);

if (System.currentTimeMillis() - localstart > 5000) {

LOG.info("Could not complete file, retrying...");

}

} catch (InterruptedException ie) {

}

}

}

closed = true;

}

以上就是HDFS中怎么实现本地文件上传,小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注行业资讯频道。

0