×

apachespark官网

apachespark官网(如何使用spark将程序提交任务到yarn-Spark-about云开发)

admin admin 发表于2023-12-24 12:35:02 浏览26 评论0

抢沙发发表评论

大家好,关于apachespark官网很多朋友都还不太明白,不过没关系,因为今天小编就来为大家分享关于如何使用spark将程序提交任务到yarn-Spark-about云开发的知识点,相信应该可以解决大家的一些困惑和问题,如果碰巧可以解决您的问题,还望关注下本站哦,希望对各位有所帮助!

本文目录

如何使用spark将程序提交任务到yarn-Spark-about云开发

使用脚本提交1.使用spark脚本提交到yarn,首先需要将spark所在的主机和hadoop集群之间hosts相互配置(也就是把spark主机的ip和主机名配置到hadoop所有节点的/etc/hosts里面,再把集群所有节点的ip和主机名配置到spark所在主机的/etc/hosts里面)。2.然后需要把hadoop目录etc/hadoop下面的*-sit.xml复制到${SPARK_HOME}的conf下面.3.确保hadoop集群配置了 HADOOP_CONF_DIR or YARN_CONF_DIR 1.yarn-standalone方式提交到yarn在${SPARK_HOME}下面执行:SPARK_JAR=./assembly/target/scala-2.10.4/spark-assembly-0.9.0-incubating-hadoop2.2.0.jar \ ./bin/spark-class org.apache.spark.deploy.yarn.Client \ --jar ./examples/target/scala-2.10/spark-examples_2.10-assembly-0.9.0-incubating.jar \ --class org.apache.spark.examples.SparkPi \ --args yarn-standalone \ --num-workers 3 \ --master-memory 2g \ --worker-memory 2g \ --worker-cores 1复制代码2. yarn-client 方式提交到yarn在${SPARK_HOME}下面执行:SPARK_JAR=./assembly/target/scala-2.10.4/spark-assembly-0.9.0-incubating-hadoop2.2.0.jar \SPARK_YARN_APP_JAR=examples/target/scala-2.10/spark-examples_2.10-assembly-0.9.0-incubating.jar \./bin/run-example org.apache.spark.examples.SparkPi yarn-client复制代码二、使用程序提交1.必须使用linux主机提交任务,使用windows提交到linux hadoop集群会报org.apache.hadoop.util.Shell$ExitCodeException: /bin/bash: 第 0 行: fg: 无任务控制复制代码错误。hadoop2.2.0不支持windows提交到linux hadoop集群,网上搜索发现这是hadoop的bug。2.提交任务的主机和hadoop集群主机名需要在hosts相互配置。3.因为使用程序提交是使用yarn-client方式,所以必须像上面脚本那样设置环境变量SPARK_JAR 和 SPARK_YARN_APP_JAR比如我的设置为向提交任务主机~/.bashrc里面添加:export SPARK_JAR=file:///home/ndyc/software/sparkTest/lib/spark-assembly-0.9.0-incubating-hadoop2.2.0.jar export SPARK_YARN_APP_JAR=file:///home/ndyc/software/sparkTest/ndspark-0.0.1.jar复制代码file:// 表明是本地文件,如果使用hdfs上的文件将file://替换为hdfs://主机名:端口号。建议使用hdfs来引用 spark-assembly-0.9.0-incubating-hadoop2.2.0.jar,因为这个文件比较大,如果使用file://每次提交任务都需要上传这个jar到各个集群,很慢。其中SPARK_JAR是${SPARK_HOME}/assembly/target/scala-2.10.4/spark-assembly-0.9.0-incubating-hadoop2.2.0.jarSPARK_YARN_APP_JAR是自己程序打的jar包,包含自己的测试程序。4.程序中加入hadoop、yarn、依赖。注意,如果引入了hbase依赖,需要这样配置《dependency》 《groupId》org.apache.hbase《/groupId》 《artifactId》hbase-thrift《/artifactId》 《version》${hbase.version}《/version》 《exclusions》 《exclusion》 《groupId》org.apache.hadoop《/groupId》 《artifactId》hadoop-mapreduce-client-jobclient《/artifactId》 《/exclusion》 《exclusion》 《groupId》org.apache.hadoop《/groupId》 《artifactId》hadoop-client《/artifactId》 《/exclusion》 《/exclusions》 《/dependency》复制代码然后再加入《dependency》 《groupId》org.ow2.asm《/groupId》 《artifactId》asm-all《/artifactId》 《version》4.0《/version》 《/dependency》复制代码否则会报错:IncompatibleClassChangeError has interface org.objectweb.asm.ClassVisitor as super class复制代码异常是因为Hbase jar hadoop-mapreduce-client-jobclient.jar里面使用到了asm3.1 而spark需要的是asm-all-4.0.jar5. hadoop conf下的*-site.xml需要复制到提交主机的classpath下,或者说maven项目resources下面。6.编写程序代码示例:package com.sdyc.ndspark.sys;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.PairFunction;import scala.Tuple2;import java.util.ArrayList;import java.util.List;/*** Created with IntelliJ IDEA.* User: zarchary* Date: 14-1-19* Time: 下午6:23* To change this template use File | Settings | File Templates.*/public class ListTest {public static void main(String args) throws Exception { SparkConf sparkConf = new SparkConf(); sparkConf.setAppName("listTest"); //使用yarn模式提交 sparkConf.setMaster("yarn-client");JavaSparkContext sc = new JavaSparkContext(sparkConf);List《String》 listA = new ArrayList《String》();listA.add("a"); listA.add("a"); listA.add("b"); listA.add("b"); listA.add("b"); listA.add("c"); listA.add("d");JavaRDD《String》 letterA = sc.parallelize(listA);JavaPairRDD《String, Integer》 letterB = letterA.map(new PairFunction《String, String, Integer》() { @Override public Tuple2《String, Integer》 call(String s) throws Exception { return new Tuple2《String, Integer》(s, 1); } });letterB = letterB.reduceByKey(new Function2《Integer, Integer, Integer》() { public Integer call(Integer i1, Integer i2) { return i1 + i2; } });//颠倒顺序 JavaPairRDD《Integer, String》 letterC = letterB.map(new PairFunction《Tuple2《String, Integer》, Integer, String》() { @Override public Tuple2《Integer, String》 call(Tuple2《String, Integer》 stringIntegerTuple2) throws Exception { return new Tuple2《Integer, String》(stringIntegerTuple2._2, stringIntegerTuple2._1); } });JavaPairRDD《Integer, List《String》》 letterD = letterC.groupByKey();// //false说明是降序 JavaPairRDD《Integer, List《String》》 letterE = letterD.sortByKey(false);System.out.println("========" + letterE.collect());System.exit(0); }}复制代码代码中master设置为yar-client表明了是使用提交到yarn.关于spark需要依赖的jar的配置可以参考我的博客spark安装和远程调用。以上弄完之后就可以运行程序了。运行后会看到yarn的ui界面出现:正在执行的过程中会发现hadoop yarn 有的nodemanage会有下面这个进程:13247 org.apache.spark.deploy.yarn.WorkerLauncher复制代码这是spark的工作进程。如果接收到异常为:WARN YarnClientClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory复制代码出现这个错误是因为提交任务的节点不能和spark工作节点交互,因为提交完任务后提交任务节点上会起一个进程,展示任务进度,大多端口为4044,工作节点需要反馈进度给该该端口,所以如果主机名或者IP在hosts中配置不正确,就会报 WARN YarnClientClusterScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory错误。 所以请检查主机名和IP是否配置正确。我自己的理解为,程序提交任务到yarn后,会上传SPARK_JAR和SPARK_YARN_APP_JAR到hadoop节点, yarn根据任务情况来分配资源,在nodemanage节点上来启动org.apache.spark.deploy.yarn.WorkerLauncher工作节点来执行spark任务,执行完成后退出。

Spark通信框架Spark Network Common

一直以来,基于Akka实现的RPC通信框架是Spark引以为豪的主要特性,也是与Hadoop等分布式计算框架对比过程中一大亮点,但是时代和技术都在演化,从Spark1.3.1版本开始, 为了解决大块数据(如Shuffle)的传输问题 ,Spark引入了Netty通信框架,到了1.6.0版本,Netty完全取代了Akka,承担Spark内部所有的RPC通信以及数据流传输。

JAVA IO也经历了几次演化,从最早的BIO(阻塞式/非阻塞IO),到1.4版本的NIO(IO复用),到1.7版本的NIO2.0/AIO(异步IO)。 基于早期BIO来实现高并发网络服务器都是依赖多线程来实现,但是线程开销较大,BIO的瓶颈明显,NIO的出现解决了这一大难题, 基于IO复用解决了IO高并发 。 但是NIO有也有几个缺点:

因为这几个原因,促使了很多JAVA-IO通信框架的出现,Netty就是其中一员,它也因为高度的稳定性,功能性,性能等特性,成为Java开发的首选

首先是NIO的上层封装,Netty提供了NioEventLoopGroup / NioSocketChannel / NioServerSocketChannel的组合来完成实际IO操作,继而在此之上实现数据流Pipeline以及EventLoop线程池等功能。

另外它又重写了NIO,JDK-NIO底层是基于Epoll的LT模式来实现,而Netty是基于Epoll的ET模式实现的一组IO操作EpollEventLoopGroup / EpollSocketChannel / EpollServerSocketChannel Netty对两种实现进行完美的封装,可以根据业务的需求来选择不同的实现

从Akka出现背景来说,它是基于Actor的RPC通信系统,它的核心概念也是Message,它是基于协程的,性能不容置疑;基于scala的偏函数,易用性也没有话说,但是它毕竟只是RPC通信,无法适用大的package/stream的数据传输,这也是Spark早期引入Netty的原因。

首先不容置疑的是Akka可以做到的,Netty也可以做到,但是Netty可以做到,Akka却无法做到。原因是啥?在软件栈中,Akka相比Netty要Higher一点,它专门针对RPC做了很多事情,而Netty相比更加基础一点, 可以为不同的应用层通信协议(RPC,FTP,HTTP等)提供支持 ,在早期的Akka版本,底层的NIO通信就是用的Netty。

其次一个优雅的工程师是不会允许一个系统中容纳两套通信框架!最后,虽然Netty没有Akka协程级的性能优势,但是Netty内部高效的Reactor线程模型,无锁化的串行设计,高效的序列化,零拷贝,内存池等特性也保证了Netty不会存在性能问题。 那么Spark是怎么用Netty来取代Akka呢?一句话,利用偏函数的特性,基于Netty“仿造”出一个简约版本的Actor模型。

对于Network通信,不管传输的是序列化后的对象还是文件,在网络上表现的都是字节流。在传统IO中,字节流表示为Stream;在NIO中,字节流表示为ByteBuffer;在Netty中字节流表示为ByteBuff或FileRegion;在Spark中,针对Byte也做了一层包装,支持对Byte和文件流进行处理,即ManagedBuffer; ManagedBuffer包含了三个函数createInputStream(),nioByteBuffer(),convertToNetty()来对Buffer进行“类型转换”,分别获取stream,ByteBuffer,ByteBuff或FileRegion;NioManagedBuffer / NettyManagedBuffer / FileSegmentManagedBuffer也是针对性提供了具体的实现。

更好的理解ManagedBuffer :比如Shuffle BlockManager模块需要在内存中维护本地executor生成的shuffle-map输出的文件引用,从而可以提供给shuffleFetch进行远程读取,此时文件表示为FileSegmentManagedBuffer,shuffleFetch远程调用FileSegmentManagedBuffer.nioByteBuffer / createInputStream函数从文件中读取为Bytes,并进行后面的网络传输。如果已经在内存中bytes就更好理解了,比如将一个字符数组表示为NettyManagedBuffer。

协议是应用层通信的基础,它提供了应用层通信的数据表示,以及编码和解码的能力。在Spark Network Common中,继承AKKA中的定义,将协议命名为Message,它继承Encodable,提供了encode的能力。

Message根据请求响应可以划分为RequestMessage和ResponseMessage两种;对于Response,根据处理结果,可以划分为Failure和Success两种类型;根据功能的不同,主要划分为Stream,ChunkFetch,Rpc。

Server构建在Netty之上,它提供两种模型NIO和Epoll,可以通过参数(spark..io.mode)进行配置,最基础的module就是shuffle,不同的IOMode选型,对应了Netty底层不同的实现,Server的Init过程中,最重要的步骤就是根据不同的IOModel完成EventLoop和Pipeline的构造

其中,MessageEncoder/Decoder针对网络包到Message的编码和解码,而最为核心就TransportRequestHandler,它封装了对所有请求/响应的处理;

TransportChannelHandler内部实现也很简单,它封装了responseHandler和requestHandler,当从Netty中读取一条Message以后,根据判断路由给相应的responseHandler和requestHandler。

Sever提供的RPC,ChunkFecth,Stream的功能都是依赖TransportRequestHandler来实现的;从原理上来说,RPC与ChunkFecth / Stream还是有很大不同的,其中RPC对于TransportRequestHandler来说是功能依赖,而ChunkFecth / Stream对于TransportRequestHandler来说只是数据依赖。

怎么理解?即TransportRequestHandler已经提供了ChunkFecth / Stream的实现,只需要在构造的时候,向TransportRequestHandler提供一个streamManager,告诉RequestHandler从哪里可以读取到Chunk或者Stream。而RPC需要向TransportRequestHandler注册一个rpcHandler,针对每个RPC接口进行功能实现,同时RPC与ChunkFecth / Stream都会有同一个streamManager的依赖,因此注入到TransportRequestHandler中的streamManager也是依赖rpcHandler来实现,即rpcHandler中提供了RPC功能实现和streamManager的数据依赖。

Server是通过监听一个端口,注入rpcHandler和streamManager从而对外提供RPC,ChunkFecth,Stream的服务,而Client即为一个客户端类,通过该类,可以将一个streamId / chunkIndex对应的ChunkFetch请求,streamId对应的Stream请求,以及一个RPC数据包对应的RPC请求发送到服务端,并监听和处理来自服务端的响应;其中最重要的两个类即为TransportClient和TransportResponseHandler分别为上述的“客户端类”和“监听和处理来自服务端的响应"。

那么TransportClient和TransportResponseHandler是怎么配合一起完成Client的工作呢? 由TransportClient将用户的RPC,ChunkFecth,Stream的请求进行打包并发送到Server端,同时将用户提供的回调函数注册到TransportResponseHandler,TransportResponseHandler是TransportChannelHandler的一部分,在TransportChannelHandler接收到数据包,并判断为响应包以后,将包数据路由到TransportResponseHandler中,在TransportResponseHandler中通过注册的回调函数,将响应包的数据返回给客户端

无论是BlockTransfer还是ShuffleFetch都需要跨executor的数据传输,在每一个executor里面都需要运行一个Server线程(后面也会分析到,对于Shuffle也可能是一个独立的ShuffleServer进程存在)来提供对Block数据的远程读写服务

在每个Executor里面,都有一个BlockManager模块,它提供了对当前Executor所有的Block的“本地管理”,并对进程内其他模块暴露getBlockData(blockId: BlockId): ManagedBuffer的Block读取接口,但是这里GetBlockData仅仅是提供本地的管理功能,对于跨远程的Block传输,则由NettyBlockTransferService提供服务。

NettyBlockTransferService本身即是Server,为其他其他远程Executor提供Block的读取功能,同时它即为Client,为本地其他模块暴露fetchBlocks的接口,支持通过host/port拉取任何Executor上的一组的Blocks。 源码位置 spark-core: org.apache.spark.network.netty

NettyBlockTransferService作为一个Server,与Executor或Driver里面其他的服务一样,在进程启动时,由SparkEnv初始化构造并启动服务,在整个运行时的一部分。

一个Server的构造依赖RpcHandler提供RPC的功能注入以及提供streamManager的数据注入。对于NettyBlockTransferService,该RpcHandler即为NettyBlockRpcServer,在构造的过程中,需要与本地的BlockManager进行管理,从而支持对外提供本地BlockMananger中管理的数据

RpcHandler提供RPC的功能注入 在这里还是属于比较“简陋的”,毕竟他是属于数据传输模块,Server中提供的chunkFetch和stream已经足够满足他的功能需要,那现在问题就是怎么从streamManager中读取数据来提供给chunkFetch和stream进行使用呢? 就是NettyBlockRpcServer作为RpcHandler提供的一个Rpc接口之一:OpenBlocks,它接受由Client提供一个Blockids列表,Server根据该BlockIds从BlockManager获取到相应的数据并注册到streamManager中,同时返回一个StreamID,后续Client即可以使用该StreamID发起ChunkFetch的操作。

从NettyBlockTransferService作为一个Server,我们基本可以推测NettyBlockTransferService作为一个Client支持fetchBlocks的功能的基本方法:

同时,为了提高服务端稳定性,针对fetchBlocks操作NettyBlockTransferService提供了非重试版本和重试版本的BlockFetcher,分别为OneForOneBlockFetcher和RetryingBlockFetcher,通过参数(spark..io.maxRetries)进行配置,默认是重试3次

在Spark,Block有各种类型,可以是ShuffleBlock,也可以是BroadcastBlock等等,对于ShuffleBlock的Fetch,除了由Executor内部的NettyBlockTransferService提供服务以外,也可以由外部的ShuffleService来充当Server的功能,并由专门的ExternalShuffleClient来与其进行交互,从而获取到相应Block数据。功能的原理和实现,基本一致,但是问题来了, 为什么需要一个专门的ShuffleService服务呢? 主要原因还是为了做到任务隔离,即减轻因为fetch带来对Executor的压力,让其专心的进行数据的计算。

在目前Spark中,也提供了这样的一个AuxiliaryService:YarnShuffleService,但是对于Spark不是必须的,如果你考虑到需要“ 通过减轻因为fetch带来对Executor的压力 ”,那么就可以尝试尝试。

同时,如果启用了外部的ShuffleService,对于shuffleClient也不是使用上面的NettyBlockTransferService,而是专门的ExternalShuffleClient,功能逻辑基本一致!

Akka的通信模型是基于Actor,一个Actor可以理解为一个Service服务对象,它可以针对相应的RPC请求进行处理,如下所示,定义了一个最为基本的Actor:

Actor内部只有唯一一个变量(当然也可以理解为函数了),即Receive,它为一个偏函数,通过case语句可以针对Any信息可以进行相应的处理,这里Any消息在实际项目中就是消息包。

另外一个很重要的概念就是ActorSystem,它是一个Actor的容器,多个Actor可以通过name-》Actor的注册到Actor中,在ActorSystem中可以根据请求不同将请求路由给相应的Actor。ActorSystem和一组Actor构成一个完整的Server端,此时客户端通过host:port与ActorSystem建立连接,通过指定name就可以相应的Actor进行通信,这里客户端就是ActorRef。所有Akka整个RPC通信系列是由Actor,ActorRef,ActorSystem组成。

Spark基于这个思想在上述的Network的基础上实现一套自己的RPC Actor模型,从而取代Akka。其中RpcEndpoint对应Actor,RpcEndpointRef对应ActorRef,RpcEnv即对应了ActorSystem。

RpcEndpoint与Actor一样,不同RPC Server可以根据业务需要指定相应receive/receiveAndReply的实现,在Spark内部现在有N多个这样的Actor,比如Executor就是一个Actor,它处理来自Driver的LaunchTask/KillTask等消息。

RpcEnv相对于ActorSystem:

RpcEndpointRef即为与相应Endpoint通信的引用,它对外暴露了send/ask等接口,实现将一个Message发送到Endpoint中。

这就是新版本的RPC框架的基本功能,它的实现基本上与Akka无缝对接,业务的迁移的功能很小,目前基本上都全部迁移完了。

RpcEnv不仅从外部接口与Akka基本一致,在内部的实现上,也基本差不多,都是按照MailBox的设计思路来实现的;

RpcEnv即充当着Server,同时也为Client内部实现。 当作为Server ,RpcEnv会初始化一个Server,并注册NettyRpcHandler。RpcHandler的receive接口负责对每一个请求进行处理,一般情况下,简单业务可以在RpcHandler直接完成请求的处理,但是考虑一个RpcEnv的Server上会挂载了很多个RpcEndpoint,每个RpcEndpoint的RPC请求频率不可控,因此需要对一定的分发机制和队列来维护这些请求,其中Dispatcher为分发器,InBox即为请求队列;

在将RpcEndpoint注册到RpcEnv过程中,也间接的将RpcEnv注册到Dispatcher分发器中,Dispatcher针对每个RpcEndpoint维护一个InBox,在Dispatcher维持一个线程池(线程池大小默认为系统可用的核数,当然也可以通过spark.rpc.netty.dispatcher.numThreads进行配置),线程针对每个InBox里面的请求进行处理。当然实际的处理过程是由RpcEndpoint来完成。

其次RpcEnv也完成Client的功能实现 ,RpcEndpointRef是以RpcEndpoint为单位,即如果一个进程需要和远程机器上N个RpcEndpoint服务进行通信,就对应N个RpcEndpointRef(后端的实际的网络连接是公用,这个是TransportClient内部提供了连接池来实现的),当调用一个RpcEndpointRef的ask/send等接口时候,会将把“消息内容+RpcEndpointRef+本地地址”一起打包为一个RequestMessage,交由RpcEnv进行发送。注意这里打包的消息里面包括RpcEndpointRef本身是很重要的,从而可以由Server端识别出这个消息对应的是哪一个RpcEndpoint。

和发送端一样,在RpcEnv中,针对每个remote端的host:port维护一个队列,即OutBox,RpcEnv的发送仅仅是把消息放入到相应的队列中,但是和发送端不一样的是:在OutBox中没有维护一个所谓的线程池来定时清理OutBox,而是通过一堆synchronized来实现的,add之后立刻消费。

摘自:Github/ColZer

apache spark是什么意思

n.火花;火星;电火花;(指品质或感情)一星,丝毫,一丁点。averysmallburningpieceofmaterialthatisproducedbysththatisburningorbyhittingtwohardsubstancestogether。Asparkisatinybrightpieceofburningmaterialthatfliesupfromsomethingthatisburning.***隐藏网址***

org.apache.spark.api.java.optional在哪个包下

***隐藏网址***如果你只想导入包的话(了解不多,1.6版本可以导入google的optional,百度“com.google.common”,2.1版本已有自己的optional):在spark-core_对应版本号.jar。

apache spark是什么

Apache Spark是一个强大的开源处理引擎,最初由Matei Zaharia开发,是他在加州大学伯克利分校的博士论文的一部分。

Apache Spark是快速、易于使用的框架,允许你解决各种复杂的数据问题,无论是半结构化、结构化、流式,或机器学习、数据科学。网页链接

关于本次apachespark官网和如何使用spark将程序提交任务到yarn-Spark-about云开发的问题分享到这里就结束了,如果解决了您的问题,我们非常高兴。