Commit 12cc45a7 authored by 谢森's avatar 谢森

<dev> flink yarn 高可用测试demo

parent 7c154ae6
# maven ignore
target
*.jar
*.war
*.zip
*.tar
*.tar.gz
release
sources
lib
plugins
modules
# eclipse ignore
*.class
.settings/
.project
.classpath
# idea ignore
.idea/
*.ipr
*.iml
*.iws
# temp ignore
*.log
*.cache
*.diff
*.patch
*.tmp
logs
# system ignore
.DS_Store
Thumbs.db
**/*.jar
**/*.class
.sincedb
kafka0.10.x/src/docs/*
# Package Files #
*.war
*.ear
dependency-reduced-pom.xml
.vertx/
release
smartdata-streamx
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>flink-ha-test</artifactId>
<version>1.0-SNAPSHOT</version>
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<properties>
<hadoop.version>2.6.0</hadoop.version>
<hadoop.release>cdh5.16.2</hadoop.release>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}-${hadoop.release}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
<version>${hadoop.version}-${hadoop.release}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}-${hadoop.release}</version>
</dependency>
<!--<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.5</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
<exclusion>
<artifactId>jackson-mapper-asl</artifactId>
<groupId>org.codehaus.jackson</groupId>
</exclusion>
<exclusion>
<artifactId>jackson-core-asl</artifactId>
<groupId>org.codehaus.jackson</groupId>
</exclusion>
</exclusions>
</dependency>-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.73</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.6.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>FlinkYarnHaTest</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<!-- 代码扫相关描插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-pmd-plugin</artifactId>
<version>3.13.0</version>
<configuration>
<rulesets>
<ruleset>rulesets/java/ali-comment.xml</ruleset>
<ruleset>rulesets/java/ali-concurrent.xml</ruleset>
<ruleset>rulesets/java/ali-constant.xml</ruleset>
<ruleset>rulesets/java/ali-exception.xml</ruleset>
<ruleset>rulesets/java/ali-flowcontrol.xml</ruleset>
<ruleset>rulesets/java/ali-naming.xml</ruleset>
<ruleset>rulesets/java/ali-oop.xml</ruleset>
<ruleset>rulesets/java/ali-orm.xml</ruleset>
<ruleset>rulesets/java/ali-other.xml</ruleset>
<ruleset>rulesets/java/ali-set.xml</ruleset>
<ruleset>rulesets/vm/ali-other.xml</ruleset>
</rulesets>
<printFailingErrors>true</printFailingErrors>
<excludeFromFailureFile>exclude-pmd.properties</excludeFromFailureFile>
</configuration>
<executions>
<execution>
<id>validate</id>
<phase>validate</phase>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
<dependencies>
<dependency>
<groupId>com.alibaba.p3c</groupId>
<artifactId>p3c-pmd</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-pmd-plugin</artifactId>
<version>3.13.0</version>
<type>maven-plugin</type>
</dependency>
</dependencies>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.exceptions.YarnException;
import java.io.IOException;
import java.util.List;
import java.util.Properties;
/**
* @author xiese
* @Description FlinkYarnHaTest
* @Email xiesen310@163.com
* @Date 2020/10/14 19:19
*/
public class FlinkYarnHaTest {
public static void main(String[] args) throws Exception {
if (args.length < 1) {
System.err.println("parameters missing. please enter configuration file (参数缺失,请输入配置文件)");
throw new RuntimeException("parameters missing. please enter configuration file (参数缺失,请输入配置文件)");
}
Properties prop = LoadConfig.getProperties(args[0]);
String yarnConfDir = prop.getProperty(HadoopSecurityUtil.YARN_CONF_DIR);
String keytabFile = prop.getProperty(HadoopSecurityUtil.ZORK_KEYTAB_FILE_KEY);
String kerberosPrincipal = prop.getProperty(HadoopSecurityUtil.ZORK_KERBEROS_PRINCIPAL);
String krb5Conf = prop.getProperty(HadoopSecurityUtil.HADOOP_SECURITY_KRB_5_CONF);
String username = prop.getProperty(HadoopSecurityUtil.USER_NAME);
String authentication = prop.getProperty(HadoopSecurityUtil.HADOOP_SECURITY_AUTHENTICATION);
if (!ParamUtil.check(yarnConfDir, keytabFile, kerberosPrincipal, krb5Conf, username)) {
throw new RuntimeException(String.format("(Please check the parameters) 请检查参数 [{ " + HadoopSecurityUtil.YARN_CONF_DIR
+ " }],[{ " + HadoopSecurityUtil.ZORK_KEYTAB_FILE_KEY
+ " }],[{ " + HadoopSecurityUtil.ZORK_KERBEROS_PRINCIPAL
+ " }],[{ " + HadoopSecurityUtil.HADOOP_SECURITY_KRB_5_CONF
+ " }],[{ " + HadoopSecurityUtil.USER_NAME
+ " }],[{" + HadoopSecurityUtil.HADOOP_SECURITY_AUTHENTICATION + "}]"
));
}
Configuration yarnConf = LoadConfig.getYarnConf(yarnConfDir);
yarnConf.set(HadoopSecurityUtil.ZORK_KEYTAB_FILE_KEY, keytabFile);
yarnConf.set(HadoopSecurityUtil.ZORK_KERBEROS_PRINCIPAL, kerberosPrincipal);
yarnConf.set(HadoopSecurityUtil.HADOOP_SECURITY_KRB_5_CONF, krb5Conf);
yarnConf.set(HadoopSecurityUtil.USER_NAME, username);
yarnConf.set(HadoopSecurityUtil.HADOOP_SECURITY_AUTHENTICATION, authentication);
HadoopSecurityUtil.login(yarnConf);
getYarnClusterInfo(yarnConf);
}
/**
* 获取 yarn 集群信息
*
* @param yarnConf
* @throws YarnException
* @throws IOException
*/
private static void getYarnClusterInfo(Configuration yarnConf) throws YarnException, IOException {
/// 创建 yarn Client 和 resource manager 进行交互
YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(yarnConf);
/// yarnClient需要启动之后才能用
yarnClient.start();
/// 获取集群信息
YarnClusterMetrics clusterMetrics = yarnClient.getYarnClusterMetrics();
/// 获取各节点的运行状态
List<NodeReport> clusterNodeReports = yarnClient.getNodeReports(NodeState.RUNNING);
clusterNodeReports.forEach((node) -> System.out.println("Node information (节点信息): " + node));
/// 获取集群所有队列
List<QueueInfo> allQueues = yarnClient.getAllQueues();
allQueues.forEach((queue) -> System.err.println("Cluster queue (集群队列): " + queue));
}
}
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import java.io.IOException;
/**
* @author xiese
* @Description HadoopSecurityUtil HADOOP_USER_NAME=hdfs
* @Email xiesen310@163.com
* @Date 2020/10/15 11:35
*/
public class HadoopSecurityUtil {
/**
* keytab 文件
*/
public static final String ZORK_KEYTAB_FILE_KEY = "keytab.file";
/**
* 票据 hdfs/cdh70150@ZORKDATA.COM
*/
public static final String ZORK_KERBEROS_PRINCIPAL = "kerberos.principal";
/**
* krb5.conf 例如: "D:\\tmp\\tmp\\yarn-conf\\krb5.conf"
*/
public static final String HADOOP_SECURITY_KRB_5_CONF = "hadoop.security.krb5.conf";
/**
* 用户名 hdfs
*/
public static final String USER_NAME = "user.name";
/**
* yarn conf dir
*/
public static final String YARN_CONF_DIR = "yarn.conf.dir";
public static final String HADOOP_SECURITY_AUTHENTICATION = "hadoop.security.authentication";
public static void login(Configuration kConfig) throws IOException {
if (kConfig.get(ZORK_KEYTAB_FILE_KEY) == null || kConfig.get(ZORK_KERBEROS_PRINCIPAL) == null) {
return;
}
System.setProperty("java.security.krb5.conf", kConfig.get(HADOOP_SECURITY_KRB_5_CONF));
System.setProperty("HADOOP_USER_NAME", kConfig.get(USER_NAME));
System.setProperty("user.name", kConfig.get(USER_NAME));
kConfig.set(HADOOP_SECURITY_AUTHENTICATION, kConfig.get(HADOOP_SECURITY_AUTHENTICATION));
UserGroupInformation.setConfiguration(kConfig);
UserGroupInformation.loginUserFromKeytab(kConfig.get(ZORK_KERBEROS_PRINCIPAL), kConfig.get(ZORK_KEYTAB_FILE_KEY));
}
}
\ No newline at end of file
import org.apache.hadoop.conf.Configuration;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.regex.Pattern;
/**
* @author xiese
* @Description 加载配置文件
* @Email xiesen310@163.com
* @Date 2020/10/15 13:13
*/
public class LoadConfig {
private static final String SEPARATOR_SLASH = "/";
/**
* 读取配置文件
*
* @param yarnConfDir yarn 目录
* @return
* @throws Exception
*/
public static Configuration getYarnConf(String yarnConfDir) throws Exception {
Configuration yarnConf = new Configuration();
try {
File dir = new File(yarnConfDir);
if (dir.exists() && dir.isDirectory()) {
File[] xmlFileList = new File(yarnConfDir).listFiles((dir1, name) -> {
return name.endsWith(".xml");
});
if (xmlFileList != null) {
for (File xmlFile : xmlFileList) {
yarnConf.addResource(xmlFile.toURI().toURL());
}
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
return yarnConf;
}
public static Properties getProperties(String path) {
Properties prop = new Properties();
InputStream stream = null;
try {
stream = new FileInputStream(new File(getResourcePath(path)));
prop.load(stream);
} catch (IOException e) {
System.err.println(String.format("读取 {} 配置文件失败", path));
e.printStackTrace();
} finally {
try {
if (stream != null) {
stream.close();
}
} catch (Exception e) {
System.err.println(String.format("读取配置文件 {} 失败", path));
e.printStackTrace();
}
}
return prop;
}
public static String getResourcePath(String name) {
if (!SEPARATOR_SLASH.equals(File.separator)) {
return name.replaceAll(Pattern.quote(File.separator), SEPARATOR_SLASH);
}
return name;
}
}
import java.lang.reflect.Array;
import java.util.Collection;
import java.util.Map;
/**
* @author xiese
* @Description TODO
* @Email xiesen310@163.com
* @Date 2020/10/15 15:45
*/
public class ParamUtil {
public static boolean check(Object... args) {
boolean flag = true;
if (args != null) {
for (int i = 0; i < args.length; i++) {
Object param = args[i];
//判断是否为Null
if (param == null) {
flag = false;
break;
}
//字符串等类型还需要做多重判断
if (param instanceof String && param.toString().trim().length() == 0) {
flag = false;
break;
}
if (param.getClass().isArray() && Array.getLength(param) == 0) {
flag = false;
break;
}
if (param instanceof Collection && ((Collection) param).isEmpty()) {
flag = false;
break;
}
if (param instanceof Map && ((Map) param).isEmpty()) {
flag = false;
break;
}
}
} else {
flag = false;
}
return flag;
}
}
#!/usr/bin/env bash
export basepath=$(cd `dirname $0`; pwd)
DEPLOY_PATH=${basepath%/*}
if [ ! -d "$DEPLOY_PATH/logs" ]; then
mkdir -p $DEPLOY_PATH/logs
fi
CONF_DIR=config.properties
nohup java -cp $DEPLOY_PATH/lib/smartdata-exec.war FlinkYarnHaTest $DEPLOY_PATH/conf/CONF_DIR >> $DEPLOY_PATH/logs/ 2>&1 &
\ No newline at end of file
yarn.conf.dir=D:\\tmp\\tmp\\yarn-conf
keytab.file=D:\\tmp\\tmp\\yarn-conf\\hdfs150.keytab
kerberos.principal=hdfs/cdh70150@ZORKDATA.COM
hadoop.security.krb5.conf=D:\\tmp\\tmp\\yarn-conf\\krb5.conf
user.name=hdfs
hadoop.security.authentication=kerberos
\ No newline at end of file
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Define some default values that can be overridden by system properties
#\u5B9A\u4E49LOG\u8F93\u51FA\u7EA7\u522B\u3001\u8F93\u51FA\u5230\u63A7\u5236\u53F0\u3001\u6587\u4EF6
log4j.rootLogger=DEBUG,STDOUT
# \u5B9A\u4E49\u63A7\u5236\u53F0 STDOUT appender
log4j.appender.STDOUT=org.apache.log4j.ConsoleAppender
log4j.appender.STDOUT.Target=System.out
#\u5B9A\u4E49\u65E5\u5FD7\u8F93\u51FA\u76EE\u7684\u5730\u4E3A\u63A7\u5236\u53F0
log4j.appender.STDOUT.layout=org.apache.log4j.PatternLayout
log4j.appender.STDOUT.layout.ConversionPattern=%d %p [%c] - %m%n
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment