摘要:本文主要向大家介绍了JAVA语言提交spark任务到yarn平台的配置讲解,通过具体的内容向大家展示,希望对大家学习JAVA语言有所帮助。
本文主要向大家介绍了JAVA语言提交spark任务到yarn平台的配置讲解,通过具体的内容向大家展示,希望对大家学习JAVA语言有所帮助。
一、背景
采用spark的方式处理,所以需要将spark的功能集成到代码,采用yarn客户端的方式管理spark任务。不需要将cdh的一些配置文件放到resource路径下,只需要配置一些配置即可,非常方便
二、任务管理架构
三、接口
1、任务提交
1. /**
2. *提交任务到yarn集群
3. *
4. *@paramconditions
5. *yarn集群,spark,hdfs具体信息,参数等
6. *@returnappid
7. */
8. publicStringsubmitSpark(YarnSubmitConditionsconditions){
9. logger.info("初始化sparkonyarn参数");
10.
11. //初始化yarn客户端
12. logger.info("初始化sparkonyarn客户端");
13. Listargs=Lists.newArrayList("--jar",conditions.getApplicationJar(),"--class",
14. conditions.getMainClass());
15. if(conditions.getOtherArgs()!=null&&conditions.getOtherArgs().size()>0){
16. for(Strings:conditions.getOtherArgs()){
17. args.add("--arg");
18. args.add(org.apache.commons.lang.StringUtils.join(newString[]{s},","));
19. }
20. }
21.
22. //identifythatyouwillbeusingSparkasYARNmode
23. System.setProperty("SPARK_YARN_MODE","true");
24. SparkConfsparkConf=newSparkConf();
25. if(org.apache.commons.lang.StringUtils.isNotEmpty(conditions.getJobName())){
26. sparkConf.setAppName(conditions.getJobName());
27. }
28.
29. sparkConf.set("spark.yarn.jars",conditions.getSparkYarnJars());
30. if(conditions.getAdditionalJars()!=null&&conditions.getAdditionalJars().length>0){
31. sparkConf.set("spark.jars",org.apache.commons.lang.StringUtils.join(conditions.getAdditionalJars(),","));
32. }
33.
34. if(conditions.getFiles()!=null&&conditions.getFiles().length>0){
35. sparkConf.set("spark.files",org.apache.commons.lang.StringUtils.join(conditions.getFiles(),","));
36. }
37. for(Map.Entrye:conditions.getSparkProperties().entrySet()){
38. sparkConf.set(e.getKey().toString(),e.getValue().toString());
39. }
40.
41. //添加这个参数,不然spark会一直请求0.0.0.0:8030,一直重试
42. sparkConf.set("yarn.resourcemanager.hostname",conditions.getYarnResourcemanagerAddress().split(":")[0]);
43. //设置为true,不删除缓存的jar包,因为现在提交yarn任务是使用的代码配置,没有配置文件,删除缓存的jar包有问题,
44. sparkConf.set("spark.yarn.preserve.staging.files","true");
45.
46. //初始化yarn的配置
47. Configurationcf=newConfiguration();
48. Stringos=System.getProperty("os.name");
49. booleancross_platform=false;
50. if(os.contains("Windows")){
51. cross_platform=true;
52. }
53. cf.setBoolean("mapreduce.app-submission.cross-platform",cross_platform);//配置使用跨平台提交任务
54. //设置yarn资源,不然会使用localhost:8032
55. cf.set("yarn.resourcemanager.address",conditions.getYarnResourcemanagerAddress());
56. //设置namenode的地址,不然jar包会分发,非常恶心
57. cf.set("fs.defaultFS",conditions.getSparkFsDefaultFS());
58.
59. ClientArgumentscArgs=newClientArguments(args.toArray(newString[args.size()]));
60. Clientclient=newClient(cArgs,cf,sparkConf);
61. logger.info("提交任务,任务名称:"+conditions.getJobName());
62.
63. try{
64.
65. ApplicationIdappId=client.submitApplication();
66.
67. returnappId.toString();
68.
69. }catch(Exceptione){
70. logger.error("提交spark任务失败",e);
71. returnnull;
72. }finally{
73. if(client!=null){
74. client.stop();
75. }
76. }
77. }
2、任务进度获取
1. /**
2. *停止spark任务
3. *
4. *@paramyarnResourcemanagerAddress
5. *yarn资源管理器地址,例如:master:8032,查看yarn集群获取具体地址
6. *@paramappIdStr
7. *需要取消的任务id
8. */
9. publicvoidkillJob(StringyarnResourcemanagerAddress,StringappIdStr){
10. logger.info("取消spark任务,任务id:"+appIdStr);
11. //初始化yarn的配置
12. Configurationcf=newConfiguration();
13. Stringos=System.getProperty("os.name");
14. booleancross_platform=false;
15. if(os.contains("Windows")){
16. cross_platform=true;
17. }
18. cf.setBoolean("mapreduce.app-submission.cross-platform",cross_platform);//配置使用跨平台提交任务
19. //设置yarn资源,不然会使用localhost:8032
20. cf.set("yarn.resourcemanager.address",yarnResourcemanagerAddress);
21.
22. //创建yarn的客户端,此类中有杀死任务的方法
23. YarnClientyarnClient=YarnClient.createYarnClient();
24. //初始化yarn的客户端
25. yarnClient.init(cf);
26. //yarn客户端启动
27. yarnClient.start();
28. try{
29. //根据应用id,杀死应用
30. yarnClient.killApplication(getAppId(appIdStr));
31. }catch(Exceptione){
32. logger.error("取消spark任务失败",e);
33. }
34. //关闭yarn客户端
35. yarnClient.stop();
36.
37. }
3、任务取消
1. /**
2. *获取spark任务状态
3. *
4. *
5. *@paramyarnResourcemanagerAddress
6. *yarn资源管理器地址,例如:master:8032,查看yarn集群获取具体地址
7. *@paramappIdStr
8. *需要取消的任务id
9. */
10. publicSparkTaskStategetStatus(StringyarnResourcemanagerAddress,StringappIdStr){
11. logger.info("获取任务状态启动,任务id:"+appIdStr);
12. //初始化yarn的配置
13. Configurationcf=newConfiguration();
14. Stringos=System.getProperty("os.name");
15. booleancross_platform=false;
16. if(os.contains("Windows")){
17. cross_platform=true;
18. }
19. cf.setBoolean("mapreduce.app-submission.cross-platform",cross_platform);//配置使用跨平台提交任务
20. //设置yarn资源,不然会使用localhost:8032
21. cf.set("yarn.resourcemanager.address",yarnResourcemanagerAddress);
22. logger.info("获取任务状态,任务id:"+appIdStr);
23.
24. SparkTaskStatetaskState=newSparkTaskState();
25. //设置任务id
26. taskState.setAppId(appIdStr);
27. YarnClientyarnClient=YarnClient.createYarnClient();
28. //初始化yarn的客户端
29. yarnClient.init(cf);
30. //yarn客户端启动
31. yarnClient.start();
32. ApplicationReportreport=null;
33. try{
34. report=yarnClient.getApplicationReport(getAppId(appIdStr));
35. }catch(Exceptione){
36. logger.error("获取spark任务状态失败");
37. }
38.
39. if(report!=null){
40. YarnApplicationStatestate=report.getYarnApplicationState();
41. taskState.setState(state.name());
42. //任务执行进度
43. floatprogress=report.getProgress();
44. taskState.setProgress(progress);
45. //最终状态
46. FinalApplicationStatusstatus=report.getFinalApplicationStatus();
47. taskState.setFinalStatus(status.name());
48. }else{
49. taskState.setState(ConstParam.SPARK_FAILED);
50. taskState.setProgress(0.0f);
51. taskState.setFinalStatus(ConstParam.SPARK_FAILED);
52. }
53.
54. //关闭yarn客户端
55. yarnClient.stop();
56. logger.info("获取任务状态结束,任务状态:"+JSON.toJSONString(taskState));
57. returntaskState;
58. }
四、yarn参数调节
1、可分配给容器的物理内存数量,一个nodemanage分配的内存,如果机器内存是128g,尽量分配2/3
yarn.nodemanager.resource.memory-mb:80g
2、可以为容器分配的虚拟 CPU 内核的数量。该参数在 CDH 4.4 以前版本中无效。一个nodemanage分配的核数。如果机器是64和,尽量分配2/3.
yarn.nodemanager.resource.cpu-vcores:40
3、Java 进程堆栈内存的最大大小(以字节为单位)。已传递到 Java -Xmx。
ResourceManager 的 Java 堆栈大小(字节)
ResourceManager Default Group
B千字节兆字节吉字节
本文由职坐标整理并发布,希望对同学们有所帮助。了解更多详情请关注编程语言JAVA频道!
您输入的评论内容中包含违禁敏感词
我知道了
请输入正确的手机号码
请输入正确的验证码
您今天的短信下发次数太多了,明天再试试吧!
我们会在第一时间安排职业规划师联系您!
您也可以联系我们的职业规划师咨询:
版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
沪公网安备 31011502005948号