900字范文,内容丰富有趣,生活中的好帮手!
900字范文 > 运行支持kubernetes原生调度的Spark程序

运行支持kubernetes原生调度的Spark程序

时间:2021-07-03 04:36:16

相关推荐

运行支持kubernetes原生调度的Spark程序

全栈工程师开发手册 (作者:栾鹏)

架构系列文章

Spark 概念说明

Apache Spark 是一个围绕速度、易用性和复杂分析构建的大数据处理框架。最初在由加州大学伯克利分校的AMPLab开发,并于成为Apache的开源项目之一。

在 Spark 中包括如下组件或概念:

Application:Spark Application 的概念和 Hadoop 中的 MapReduce 类似,指的是用户编写的 Spark 应用程序,包含了一个 Driver 功能的代码和分布在集群中多个节点上运行的 Executor 代码;

Driver:Spark 中的 Driver 即运行上述 Application 的 main() 函数并且创建 SparkContext,其中创建 SparkContext 的目的是为了准备Spark应用程序的运行环境。在 Spark 中由 SparkContext 负责和 ClusterManager 通信,进行资源的申请、任务的分配和监控等;当 Executor 部分运行完毕后,Driver负责将SparkContext 关闭。通常用 SparkContext 代表 Driver;

Executor:Application运行在Worker 节点上的一个进程,该进程负责运行Task,并且负责将数据存在内存或者磁盘上,每个Application都有各自独立的一批Executor。在Spark on Yarn模式下,其进程名称为CoarseGrainedExecutorBackend,类似于 Hadoop MapReduce 中的 YarnChild。一个 CoarseGrainedExecutorBackend 进程有且仅有一个 executor 对象,它负责将 Task 包装成 taskRunner,并从线程池中抽取出一个空闲线程运行 Task。每个 CoarseGrainedExecutorBackend 能并行运行 Task 的数量就取决于分配给它的 CPU 的个数了;

Cluster Manager:指的是在集群上获取资源的外部服务,目前有:

Standalone:Spark原生的资源管理,由Master负责资源的分配;Hadoop Yarn:由YARN中的ResourceManager负责资源的分配; Worker:集群中任何可以运行Application代码的节点,类似于YARN中的NodeManager节点。在Standalone模式中指的就是通过Slave文件配置的Worker节点,在Spark on Yarn模式中指的就是NodeManager节点;作业(Job):包含多个Task组成的并行计算,往往由Spark Action催生,一个JOB包含多个RDD及作用于相应RDD上的各种Operation;阶段(Stage):每个Job会被拆分很多组 Task,每组task被称为Stage,也可称TaskSet,一个job分为多个阶段,每一个stage的分割点是action。比如一个job是:(transformation1 -> transformation1 -> action1 -> transformation3 -> action2),这个job就会被分为两个stage,分割点是action1和action2。任务(Task): 被送到某个Executor上的工作任务;

Context:启动spark application的时候创建,作为Spark 运行时环境。

Dynamic Allocation(动态资源分配):一个配置选项,可以将其打开。从Spark1.2之后,对于On Yarn模式,已经支持动态资源分配(Dynamic Resource Allocation),这样,就可以根据Application的负载(Task情况),动态的增加和减少executors,这种策略非常适合在YARN上使用spark-sql做数据开发和分析,以及将spark-sql作为长服务来使用的场景。Executor 的动态分配需要在 cluster mode 下启用 “external shuffle service”。

动态资源分配策略:开启动态分配策略后,application会在task因没有足够资源被挂起的时候去动态申请资源,这意味着该application现有的executor无法满足所有task并行运行。spark一轮一轮的申请资源,当有task挂起或等待 spark.dynamicAllocation.schedulerBacklogTimeout (默认1s)时间的时候,会开始动态资源分配;之后会每隔 spark.dynamicAllocation.sustainedSchedulerBacklogTimeout (默认1s)时间申请一次,直到申请到足够的资源。每次申请的资源量是指数增长的,即1,2,4,8等。之所以采用指数增长,出于两方面考虑:其一,开始申请的少是考虑到可能application会马上得到满足;其次要成倍增加,是为了防止application需要很多资源,而该方式可以在很少次数的申请之后得到满足。

概念的重要理解:一个jar应用或者py代码是一个application,里面可能会包含多个job。一个job每个action就是一个stage,一次action都会出发任务的封装,分发,资源分配等问题。并行的task会被分发到worker上。

Spark作业基本运行原理解析

们使用spark-submit提交一个Spark作业之后,这个作业就会启动一个对应的Driver进程。提交作业的节点称为Master节点,Driver进程就是开始执行你Spark程序的那个Main函数(Driver进程不一定在Master节点上)。根据你使用的部署模式(deploy-mode)不同,Driver进程可能在本地启动,也可能在集群中某个工作节点上启动。

Driver进程本身会根据我们设置的参数,占有一定数量的内存和CPU core。而Driver进程要做的第一件事情,就是向集群管理器申请运行Spark作业需要使用的资源,这里的资源指的就是Executor进程。YARN集群管理器会根据我们为Spark作业设置的资源参数,在各个工作节点Worker上,启动一定数量的Executor进程,每个Executor进程都占有一定数量的内存和CPU core。

在申请到了作业执行所需的资源之后,Driver进程就会开始调度和执行我们编写的作业代码了。Driver进程会将我们编写的Spark作业代码分拆为多个stage,每个stage执行一部分代码片段,并为每个stage创建一批task,然后将这些task分配到各个Executor进程中执行。task是最小的计算单元,负责执行一模一样的计算逻辑(也就是我们自己编写的某个代码片段),只是每个task处理的数据不同而已。一个stage的所有task都执行完毕之后,会在各个节点本地的磁盘文件中写入计算中间结果,然后Driver就会调度运行下一个stage。下一个stage的task的输入数据就是上一个stage输出的中间结果。如此循环往复,直到将我们自己编写的代码逻辑全部执行完,并且计算完所有的数据,得到我们想要的结果为止。

参考:/p/89e1c8c08933

思想

spark客户端通过在当前用户下的~/.kube/config来实现对k8s资源的调度,也就是说spark的控制端(spark-driver)是在k8s之外的,跟人为操作一样,在k8s里面生成作为worker的pod来执行任务. 当然我们也可以把外部是driver封装成镜像放在k8s里面.

注意:~/.kube/config是根据用户设定的,不同的用户下面的文件不一样,所以如果切换了用户需要注意config是否相同或存在.

背景:

Spark 2.3.0 开始支持使用k8s 作为资源管理原生调度spark。使用k8s原生调度的spark主要有以下好处:

采用k8s原生调度,不再需要二级调度,直接使用k8s原生的调度模块,实现与其他应用的混布;

资源隔离:任务可以提交到指定的namespace,这样可以复用k8s原生的qouta限制,实现任务资源的限制;

资源分配:可以指定每个spark任务的指定资源限制,任务之间更加隔离;

用户自定义:用户可以在spark基础镜像中打上自己的application, 更加灵活和方便;

试用条件:

· Spark 2.3或更高版本的可运行分发。

· 正在运行的Kubernetes集群版本> = 1.6,并使用kubectl为其配置了访问权限 。如果您还没有可用的Kubernetes群集,则可以使用minikube在本地计算机上设置测试群集 。

o 我们建议在启用DNS插件的情况下使用最新版本的minikube。

o 请注意,默认的minikube配置不足以运行Spark应用程序。我们推荐3个CPU和4g内存,以便能够使用一个执行程序启动一个简单的Spark应用程序。

· 您必须具有适当的权限才能在群集中列出,创建,编辑和删除 窗格。您可以验证您是否可以通过运行列出这些资源kubectl auth can-i <list|create|edit|delete> pods

o 必须允许驱动程序窗格使用的服务帐户凭据创建窗格,服务和配置图。

· 您必须在群集中配置Kubernetes DNS。

怎么运行的

spark-submit可以直接用于将Spark应用程序提交给Kubernetes集群。提交机制的工作原理如下:

Spark在Kubernetes窗格中创建一个Spark driver程序。

该Spark driver程序创建也在Kubernetes窗格中运行并连接到它们的执行程序,并执行应用程序代码。

当应用程序完成时,执行程序窗格会终止并被清理,但驱动程序窗格会保留日志并在Kubernetes API中保持“已完成”状态,直到最终收集垃圾或手动清理垃圾。

请注意,在完成状态,驾驶员舱并没有使用任何的计算或存储资源。

驱动程序和执行程序pod调度由Kubernetes处理。可以通过 使用其配置属性的节点选择器将驱动程序和执行程序窗格安排在可用节点的子集上。在将来的版本中,可以使用更高级的调度提示,如

node/pod affinities。

客户端依赖java环境

需要现在linux中安装java环境

下载地址

/technetwork/java/javase/downloads/jdk8-downloads-2133151.html

tar -xvf jdk-8u191-linux-x64.tar.gz

使用Vi编辑器,设置环境变量

$ sudo vi /etc/profile

在文件最后,添加如下内容:

#Java Envexport JAVA_HOME=/soft/jdk1.8.0_191export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jarexport PATH=$PATH:$JAVA_HOME/bin

退出vi编辑器,使环境变量设置立即生效

$ source /etc/profile

3.5 查看JDK版本

$ java -versionjava version "1.8.0_121"Java(TM) SE Runtime Environment (build 1.8.0_121-b13)Java HotSpot(TM) 64-Bit Server VM (build 25.121-b13, mixed mode)

为何使用 spark on kubernetes

使用kubernetes原生调度的spark on kubernetes是对现有的spark on yarn/mesos的资源使用方式的革命性的改进,主要表现在以下几点:

Kubernetes原生调度:不再需要二层调度,直接使用kubernetes的资源调度功能,跟其他应用共用整个kubernetes管理的资源池;

资源隔离,粒度更细:原先yarn中的queue在spark on kubernetes中已不存在,取而代之的是kubernetes中原生的namespace,可以为每个用户分别指定一个namespace,限制用户的资源quota;

细粒度的资源分配:可以给每个spark任务指定资源限制,实际指定多少资源就使用多少资源,因为没有了像yarn那样的二层调度(圈地式的),所以可以更高效和细粒度的使用资源;

监控的变革:因为做到了细粒度的资源分配,所以可以对用户提交的每一个任务做到资源使用的监控,从而判断用户的资源使用情况,所有的metric都记录在数据库中,甚至可以为每个用户的每次任务提交计量;

日志的变革:用户不再通过yarn的web页面来查看任务状态,而是通过pod的log来查看,可将所有的kuberentes中的应用的日志等同看待收集起来,然后可以根据标签查看对应应用的日志;

所有这些变革都可以让我们更高效的获取资源、更有效率的获取资源!

架构设计

关于 spark standalone 的局限性与 kubernetes native spark 架构之间的区别请参考 Anirudh Ramanathan 在 10月8日提交的 issue Support Spark natively in Kubernetes #34377。

简而言之,spark standalone on kubernetes 有如下几个缺点:

无法对于多租户做隔离,每个用户都想给 pod 申请 node 节点可用的最大的资源。

Spark 的 master/worker 本来不是设计成使用 kubernetes 的资源调度,这样会存在两层的资源调度问题,不利于与 kuberentes 集成。

而 kubernetes native spark 集群中,spark 可以调用 kubernetes API 获取集群资源和调度。要实现 kubernetes native spark 需要为 spark 提供一个集群外部的 manager 可以用来跟 kubernetes API 交互。

调度器后台

使用 kubernetes 原生调度的 spark 的基本设计思路是将 spark 的 driver 和 executor 都放在 kubernetes 的 pod 中运行,另外还有两个附加的组件:ResourceStagingServer 和 KubernetesExternalShuffleService。

Spark driver 其实可以运行在 kubernetes 集群内部(cluster mode)可以运行在外部(client mode),executor 只能运行在集群内部,当有 spark 作业提交到 kubernetes 集群上时,调度器后台将会为 executor pod 设置如下属性:

使用我们预先编译好的包含 kubernetes 支持的 spark 镜像,然后调用 CoarseGrainedExecutorBackend main class 启动 JVM。

调度器后台为 executor pod 的运行时注入环境变量,例如各种 JVM 参数,包括用户在 spark-submit 时指定的那些参数。

Executor 的 CPU、内存限制根据这些注入的环境变量保存到应用程序的 SparkConf 中。

可以在配置中指定 spark 运行在指定的 namespace 中。

参考:Scheduler backend 文档

下载客户端

下载spark2.3.0版本/dyn/closer.lua/spark/spark-2.3.0/spark-2.3.0-bin-hadoop2.7.tgz

或者http://ftp.meisei-u.ac.jp/mirror/apache/dist/spark/

不同的客户端,文件所在的位置不一样,所以一定要注意

我是从这里下载的/apache-spark-on-k8s/spark/releases

我使用的是spark-2.2.0-k8s-0.5.0-bin-with-hadoop-2.7.3.tgz客户端

执行测试

先保证当前用户~/.kube/config下面有k8s的调用配置文件,也就是说你在命令行通过kubectl可以直接调度k8s资源.(包含自动下载需要的镜像,自动启动pod,自动创建命名空间,自动删除pod,自动起job之类的.我们只需要有一个spark的客户端就行了)

先来试试客户端的语言

交互式Python Shell

在spark文件夹下面

./bin/pyspark

并运行以下命令,该命令也应返回1000:

sc.parallelize(range(1000)).count()

交互式Scala Shell

开始使用Spark的最简单方法是通过Scala shell:

./bin/spark-shell

尝试以下命令,该命令应返回1000:

scala> sc.parallelize(1 to 1000).count()

将应用程序提交给Kubernetes

运行 SparkPi 测试

我们解压spark客户端后cd到spark客户端的目录下面, 将任务运行在 spark-cluster 的 namespace 中,启动 5 个 executor 实例。

先要为 spark 集群创建一个 serviceaccount 和 clusterrolebinding:

kubectl create serviceaccount spark --namespace spark-clusterkubectl create rolebinding spark-edit --clusterrole=edit --serviceaccount=spark-cluster:spark --namespace=spark-cluster

不然会使用default用户账户,将没有权限获取 spark-cluster 中的 pod 信息

bin/spark-submit \--deploy-mode cluster \--class org.apache.spark.examples.SparkPi \--master k8s://https://192.168.11.127:6443 \--kubernetes-namespace spark-cluster \--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \--conf spark.executor.instances=5 \--conf spark.app.name=spark-pi \--conf spark.kubernetes.driver.docker.image=kubespark/spark-driver:v2.2.0-kubernetes-0.5.0 \--conf spark.kubernetes.executor.docker.image=kubespark/spark-executor:v2.2.0-kubernetes-0.5.0 \local:///opt/spark/examples/jars/spark-examples_2.11-2.2.0-k8s-0.5.0.jar

其中

spark-submit [options] <app jar | python file> [app arguments]

kubernetes-namespace为spark调度任务使用的k8s命名空间

spark.executor.instances为worker的个数,后面的是镜像的地址

conf spark.kubernetes.authenticate.driver.serviceAccountName为创建的账户名称

应用程序名称通过--conf spark.app.name--name以参数 spark-submit使用默认命名规则,应用程序名称必须由小写字母,数字,字符组成-,and . 和必须以字母数字字符开头和结尾。

--master后面跟的是apiserver的信息,(所以要保障k8s已经部署了apiserver)

--master k8s://https://<k8s-apiserver-host>:<k8s-apiserver-port>

k8s-apiserver-host可以在~/.kube/config文件中查看

或者通过下面的命令查看

$ kubectl cluster-infoKubernetes master is running at http://127.0.0.1:6443

通过将--master命令行参数传递给应用程序配置spark-submit或通过设置 spark.master在应用程序配置中指定的Spark主服务器必须是具有该格式的URLk8s://<api_server_url>。对主字符串进行前缀k8s://将导致Spark应用程序在Kubernetes集群上启动,并在其中联系API服务器api_server_url。如果URL中未指定HTTP协议,则默认为https。例如,将master设置k8s://:443为等同于将其设置为k8s://:443,但要在不同端口上没有TLS的情况下连接,则master将设置为k8s://:8443

我们可以直接使用官方已编译好的 docker 镜像来部署,先在k8s上的服务器docker中把下面的镜像下载下来, 下面是官方发布的镜像:

关于该命令参数的介绍请参考:https://apache-spark-on-k8s.github.io/userdocs/running-on-kubernetes.html

或者 /docs/latest/running-on-kubernetes.html

local:后面跟随的是应用jar的地址,这里使用的是官方自带的测试应用. 该 jar 包实际上是 spark-driver 和 spark-executor 镜像里的,在上一步构建镜像时已经构建并上传到了镜像仓库中。如果我们自己构建镜像需要自己将自己的上传到镜像中,并指定自己的应用地址。

注意: 该 jar 包实际上是 spark.kubernetes.executor.docker.image 镜像中的。

这时候提交任务运行还是失败,报错信息中可以看到两个问题:

Executor 无法找到 driver pod

用户 system:serviceaccount:spark-cluster:defaul 没有权限获取 spark-cluster 中的 pod 信息。

需要为 spark 集群创建一个 serviceaccount 和 clusterrolebinding:

kubectl create serviceaccount spark --namespace spark-cluster

kubectl create rolebinding spark-edit --clusterrole=edit --serviceaccount=spark-cluster:spark --namespace=spark-cluster

该 Bug 将在新版本中修复。

注意以下坑:

spark 自带的exemples是用jdk1.8编译的,如果启动过程中提示Unsupported major.minor version

52.0请更换jdk版本;spark-submit默认会去~/.kube/config去加载集群配置,故请将k8s集群config放在该目录下;spark driver 启动的时候报错Error: Could not find or load main class

org.apache.spark.examples.SparkPi spark 启动参数的local://后面应该跟你自己的spark

application在容器里的路径; spark driver 启动抛异常Caused by:

.UnknownHostException: kubernetes.default.svc: Try again, 请保证

k8d let节点间网络互通;

任务执行效果:

spark demo跑了起来后,可以看到spark-submit相当于起了一个controller, 用于管理单个spark任务,首先会创建该任务的service和driver,待driver运行后,会启动exeuctor,个数为–conf spark.executor.instances=5 指定的参数,待执行完毕后,submit会自动删除exeuctor, driver会用默认的gc机制清理。

打包构建自己的应用

在上面的测试中我们其中的是官方自带的测试jar,如果我们需要运行自己的应用,可以自己重构镜像,然后在summit中指定地址。Spark(从版本2.3开始)附带一个可用于此目的的Dockerfile,或自定义以匹配单个应用程序的需求。

spark-2.3.2-bin-hadoop2.7版本

dockerfile文件它可以在kubernetes/dockerfiles/ 目录中找到。

Spark还附带了一个bin/docker-image-tool.sh脚本,可用于构建和发布Docker映像以与Kubernetes后端一起使用。

用法示例是:

$ ./bin/docker-image-tool.sh -r <repo> -t my-tag build$ ./bin/docker-image-tool.sh -r <repo> -t my-tag push

spark-2.2.0-k8s-0.5.0-bin-2.7.3版本

Dockerfile相关文件位于dockerfiles/并且可以在使用提供的脚本构建之前进一步自定义,或者手动进行。

构建命令为

格式./sbin/build-push-docker-images.sh -r <repo> -t <tag> build./sbin/build-push-docker-images.sh -r <repo> -t <tag> push示例./sbin/build-push-docker-images.sh -r luanpeng/lp -t my-tag build./sbin/build-push-docker-images.sh -r luanpeng/lp -t my-tag push

代码会自动在自己的仓库后面添加相应的内容生成新镜像

luanpeng/lp/spark-driver:my-tagluanpeng/lp/spark-resource-staging-server:my-tagluanpeng/lp/spark-init:my-tagluanpeng/lp/spark-shuffle:my-tagluanpeng/lp/spark-executor:my-tagluanpeng/lp/spark-executor-py:my-tagluanpeng/lp/spark-driver-py:my-tag

构建以后就要summit就要使用自己的镜像地址了

依赖管理

上文中我们在运行测试程序时,命令行中指定的 jar 文件已包含在 docker 镜像中,是不是说我们每次提交任务都需要重新创建一个镜像呢?非也!如果真是这样也太麻烦了。

创建 resource staging server

为了方便用户提交任务,不需要每次提交任务的时候都创建一个镜像,我们使用了 resource staging server 。

kubectl create -f kubernetes-resource-staging-server.yaml --namespace=spark-cluster

在spark-2.2.0-k8s-0.5.0-bin-2.7.3版本中conf文件夹下面包含该文件,spark-2.3.2-bin-hadoop2.7版本中没有该文件.

我们同样将其部署在 spark-cluster namespace 下,该 yaml 文件如下

---apiVersion: apps/v1beta1kind: Deploymentmetadata:name: spark-resource-staging-serverspec:replicas: 1template:metadata:labels:resource-staging-server-instance: defaultspec:volumes:- name: resource-staging-server-propertiesconfigMap:name: spark-resource-staging-server-configcontainers:- name: spark-resource-staging-serverimage: kubespark/spark-resource-staging-server:v2.2.0-kubernetes-0.4.0resources:requests:cpu: 100mmemory: 256Milimits:cpu: 100mmemory: 1GivolumeMounts:- name: resource-staging-server-propertiesmountPath: '/etc/spark-resource-staging-server'args:- '/etc/spark-resource-staging-server/resource-staging-server.properties'---apiVersion: v1kind: ConfigMapmetadata:name: spark-resource-staging-server-configdata:resource-staging-server.properties: |spark.kubernetes.resourceStagingServer.port=10000spark.ssl.kubernetes.resourceStagingServer.enabled=false---apiVersion: v1kind: Servicemetadata:name: spark-resource-staging-servicespec:type: NodePortselector:resource-staging-server-instance: defaultports:- protocol: TCPport: 10000targetPort: 10000nodePort: 31000

优化

其中有一点需要优化,在使用下面的命令提交任务时,使用--conf spark.kubernetes.resourceStagingServer.uri参数指定 resource staging server 地址,用户不应该关注 resource staging server 究竟运行在哪台宿主机上,可以使用下面两种方式实现:

使用 nodeSelector 将 resource staging server 固定调度到某一台机器上,该地址依然使用宿主机的 IP 地址改变 spark-resource-staging-service service 的 type 为 ClusterIP, 然后使用 Ingress 将其暴露到集群外部,然后加入的内网 DNS 里,用户使用 DNS 名称指定 resource staging server 的地址。

这里比较简单,就指定node就行了,下面的命令使用node的ip和nodeport

然后可以执行下面的命令来提交本地的 jar 到 kubernetes 上运行。

bin/spark-submit \--deploy-mode cluster \--class org.apache.spark.examples.SparkPi \--master k8s://192.168.11.127:6443 \--kubernetes-namespace spark-cluster \--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \--conf spark.executor.instances=5 \--conf spark.app.name=spark-pi \--conf spark.kubernetes.driver.docker.image=kubespark/spark-driver:v2.2.0-kubernetes-0.5.0 \--conf spark.kubernetes.executor.docker.image=kubespark/spark-executor:v2.2.0-kubernetes-0.5.0 \--conf spark.kubernetes.initcontainer.docker.image=kubespark/spark-init:v2.2.0-kubernetes-0.5.0 \--conf spark.kubernetes.resourceStagingServer.uri=http://192.168.11.127:31000 \./examples/jars/spark-examples_2.11-2.2.0-k8s-0.5.0.jar

该命令将提交本地的 ./examples/jars/spark-examples_2.11-2.2.0-k8s-0.5.0-SNAPSHOT.jar 文件到 resource staging server,executor 将从该 server 上获取 jar 包并运行,这样用户就不需要每次提交任务都编译一个镜像了。

详见:https://apache-spark-on-k8s.github.io/userdocs/running-on-kubernetes.html#dependency-management

设置 HDFS 用户

如果 Hadoop 集群没有设置 kerbros 安全认证的话,在指定 spark-submit 的时候可以通过指定如下四个环境变量, 设置 Spark 与 HDFS 通信使用的用户:

--conf spark.kubernetes.driverEnv.SPARK_USER=hadoop --conf spark.kubernetes.driverEnv.HADOOP_USER_NAME=hadoop --conf spark.executorEnv.HADOOP_USER_NAME=hadoop --conf spark.executorEnv.SPARK_USER=hadoop

使用 hadoop 用户提交本地 jar 包的命令示例:

./spark-submit \--deploy-mode cluster \--class com.talkingdata.alluxio.hadooptest \--master k8s://https://192.168.11.127:6443 \--kubernetes-namespace spark-cluster \--conf spark.kubernetes.driverEnv.SPARK_USER=hadoop \--conf spark.kubernetes.driverEnv.HADOOP_USER_NAME=hadoop \--conf spark.executorEnv.HADOOP_USER_NAME=hadoop \--conf spark.executorEnv.SPARK_USER=hadoop \--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \--conf spark.executor.instances=5 \--conf spark.app.name=spark-pi \--conf spark.kubernetes.driver.docker.image=kubespark/spark-driver:v2.2.0-kubernetes-0.5.0 \--conf spark.kubernetes.executor.docker.image=kubespark/spark-executor:v2.2.0-kubernetes-0.5.0 \--conf spark.kubernetes.initcontainer.docker.image=kubespark/spark-init:v2.2.0-kubernetes-0.5.0 \--conf spark.kubernetes.resourceStagingServer.uri=http://192.168.11.127:31000 \~/Downloads/tendcloud_2.10-1.0.jar

我们可以使用这样的参数来传递环境变量的值 spark.executorEnv.[EnvironmentVariableName],只要将 EnvironmentVariableName 替换为环境变量名称即可.

详见:/apache-spark-on-k8s/spark/issues/408

限制 Driver 和 Executor 的资源使用

在执行 spark-submit 时使用如下参数设置内存和 CPU 资源限制:

--conf spark.driver.memory=3G--conf spark.executor.memory=3G--conf spark.driver.cores=2--conf spark.executor.cores=10

这几个参数中值如何传递到 Pod 的资源设置中的呢?

比如我们设置在执行 spark-submit 的时候传递了这样的两个参数:--conf spark.driver.cores=2--conf spark.driver.memory=100G那么查看 driver pod 的 yaml 输出结果将会看到这样的资源设置:

resources:limits:memory: 110Girequests:cpu: "2"memory: 100Gi

以上参数是对 request 值的设置,那么 limit 的资源设置的值又是从何而来?

可以使用spark.kubernetes.driver.limit.coresspark.kubernetes.executor.limit.cores来设置 CPU的 hard limit。

memory limit 的值是根据 memory request 的值加上 spark.kubernetes.executor.memoryOverhead 的值计算而来的,该配置项用于设置分配给每个 executor 的超过 heap 内存的值(可以使用k、m、g单位)。该值用于虚拟机的开销、其他本地服务开销。根据 executor 的大小设置(通常是 6%到10%)。

我们可以这样来提交一个任务,同时设置 driver 和 executor 的 CPU、内存的资源 request 和 limit 值(driver 的内存 limit 值为 request 值的 110%)。

./spark-submit \--deploy-mode cluster \--class org.apache.spark.examples.SparkPi \--master k8s://https://192.168.11.127:6443 \--kubernetes-namespace spark-cluster \--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \--conf spark.driver.memory=100G \--conf spark.executor.memory=10G \--conf spark.driver.cores=30 \--conf spark.executor.cores=2 \--conf spark.driver.maxResultSize=10240m \--conf spark.kubernetes.driver.limit.cores=32 \--conf spark.kubernetes.executor.limit.cores=3 \--conf spark.kubernetes.executor.memoryOverhead=2g \--conf spark.executor.instances=5 \--conf spark.app.name=spark-pi \--conf spark.kubernetes.driver.docker.image=kubespark/spark-driver:v2.2.0-kubernetes-0.5.0 \--conf spark.kubernetes.executor.docker.image=kubespark/spark-executor:v2.2.0-kubernetes-0.5.0 \--conf spark.kubernetes.initcontainer.docker.image=kubespark/spark-init:v2.2.0-kubernetes-0.5.0 \--conf spark.kubernetes.resourceStagingServer.uri=http://192.168.11.127:31000 \local:///opt/spark/examples/jars/spark-examples_2.11-2.2.0-k8s-0.4.0-SNAPSHOT.jar 10000000

这将启动一个包含一千万个 task 的计算 pi 的 spark 任务,任务运行过程中,drvier 的 CPU 实际消耗大约为 3 核,内存 40G,每个 executor 的 CPU 实际消耗大约不到 1 核,内存不到 4G,我们可以根据实际资源消耗不断优化资源的 request 值。

SPARK_DRIVER_MEMORY 和 SPARK_EXECUTOR_MEMORY 和分别作为 Driver 容器和 Executor 容器启动的环境变量,比如下面这个 Driver 启动的 CMD 中:

CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > /tmp/java_opts.txt && \readarray -t SPARK_DRIVER_JAVA_OPTS < /tmp/java_opts.txt && \if ! [ -z ${SPARK_MOUNTED_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \if ! [ -z ${SPARK_SUBMIT_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_SUBMIT_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \if ! [ -z ${SPARK_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \if ! [ -z ${SPARK_MOUNTED_FILES_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \if ! [ -z ${SPARK_MOUNTED_FILES_FROM_SECRET_DIR} ]; then cp -R "$SPARK_MOUNTED_FILES_FROM_SECRET_DIR/." .; fi && \${JAVA_HOME}/bin/java "${SPARK_DRIVER_JAVA_OPTS[@]}" -cp $SPARK_CLASSPATH -Xms$SPARK_DRIVER_MEMORY -Xmx$SPARK_DRIVER_MEMORY $SPARK_DRIVER_CLASS $SPARK_DRIVER_ARGS

我们可以看到对 SPARK_DRIVER_MEMORY 环境变量的引用。Executor 的设置与 driver 类似。

而我们可以使用这样的参数来传递环境变量的值 spark.executorEnv.[EnvironmentVariableName],只要将 EnvironmentVariableName 替换为环境变量名称即可。

关于Hadoop版本的说明

Spark使用Hadoop核心库与HDFS和其他支持Hadoop的存储系统进行通信。由于协议在不同版本的Hadoop中已更改,因此在集群中必须运行针对spark相同版本的hadoop。

有关构建特定Hadoop发行版的详细指导,请参阅“指定Hadoop版本”中的构建文档 ,包括构建特定的Hive和Hive Thriftserver发行版。

Python支持

随着数据科学家对Python的不断增长的支持,spark已经支持提交PySpark应用程序。这些应用程序遵循您期望从其他集群管理器获得的一般语法。提交PySpark作业类似于提交Java / Scala应用程序,除非开发者没有提供类,如预期的那样。以下是执行Spark-Pi示例的方法:

bin/spark-submit \--deploy-mode cluster \--class org.apache.spark.examples.SparkPi \--master k8s://https://192.168.11.127:6443 \--kubernetes-namespace spark-cluster \--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \--conf spark.executor.instances=5 \--conf spark.app.name=spark-pi \--conf spark.kubernetes.driver.docker.image=kubespark/spark-driver-py:v2.2.0-kubernetes-0.5.0 \--conf spark.kubernetes.executor.docker.image=kubespark/spark-executor-py:v2.2.0-kubernetes-0.5.0 \--jars local:///opt/spark/examples/jars/spark-examples_2.11-2.2.0-k8s-0.5.0.jar \local:///opt/spark/examples/src/main/python/pi.py 10

当然也可以传递本地py文件进入

bin/spark-submit \--deploy-mode cluster \--master k8s://https://192.168.11.127:6443 \--kubernetes-namespace spark-cluster \--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \--conf spark.executor.instances=5 \--conf spark.app.name=spark-pi \--conf spark.kubernetes.driver.docker.image=kubespark/spark-driver-py:v2.2.0-kubernetes-0.5.0 \--conf spark.kubernetes.executor.docker.image=kubespark/spark-executor-py:v2.2.0-kubernetes-0.5.0 \--conf spark.kubernetes.initcontainer.docker.image=kubespark/spark-init:v2.2.0-kubernetes-0.5.0 \--conf spark.kubernetes.resourceStagingServer.uri=http://192.168.11.127:31000 \--jars local:///opt/spark/jars/RoaringBitmap-0.5.11.jar \./examples/src/main/python/pi.py 10

--jars local:///opt/spark/jars/RoaringBitmap-0.5.11.jar是必须要加入的一个应用,不然会报错

为了支持 Python,可以使用 --py-files 选项为 executor 指定分布式的 .egg、.zip 和 .py 库。

因为我们运行python文件,其实只是在driver驱动中执行, 在运行RDD数据时, 也就是进行分布式计算时才会启动executor进行计算响应的并行工作. --py-files选项可以为每个executor 指定分布式的py文件

bin/spark-submit \--deploy-mode cluster \--master k8s://https://<k8s-apiserver-host>:<k8s-apiserver-port> \--kubernetes-namespace <k8s-namespace> \--conf spark.executor.instances=5 \--conf spark.app.name=spark-pi \--conf spark.kubernetes.driver.docker.image=kubespark/driver-py:v2.2.0-kubernetes-0.4.0 \--conf spark.kubernetes.executor.docker.image=kubespark/executor-py:v2.2.0-kubernetes-0.4.0 \--conf spark.kubernetes.initcontainer.docker.image=kubespark/spark-init:v2.2.0-kubernetes-0.4.0 \--jars local:///opt/spark/examples/jars/spark-examples_2.11-2.2.0-k8s-0.4.0-SNAPSHOT.jar \--py-files local:///opt/spark/examples/src/main/python/sort.py \local:///opt/spark/examples/src/main/python/pi.py 10

自定义python镜像

我们需要使用spark运行python镜像,但同时需要执行其他的python程序,这就需要其他的python包.

所以需要我们重新封装镜像.

在当前的spark-driver-pyDocker镜像的dockerfile中所看到的,我们已经注释掉了您可以取消注释使用的当前pip模块支持:

dockerfile在spark-2.2.0-k8s-0.5.0-bin-2.7.3/dockerfiles/driver-py/Dockerfile中

ADD examples /opt/spark/examplesADD python /opt/spark/pythonRUN apk add --no-cache python && \python -m ensurepip && \rm -r /usr/lib/python*/ensurepip && \pip install --upgrade pip setuptools && \rm -r /root/.cache# UNCOMMENT THE FOLLOWING TO START PIP INSTALLING PYTHON PACKAGES# RUN apk add --update alpine-sdk python-dev# RUN pip install numpy

我们可以取消注释,并加上自己需要安装的内容,以此来封装满足我们需求的镜像

spark配置参数

以下是一些特定于Kubernetes的其他常见属性。大多数其他配置与其他部署模式相同。有关这些内容的详细信息,请参阅配置页面。

对将来应该解除的当前实现的一些限制包括:*应用程序只能在群集模式下运行。*只能运行Scala,Java和Python应用程序。

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。