查看原文
其他

Yarn运行中的任务如何终止?

点击上方蓝色字体,选择“设为星标”

回复”资源“获取更多惊喜

前言

我们的作业是使用yarn来调度的,那么肯定就需要使用相关的命令来进行管理,简单的有查询任务列表和killed某一个正在运行中的任务。

一、Yarn常用命令

以下是基于yarn客户端使用命令行的方式进行:

yarn application -list 打印任务信息
yarn application -status application_1436784252938_0022 查看任务状态
yarn applicaton -kill applicationId kill 任务
二、REST API
  1. 发送PUT请求

// 基于Hutool工具类
String appId = "application_1612256232497_19182";
String requestUrl = "http://uat02:8088/ws/v1/cluster/apps/" + appId + "/state";
JSONObject param = JSONUtil.createObj();
param.putOpt("state", "KILLED");
String resultJson = HttpRequest.put(requestUrl).form(param.toString()).execute().body();
JSONObject jsonObject = JSONUtil.parseObj(resultJson);
System.out.println(jsonObject.toJSONString(0));

但是、遗憾的是报如下问题:

{"RemoteException":{"exception":"WebApplicationException","javaClassName":"javax.ws.rs.WebApplicationException"}}

开始我以为是kerberos权限的相关问题,于是加入如下代码:

1. 引入hadoop-common包
2. 如下代码示例用以kerberos认证:
String confPath = "/tmp/krb5.conf";
System.setProperty("java.security.krb5.conf", confPath);
Configuration conf = new Configuration();
conf.set("hadoop.security.authentication", "Kerberos");
UserGroupInformation.setConfiguration(conf);
String keyPath = "/tmp/xx.keytab";
UserGroupInformation.loginUserFromKeytab("xx", keyPath);

如想要开启请按照如下说明方式:CM -> HDFS service -> search for and enable “Enable Kerberos Authentication for HTTP Web-Consoles”, deploy client configuration, restart HDFS and YARN services 所以上述开关没有打开,无论如何都不可能使用http方式去终止任务的,所以只能在部署yarn的客户端进行yarn application -kill job了

三、YarnClient API

当我在使用hadoop yarn 版本为2.7.1的时候总是可以krb认证成功但却会在连接yarn的时候被拒绝,百思不得解,如下报错:注意:本地调试是OK,但是打包后运行就会出错,其中krb5.conf 和keytab文件已经指定了绝对路径。

在经历了一个下午的折腾之后发现,升级jar版本为hadoop 3.0.0 后在再次尝试终于OK。在这里请和生产的hadoop版本保持一致。否则可能回出现一些问题导致认证失败。

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.0.0</version>
</dependency>

于是、我们就可以使用yarn api 来进行各种操作了,简单的示例如下:

if ("dev".equals(env)) {
krb5 = "/Users/tandemac/conf/krb5.conf";
keyTab = "/Users/tandemac/conf/app_prd.keytab";
hdfsConf = ConfigUtil.initConfiguration("/Users/tandemac/conf");
} else {
keyTab = "/data/kerberos/app_prd.keytab";
krb5 = "/etc/krb5.conf";
hdfsConf = ConfigUtil.initConfiguration("/etc/hadoop/conf");
}
ConfigUtil.initKerberosConf(hdfsConf, krb5, "app_prd", keyTab);
YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(hdfsConf);
yarnClient.start();
List<ApplicationReport> yarnClientApplications = yarnClient.getApplications();
yarnClientApplications.stream()
.filter(app -> app.getYarnApplicationState().equals(YarnApplicationState.RUNNING))
.filter(app -> !app.getQueue().contains("priority")) //过滤含有高优先级队列的任务
.forEach(application -> {
String name = application.getName();
String queue = application.getQueue();
String yarnApplicationState = application.getYarnApplicationState().name();
int memory = application.getApplicationResourceUsageReport().getUsedResources().getMemory();
int virtualCores = application.getApplicationResourceUsageReport().getUsedResources().getVirtualCores();

log.info("Kill job : {}", sb.toString());
try {
yarnClient.killApplication(application.getApplicationId());
} catch (YarnException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}

ConfigUtil配置管理类

import java.io.File;
import java.io.IOException;

import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;

@Slf4j
public class ConfigUtil {
/**
* 初始化HDFS Configuration
*
* @return configuration
*/
public static Configuration initConfiguration(String confPath) {
Configuration configuration = new Configuration();
configuration.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
configuration.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
configuration.addResource(new Path(new File(confPath + File.separator + "core-site.xml").toURI()));
configuration.addResource(new Path(new File(confPath + File.separator + "hdfs-site.xml").toURI()));
configuration.addResource(new Path(new File(confPath + File.separator + "yarn-site.xml").toURI()));
return configuration;
}

/**
* 初始化Kerberos环境
*
* @param conf
* @param krb5ConfFilePath
* @param kerberosUser
* @param keytabFilePath
*/
public static void initKerberosConf(Configuration conf, String krb5ConfFilePath, String kerberosUser, String keytabFilePath) {
System.setProperty("java.security.krb5.conf", krb5ConfFilePath);
System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
System.setProperty("sun.security.krb5.debug", "false");
try {
UserGroupInformation.setConfiguration(conf);
UserGroupInformation.loginUserFromKeytab(kerberosUser, keytabFilePath);
} catch (IOException e) {
e.printStackTrace();
}
}
}

参考: 

https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/ResourceManagerRest.html

https://docs.cloudera.com/cdp-private-cloud-base/7.1.5/security-kerberos-authentication/topics/cm-security-kerberos-enabling-step9-web-authentication.html 

https://s905060.gitbooks.io/site-reliability-engineer-handbook/content/hadoop_how_to_kill_all_the_specified_user_job.html 

https://stackoverflow.com/questions/63799812/how-to-kill-a-spark-application-using-yarn-resourcemanager-rest-api https://zhuanlan.zhihu.com/p/100662052




数据仓库中的增量&全量


Flink结合Kafka实时写入Iceberg实践笔记


浅谈数仓建模及其方法论

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存