千家信息网

hbase怎么在不同版本hdfs集群之间转移数据

发表于:2024-09-22 作者:千家信息网编辑
千家信息网最后更新 2024年09月22日,这篇文章主要讲解了"hbase怎么在不同版本hdfs集群之间转移数据",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"hbase怎么在不同版本hdfs集群
千家信息网最后更新 2024年09月22日hbase怎么在不同版本hdfs集群之间转移数据

这篇文章主要讲解了"hbase怎么在不同版本hdfs集群之间转移数据",文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习"hbase怎么在不同版本hdfs集群之间转移数据"吧!

很多人会有这样一个需求:将一个hdfs集群上的数据写入另一个hdfs集群所在的hbase数据库。通常情况下两个hdfs集群的版本差距并不大,这样的程序会很容易写。但有时会跨大版本。比如作者所在的厂子,数据都在基于hadoop0.19.2版本修改的hdfs集群上,要将这样的数据导入版本为0.20.2+的hdfs集群,就不能使用同一个hadoop jar包来完成了。如何实现呢?

最简单的办法就是把src集群的数据导到本地,然后起另一个进程将本地数据传到des集群上去。
不过这有几个问题:

  • 效率降低

  • 占用本地磁盘空间

  • 不能应付实时导数据需求

  • 两个进程需要协调,复杂度增加


更好的办法是在同一个进程内一边读src数据,一边写des集群。不过这相当于在同一个进程空间内加载两个版本的hadoop jar包,这就需要在程序中使用两个classloader来实现。
以下代码可以实现classloader加载自定义的jar包,并生成需要的Configuration对象:

Java代码

  1. URL[] jarUrls = new URL[1];

  2. jarUrls[0]=new File(des_jar_path).toURI().toURL();

  3. ClassLoader jarloader = new URLClassLoader(jarUrls, null);

  4. Class Proxy = Class.forName("yourclass", true, jarloader);

  5. Configuration conf = (Configuration)Proxy.newInstance();

URL[] jarUrls = new URL[1];

jarUrls[0]=new File(des_jar_path).toURI().toURL();

ClassLoader jarloader = new URLClassLoader(jarUrls, null);

Class Proxy = Class.forName("yourclass", true, jarloader);

Configuration conf = (Configuration)Proxy.newInstance();



但是由于在生成HTable对象时,需要使用这个conf对象,而加载这个conf对象的代码本身是由默认的classloader加载的,也就是0.19.2的jar包。所以在以上代码最后一行所强制转换的Configuration对象仍然是0.19.2版本的。那怎么办呢?
琢磨了一会,发现如果要实现以上功能,必须将生成HTable对象,以及以后的所有hbase操作都使用这个新的classloader,因此这个新的classloader必须加载除了0.19.2的jar包外所有需要用到的jar包,然后把所有操作都封装进去。在外面用反射来调用。
这样的话,通常构造函数都不为空了,因此需要用到Constructor来构造一个自定义的构造函数
代码段如下:

Java代码

  1. main.java

  2. void init(){

  3. ClassLoader jarloader = generateJarLoader();

  4. Class Proxy = Class.forName("test.writer.hbasewriter.HBaseProxy", true, jarloader);

  5. Constructor con = Proxy.getConstructor(new Class[]{String.class, String.class, boolean.class});

  6. Boolean autoflush = param.getBoolValue(ParamsKey.HbaseWriter.autoFlush, true);

  7. proxy = con.newInstance(new Object[]{path, tablename, autoflush});

  8. }

  9. void put(){

  10. ...

  11. while((line = getLine()) != null) {

  12. proxy.getClass().getMethod("generatePut",String.class).invoke(proxy, line.getField(rowkey));

  13. Method addPut = proxy.getClass().getMethod("addPut",

  14. new Class[]{String.class, String.class, String.class});

  15. addPut.invoke(proxy, new Object[]{field, column, encode});

  16. proxy.getClass().getMethod("putLine").invoke(proxy);

  17. }

  18. }

  19. ClassLoader generateJarLoader() throws IOException {

  20. String libPath = System.getProperty("java.ext.dirs");

  21. FileFilter filter = new FileFilter() {

  22. @Override

  23. public boolean accept(File pathname) {

  24. if(pathname.getName().startsWith("hadoop-0.19.2"))

  25. return false;

  26. else

  27. return pathname.getName().endsWith(".jar");

  28. }

  29. };

  30. File[] jars = new File(libPath).listFiles(filter);

  31. URL[] jarUrls = new URL[jars.length+1];

  32. int k = 0;

  33. for (int i = 0; i < jars.length; i++) {

  34. jarUrls[k++] = jars[i].toURI().toURL();

  35. }

  36. jarUrls[k] = new File("hadoop-0.20.205.jar")

  37. ClassLoader jarloader = new URLClassLoader(jarUrls, null);

  38. return jarloader;

  39. }

main.java

void init(){

ClassLoader jarloader = generateJarLoader();

Class Proxy = Class.forName("test.writer.hbasewriter.HBaseProxy", true, jarloader);

Constructor con = Proxy.getConstructor(new Class[]{String.class, String.class, boolean.class});

Boolean autoflush = param.getBoolValue(ParamsKey.HbaseWriter.autoFlush, true);

proxy = con.newInstance(new Object[]{path, tablename, autoflush});

}

void put(){

...

while((line = getLine()) != null) {

proxy.getClass().getMethod("generatePut",String.class).invoke(proxy, line.getField(rowkey));

Method addPut = proxy.getClass().getMethod("addPut",

new Class[]{String.class, String.class, String.class});

addPut.invoke(proxy, new Object[]{field, column, encode});

proxy.getClass().getMethod("putLine").invoke(proxy);

}

}


ClassLoader generateJarLoader() throws IOException {

String libPath = System.getProperty("java.ext.dirs");

FileFilter filter = new FileFilter() {

@Override

public boolean accept(File pathname) {

if(pathname.getName().startsWith("hadoop-0.19.2"))

return false;

else

return pathname.getName().endsWith(".jar");

}

};

File[] jars = new File(libPath).listFiles(filter);

URL[] jarUrls = new URL[jars.length+1];

int k = 0;

for (int i = 0; i < jars.length; i++) {

jarUrls[k++] = jars[i].toURI().toURL();

}

jarUrls[k] = new File("hadoop-0.20.205.jar")

ClassLoader jarloader = new URLClassLoader(jarUrls, null);

return jarloader;

}

Java代码

  1. HBaseProxy.java

  2. public HBaseProxy(String hbase_conf, String tableName, boolean autoflush)

  3. throws IOException{

  4. Configuration conf = new Configuration();

  5. conf.addResource(new Path(hbase_conf));

  6. config = new Configuration(conf);

  7. htable = new HTable(config, tableName);

  8. admin = new HBaseAdmin(config);

  9. htable.setAutoFlush(autoflush);

  10. }

  11. public void addPut(String field, String column, String encode) throws IOException {

  12. try {

  13. p.add(column.split(":")[0].getBytes(), column.split(":")[1].getBytes(),

  14. field.getBytes(encode));

  15. } catch (UnsupportedEncodingException e) {

  16. p.add(column.split(":")[0].getBytes(), column.split(":")[1].getBytes(),

  17. field.getBytes());

  18. }

  19. }

  20. public void generatePut(String rowkey){

  21. p = new Put(rowkey.getBytes());

  22. }

  23. public void putLine() throws IOException{

  24. htable.put(p);

  25. }

HBaseProxy.java

public HBaseProxy(String hbase_conf, String tableName, boolean autoflush)

throws IOException{

Configuration conf = new Configuration();

conf.addResource(new Path(hbase_conf));

config = new Configuration(conf);

htable = new HTable(config, tableName);

admin = new HBaseAdmin(config);

htable.setAutoFlush(autoflush);

}

public void addPut(String field, String column, String encode) throws IOException {

try {

p.add(column.split(":")[0].getBytes(), column.split(":")[1].getBytes(),

field.getBytes(encode));

} catch (UnsupportedEncodingException e) {

p.add(column.split(":")[0].getBytes(), column.split(":")[1].getBytes(),

field.getBytes());

}

}

public void generatePut(String rowkey){

p = new Put(rowkey.getBytes());

}

public void putLine() throws IOException{

htable.put(p);

}


总之,在同一个进程中加载多个classloader时一定要注意,classloader A所加载的对象是不能转换成classloader B的对象的,当然也不能使用。两个空间的相互调用只能用java的基本类型或是反射。

感谢各位的阅读,以上就是"hbase怎么在不同版本hdfs集群之间转移数据"的内容了,经过本文的学习后,相信大家对hbase怎么在不同版本hdfs集群之间转移数据这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是,小编将为大家推送更多相关知识点的文章,欢迎关注!

0