JAVA语言提交spark任务到yarn平台的配置讲解
小标 2018-12-04 来源 : 阅读 2621 评论 0

摘要:本文主要向大家介绍了JAVA语言提交spark任务到yarn平台的配置讲解,通过具体的内容向大家展示,希望对大家学习JAVA语言有所帮助。

本文主要向大家介绍了JAVA语言提交spark任务到yarn平台的配置讲解,通过具体的内容向大家展示,希望对大家学习JAVA语言有所帮助。


一、背景


采用spark的方式处理,所以需要将spark的功能集成到代码,采用yarn客户端的方式管理spark任务。不需要将cdh的一些配置文件放到resource路径下,只需要配置一些配置即可,非常方便


二、任务管理架构



三、接口


1、任务提交


1. /**


2. *提交任务到yarn集群


3. *


4. *@paramconditions


5. *yarn集群,spark,hdfs具体信息,参数等


6. *@returnappid


7. */


8. publicStringsubmitSpark(YarnSubmitConditionsconditions){


9. logger.info("初始化sparkonyarn参数");


10.


11. //初始化yarn客户端


12. logger.info("初始化sparkonyarn客户端");


13. Listargs=Lists.newArrayList("--jar",conditions.getApplicationJar(),"--class",


14. conditions.getMainClass());


15. if(conditions.getOtherArgs()!=null&&conditions.getOtherArgs().size()>0){


16. for(Strings:conditions.getOtherArgs()){


17. args.add("--arg");


18. args.add(org.apache.commons.lang.StringUtils.join(newString[]{s},","));


19. }


20. }


21.


22. //identifythatyouwillbeusingSparkasYARNmode


23. System.setProperty("SPARK_YARN_MODE","true");


24. SparkConfsparkConf=newSparkConf();


25. if(org.apache.commons.lang.StringUtils.isNotEmpty(conditions.getJobName())){


26. sparkConf.setAppName(conditions.getJobName());


27. }


28.


29. sparkConf.set("spark.yarn.jars",conditions.getSparkYarnJars());


30. if(conditions.getAdditionalJars()!=null&&conditions.getAdditionalJars().length>0){


31. sparkConf.set("spark.jars",org.apache.commons.lang.StringUtils.join(conditions.getAdditionalJars(),","));


32. }


33.


34. if(conditions.getFiles()!=null&&conditions.getFiles().length>0){


35. sparkConf.set("spark.files",org.apache.commons.lang.StringUtils.join(conditions.getFiles(),","));


36. }


37. for(Map.Entrye:conditions.getSparkProperties().entrySet()){


38. sparkConf.set(e.getKey().toString(),e.getValue().toString());


39. }


40.


41. //添加这个参数,不然spark会一直请求0.0.0.0:8030,一直重试


42. sparkConf.set("yarn.resourcemanager.hostname",conditions.getYarnResourcemanagerAddress().split(":")[0]);


43. //设置为true,不删除缓存的jar包,因为现在提交yarn任务是使用的代码配置,没有配置文件,删除缓存的jar包有问题,


44. sparkConf.set("spark.yarn.preserve.staging.files","true");


45.


46. //初始化yarn的配置


47. Configurationcf=newConfiguration();


48. Stringos=System.getProperty("os.name");


49. booleancross_platform=false;


50. if(os.contains("Windows")){


51. cross_platform=true;


52. }


53. cf.setBoolean("mapreduce.app-submission.cross-platform",cross_platform);//配置使用跨平台提交任务


54. //设置yarn资源,不然会使用localhost:8032


55. cf.set("yarn.resourcemanager.address",conditions.getYarnResourcemanagerAddress());


56. //设置namenode的地址,不然jar包会分发,非常恶心


57. cf.set("fs.defaultFS",conditions.getSparkFsDefaultFS());


58.


59. ClientArgumentscArgs=newClientArguments(args.toArray(newString[args.size()]));


60. Clientclient=newClient(cArgs,cf,sparkConf);


61. logger.info("提交任务,任务名称:"+conditions.getJobName());


62.


63. try{


64.


65. ApplicationIdappId=client.submitApplication();


66.


67. returnappId.toString();


68.


69. }catch(Exceptione){


70. logger.error("提交spark任务失败",e);


71. returnnull;


72. }finally{


73. if(client!=null){


74. client.stop();


75. }


76. }


77. }


2、任务进度获取


1. /**


2. *停止spark任务


3. *


4. *@paramyarnResourcemanagerAddress


5. *yarn资源管理器地址,例如:master:8032,查看yarn集群获取具体地址


6. *@paramappIdStr


7. *需要取消的任务id


8. */


9. publicvoidkillJob(StringyarnResourcemanagerAddress,StringappIdStr){


10. logger.info("取消spark任务,任务id:"+appIdStr);


11. //初始化yarn的配置


12. Configurationcf=newConfiguration();


13. Stringos=System.getProperty("os.name");


14. booleancross_platform=false;


15. if(os.contains("Windows")){


16. cross_platform=true;


17. }


18. cf.setBoolean("mapreduce.app-submission.cross-platform",cross_platform);//配置使用跨平台提交任务


19. //设置yarn资源,不然会使用localhost:8032


20. cf.set("yarn.resourcemanager.address",yarnResourcemanagerAddress);


21.


22. //创建yarn的客户端,此类中有杀死任务的方法


23. YarnClientyarnClient=YarnClient.createYarnClient();


24. //初始化yarn的客户端


25. yarnClient.init(cf);


26. //yarn客户端启动


27. yarnClient.start();


28. try{


29. //根据应用id,杀死应用


30. yarnClient.killApplication(getAppId(appIdStr));


31. }catch(Exceptione){


32. logger.error("取消spark任务失败",e);


33. }


34. //关闭yarn客户端


35. yarnClient.stop();


36.


37. }


3、任务取消


1. /**


2. *获取spark任务状态


3. *


4. *


5. *@paramyarnResourcemanagerAddress


6. *yarn资源管理器地址,例如:master:8032,查看yarn集群获取具体地址


7. *@paramappIdStr


8. *需要取消的任务id


9. */


10. publicSparkTaskStategetStatus(StringyarnResourcemanagerAddress,StringappIdStr){


11. logger.info("获取任务状态启动,任务id:"+appIdStr);


12. //初始化yarn的配置


13. Configurationcf=newConfiguration();


14. Stringos=System.getProperty("os.name");


15. booleancross_platform=false;


16. if(os.contains("Windows")){


17. cross_platform=true;


18. }


19. cf.setBoolean("mapreduce.app-submission.cross-platform",cross_platform);//配置使用跨平台提交任务


20. //设置yarn资源,不然会使用localhost:8032


21. cf.set("yarn.resourcemanager.address",yarnResourcemanagerAddress);


22. logger.info("获取任务状态,任务id:"+appIdStr);


23.


24. SparkTaskStatetaskState=newSparkTaskState();


25. //设置任务id


26. taskState.setAppId(appIdStr);


27. YarnClientyarnClient=YarnClient.createYarnClient();


28. //初始化yarn的客户端


29. yarnClient.init(cf);


30. //yarn客户端启动


31. yarnClient.start();


32. ApplicationReportreport=null;


33. try{


34. report=yarnClient.getApplicationReport(getAppId(appIdStr));


35. }catch(Exceptione){


36. logger.error("获取spark任务状态失败");


37. }


38.


39. if(report!=null){


40. YarnApplicationStatestate=report.getYarnApplicationState();


41. taskState.setState(state.name());


42. //任务执行进度


43. floatprogress=report.getProgress();


44. taskState.setProgress(progress);


45. //最终状态


46. FinalApplicationStatusstatus=report.getFinalApplicationStatus();


47. taskState.setFinalStatus(status.name());


48. }else{


49. taskState.setState(ConstParam.SPARK_FAILED);


50. taskState.setProgress(0.0f);


51. taskState.setFinalStatus(ConstParam.SPARK_FAILED);


52. }


53.


54. //关闭yarn客户端


55. yarnClient.stop();


56. logger.info("获取任务状态结束,任务状态:"+JSON.toJSONString(taskState));


57. returntaskState;


58. }


四、yarn参数调节


1、可分配给容器的物理内存数量,一个nodemanage分配的内存,如果机器内存是128g,尽量分配2/3


yarn.nodemanager.resource.memory-mb:80g


2、可以为容器分配的虚拟 CPU 内核的数量。该参数在 CDH 4.4 以前版本中无效。一个nodemanage分配的核数。如果机器是64和,尽量分配2/3.


yarn.nodemanager.resource.cpu-vcores:40


3、Java 进程堆栈内存的最大大小(以字节为单位)。已传递到 Java -Xmx。


ResourceManager 的 Java 堆栈大小(字节)


ResourceManager Default Group


B千字节兆字节吉字节


          

本文由职坐标整理并发布,希望对同学们有所帮助。了解更多详情请关注编程语言JAVA频道!


本文由 @小标 发布于职坐标。未经许可,禁止转载。
喜欢 | 1 不喜欢 | 1
看完这篇文章有何感觉?已经有2人表态,50%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved

208小时内训课程