千家信息网

java如何使用多线程读取超大文件

发表于:2024-09-24 作者:千家信息网编辑
千家信息网最后更新 2024年09月24日,这篇文章主要介绍了java如何使用多线程读取超大文件,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。基本思路如下:1.计算出文件总大小2
千家信息网最后更新 2024年09月24日java如何使用多线程读取超大文件

这篇文章主要介绍了java如何使用多线程读取超大文件,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。

基本思路如下:

1.计算出文件总大小

2.分段处理,计算出每个线程读取文件的开始与结束位置

(文件大小/线程数)*N,N是指第几个线程,这样能得到每个线程在读该文件的大概起始位置

使用"大概起始位置",作为读文件的开始偏移量(fileChannel.position("大概起始位置")),来读取该文件,直到读到第一个换行符,记录下这个换行符的位置,作为该线程的准确起 始位置.同时它也是上一个线程的结束位置.最后一个线程的结束位置也直接设置为-1

3.启动线程,每个线程从开始位置读取到结束位置为止

代码如下:

读文件工具类

import java.io.*;import java.nio.ByteBuffer;import java.nio.channels.FileChannel;import java.util.Observable; /** * Created with IntelliJ IDEA. * User: okey * Date: 14-4-2 * Time: 下午3:12 * 读取文件 */public class ReadFile extends Observable { private int bufSize = 1024; // 换行符 private byte key = "\n".getBytes()[0]; // 当前行数 private long lineNum = 0; // 文件编码,默认为gb2312 private String encode = "gb2312"; // 具体业务逻辑监听器 private ReaderFileListener readerListener; public void setEncode(String encode) { this.encode = encode; } public void setReaderListener(ReaderFileListener readerListener) { this.readerListener = readerListener; } /** * 获取准确开始位置 * @param file * @param position * @return * @throws Exception */ public long getStartNum(File file, long position) throws Exception { long startNum = position; FileChannel fcin = new RandomAccessFile(file, "r").getChannel(); fcin.position(position); try { int cache = 1024; ByteBuffer rBuffer = ByteBuffer.allocate(cache); // 每次读取的内容 byte[] bs = new byte[cache]; // 缓存 byte[] tempBs = new byte[0]; String line = ""; while (fcin.read(rBuffer) != -1) { int rSize = rBuffer.position(); rBuffer.rewind(); rBuffer.get(bs); rBuffer.clear(); byte[] newStrByte = bs; // 如果发现有上次未读完的缓存,则将它加到当前读取的内容前面 if (null != tempBs) { int tL = tempBs.length; newStrByte = new byte[rSize + tL]; System.arraycopy(tempBs, 0, newStrByte, 0, tL); System.arraycopy(bs, 0, newStrByte, tL, rSize); } // 获取开始位置之后的第一个换行符 int endIndex = indexOf(newStrByte, 0); if (endIndex != -1) { return startNum + endIndex; } tempBs = substring(newStrByte, 0, newStrByte.length); startNum += 1024; } } catch (Exception e) { e.printStackTrace(); } finally { fcin.close(); } return position; } /** * 从设置的开始位置读取文件,一直到结束为止。如果 end设置为负数,刚读取到文件末尾 * @param fullPath * @param start * @param end * @throws Exception */ public void readFileByLine(String fullPath, long start, long end) throws Exception { File fin = new File(fullPath); if (fin.exists()) { FileChannel fcin = new RandomAccessFile(fin, "r").getChannel(); fcin.position(start); try { ByteBuffer rBuffer = ByteBuffer.allocate(bufSize); // 每次读取的内容 byte[] bs = new byte[bufSize]; // 缓存 byte[] tempBs = new byte[0]; String line = ""; // 当前读取文件位置 long nowCur = start; while (fcin.read(rBuffer) != -1) { nowCur += bufSize; int rSize = rBuffer.position(); rBuffer.rewind(); rBuffer.get(bs); rBuffer.clear(); byte[] newStrByte = bs; // 如果发现有上次未读完的缓存,则将它加到当前读取的内容前面 if (null != tempBs) { int tL = tempBs.length; newStrByte = new byte[rSize + tL]; System.arraycopy(tempBs, 0, newStrByte, 0, tL); System.arraycopy(bs, 0, newStrByte, tL, rSize); } // 是否已经读到最后一位 boolean isEnd = false; // 如果当前读取的位数已经比设置的结束位置大的时候,将读取的内容截取到设置的结束位置 if (end > 0 && nowCur > end) { // 缓存长度 - 当前已经读取位数 - 最后位数 int l = newStrByte.length - (int) (nowCur - end); newStrByte = substring(newStrByte, 0, l); isEnd = true; } int fromIndex = 0; int endIndex = 0; // 每次读一行内容,以 key(默认为\n) 作为结束符 while ((endIndex = indexOf(newStrByte, fromIndex)) != -1) { byte[] bLine = substring(newStrByte, fromIndex, endIndex); line = new String(bLine, 0, bLine.length, encode); lineNum++; // 输出一行内容,处理方式由调用方提供 readerListener.outLine(line.trim(), lineNum, false); fromIndex = endIndex + 1; } // 将未读取完成的内容放到缓存中 tempBs = substring(newStrByte, fromIndex, newStrByte.length); if (isEnd) { break; } } // 将剩下的最后内容作为一行,输出,并指明这是最后一行 String lineStr = new String(tempBs, 0, tempBs.length, encode); readerListener.outLine(lineStr.trim(), lineNum, true); } catch (Exception e) { e.printStackTrace(); } finally { fcin.close(); } } else { throw new FileNotFoundException("没有找到文件:" + fullPath); } // 通知观察者,当前工作已经完成 setChanged(); notifyObservers(start+"-"+end); } /** * 查找一个byte[]从指定位置之后的一个换行符位置 * * @param src * @param fromIndex * @return * @throws Exception */ private int indexOf(byte[] src, int fromIndex) throws Exception { for (int i = fromIndex; i < src.length; i++) { if (src[i] == key) { return i; } } return -1; } /** * 从指定开始位置读取一个byte[]直到指定结束位置为止生成一个全新的byte[] * * @param src * @param fromIndex * @param endIndex * @return * @throws Exception */ private byte[] substring(byte[] src, int fromIndex, int endIndex) throws Exception { int size = endIndex - fromIndex; byte[] ret = new byte[size]; System.arraycopy(src, fromIndex, ret, 0, size); return ret; } }

读文件线程

/** * Created with IntelliJ IDEA. * User: okey * Date: 14-4-2 * Time: 下午4:50 * To change this template use File | Settings | File Templates. */public class ReadFileThread extends Thread { private ReaderFileListener processPoiDataListeners; private String filePath; private long start; private long end; public ReadFileThread(ReaderFileListener processPoiDataListeners,long start,long end,String file) { this.setName(this.getName()+"-ReadFileThread"); this.start = start; this.end = end; this.filePath = file; this.processPoiDataListeners = processPoiDataListeners; } @Override public void run() { ReadFile readFile = new ReadFile(); readFile.setReaderListener(processPoiDataListeners); readFile.setEncode(processPoiDataListeners.getEncode());// readFile.addObserver(); try { readFile.readFileByLine(filePath, start, end + 1); } catch (Exception e) { e.printStackTrace(); } }}

具体业务逻辑监听

/** * Created with Okey * User: Okey * Date: 13-3-14 * Time: 下午3:19 * NIO逐行读数据回调方法 */public abstract class ReaderFileListener { // 一次读取行数,默认为500 private int readColNum = 500; private String encode; private List list = new ArrayList(); /** * 设置一次读取行数 * @param readColNum */ protected void setReadColNum(int readColNum) { this.readColNum = readColNum; } public String getEncode() { return encode; } public void setEncode(String encode) { this.encode = encode; } /** * 每读取到一行数据,添加到缓存中 * @param lineStr 读取到的数据 * @param lineNum 行号 * @param over 是否读取完成 * @throws Exception */ public void outLine(String lineStr, long lineNum, boolean over) throws Exception { if(null != lineStr) list.add(lineStr); if (!over && (lineNum % readColNum == 0)) { output(list); list.clear(); } else if (over) { output(list); list.clear(); } } /** * 批量输出 * * @param stringList * @throws Exception */ public abstract void output(List stringList) throws Exception; }

线程调度

import java.io.File;import java.io.FileInputStream;import java.io.IOException; /** * Created with IntelliJ IDEA. * User: okey * Date: 14-4-1 * Time: 下午6:03 * To change this template use File | Settings | File Templates. */public class BuildData { public static void main(String[] args) throws Exception { File file = new File("E:\\1396341974289.csv"); FileInputStream fis = null; try { ReadFile readFile = new ReadFile(); fis = new FileInputStream(file); int available = fis.available(); int maxThreadNum = 50; // 线程粗略开始位置 int i = available / maxThreadNum; for (int j = 0; j < maxThreadNum; j++) { // 计算精确开始位置 long startNum = j == 0 ? 0 : readFile.getStartNum(file, i * j); long endNum = j + 1 < maxThreadNum ? readFile.getStartNum(file, i * (j + 1)) : -2; // 具体监听实现 ProcessDataByPostgisListeners listeners = new ProcessDataByPostgisListeners("gbk"); new ReadFileThread(listeners, startNum, endNum, file.getPath()).start(); } } catch (IOException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } }}

现在就可以尽情的调整maxThreadNum来享受风一般的速度吧!

感谢你能够认真阅读完这篇文章,希望小编分享的"java如何使用多线程读取超大文件"这篇文章对大家有帮助,同时也希望大家多多支持,关注行业资讯频道,更多相关知识等着你来学习!

位置 文件 线程 内容 缓存 一行 换行符 篇文章 位数 数据 监听 起始 输出 业务 同时 大小 逻辑 处理 粗略 精确 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 网络安全诗朗诵 网络安全宣传活动目的 微软起初有多少软件开发人员 请简述网络安全的技术架构 滨州专业软件开发报价 西安软件开发工资涨幅 数据库显示2005错误啥意思 c 更新数据库字段 门户网站网络安全检查总结 魔兽世界正式服最大服务器 电子网络安全活动记事 大学网络安全与信息化办公室 国家网络安全周提出了四招几式 腾讯应用服务器有哪些 网络安全考试在哪报名 网络棋牌软件开发公司电话 服务器安全操作指南 厦门勇士网络技术公司 数据挖掘数据库应用 手机数据连接就无服务器 赤壁市扬尖网络技术工作室 设计数据库并创建相关数据 什么是数据库事务日志 网信办让公司做网络安全评价 大学生网络安全技术论文名字 网络安全诗报报告 南川服务器运维 拒绝服务属于网络安全问题吗 网络安全考试在哪报名 亿品众合互联网科技
0