千家信息网

MiniYARNCluster MiniDFSCluster Kerberos

发表于:2025-01-28 作者:千家信息网编辑
千家信息网最后更新 2025年01月28日,/**1. download hadoop source code ,then yum install cmake zlib-devel, finally ,mvn clean package -Pd
千家信息网最后更新 2025年01月28日MiniYARNCluster MiniDFSCluster Kerberos

/**

1. download hadoop source code ,then yum install cmake zlib-devel, finally ,mvn clean package -Pdist,native -DskipTests -Dtar to create native lib

2. before run junit you shuold specify vm argument -Djava.library.path=/usr/hadoop{version}/hadoop-dist/target/hadoop-{version}/lib/native

3. use mvn clean install -Pnative -Dcontainer-executor.conf.dir=/{project.path}/bin -DskipTests to create container-executor

an example container-executor.cfg existe in {project.path}/bin

After that it will create container-executor exe file

cp this file to {project.path}/bin

sudo chown root:yourusername {project.path}/bin/container-executor in this example yourusername is tjj

sudo chmod 4550 {project.path}/bin/container-executor

4. If you want to run testjob in YARN(conf.set("mapreduce.framework.name", "yarn")),you should modify META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider

to #org.apache.hadoop.mapred.LocalClientProtocolProvider

org.apache.hadoop.mapred.YarnClientProtocolProvider

in hadoop-mapreduce-client-common-{version}.jar which exist in your classpath


*/

import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY;

import static org.apache.hadoop.hdfs.DFSConfigKeys.*;

import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY;

import static org.junit.Assert.*;


import java.io.*;

import java.util.ArrayList;

import java.util.List;

import java.util.Properties;


import org.apache.commons.io.FileUtils;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.*;

import org.apache.hadoop.hdfs.HdfsConfiguration;

import org.apache.hadoop.hdfs.MiniDFSCluster;

import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferTestCase;

import org.apache.hadoop.http.HttpConfig;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.minikdc.MiniKdc;

import org.apache.hadoop.security.SecurityUtil;

import org.apache.hadoop.security.UserGroupInformation;

import org.apache.hadoop.security.ssl.KeyStoreTestUtil;

import org.apache.hadoop.yarn.conf.YarnConfiguration;

import org.apache.hadoop.yarn.server.MiniYARNCluster;

import org.junit.*;


public class TestClusterWithKerberos {



private static File baseDir;

private static String hdfsPrincipal;

private static MiniKdc kdc;

private static String keytab;

private static String spnegoPrincipal;

private MiniYARNCluster yarnCluster;

private MiniDFSCluster cluster;



@BeforeClass

public static void initKdc() throws Exception {

baseDir = new File(System.getProperty("test.build.dir", "target/test-dir"),

SaslDataTransferTestCase.class.getSimpleName());

FileUtil.fullyDelete(baseDir);

assertTrue(baseDir.mkdirs());


Properties kdcConf = MiniKdc.createConf();

kdc = new MiniKdc(kdcConf, baseDir);

kdc.start();

UserGroupInformation ugi = UserGroupInformation.createRemoteUser("tjj");

UserGroupInformation.setLoginUser(ugi);

String userName = UserGroupInformation.getLoginUser().getShortUserName();

File keytabFile = new File(baseDir, userName + ".keytab");

keytab = keytabFile.getAbsolutePath();

kdc.createPrincipal(keytabFile, userName + "/localhost", "HTTP/localhost");

hdfsPrincipal = userName + "/localhost@" + kdc.getRealm();

spnegoPrincipal = "HTTP/localhost@" + kdc.getRealm();

System.out.println("keytab "+keytab+"hdfsPrincipal "+hdfsPrincipal);

}


@AfterClass

public static void shutdownKdc() {

if (kdc != null) {

kdc.stop();

}

FileUtil.fullyDelete(baseDir);

}






private void startCluster(HdfsConfiguration conf) throws IOException {

cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();//

cluster.waitActive();

yarnCluster = new MiniYARNCluster("MiniClusterStartsWithCountJobTest", // testName

1, // number of node managers

1, // number of local log dirs per node manager

1); // number of hdfs dirs per node manager

yarnCluster.init(conf);




yarnCluster.start();

yarnCluster.getConfig().writeXml(new FileOutputStream(new File("conf.Xml")));

}


@Test

public void testWithMiniCluster() throws Exception {


HdfsConfiguration clusterConf = createSecureConfig("authentication,integrity,privacy");

YarnConfiguration yarnConf = createYarnSecureConfig();

clusterConf.addResource(yarnConf);

startCluster(clusterConf);


Configuration conf = new Configuration();

conf.addResource(FileUtils.openInputStream(new File("conf.Xml")));

String IN_DIR = "testing/wordcount/input";

String OUT_DIR = "testing/wordcount/output";

String DATA_FILE = "sample.txt";


FileSystem fs = FileSystem.get(conf);

Path inDir = new Path(IN_DIR);

Path outDir = new Path(OUT_DIR);


fs.delete(inDir, true);

fs.delete(outDir, true);


// create the input data files

List content = new ArrayList();

content.add("She sells seashells at the seashore, and she sells nuts in the mountain.");

writeHDFSContent(fs, inDir, DATA_FILE, content);


// set up the job, submit the job and wait for it complete

Job job = Job.getInstance(conf);


job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

job.setMapperClass(BasicWordCount.TokenizerMapper.class);

job.setReducerClass(BasicWordCount.IntSumReducer.class);

FileInputFormat.addInputPath(job, inDir);

FileOutputFormat.setOutputPath(job, outDir);

job.waitForCompletion(true);

assertTrue(job.isSuccessful());


// now check that the output is as expected

List results = getJobResults(fs, outDir, 11);


assertTrue(results.contains("She\t1"));

assertTrue(results.contains("sells\t2"));



// clean up after test case

fs.delete(inDir, true);

fs.delete(outDir, true);

}

/* @Test

public void wordcount() throws Exception {


HdfsConfiguration clusterConf = createSecureConfig("authentication,integrity,privacy");

YarnConfiguration yarnConf = createYarnSecureConfig();

clusterConf.addResource(yarnConf);

startCluster(clusterConf);


Configuration conf = new Configuration();

conf.addResource(FileUtils.openInputStream(new File("conf.Xml")));

String IN_DIR = "testing/wordcount/input";

String OUT_DIR = "testing/wordcount/output";

String DATA_FILE = "sample.txt";


FileSystem fs = FileSystem.get(conf);

Path inDir = new Path(IN_DIR);

Path outDir = new Path(OUT_DIR);


fs.delete(inDir, true);

fs.delete(outDir, true);


// create the input data files

List content = new ArrayList();

content.add("She sells seashells at the seashore, and she sells nuts in the mountain.");

writeHDFSContent(fs, inDir, DATA_FILE, content);

String[] args = new String[]{IN_DIR,OUT_DIR};

int exitCode = ToolRunner.run(conf,new WordCount(), args);

fs.delete(inDir, true);

fs.delete(outDir, true);

}*/

private void writeHDFSContent(FileSystem fs, Path dir, String fileName, List content) throws IOException {

Path newFilePath = new Path(dir, fileName);

FSDataOutputStream out = fs.create(newFilePath);

for (String line : content) {

out.writeBytes(line);

}

out.close();

}


protected List getJobResults(FileSystem fs, Path outDir, int numLines) throws Exception {

List results = new ArrayList();

FileStatus[] fileStatus = fs.listStatus(outDir);

for (FileStatus file : fileStatus) {

String name = file.getPath().getName();

if (name.contains("part-r-00000")) {

Path filePath = new Path(outDir + "/" + name);

BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(filePath)));

for (int i = 0; i < numLines; i++) {

String line = reader.readLine();

if (line == null) {

fail("Results are not what was expected");

}

System.out.println("line info: "+line);

results.add(line);

}

assertNull(reader.readLine());

reader.close();

}

}

return results;

}


private HdfsConfiguration createSecureConfig(String dataTransferProtection) throws Exception {

HdfsConfiguration conf = new HdfsConfiguration();

SecurityUtil.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.KERBEROS, conf);

conf.set(DFS_NAMENODE_KERBEROS_PRINCIPAL_KEY, hdfsPrincipal);

conf.set(DFS_NAMENODE_KEYTAB_FILE_KEY, keytab);

conf.set(DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, hdfsPrincipal);

conf.set(DFS_DATANODE_KEYTAB_FILE_KEY, keytab);

conf.set(DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY, spnegoPrincipal);

conf.setBoolean(DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true);

conf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, dataTransferProtection);

conf.set(DFS_HTTP_POLICY_KEY, HttpConfig.Policy.HTTPS_ONLY.name());

conf.set(DFS_NAMENODE_HTTPS_ADDRESS_KEY, "localhost:0");

conf.set(DFS_DATANODE_HTTPS_ADDRESS_KEY, "localhost:0");

conf.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SASL_KEY, 10);

conf.set(DFS_ENCRYPT_DATA_TRANSFER_KEY, "true");//https://issues.apache.org/jira/browse/HDFS-7431

String keystoresDir = baseDir.getAbsolutePath();

String sslConfDir = KeyStoreTestUtil.getClasspathDir(this.getClass());

KeyStoreTestUtil.setupSSLConfig(keystoresDir, sslConfDir, conf, false);

return conf;

}

private YarnConfiguration createYarnSecureConfig(){


YarnConfiguration conf = new YarnConfiguration();



//yarn secure config

conf.set("yarn.resourcemanager.keytab", keytab);

conf.set("yarn.resourcemanager.principal", hdfsPrincipal);


conf.set("yarn.nodemanager.keytab", keytab);

conf.set("yarn.nodemanager.principal", hdfsPrincipal);

// conf.set("yarn.nodemanager.container-executor.class", "org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor");

conf.set("yarn.nodemanager.container-executor.class", "org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor");

conf.set("yarn.nodemanager.linux-container-executor.path", "/container/container-executor");

conf.set("mapreduce.jobhistory.keytab", keytab);

conf.set("mapreduce.jobhistory.principal", hdfsPrincipal);

conf.set("yarn.nodemanager.aux-services", "mapreduce_shuffle");//https://issues.apache.org/jira/browse/YARN-1289


//enable security


conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, "true");

//yarn

conf.set("mapreduce.framework.name", "yarn"); //http://stackoverflow.com/questions/26567223/java-io-ioexception-cannot-initialize-cluster-in-hadoop2-with-yarn use Yarn runner

return conf;

}

}



down vote

I have run into similar issues today. In my case I was building an über jar, where some dependency (I have not found the culprit yet) was bringing in a META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider with the contents:


org.apache.hadoop.mapred.LocalClientProtocolProvider

I provided my own in the project (e.g. put it on the classpath) with the following:


org.apache.hadoop.mapred.YarnClientProtocolProvider

and the correct one is picked up. I suspect you are seeing similar. To fix, please create the file described above, and put it on the classpath. If I find the culprit Jar, I will update the answer.



http://stackoverflow.com/questions/26567223/java-io-ioexception-cannot-initialize-cluster-in-hadoop2-with-yarn


hadoop-mapreduce-client-common-2.6.0.jar

#

# Licensed under the Apache License, Version 2.0 (the "License");

# you may not use this file except in compliance with the License.

# You may obtain a copy of the License at

#

# http://www.apache.org/licenses/LICENSE-2.0

#

# Unless required by applicable law or agreed to in writing, software

# distributed under the License is distributed on an "AS IS" BASIS,

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

# See the License for the specific language governing permissions and

# limitations under the License.

#

#org.apache.hadoop.mapred.LocalClientProtocolProvider

org.apache.hadoop.mapred.YarnClientProtocolProvider


0