Spark底层通信原理
-
RpcEndpoint:RPC通信终端
。Spark针对每个节点(Client/Master/Worker)都称之为一个RPC终端,且都实现RpcEndpoint接口,内部根据不同端点的需求,设计不同的消息和不同的业务处理,如果需要发送(询问)则调用Dispatcher。在Spark中,所有的终端都存在生命周期:
-
Constructor =》
onStart
=》
receive
* =》onStop
-
RpcEnv:RPC上下文环境
,每个RPC终端运行时依赖的上下文环境称为RpcEnv;在当前Spark版本中使用的NettyRpcEnv
-
Dispatcher
:消息调度(分发)器
,针对于RPC终端需要发送远程消息或者从远程RPC接收到的消息,分发至对应的指令收件箱(发件箱)。如果指令接收方是自己则存入收件箱,如果指令接收方不是自己,则放入发件箱;
-
Inbox:指令消息收件箱。
一个本地RpcEndpoint对应一个收件箱,Dispatcher在每次向Inbox存入消息时,都将对应EndpointData加入内部ReceiverQueue中,另外Dispatcher创建时会启动一个单独线程进行轮询ReceiverQueue,进行收件箱消息消费;
-
RpcEndpointRef:RpcEndpointRef是对远程RpcEndpoint的一个引用。
当我们需要向一个具体的RpcEndpoint发送消息时,一般我们需要获取到该RpcEndpoint的引用,然后通过该应用发送消息。
-
OutBox:指令消息发件箱。
对于当前RpcEndpoint来说,一个目标RpcEndpoint对应一个发件箱,如果向多个目标RpcEndpoint发送信息,则有多个OutBox。当消息放入Outbox后,紧接着通过TransportClient将消息发送出去。消息放入发件箱以及发送过程是在同一个线程中进行;
-
RpcAddress:表示远程的RpcEndpointRef的地址
,Host + Port。
-
TransportClient:Netty通信客户端
,一个OutBox对应一个TransportClient,TransportClient不断轮询OutBox,根据OutBox消息的receiver信息,请求对应的远程TransportServer;
-
TransportServer:Netty通信服务端
,一个RpcEndpoint对应一个TransportServer,接受远程消息后调用Dispatcher分发消息至对应收发件箱;
Executor通信终端
1)在IDEA中全局查找(ctrl + n)
org.apache.spark.executor.
YarnCoarseGrainedExecutorBackend
,点击对应的伴生对象
2)YarnCoarseGrainedExecutorBackend.scala 继承CoarseGrainedExecutorBackend继承RpcEndpoint
-
// constructor -> onStart -> receive* -> onStop
-
private
[spark]
trait
RpcEndpoint
{
-
val
rpcEnv: RpcEnv
-
final
def self:
RpcEndpointRef
= {
-
require(rpcEnv !=
null
,
"rpcEnv has not been initialized"
)
-
rpcEnv.endpointRef(
this
)
-
def
receive
: PartialFunction[Any,
Unit
] = {
-
case _ =>
throw
new SparkException(self +
" does not implement 'receive'"
)
-
def
receiveAndReply
(context: RpcCallContext): PartialFunction[Any,
Unit
] = {
-
case _ => context.sendFailure(new SparkException(self +
" won't reply anything"
))
-
def
onStart
():
Unit
= {
-
// By default, do nothing.
-
def
onStop
():
Unit
= {
-
// By default, do nothing.
-
private
[spark]
abstract
class
RpcEndpointRef
(conf: SparkConf)
-
extends Serializable with Logging {
-
... ...
-
def
send
(message: Any):
Unit
-
def
ask
[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T]
-
... ...
Driver通信终端
ExecutorBackend发送向Driver发送请求后,Driver开始接收消息。全局查找(ctrl + n)SparkContext类
SparkContext.scala
-
class
SparkContext
(config: SparkConf) extends Logging {
-
... ...
-
private
var
_
schedulerBackend
: SchedulerBackend = _
-
... ...
点击SchedulerBackend进入SchedulerBackend.scala,查找实现类(ctrl+h),找到CoarseGrainedSchedulerBackend.scala,在该类内部创建
DriverEndpoint对象。
-
private
[spark]
-
class
CoarseGrainedSchedulerBackend
(scheduler: TaskSchedulerImpl,
val
rpcEnv: RpcEnv)
-
extends ExecutorAllocationClient with SchedulerBackend with Logging {
-
class
DriverEndpoint
extends
IsolatedRpcEndpoint
with
Logging
{
-
override
def receive: PartialFunction[Any,
Unit
] = {
-
... ...
-
//
接收注册成功后的消息
-
case LaunchedExecutor(executorId) =>
-
executorDataMap.
get
(executorId).foreach {
data
=>
-
data
.freeCores =
data
.totalCores
-
makeOffers(executorId)
-
//
接收
ask
消息,并回复
-
override
def receiveAndReply(context: RpcCallContext): PartialFunction[Any,
Unit
] = {
-
case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls,
-
attributes, resources, resourceProfileId) =>
-
... ...
-
context.reply(
true
)
-
... ...
-
... ...
-
val
driverEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
createDriverEndpoint
())
-
protected
def
createDriverEndpoint
(): DriverEndpoint = new
DriverEndpoint
()
DriverEndpoint继承IsolatedRpcEndpoint继承RpcEndpoint
-
// constructor -> onStart -> receive* -> onStop
-
private
[spark]
trait
RpcEndpoint
{
-
val
rpcEnv: RpcEnv
-
final
def self:
RpcEndpointRef
= {
-
require(rpcEnv !=
null
,
"rpcEnv has not been initialized"
)
-
rpcEnv.endpointRef(
this
)
-
def
receive
: PartialFunction[Any,
Unit
] = {
-
case _ =>
throw
new SparkException(self +
" does not implement 'receive'"
)
-
def
receiveAndReply
(context: RpcCallContext): PartialFunction[Any,
Unit
] = {
-
case _ => context.sendFailure(new SparkException(self +
" won't reply anything"
))
-
def
onStart
():
Unit
= {
-
// By default, do nothing.
-
def
onStop
():
Unit
= {
-
// By default, do nothing.
-
private
[spark]
abstract
class
RpcEndpointRef
(conf: SparkConf)
-
extends Serializable with Logging {
-
... ...
-
def
send
(message: Any):
Unit
-
def
ask
[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T]
-
... ...
Executor
通信环境准备
创建RPC通信环境
1)在IDEA中全局查找(ctrl + n)
org.apache.spark.executor.
YarnCoarseGrainedExecutorBackend
,点击对应的伴生对象
2)运行CoarseGrainedExecutorBackend
YarnCoarseGrainedExecutorBackend.scala
-
private
[spark] object
YarnCoarseGrainedExecutorBackend
extends
Logging {
-
def
main
(args:
Array
[
String
]): Unit = {
-
val
createFn
: (RpcEnv, CoarseGrainedExecutorBackend.Arguments, SparkEnv, ResourceProfile) =>
-
CoarseGrainedExecutorBackend = {
case
(rpcEnv,
arguments
, env, resourceProfile) =>
-
new
YarnCoarseGrainedExecutorBackend
(rpcEnv,
arguments
.driverUrl,
arguments
.executorId,
-
arguments
.bindAddress,
arguments
.hostname,
arguments
.cores,
arguments
.userClassPath, env,
-
arguments
.resourcesFileOpt, resourceProfile)
-
val backendArgs = CoarseGrainedExecutorBackend.parseArguments(args,
-
this
.getClass.getCanonicalName.stripSuffix(
"$"
))
-
CoarseGrainedExecutorBackend
.run(backendArgs,
createFn
)
-
System.exit(
0
)
CoarseGrainedExecutorBackend.scala
-
def
run
(
-
arguments
: Arguments,
-
backendCreateFn
: (RpcEnv, Arguments, SparkEnv, ResourceProfile) =>
-
CoarseGrainedExecutorBackend): Unit = {
-
SparkHadoopUtil.get.runAsSparkUser { () =>
-
// Bootstrap to fetch the driver's Spark properties.
-
val executorConf =
new
SparkConf
-
val fetcher = RpcEnv.
create
(
-
"
driverPropsFetcher
"
,
-
arguments
.bindAddress,
-
arguments
.hostname,
-
executorConf,
-
new
SecurityManager(executorConf),
-
numUsableCores =
0
,
-
clientMode =
true
)
-
driverConf.set(EXECUTOR_ID,
arguments
.executorId)
-
val env = SparkEnv.
createExecutorEnv
(driverConf,
arguments
.executorId,
arguments
.bindAddress,
-
arguments
.hostname,
arguments
.cores, cfg.ioEncryptionKey, isLocal =
false
)
-
env.
rpcEnv
.
setupEndpoint
(
"Executor"
,
-
backendCreateFn
(env.rpcEnv,
arguments
, env, cfg.resourceProfile))
-
arguments
.workerUrl.foreach { url =>
-
env.rpcEnv.setupEndpoint(
"WorkerWatcher"
,
new
WorkerWatcher(env.rpcEnv, url))
-
env.rpcEnv.awaitTermination()
3)点击create,进入RpcEnv.Scala
-
def
create
(
-
name
:
String,
-
bindAddress
:
String,
-
advertiseAddress
:
String,
-
port
:
Int,
-
conf
:
SparkConf,
-
securityManager
:
SecurityManager,
-
numUsableCores
:
Int,
-
clientMode
:
Boolean): RpcEnv = {
-
val
config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port, securityManager,
-
numUsableCores,
clientMode)
-
new
NettyRpcEnvFactory
().create(config)
NettyRpcEnv.scala
-
private
[rpc]
class
NettyRpcEnvFactory
extends
RpcEnvFactory
with
Logging
{
-
def create(config: RpcEnvConfig): RpcEnv = {
-
... ...
-
val
nettyEnv
=
-
new
NettyRpcEnv
(sparkConf, javaSerializerInstance, config.advertiseAddress,
-
config.securityManager, config.numUsableCores)
-
if
(!config.clientMode) {
-
val
startNettyRpcEnv
: Int => (NettyRpcEnv, Int) = { actualPort =>
-
nettyEnv.
startServer
(config.bindAddress, actualPort)
-
(nettyEnv, nettyEnv.address.port)
-
try
{
-
Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1
-
}
catch
{
-
case
NonFatal(e) =>
-
nettyEnv.shutdown()
-
throw
e
-
nettyEnv
创建多个发件箱
NettyRpcEnv.scala
-
NettyRpcEnv.scala
-
private
[netty]
class
NettyRpcEnv
(
-
val
conf: SparkConf,
-
javaSerializerInstance: JavaSerializerInstance,
-
host: String,
-
securityManager: SecurityManager,
-
numUsableCores:
Int
) extends RpcEnv(conf) with Logging {
-
... ...
-
private
val
outboxes
= new ConcurrentHashMap[
RpcAddress, Outbox
]()
-
... ...
启动TransportServer
NettyRpcEnv.scala
-
def
startServer
(bindAddress: String, port:
Int
):
Unit
= {
-
... ...
-
server = transportContext.
createServer
(bindAddress, port, bootstraps)
-
dispatcher.
registerRpcEndpoint
(
-
RpcEndpointVerifier.NAME, new RpcEndpointVerifier(
this
, dispatcher))
TransportContext.scala
-
public
TransportServer
createServer
(
-
String host,
int
port, List<TransportServerBootstrap> bootstraps) {
-
return
new
TransportServer
(
this
, host, port, rpcHandler, bootstraps);
TransportServer.java
-
public
TransportServer
(
-
TransportContext context,
-
String hostToBind,
-
int
portToBind,
-
RpcHandler appRpcHandler,
-
List<TransportServerBootstrap> bootstraps) {
-
... ...
-
init
(hostToBind, portToBind);
-
... ...
-
private
void
init
(String hostToBind,
int
portToBind) {
-
//
默认是
NIO
模式
-
IOMode
ioMode
= IOMode.valueOf(conf.ioMode());
-
EventLoopGroup bossGroup = NettyUtils.createEventLoop(ioMode,
1
,
-
conf.getModuleName() +
"-boss"
);
-
EventLoopGroup workerGroup = NettyUtils.createEventLoop(ioMode, conf.serverThreads(), conf.getModuleName() +
"-server"
);
-
bootstrap =
new
ServerBootstrap()
-
.
group
(bossGroup, workerGroup)
-
.channel(NettyUtils.
getServerChannelClass
(ioMode))
-
.option(ChannelOption.ALLOCATOR, pooledAllocator)
-
.option(ChannelOption.SO_REUSEADDR, !SystemUtils.IS_OS_WINDOWS)
-
.childOption(ChannelOption.ALLOCATOR, pooledAllocator);
-
... ...
NettyUtils.java
-
public
static
Class<? extends ServerChannel>
getServerChannelClass
(IOMode mode) {
-
switch
(mode) {
-
case
NIO
:
-
return
NioServerSocketChannel.
class
;
-
case
EPOLL
:
-
return
EpollServerSocketChannel.
class
;
-
default
:
-
throw
new
IllegalArgumentException(
"Unknown io mode: "
+ mode);
注册通信终端RpcEndpoint
NettyRpcEnv.scala
-
def
startServer
(bindAddress: String, port:
Int
):
Unit
= {
-
... ...
-
server = transportContext.
createServer
(bindAddress, port, bootstraps)
-
dispatcher.
registerRpcEndpoint
(
-
RpcEndpointVerifier.NAME, new RpcEndpointVerifier(
this
, dispatcher))
创建TransportClient
Dispatcher.scala
-
def
registerRpcEndpoint
(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
-
... ...
-
val endpointRef =
new
NettyRpcEndpointRef
(nettyEnv.conf, addr, nettyEnv)
-
... ...
-
private
[netty]
class
NettyRpcEndpointRef
(... ...)
extends
RpcEndpointRef
(
conf
) {
-
... ...
-
@transient
@volatile
var
client
:
TransportClient
= _
-
//
创建
TransportClient
-
private
[netty] def
createClient
(address: RpcAddress): TransportClient = {
-
clientFactory
.createClient(address.host, address.port)
-
private
val
clientFactory
=
transportContext
.createClientFactory(createClientBootstraps())
-
... ...
收发邮件箱
1)接收邮件箱1个
Dispatcher.scala
-
def
registerRpcEndpoint
(name:
String
,
endpoint
: RpcEndpoint): NettyRpcEndpointRef = {
-
... ...
-
var
messageLoop
: MessageLoop =
null
-
try
{
-
messageLoop =
endpoint
match {
-
case
e: IsolatedRpcEndpoint =>
-
new
DedicatedMessageLoop
(name, e,
this
)
-
case
_ =>
-
sharedLoop.register(name, endpoint)
-
sharedLoop
-
endpoints.put(name, messageLoop)
-
}
catch
{
-
... ...
-
endpointRef
DedicatedMessageLoop.scala
-
private
class
DedicatedMessageLoop
(
-
name
:
String,
-
endpoint
:
IsolatedRpcEndpoint,
-
dispatcher
:
Dispatcher)
-
extends
MessageLoop(dispatcher) {
-
private
val
inbox
= new
Inbox
(name, endpoint)
-
…
…
Inbox.scala
-
private
[netty]
class
Inbox
(
val
endpointName: String,
val
endpoint: RpcEndpoint)
-
extends Logging {
-
... ...
-
inbox
.synchronized {
-
messages.add(
OnStart
)
-
... ...
Executor
注册
CoarseGrainedExecutorBackend.scala
-
// RPC
生命周期:
constructor -> onStart -> receive* -> onStop
-
private
[spark]
class
CoarseGrainedExecutorBackend
(... ...)
-
extends
IsolatedRpcEndpoint
with
ExecutorBackend
with
Logging {
-
... ...
-
override def
onStart
(): Unit = {
-
rpcEnv
.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
-
// This is a very fast action so we can use "ThreadUtils.sameThread"
-
driver
= Some(
ref
)
-
// 1
向
Driver
注册自己
-
ref.
ask
[
Boolean
](
RegisterExecutor
(executorId, self, hostname, cores, extractLogUrls, extractAttributes, _resources, resourceProfile.id))
-
}(ThreadUtils.sameThread).onComplete {
-
// 2
接收
Driver
返回成功的消息,并给自己发送注册成功消息
-
case
Success
(_) =>
-
self.send(RegisteredExecutor)
-
case
Failure(e) =>
-
exitExecutor(
1
, s
"Cannot register with driver: $driverUrl"
, e, notifyDriver =
false
)
-
}(ThreadUtils.sameThread)
-
... ...
-
override def
receive
: PartialFunction[Any, Unit] = {
-
// 3
收到注册成功的消息后,创建
Executor
,并启动
Executor
-
case
RegisteredExecutor
=>
-
try
{
-
//
创建
Executor
-
executor =
new Executor
(executorId, hostname, env, userClassPath, isLocal =
false
, resources = _resources)
-
driver.get.
send(LaunchedExecutor
(executorId))
-
}
catch
{
-
case
NonFatal(e) =>
-
exitExecutor(
1
,
"Unable to create executor due to "
+ e.getMessage, e)
-
... ...
Driver
接收消息并应答
ExecutorBackend发送向Driver发送请求后,Driver开始接收消息。全局查找(ctrl + n)SparkContext类
SparkContext.scala
-
class
SparkContext
(config: SparkConf) extends Logging {
-
... ...
-
private
var
_
schedulerBackend
: SchedulerBackend = _
-
... ...
点击SchedulerBackend进入SchedulerBackend.scala,查找实现类(ctrl+h),找到CoarseGrainedSchedulerBackend.scala
-
private
[spark]
-
class
CoarseGrainedSchedulerBackend
(scheduler: TaskSchedulerImpl,
val
rpcEnv: RpcEnv)
-
extends ExecutorAllocationClient with SchedulerBackend with Logging {
-
class
DriverEndpoint
extends
IsolatedRpcEndpoint
with
Logging
{
-
override
def receive: PartialFunction[Any,
Unit
] = {
-
... ...
-
//
接收注册成功后的消息
-
case
LaunchedExecutor
(executorId) =>
-
executorDataMap.
get
(executorId).foreach {
data
=>
-
data
.freeCores =
data
.totalCores
-
makeOffers(executorId)
-
//
接收
ask
消息,并回复
-
override
def
receiveAndReply
(context: RpcCallContext): PartialFunction[Any,
Unit
] = {
-
case
RegisterExecutor
(executorId, executorRef, hostname, cores, logUrls,
-
attributes, resources, resourceProfileId) =>
-
... ...
-
context.reply(true)
-
... ...
-
... ...
-
val
driverEndpoint
= rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint())
-
protected
def createDriverEndpoint(): DriverEndpoint = new DriverEndpoint()
Executor
执行代码
SparkContext初始化完毕,通知执行后续代码
1)进入到ApplicationMaster
ApplicationMaster.scala
-
private
[spark]
class
ApplicationMaster
(
-
args: ApplicationMasterArguments,
-
sparkConf: SparkConf,
-
yarnConf: YarnConfiguration) extends Logging {
-
private
def
runDriver
():
Unit
= {
-
addAmIpFilter(None, System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV))
-
userClassThread = startUserApplication()
-
val
totalWaitTime = sparkConf.
get
(AM_MAX_WAIT_TIME)
-
try
{
-
val
sc =
ThreadUtils
.
awaitResult
(sparkContextPromise.future,
-
Duration(totalWaitTime, TimeUnit.MILLISECONDS))
-
if
(
sc
!=
null
) {
-
val
rpcEnv = sc.env.rpcEnv
-
val
userConf = sc.getConf
-
val
host = userConf.
get
(DRIVER_HOST_ADDRESS)
-
val
port = userConf.
get
(DRIVER_PORT)
-
registerAM
(host, port, userConf, sc.ui.map(_.webUrl), appAttemptId)
-
val
driverRef = rpcEnv.setupEndpointRef(
-
RpcAddress(host, port),
-
YarnSchedulerBackend.ENDPOINT_NAME)
-
createAllocator
(driverRef, userConf, rpcEnv, appAttemptId, distCacheConf)
-
}
else
{
-
//
执行程序
-
resumeDriver
()
-
userClassThread.join()
-
}
catch
{
-
... ...
-
}
finally
{
-
resumeDriver()
-
... ...
-
private
def
resumeDriver
():
Unit
= {
-
sparkContextPromise.synchronized {
-
sparkContextPromise.
notify
()
接收代码继续执行消息
在SparkContext.scala文件中查找_taskScheduler.postStartHook(),点击postStartHook,查找实现类(ctrl + h)
-
private
[spark]
class
YarnClusterScheduler
(sc: SparkContext) extends YarnScheduler(sc) {
-
logInfo(
"Created YarnClusterScheduler"
)
-
override
def postStartHook():
Unit
= {
-
ApplicationMaster.sparkContextInitialized(sc)
-
super
.
postStartHook
()
-
logInfo(
"YarnClusterScheduler.postStartHook done"
)
点击super.postStartHook()
TaskSchedulerImpl.scala
-
override
def
postStartHook
():
Unit
= {
-
waitBackendReady
()
-
private
def
waitBackendReady
():
Unit
= {
-
if
(backend.isReady) {
-
return
-
while
(
!backend.isReady
) {
-
if
(sc.stopped.
get
) {
-
throw
new IllegalStateException(
"Spark context stopped while waiting for backend"
)
-
synchronized {
-
this
.wait(
100
)
任务的执行
任务切分和任务调度原理
Stage任务划分
Task任务调度执行
本地化调度
任务分配原则:根据每个Task的优先位置,确定Task的Locality(本地化)级别,本地化一共有五种,优先级由高到低顺序:
移动数据不如移动计算。
名称
|
解析
|
PROCESS_LOCAL
|
进程本地化,task和数据在同一个Executor中,性能最好。
|
NODE_LOCAL
|
节点本地化,task和数据在同一个节点中,但是task和数据不在同一个Executor中,数据需要在进程间进行传输。
|
RACK_LOCAL
|
机架本地化,task和数据在同一个机架的两个节点上,数据需要通过网络在节点之间进行传输。
|
NO_PREF
|
对于task来说,从哪里获取都一样,没有好坏之分。
|
ANY
|
task和数据可以在集群的任何地方,而且不在一个机架中,性能最差。
|
失败重试与黑名单机制
除了选择合适的Task调度运行外,还需要监控Task的执行状态,前面也提到,与外部打交道的是SchedulerBackend,Task被提交到Executor启动执行后,Executor会将执行状态上报给SchedulerBackend,SchedulerBackend则告诉TaskScheduler,TaskScheduler找到该Task对应的TaskSetManager,并通知到该TaskSetManager,这样TaskSetManager就知道Task的失败与成功状态,对于失败的Task,会记录它失败的次数,如果失败次数还没有超过最大重试次数,那么就把它放回待调度的Task池子中,否则整个Application失败。
在记录Task失败次数过程中,会记录它上一次失败所在的Executor Id和Host,这样下次再调度这个Task时,会使用黑名单机制,避免它被调度到上一次失败的节点上,起到一定的容错作用。黑名单记录Task上一次失败所在的Executor Id和Host,以及其对应的"拉黑"时间,"拉黑"时间是指这段时间内不要再往这个节点上调度这个Task了。
阶段的划分
0)在WordCount程序中查看源码
-
import
org.apache.spark.rdd.RDD
-
import
org.apache.spark.{SparkConf, SparkContext}
-
object WordCount {
-
def main(args:
Array
[
String
]): Unit = {
-
val conf: SparkConf =
new
SparkConf().setAppName(
"WC"
).setMaster(
"local[*]"
)
-
val sc: SparkContext =
new
SparkContext(conf)
-
// 2
读取数据
hello atguigu spark spark
-
val lineRDD: RDD[
String
] = sc.textFile(
"input"
)
-
// 3
一行
变多行
-
val wordRDD: RDD[
String
] = lineRDD.flatMap((x:
String
) => x.split(
" "
))
-
// 4
变换结构
一行变一行
-
val wordToOneRDD: RDD[(
String
, Int)] = wordRDD.map((x:
String
) => (x, 1))
-
// 5
聚合
key
相同的单词
-
val wordToSumRDD: RDD[(
String
, Int)] = wordToOneRDD.reduceByKey((v1, v2) => v1 + v2)
-
// 6
收集打印
-
wordToSumRDD.collect().foreach(println)
-
//7
关闭资源
-
sc.stop()
1)在WordCount代码中点击collect
RDD.scala
-
def collect():
Array
[T] = withScope {
-
val results = sc.
runJob
(
this
, (iter: Iterator[T]) => iter.toArray)
-
Array
.concat(results: _*)
SparkContext.scala
-
def runJob[T, U: ClassTag](rdd: RDD[T],
func
:
Iterator
[
T
] =>
U
):
Array
[
U
] = {
-
runJob
(rdd,
func
, 0
until
rdd
.
partitions
.
length
)
-
def
runJob
[T, U: ClassTag](
-
rdd: RDD[T],
-
func
:
Iterator
[
T
] =>
U
,
-
partitions: Seq[Int]): Array[U] = {
-
val cleanedFunc = clean(
func
)
-
runJob
(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)
-
def
runJob
[T, U: ClassTag](
-
rdd: RDD[T],
-
func
: (TaskContext, Iterator[T]) =>
U
,
-
partitions: Seq[Int]): Array[U] = {
-
val results =
new
Array[U](partitions.size)
-
runJob
[T, U](rdd,
func
,
partitions
, (index, res) =>
results
(index) =
res
)
-
results
-
def
runJob
[T, U: ClassTag](
-
rdd: RDD[T],
-
func
: (TaskContext, Iterator[T]) =>
U
,
-
partitions: Seq[Int],
-
resultHandler: (Int, U) => Unit): Unit = {
-
... ...
-
dagScheduler
.
runJob
(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
-
... ...
DAGScheduler.scala
-
def
runJob
[T, U](
-
rdd
:
RDD[T],
-
func
:
(TaskContext, Iterator[T]) => U,
-
partitions
:
Seq[Int],
-
callSite
:
CallSite,
-
resultHandler
:
(Int, U) => Unit,
-
properties
:
Properties): Unit = {
-
...
...
-
val
waiter =
submitJob
(rdd, func, partitions, callSite, resultHandler, properties)
-
...
...
-
def
submitJob
[T, U](
-
rdd
:
RDD[T],
-
func
:
(TaskContext, Iterator[T]) => U,
-
partitions
:
Seq[Int],
-
callSite
:
CallSite,
-
resultHandler
:
(Int, U) => Unit,
-
properties
:
Properties): JobWaiter[U] = {
-
...
...
-
val
waiter = new JobWaiter[U](this, jobId, partitions.size, resultHandler)
-
eventProcessLoop.
post
(JobSubmitted(
-
jobId,
rdd, func2, partitions.toArray, callSite, waiter,
-
Utils.cloneProperties(properties)))
-
waiter
EventLoop.scala
-
def
post
(
event
: E): Unit = {
-
if
(!stopped.
get
) {
-
if
(
eventThread
.isAlive) {
-
eventQueue
.put(
event
)
-
}
else
{
-
... ...
-
private
[spark] val
eventThread
=
new
Thread(name) {
-
override
def
run
(): Unit = {
-
while
(!stopped.
get
) {
-
val
event
= eventQueue.take()
-
try
{
-
onReceive
(
event
)
-
}
catch
{
-
... ...
查找onReceive实现类(ctrl + h)
DAGScheduler.scala
-
private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler)
-
extends EventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop")
with
Logging
{
-
... ...
-
override
def
onReceive
(
event
: DAGSchedulerEvent): Unit = {
-
val timerContext = timer.time()
-
try {
-
doOnReceive
(
event
)
-
} finally {
-
timerContext.stop()
-
private
def
doOnReceive
(
event
: DAGSchedulerEvent): Unit =
event
match
{
-
case
JobSubmitted(jobId, rdd, func,
partitions
, callSite, listener, properties) =>
-
dagScheduler
.
handleJobSubmitted
(jobId,
rdd
, func,
partitions
, callSite, listener, properties)
-
... ...
-
... ...
-
private
[scheduler]
def
handleJobSubmitted
(jobId:
Int
,
-
finalRDD
: RDD[_],
-
func: (TaskContext, Iterator[_]) => _,
-
partitions
:
Array
[
Int
],
-
callSite: CallSite,
-
listener: JobListener,
-
properties: Properties): Unit = {
-
var
finalStage
:
ResultStage
=
null
-
finalStage =
createResultStage
(
finalRDD
, func,
partitions
, jobId, callSite)
-
... ...
-
private
def
createResultStage
(
-
rdd
: RDD[_],
-
func: (TaskContext, Iterator[_]) => _,
-
partitions
:
Array
[
Int
],
-
jobId:
Int
,
-
callSite: CallSite): ResultStage = {
-
val
parents
=
getOrCreateParentStages
(rdd, jobId)
-
val
id
= nextStageId.getAndIncrement()
-
val stage =
new
ResultStage
(
id
,
rdd
, func,
partitions
,
parents
, jobId, callSite)
-
stageIdToStage(
id
) = stage
-
updateJobIdStageIdMaps(jobId, stage)
-
stage
-
private
def
getOrCreateParentStages
(rdd: RDD[_], firstJobId:
Int
):
List
[Stage] = {
-
getShuffleDependencies
(rdd).map { shuffleDep =>
-
getOrCreateShuffleMapStage
(shuffleDep, firstJobId)
-
}.toList
-
private
[scheduler]
def
getShuffleDependencies
(
-
rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
-
val parents =
new
HashSet[ShuffleDependency[_, _, _]]
-
val visited =
new
HashSet[RDD[_]]
-
val waitingForVisit =
new
ListBuffer[RDD[_]]
-
waitingForVisit += rdd
-
while
(waitingForVisit.nonEmpty) {
-
val toVisit = waitingForVisit.remove(
0
)
-
if
(!visited(toVisit)) {
-
visited += toVisit
-
toVisit.dependencies.foreach
{
-
case
shuffleDep:
ShuffleDependency
[_, _, _] =>
-
parents += shuffleDep
-
case
dependency =>
-
waitingForVisit.prepend(dependency.rdd)
-
parents
-
private
def
getOrCreateShuffleMapStage
(
-
shuffleDep: ShuffleDependency[_, _, _],
-
firstJobId:
Int
): ShuffleMapStage = {
-
shuffleIdToMapStage.get(shuffleDep.shuffleId)
match
{
-
case
Some
(stage) =>
-
stage
-
case
None
=>
-
getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
-
if
(!shuffleIdToMapStage.contains(dep.shuffleId)) {
-
createShuffleMapStage(dep, firstJobId)
-
// Finally,
create
a stage
for
the given shuffle dependency.
-
createShuffleMapStage
(shuffleDep, firstJobId)
-
def
createShuffleMapStage
[K, V, C](
-
shuffleDep: ShuffleDependency[K, V, C], jobId:
Int
): ShuffleMapStage = {
-
... ...
-
val rdd = shuffleDep.rdd
-
val numTasks = rdd.partitions.length
-
val
parents
=
getOrCreateParentStages
(rdd, jobId)
-
val
id
= nextStageId.getAndIncrement()
-
val stage =
new
ShuffleMapStage
(
-
id
, rdd, numTasks,
parents
, jobId, rdd.creationSite, shuffleDep, mapOutputTracker)
-
... ...
-
... ...
任务的切分
DAGScheduler.scala
-
private
[scheduler] def handleJobSubmitted(jobId:
Int
,
-
finalRDD: RDD[_],
-
func: (TaskContext, Iterator[_]) => _,
-
partitions: Array[
Int
],
-
callSite: CallSite,
-
listener: JobListener,
-
properties: Properties):
Unit
= {
-
var
finalStage: ResultStage =
null
-
try
{
-
finalStage =
createResultStage
(finalRDD, func, partitions, jobId, callSite)
-
}
catch
{
-
... ...
-
val
job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
-
... ...
-
submitStage(finalStage)
-
private
def
submitStage
(stage: Stage):
Unit
= {
-
val
jobId = activeJobForStage(stage)
-
if
(jobId.isDefined) {
-
if
(!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
-
val
missing =
getMissingParentStages
(stage).sortBy(_.id)
-
if
(missing.isEmpty) {
-
submitMissingTasks
(stage, jobId.
get
)
-
}
else
{
-
for
(parent <- missing) {
-
submitStage(parent)
-
waitingStages += stage
-
}
else
{
-
abortStage(stage,
"No active job for stage "
+ stage.id, None)
-
private
def
submitMissingTasks
(stage: Stage, jobId:
Int
):
Unit
= {
-
val
partitionsToCompute: Seq[
Int
] = stage.
findMissingPartitions
()
-
... ...
-
val
tasks
: Seq[Task[_]] =
try
{
-
val
serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
-
stage match {
-
case
stage
:
ShuffleMapStage
=>
-
stage.pendingPartitions.clear()
-
partitionsToCompute
.map { id =>
-
val
locs = taskIdToLocations(id)
-
val
part = partitions(id)
-
stage.pendingPartitions += id
-
new
ShuffleMapTask
(stage.id, stage.latestInfo.attemptNumber,
-
taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
-
Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
-
case
stage
:
ResultStage
=>
-
partitionsToCompute.map { id =>
-
val
p:
Int
= stage.partitions(id)
-
val
part = partitions(p)
-
val
locs = taskIdToLocations(id)
-
new
ResultTask
(stage.id, stage.latestInfo.attemptNumber,
-
taskBinary, part, locs, id, properties, serializedTaskMetrics,
-
Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
-
stage.rdd.isBarrier())
-
}
catch
{
-
... ...
Stage.scala
-
private
[scheduler]
abstract
class
Stage
(... ...)
-
extends Logging {
-
... ...
-
def
findMissingPartitions
(): Seq[
Int
]
-
... ...
全局查找(ctrl + h)
findMissingPartitions实现类。
ShuffleMapStage.scala
-
private
[spark]
class
ShuffleMapStage
(... ...)
-
extends
Stage
(
id
,
rdd
,
numTasks
,
parents
,
firstJobId
,
callSite
) {
-
private
[
this
]
var
_mapStageJobs: List[ActiveJob] = Nil
-
... ...
-
override def
findMissingPartitions
(): Seq[Int] = {
-
mapOutputTrackerMaster
-
.findMissingPartitions(shuffleDep.shuffleId)
-
.getOrElse(
0 until numPartitions
)
ResultStage.scala
-
private
[spark]
class
ResultStage
(... ...)
-
extends Stage(id, rdd, partitions.length, parents, firstJobId, callSite) {
-
... ...
-
override
def
findMissingPartitions
(): Seq[
Int
] = {
-
val
job = activeJob.
get
(
0 until job.numPartitions
).filter(id => !job.finished(id))
-
... ...
任务的调度
1)提交任务
DAGScheduler.scala
-
private
def
submitMissingTasks
(stage: Stage, jobId: Int): Unit = {
-
... ...
-
if
(tasks.nonEmpty) {
-
taskScheduler
.
submitTasks
(
new TaskSet
( tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
-
}
else
{
-
markStageAsFinished(stage, None)
-
stage match {
-
case
stage: ShuffleMapStage =>
-
markMapStageJobsAsFinished(stage)
-
case
stage : ResultStage =>
-
logDebug(s
"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"
)
-
submitWaitingChildStages(stage)
TaskScheduler.scala
def submitTasks(taskSet: TaskSet): Unit
全局查找submitTasks的实现类TaskSchedulerImpl
TaskSchedulerImpl.scala
-
override
def
submitTasks
(taskSet: TaskSet):
Unit
= {
-
val
tasks = taskSet.tasks
-
this
.synchronized {
-
val
manager
=
createTaskSetManager
(
taskSet
, maxTaskFailures)
-
val
stage = taskSet.stageId
-
val
stageTaskSets =
-
taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[
Int
, TaskSetManager])
-
... ...
-
stageTaskSets(taskSet.stageAttemptId) = manager
-
//
向队列里面设置任务
-
schedulableBuilder
.addTaskSetManager(manager, manager.taskSet.properties)
-
... ...
-
//
取任务
-
backend.
reviveOffers
()
2)FIFO和公平调度器
点击schedulableBuilder,查找schedulableBuilder初始化赋值的地方
-
private
var
schedulableBuilder
:
SchedulableBuilder
=
null
-
def
initialize
(backend: SchedulerBackend): Unit = {
-
this.backend = backend
-
schedulableBuilder
= {
-
schedulingMode
match {
-
case
SchedulingMode.FIFO =>
-
new
FIFOSchedulableBuilder
(rootPool)
-
case
SchedulingMode.FAIR =>
-
new
FairSchedulableBuilder
(rootPool, conf)
-
case
_ =>
-
throw
new
IllegalArgumentException(s
"Unsupported $SCHEDULER_MODE_PROPERTY: "
+
-
s
"$schedulingMode"
)
-
schedulableBuilder.buildPools()
点击schedulingMode,
default scheduler is FIFO
-
private
val
schedulingModeConf = conf.
get
(
SCHEDULER_MODE
)
-
val
schedulingMode: SchedulingMode =
-
... ...
-
SchedulingMode.withName(
schedulingModeConf
.toUpperCase(Locale.ROOT))
-
... ...
-
private
[spark]
val
SCHEDULER_MODE =
-
ConfigBuilder(
"spark.scheduler.mode"
)
-
.version(
"0.8.0"
)
-
.stringConf
-
.createWithDefault(
SchedulingMode.FIFO.toString
)
3)读取任务
SchedulerBackend.scala
-
private
[spark]
trait
SchedulerBackend
{
-
... ...
-
def
reviveOffers
():
Unit
-
... ...
全局查找reviveOffers实现类CoarseGrainedSchedulerBackend
CoarseGrainedSchedulerBackend.scala
-
override
def reviveOffers():
Unit
= {
-
//
自己给自己发消息
-
driverEndpoint
.send(
ReviveOffers
)
-
//
自己接收到消息
-
override
def
receive
: PartialFunction[Any,
Unit
] = {
-
... ...
-
case
ReviveOffers
=>
-
makeOffers
()
-
... ...
-
private
def
makeOffers
():
Unit
= {
-
val
taskDescs = withLock {
-
... ...
-
//
取任务
-
scheduler.
resourceOffers
(workOffers)
-
if
(taskDescs.nonEmpty) {
-
launchTasks
(taskDescs)
TaskSchedulerImpl.scala
-
def
resourceOffers
(offers: IndexedSe
q[WorkerOffer]
): Se
q[Seq[TaskDescription]
] = synchronized {
-
... ...
-
val sortedTaskSets =
rootPool
.
getSortedTaskSetQueue
.filterNot(
_
.isZombie)
-
for
(taskSet <- sortedTaskSets) {
-
val availableSlots = availableCpus.map(
c =>
c / CPUS_PER_TASK).sum
-
if
(taskSet.isBarrier && availableSlots < taskSet.numTasks) {
-
}
else
{
-
var launchedAnyTask = false
-
val addressesWithDescs = ArrayBuffer[(String, TaskDescription)]()
-
for
(currentMaxLocality <- taskSet.
myLocalityLevels
) {
-
var launchedTaskAtCurrentMaxLocality = false
-
do
{
-
launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(taskSet,
-
currentMaxLocality, shuffledOffers, availableCpus,
-
availableResources, tasks, addressesWithDescs)
-
launchedAnyTask |= launchedTaskAtCurrentMaxLocality
-
}
while
(launchedTaskAtCurrentMaxLocality)
-
... ...
-
... ...
-
return
tasks
Pool.scala
-
override
def
getSortedTaskSetQueue
: ArrayBuffer[TaskSetManager] = {
-
val
sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
-
val
sortedSchedulableQueue =
-
schedulableQueue.asScala.toSeq.sortWith(
taskSetSchedulingAlgorithm
.comparator)
-
for
(schedulable <- sortedSchedulableQueue) {
-
sortedTaskSetQueue
++= schedulable.getSortedTaskSetQueue
-
sortedTaskSetQueue
-
private
val
taskSetSchedulingAlgorithm
: SchedulingAlgorithm = {
-
schedulingMode
match {
-
case
SchedulingMode.
FAIR
=>
-
new
FairSchedulingAlgorithm
()
-
case
SchedulingMode.FIFO =>
-
new
FIFOSchedulingAlgorithm
()
-
case
_ =>
-
…
…
4)FIFO和公平调度器规则
SchedulingAlgorithm.scala
-
private
[spark]
class
FIFOSchedulingAlgorithm
extends
SchedulingAlgorithm
{
-
override
def
comparator
(s1: Schedulable, s2: Schedulable):
Boolean
= {
-
val
priority1 = s1.priority
-
val
priority2 = s2.priority
-
var
res = math.signum(priority1 - priority2)
-
if
(res ==
0
) {
-
val
stageId1 = s1.stageId
-
val
stageId2 = s2.stageId
-
res = math.signum(stageId1 - stageId2)
-
res <
0
-
private
[spark]
class
FairSchedulingAlgorithm
extends
SchedulingAlgorithm
{
-
override
def comparator(s1: Schedulable, s2: Schedulable):
Boolean
= {
-
val
minShare1 = s1.minShare
-
val
minShare2 = s2.minShare
-
val
runningTasks1 = s1.runningTasks
-
val
runningTasks2 = s2.runningTasks
-
val
s1Needy = runningTasks1 < minShare1
-
val
s2Needy = runningTasks2 < minShare2
-
val
minShareRatio1 = runningTasks1.toDouble / math.max(minShare1,
1.0
)
-
val
minShareRatio2 = runningTasks2.toDouble / math.max(minShare2,
1.0
)
-
val
taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
-
val
taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
5)发送给Executor端执行任务
CoarseGrainedSchedulerBackend.scala
-
private
def
makeOffers
():
Unit
= {
-
val
taskDescs = withLock {
-
... ...
-
//
取任务
-
scheduler.
resourceOffers
(workOffers)
-
if
(taskDescs.nonEmpty) {
-
launchTasks
(taskDescs)
-
private
def
launchTasks
(tasks: Seq[Seq[TaskDescription]]):
Unit
= {
-
for
(task <- tasks.flatten) {
-
val
serializedTask = TaskDescription.encode(task)
-
if
(serializedTask.limit() >= maxRpcMessageSize) {
-
... ...
-
else
{
-
//
序列化任务发往
Executor
远程终端
-
executorData.
executorEndpoint
.
send
(
LaunchTask
(new SerializableBuffer(
serializedTask
)))
任务的执行
在CoarseGrainedExecutorBackend.scala中接收数据
LaunchTask
-
override
def receive: PartialFunction[Any,
Unit
] = {
-
... ...
-
case
LaunchTask
(
data
) =>
-
if
(executor ==
null
) {
-
exitExecutor(
1
,
"Received LaunchTask command but executor was null"
)
-
}
else
{
-
val
taskDesc = TaskDescription.
decode
(
data
.value)
-
logInfo(
"Got assigned task "
+ taskDesc.taskId)
-
taskResources(taskDesc.taskId) = taskDesc.resources
-
executor
.
launchTask
(
this
, taskDesc)
-
... ...
Executor.scala
-
def
launchTask
(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
-
val
tr
= new
TaskRunner
(context, taskDescription)
-
runningTasks.put(taskDescription.taskId,
tr
)
-
threadPool
.
execute
(
tr
)
Shuffle
Spark最初版本HashShuffle
Spark0.8.1版本以后优化后的HashShuffle
Spark1.1版本加入SortShuffle,默认是HashShuffle
Spark1.2版本默认是SortShuffle,但是可配置HashShuffle
Spark2.0版本删除HashShuffle只有SortShuffle
Shuffle
的原理和执行过程
Shuffle一定会有落盘。
如果shuffle过程中落盘数据量减少,那么可以提高性能。
算子如果存在预聚合功能,可以提高shuffle的性能。
HashShuffle
解析
未优化的HashShuffle
优化后的HashShuffle
优化的HashShuffle过程就是启用合并机制,合并机制就是复用buffer,开启合并机制的配置是
spark.shuffle.consolidateFiles
。该参数默认值为false,将其设置为true即可开启优化机制。通常来说,如果我们使用HashShuffleManager,那么都建议开启这个选项。
官网参数说明:
http://spark.apache.org/docs/0.8.1/configuration.html
SortShuffle
解析
SortShuffle
在该模式下,数据会先写入一个数据结构,reduceByKey写入Map,一边通过Map局部聚合,一边写入内存。Join算子写入ArrayList直接写入内存中。然后需要判断是否达到阈值,如果达到就会将内存数据结构的数据写入到磁盘,清空内存数据结构。
在溢写磁盘前,先根据key进行排序,排序过后的数据,会分批写入到磁盘文件中。默认批次为
10000
条,数据会以每批一万条写入到磁盘文件。写入磁盘文件通过缓冲区溢写的方式,每次溢写都会产生一个磁盘文件,
也就是说一个Task过程会产生多个临时文件。
最后在每个Task中,将所有的临时文件合并,这就是merge过程,此过程将所有临时文件读取出来,一次写入到最终文件。
意味着一个Task的所有数据都在这一个文件中。同时单独写一份索引文件,标识下游各个Task的数据在文件中的索引,start offset和end offset。
bypassShuffle
bypassShuffle和SortShuffle的区别就是不对数据排序。
bypass运行机制的触发条件如下:
1)shuffle reduce task数量小于等于
spark.shuffle.sort.bypassMergeThreshold
参数的值,默认为200。
2)不是聚合类的shuffle算子(比如reduceByKey不行)。
Shuffle
写磁盘
shuffleWriterProcessor(写处理器)
DAGScheduler.scala
-
private
def
submitMissingTasks
(stage: Stage, jobId: Int): Unit = {
-
... ...
-
val
tasks
: Seq[Task[_]] =
try
{
-
val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
-
stage match {
-
// shuffle
写过程
-
case
stage
:
ShuffleMapStage
=>
-
stage.pendingPartitions.clear()
-
partitionsToCompute.map { id =>
-
val locs = taskIdToLocations(id)
-
val part = partitions(id)
-
stage.pendingPartitions += id
-
new
ShuffleMapTask
(stage.id, stage.latestInfo.attemptNumber,
-
taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
-
Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
-
// shuffle
读过程
-
case
stage
:
ResultStage
=>
-
... ...
-
}
catch
{
-
... ...
Task.scala
-
private[
spark
] abstract class
Task
[
T
](
... ...
) extends Serializable {
-
final def run(... ...): T = {
-
runTask
(context)
Ctrl+h查找runTask 实现类ShuffleMapTask.scala
-
private[spark] class
ShuffleMapTask
(... ...)
-
extends
Task
[
MapStatus
](
... ...
){
-
override def
runTask
(context: TaskContext): MapStatus = {
-
... ...
-
dep.shuffleWriterProcessor.
write
(rdd, dep, mapId, context, partition)
ShuffleWriteProcessor.scala
-
def
write
(... ...): MapStatus = {
-
var
writer
: ShuffleWriter[Any, Any] =
null
-
try
{
-
val manager = SparkEnv.
get
.shuffleManager
-
writer = manager.
getWriter
[Any, Any](
-
dep.shuffleHandle
,
-
mapId,
-
context,
-
createMetricsReporter(context))
-
writer.
write
(
-
rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
-
writer.stop(success =
true
).
get
-
}
catch
{
-
... ...
查找(ctrl + h)ShuffleManager的实现类,SortShuffleManager
SortShuffleManager.scala
-
override
def
getWriter
[K, V]( handle: ShuffleHandle,
-
mapId: Long,
-
context: TaskContext,
-
metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] =
-
... ...
-
handle
match {
-
case
unsafeShuffleHandle
: SerializedShuffleHandle[K @
unchecked
, V @
unchecked
] =>
-
new
UnsafeShuffleWriter
(... ...)
-
case
bypassMergeSortHandle
: BypassMergeSortShuffleHandle[K @
unchecked
, V @
unchecked
] =>
-
new
BypassMergeSortShuffleWriter
(... ...)
-
case
other: BaseShuffleHandle[K @
unchecked
, V @
unchecked
, _] =>
-
new
SortShuffleWriter
(... ...)
因为getWriter的第一个输入参数是dep.shuffleHandle,点击dep.shuffleHandle
Dependency.scala
val
shuffleHandle
: ShuffleHandle = _rdd.context.env.shuffleManager.
registerShuffle
(shuffleId, this)
ShuffleManager.scala
def
registerShuffle
[K, V, C](shuffleId: Int, dependency: ShuffleDependency[K, V, C]): ShuffleHandle
使用BypassShuffle条件
BypassMergeSortShuffleHandle使用条件:
1)不能使用预聚合
2)如果下游的分区数量小于等于200(可配置)
处理器
|
写对象
|
判断条件
|
SerializedShuffleHandle
|
UnsafeShuffleWriter
|
1.序列化规则支持重定位操作(java序列化不支持,Kryo支持)
2.不能使用预聚合
3.如果下游的分区数量小于或等于1677216
|
BypassMergeSortShuffleHandle
|
BypassMergeSortShuffleWriter
|
1.不能使用预聚合
2.如果下游的分区数量小于等于200(可配置)
|
BaseShuffleHandle
|
SortShuffleWriter
|
其他情况
|
查找(ctrl + h)
registerShuffle
实现类,SortShuffleManager.scala
-
override
def
registerShuffle
[K, V, C](
-
shuffleId: Int,
-
dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
-
//
使用
BypassShuffle
条件:不能使用预聚合功能;默认下游分区数据不能大于
200
-
if
(SortShuffleWriter.
shouldBypassMergeSort
(conf, dependency)) {
-
new
BypassMergeSortShuffleHandle[K, V](
-
shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
-
}
else
if
(SortShuffleManager.
canUseSerializedShuffle
(dependency)) {
-
new
SerializedShuffleHandle[K, V](
-
shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
-
}
else
{
-
new
BaseShuffleHandle
(shuffleId, dependency)
点击
shouldBypassMergeSort
SortShuffleWriter.scala
-
private
[spark]
object
SortShuffleWriter
{
-
def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]):
Boolean
= {
-
//
是否有
map
阶段预聚合(支持预聚合不能用)
-
if
(dep.mapSideCombine) {
-
false
-
}
else
{
-
// SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD = 200
分区
-
val
bypassMergeThreshold:
Int
= conf.
get
(config.SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD)
-
//
如果下游分区器的数量,小于
200
(可配置),可以使用
bypass
-
dep.partitioner.numPartitions <= bypassMergeThreshold
使用SerializedShuffle条件
SerializedShuffleHandle使用条件:
1)序列化规则支持重定位操作(java序列化不支持,Kryo支持)
2)不能使用预聚合
3)如果下游的分区数量小于或等于1677216
点击
canUseSerializedShuffle
SortShuffleManager.scala
-
def
canUseSerializedShuffle
(dependency: ShuffleDependency[_, _, _]):
Boolean
= {
-
val
shufId = dependency.shuffleId
-
val
numPartitions = dependency.partitioner.numPartitions
-
//
是否支持将两个独立的序列化对象
重定位,聚合到一起
-
// 1
默认的
java
序列化不支持;
Kryo
序列化支持重定位(可以用)
-
if
(!dependency.serializer.supportsRelocationOfSerializedObjects) {
-
false
-
}
else
if
(dependency.mapSideCombine) {
// 2
支持预聚合也不能用
-
false
-
}
else
if
(numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) {
//3
如果下游分区的数量大于
16777216
,也不能用
-
false
-
}
else
{
使用BaseShuffle
点击SortShuffleWriter
SortShuffleWriter.scala
-
override
def
write
(records:
Iterator
[Product2[K, V]]): Unit = {
-
//
判断是否有预聚合功能,支持会有
aggregator
和排序规则
-
sorter
=
if
(dep.
mapSideCombine
) {
-
new ExternalSorter[K, V, C](
-
context, dep.aggregator,
Some
(dep.partitioner), dep.keyOrdering, dep.serializer)
-
}
else
{
-
new ExternalSorter[K, V, V](
-
context,
aggregator
=
None
,
Some
(dep.partitioner),
ordering = None
, dep.serializer)
-
//
插入数据
-
sorter.
insertAll
(records)
-
val mapOutputWriter = shuffleExecutorComponents.createMapOutputWriter(
-
dep.shuffleId, mapId, dep.partitioner.numPartitions)
-
//
插入数据
-
sorter.
writePartitionedMapOutput
(dep.shuffleId, mapId, mapOutputWriter)
-
val partitionLengths = mapOutputWriter.commitAllPartitions()
-
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths, mapId)
插入数据(缓存+溢写)
ExternalSorter.scala
-
def
insertAll
(records: Iterator[Product2[K, V]]):
Unit
= {
-
val
shouldCombine = aggregator.isDefined
-
//
判断是否支持预聚合,支持预聚合,采用
map
结构,不支持预聚合采用
buffer
结构
-
if
(
shouldCombine
) {
-
val
mergeValue = aggregator.
get
.mergeValue
-
val
createCombiner = aggregator.
get
.createCombiner
-
var
kv: Product2[K, V] =
null
-
val
update = (hadValue:
Boolean
, oldValue: C) => {
-
if
(hadValue) mergeValue(oldValue, kv._2)
else
createCombiner(kv._2)
-
while
(records.hasNext) {
-
addElementsRead()
-
kv = records.next()
-
//
如果支持预聚合,在
map
阶段聚合,将相同
key
,的
value
聚合
-
map
.changeValue((getPartition(kv._1), kv._1), update)
-
//
是否能够溢写
-
maybeSpillCollection
(usingMap =
true
)
-
}
else
{
-
while
(records.hasNext) {
-
addElementsRead()
-
val
kv = records.next()
-
//
如果不支持预聚合,
value
不需要聚合
(key
,
(value1,value2))
-
buffer
.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])
-
maybeSpillCollection
(usingMap =
false
)
-
private
def
maybeSpillCollection
(usingMap:
Boolean
):
Unit
= {
-
var
estimatedSize =
0L
-
if
(usingMap) {
-
estimatedSize = map.estimateSize()
-
if
(
maybeSpill
(map, estimatedSize)) {
-
map = new PartitionedAppendOnlyMap[K, C]
-
}
else
{
-
estimatedSize = buffer.estimateSize()
-
if
(maybeSpill(buffer, estimatedSize)) {
-
buffer = new PartitionedPairBuffer[K, C]
-
if
(estimatedSize > _peakMemoryUsedBytes) {
-
_peakMemoryUsedBytes = estimatedSize
Spillable.scala
-
protected
def
maybeSpill
(collection: C, currentMemory:
Long
):
Boolean
= {
-
var
shouldSpill =
false
-
// myMemoryThreshold
默认值内存门槛是
5m
-
if
(
elementsRead % 32 == 0
&&
currentMemory >= myMemoryThreshold
) {
-
val
amountToRequest =
2
* currentMemory - myMemoryThreshold
-
//
申请内存
-
val
granted = acquireMemory(amountToRequest)
-
myMemoryThreshold += granted
-
//
当前内存大于(尝试申请的内存
+
门槛),就需要溢写了
-
shouldSpill = currentMemory >= myMemoryThreshold
-
//
强制溢写
读取数据的值
超过了
Int
的最大值
-
shouldSpill = shouldSpill ||
_elementsRead > numElementsForceSpillThreshold
-
if
(shouldSpill) {
-
_spillCount +=
1
-
logSpillage(currentMemory)
-
//
溢
写
-
spill
(collection)
-
_elementsRead =
0
-
_memoryBytesSpilled += currentMemory
-
//
释放内存
-
releaseMemory
()
-
shouldSpill
-
protected
def
spill
(collection: C):
Unit
查找(ctrl +h)spill 的实现类ExternalSorter
ExternalSorter.scala
-
override
protected
[
this
] def
spill
(collection: WritablePartitionedPairCollection[K, C]):
Unit
= {
-
val
inMemoryIterator = collection.
destructiveSortedWritablePartitionedIterator
(comparator)
-
val
spillFile =
spillMemoryIteratorToDisk
(inMemoryIterator)
-
spills += spillFile
-
private
[
this
] def
spillMemoryIteratorToDisk
(inMemoryIterator: WritablePartitionedIterator)
-
: SpilledFile = {
-
//
创建临时文件
-
val
(blockId, file) = diskBlockManager.
createTempShuffleBlock
()
-
var
objectsWritten:
Long
=
0
-
val
spillMetrics: ShuffleWriteMetrics = new ShuffleWriteMetrics
-
//
溢写文件前,
fileBufferSize
缓冲区大小默认
32m
-
val
writer
: DiskBlockObjectWriter =
-
blockManager.getDiskWriter(blockId, file, serInstance,
fileBufferSize
, spillMetrics)
-
SpilledFile(file, blockId, batchSizes.toArray, elementsPerPartition)
merge合并
来到SortShuffleWriter.scala
-
override
def
write
(records:
Iterator
[Product2[K, V]]): Unit = {
-
sorter =
if
(dep.mapSideCombine) {
-
new ExternalSorter[K, V, C](
-
context, dep.aggregator,
Some
(dep.partitioner), dep.keyOrdering, dep.serializer)
-
}
else
{
-
new ExternalSorter[K, V, V](
-
context, aggregator =
None
,
Some
(dep.partitioner), ordering =
None
, dep.serializer)
-
sorter.
insertAll
(records)
-
val mapOutputWriter = shuffleExecutorComponents.createMapOutputWriter(
-
dep.shuffleId, mapId, dep.partitioner.numPartitions)
-
//
合并
-
sorter.
writePartitionedMapOutput
(dep.shuffleId, mapId, mapOutputWriter)
-
val partitionLengths = mapOutputWriter.
commitAllPartitions
()
-
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths, mapId)
ExternalSorter.scala
-
def
writePartitionedMapOutput
(
-
shuffleId:
Int
,
-
mapId:
Long
,
-
mapOutputWriter: ShuffleMapOutputWriter):
Unit
= {
-
var
nextPartitionId =
0
-
//
如果溢写文件为空,只对内存中数据处理
-
if
(spills.isEmpty) {
-
// Case where we only have in-memory data
-
... ...
-
}
else
{
-
// We must perform merge-sort; get an iterator by partition and write everything directly.
-
//
如果溢写文件不为空,需要将多个溢写文件合并
-
for
((id, elements) <-
this
.
partitionedIterator
) {
-
val
blockId = ShuffleBlockId(shuffleId, mapId, id)
-
var
partitionWriter
: ShufflePartitionWriter =
null
-
var
partitionPairsWriter: ShufflePartitionPairsWriter =
null
-
if
(partitionPairsWriter !=
null
) {
-
partitionPairsWriter.close()
-
nextPartitionId = id +
1
-
def
partitionedIterator
: Iterator[(
Int
, Iterator[Product2[K, C]])] = {
-
val
usingMap = aggregator.isDefined
-
val
collection: WritablePartitionedPairCollection[K, C] =
if
(usingMap) map
else
buffer
-
if
(
spills.isEmpty
) {
-
if
(ordering.isEmpty) {
-
groupByPartition(destructiveIterator(collection.partitionedDestructiveSortedIterator(None)))
-
}
else
{
-
groupByPartition(destructiveIterator(
-
collection.partitionedDestructiveSortedIterator(Some(keyComparator))))
-
}
else
{
-
//
合并溢写文件和内存中数据
-
merge
(spills, destructiveIterator(
-
collection.partitionedDestructiveSortedIterator(comparator)))
-
private
def
merge
(spills: Seq[SpilledFile], inMemory: Iterator[((
Int
, K), C)])
-
: Iterator[(
Int
, Iterator[Product2[K, C]])] = {
-
val
readers = spills.map(new SpillReader(_))
-
val
inMemBuffered = inMemory.buffered
-
(
0
until numPartitions).iterator.map { p =>
-
val
inMemIterator = new IteratorForPartition(p, inMemBuffered)
-
val
iterators = readers.map(_.readNextPartition()) ++ Seq(inMemIterator)
-
if
(aggregator.isDefined) {
-
(p, mergeWithAggregation(
-
iterators, aggregator.
get
.mergeCombiners, keyComparator, ordering.isDefined))
-
}
else
if
(ordering.isDefined) {
-
//
归并排序
-
(p,
mergeSort
(iterators, ordering.
get
))
-
}
else
{
-
(p, iterators.iterator.flatten)
来到SortShuffleWriter.scala
-
override
def
write
(records: Iterator[Product2[K, V]]): Unit = {
-
sorter =
if
(dep.mapSideCombine) {
-
new
ExternalSorter[K, V, C](
-
context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
-
}
else
{
-
new
ExternalSorter[K, V, V](
-
context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
-
sorter.insertAll(records)
-
val mapOutputWriter = shuffleExecutorComponents.createMapOutputWriter(
-
dep.shuffleId, mapId, dep.partitioner.numPartitions)
-
//
合并
-
sorter.
writePartitionedMapOutput
(dep.shuffleId, mapId, mapOutputWriter)
-
//
写磁盘
-
val partitionLengths = mapOutputWriter.
commitAllPartitions
()
-
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths, mapId)
-
查找(
ctrl + h
)
commitAllPartitions
实现类,来到
LocalDiskShuffleMapOutputWriter.java
-
public
long
[]
commitAllPartitions
() throws IOException {
-
if
(outputFileChannel !=
null
&& outputFileChannel.position() != bytesWrittenToMergedFile) {
-
... ...
-
cleanUp();
-
File resolvedTmp = outputTempFile !=
null
&& outputTempFile.isFile() ? outputTempFile :
null
;
-
blockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, resolvedTmp);
-
return
partitionLengths;
查找(ctrl + h)
commitAllPartitions实现类,
来到LocalDiskShuffleMapOutputWriter.java
-
public
long
[]
commitAllPartitions
() throws IOException {
-
if
(outputFileChannel !=
null
&& outputFileChannel.position() != bytesWrittenToMergedFile) {
-
... ...
-
cleanUp();
-
File resolvedTmp = outputTempFile !=
null
&& outputTempFile.isFile() ? outputTempFile :
null
;
-
blockResolver.
writeIndexFileAndCommit
(shuffleId, mapId, partitionLengths, resolvedTmp);
-
return
partitionLengths;
IndexShuffleBlockResolver.scala
-
def
writeIndexFileAndCommit
(
-
shuffleId:
Int
,
-
mapId:
Long
,
-
lengths: Array[
Long
],
-
dataTmp: File):
Unit
= {
-
val
indexFile
= getIndexFile(shuffleId, mapId)
-
val
indexTmp
= Utils.tempFileWith(indexFile)
-
try
{
-
val
dataFile = getDataFile(shuffleId, mapId)
-
synchronized {
-
val
existingLengths = checkIndexAndDataFile(indexFile, dataFile, lengths.length)
-
if
(existingLengths !=
null
) {
-
System.arraycopy(existingLengths,
0
, lengths,
0
, lengths.length)
-
if
(dataTmp !=
null
&& dataTmp.exists()) {
-
dataTmp.delete()
-
}
else
{
-
val
out
= new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexTmp)))
-
Utils.tryWithSafeFinally {
-
var
offset =
0L
-
out
.writeLong(offset)
-
for
(length <- lengths) {
-
offset += length
-
out
.writeLong(offset)
-
out
.close()
-
if
(
indexFile
.exists()) {
-
indexFile.delete()
-
if
(
dataFile
.exists()) {
-
dataFile.delete()
-
if
(!indexTmp.
renameTo
(
indexFile
)) {
-
throw
new IOException(
"fail to rename file "
+ indexTmp +
" to "
+ indexFile)
-
if
(dataTmp !=
null
&& dataTmp.exists() && !dataTmp.
renameTo
(
dataFile
)) {
-
throw
new IOException(
"fail to rename file "
+ dataTmp +
" to "
+ dataFile)
-
}
finally
{
-
... ...
Shuffle
读取磁盘
DAGScheduler.scala
-
private
def
submitMissingTasks
(stage: Stage, jobId: Int): Unit = {
-
... ...
-
val
tasks
: Seq[Task[_]] =
try
{
-
val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
-
stage match {
-
case
stage
:
ShuffleMapStage
=>
-
... ...
-
case
stage
:
ResultStage
=>
-
partitionsToCompute.map { id =>
-
val p: Int = stage.partitions(id)
-
val part = partitions(p)
-
val locs = taskIdToLocations(id)
-
new
ResultTask
(stage.id, stage.latestInfo.attemptNumber,
-
taskBinary, part, locs, id, properties, serializedTaskMetrics,
-
Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
-
stage.rdd.isBarrier())
-
}
catch
{
-
... ...
ResultTask.scala
-
private[
spark
] class
ResultTask
[
T, U
](
... ...
)
-
extends Task[
U
](
... ...
)
-
with Serializable {
-
override def
runTask
(context: TaskContext): U = {
-
func(context, rdd.
iterator
(partition, context))
RDD.scala
-
final
def
iterator
(split: Partition, context: TaskContext): Iterator[T] = {
-
if
(storageLevel != StorageLevel.NONE) {
-
getOrCompute
(split, context)
-
}
else
{
-
computeOrReadCheckpoint(split, context)
-
private[spark]
def
getOrCompute
(partition: Partition, context: TaskContext): Iterator[T] = {
-
... ...
-
computeOrReadCheckpoint
(partition, context)
-
... ...
-
def
computeOrReadCheckpoint
(split: Partition, context: TaskContext): Iterator[T] ={
-
if
(isCheckpointedAndMaterialized) {
-
firstParent[T].iterator(split, context)
-
}
else
{
-
compute
(split, context)
-
def
compute
(split: Partition, context: TaskContext): Iterator[T]
全局查找compute,由于我们是ShuffledRDD,所以点击ShuffledRDD.scala,搜索compute
-
override
def
compute
(
split
:
Partition
, context:
TaskContext
):
Iterator
[(
K
,
C
)] = {
-
val dep = dependencies.head.asInstanceOf[
ShuffleDependency
[
K
,
V
,
C
]]
-
val metrics = context.taskMetrics().createTempShuffleReadMetrics()
-
SparkEnv
.
get
.shuffleManager.
getReader
(
-
dep.shuffleHandle,
split
.index,
split
.index +
1
, context, metrics)
-
.
read
()
-
.asInstanceOf[
Iterator
[(
K
,
C
)]]
ShuffleManager.scala文件
def
getReader
[K, C](... ...): ShuffleReader[K, C]
查找(ctrl + h)
getReader
的实现类,SortShuffleManager.scala
-
override def
getReader
[
K, C
](
... ...
): ShuffleReader[K, C] = {
-
val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(
-
handle.shuffleId, startPartition, endPartition)
-
new
BlockStoreShuffleReader
(
-
handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, context, metrics,
-
shouldBatchFetch = canUseBatchFetch(startPartition, endPartition, context))
在BlockStoreShuffleReader.scala文件中查找read方法
-
override
def
read
(): Iterator[Product2[K, C]] = {
-
val wrappedStreams =
new
ShuffleBlockFetcherIterator(
-
... ...
-
//
读缓冲区大小
默认
48m
-
SparkEnv.
get
.conf.
get
(config.REDUCER_MAX_SIZE_IN_FLIGHT) *
1024
*
1024
,
-
SparkEnv.
get
.conf.
get
(config.REDUCER_MAX_REQS_IN_FLIGHT),
-
... ...
Spark内存管理
堆内和堆外内存
Spark支持堆内内存也支持堆外内存
1)堆内内存:程序在运行时动态地申请某个大小的内存空间
2)堆外内存:直接向操作系统进行申请的内存,不受JVM控制
堆内内存和对外内存优缺点
1)
堆外内存,相比于堆内内存有几个优势:
(1)减少了垃圾回收的工作,因为垃圾回收会暂停其他的工作
(2)加快了复制的速度。因为堆内在Flush到远程时,会先序列化,然后在发送;而堆外内存本身是序列化的相当于省略掉了这个工作。
说明:
堆外内存是序列化的
,其占用的内存大小可直接计算。
堆内内存是非序列化的对象
,其占用的内存是通过
周期性地采样近似估算而得
,即并不是每次新增的数据项都会计算一次占用的内存大小,这种方法降低了时间开销但是有可能误差较大,导致某一时刻的实际内存有可能远远超出预期。此外,在被Spark标记为释放的对象实例,很有可能在实际上并没有被JVM回收,导致实际可用的内存小于Spark记录的可用内存。所以
Spark并不能
准确记录实际可用的堆内内存,从而也就
无法完全避免内存溢出OOM
的异常。
2)堆外内存,相比于堆内内存有几个缺点:
(1)堆外内存难以控制,如果内存泄漏,那么很难排查
(2)堆外内存相对来说,不适合存储很复杂的对象。一般简单的对象或者扁平化的比较适合。
1)堆内内存大小设置:–executor-memory 或 spark.executor.memory
2)在默认情况下堆外内存并不启用,spark.memory.offHeap.enabled 参数启用,并由 spark.memory.offHeap.size 参数设定堆外空间的大小。
官网配置地址:http://spark.apache.org/docs/3.0.0/configuration.html
堆内内存空间分配
堆内内存包括:
存储(Storage)内存
、
执行(Execution)内存
、
其他内存
静态内存管理
在
Spark最初采用的静态内存管理机制
下,存储内存、执行内存和其他内存的大小在Spark应用程序运行期间均为固定的,但用户可以应用程序启动前进行配置,堆内内存的分配如图所示:
可以看到,可用的堆内内存的大小需要按照下列方式计算:
可用的存储内存 = systemMaxMemory * spark.storage.memoryFraction * spark.storage.safety Fraction
可用的执行内存 = systemMaxMemory * spark.shuffle.memoryFraction * spark.shuffle.safety Fraction
其中systemMaxMemory取决于当前JVM堆内内存的大小,最后可用的执行内存或者存储内存要在此基础上与各自的memoryFraction 参数和safetyFraction 参数相乘得出。上述计算公式中的两个 safetyFraction 参数,其意义在于在逻辑上预留出 1-safetyFraction 这么一块保险区域,降低因实际内存超出当前预设范围而导致 OOM 的风险(上文提到,对于非序列化对象的内存采样估算会产生误差)。值得注意的是,这个预留的保险区域仅仅是一种逻辑上的规划,在具体使用时 Spark 并没有区别对待,和"其它内存"一样交给了 JVM 去管理。
Storage内存和Execution内存都有预留空间,目的是防止OOM,因为Spark堆内内存大小的记录是不准确的,需要留出保险区域。
堆外的空间分配较为简单,只有存储内存和执行内存,如下图所示。可用的执行内存和存储内存占用的空间大小直接由参数spark.memory.storageFraction 决定,由于堆外内存占用的空间可以被精确计算,所以无需再设定保险区域。
静态内存管理机制实现起来较为简单,但如果用户不熟悉Spark的存储机制,或没有根据具体的数据规模和计算任务或做相应的配置,很容易造成"一半海水,一半火焰"的局面,即存储内存和执行内存中的一方剩余大量的空间,而另一方却早早被占满,不得不淘汰或移出旧的内容以存储新的内容。由于新的内存管理机制的出现,这种方式目前已经很少有开发者使用,出于兼容旧版本的应用程序的目的,Spark 仍然保留了它的实现。
统一内存管理
Spark1.6 之后
引入的统一内存管理机制,与静态内存管理的区别在于存储内存和执行内存共享同一块空间,可以动态占用对方的空闲区域,统一内存管理的堆内内存结构如图所示:
统一内存管理的堆外内存结构如下图所示:
其中最重要的优化在于动态占用机制,其规则如下:
-
设定基本的存储内存和执行内存区域(spark.storage.storageFraction参数),该设定确定了双方各自拥有的空间的范围;
-
双方的空间都不足时,则存储到硬盘;若己方空间不足而对方空余时,可借用对方的空间;(存储空间不足是指不足以放下一个完整的Block)
-
执行内存的空间被对方占用后,可让对方将占用的部分转存到硬盘,然后"归还"借用的空间;
-
存储内存的空间被对方占用后,无法让对方"归还",因为需要考虑 Shuffle过程中的很多因素,实现起来较为复杂。
统一内存管理的动态占用机制如图所示:
凭借统一内存管理机制,Spark在一定程度上提高了堆内和堆外内存资源的利用率,降低了开发者维护Spark内存的难度,但并不意味着开发者可以高枕无忧。如果存储内存的空间太大或者说缓存的数据过多,反而会导致频繁的全量垃圾回收,降低任务执行时的性能,因为缓存的RDD数据通常都是长期驻留内存的。所以要想充分发挥Spark的性能,需要开发者进一步了解存储内存和执行内存各自的管理方式和实现原理。
4.2.3 内存空间分配
全局查找(ctrl + n)SparkEnv,并找到create方法
SparkEnv.scala
-
private def
create
(
-
conf: SparkConf,
-
executorId:
String
,
-
bindAddress:
String
,
-
advertiseAddress:
String
,
-
port:
Option
[
Int
],
-
isLocal:
Boolean
,
-
numUsableCores:
Int
,
-
ioEncryptionKey:
Option
[
Array
[
Byte
]],
-
listenerBus: LiveListenerBus =
null
,
-
mockOutputCommitCoordinator:
Option
[OutputCommitCoordinator] =
None
): SparkEnv = {
-
... ...
-
val memoryManager: MemoryManager =
UnifiedMemoryManager
(conf, numUsableCores)
-
... ...
UnifiedMemoryManager.scala
-
def apply(conf: SparkConf, numCores:
Int
): UnifiedMemoryManager = {
-
//
获取最大的可用内存为总内存的
0.6
-
val
maxMemory
=
getMaxMemory
(conf)
-
//
最大可用内存的
0.5
MEMORY_STORAGE_FRACTION=0.5
-
new
UnifiedMemoryManager
(
-
conf,
-
maxHeapMemory = maxMemory,
-
onHeapStorageRegionSize =
-
(
maxMemory
* conf.
get
(config.
MEMORY
_STORAGE_FRACTION)).toLong,
-
numCores = numCores)
-
private
def
getMaxMemory
(conf: SparkConf):
Long
= {
-
//
获取系统内存
-
val
systemMemory = conf.
get
(TEST_MEMORY)
-
//
获取系统预留内存,默认
300m
(
RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024
)
-
val
reservedMemory
= conf.getLong(
TEST_RESERVED_MEMORY
.key,
-
if
(conf.contains(IS_TESTING))
0
else
RESERVED_SYSTEM_MEMORY_BYTES
)
-
val
minSystemMemory = (reservedMemory *
1.5
).ceil.toLong
-
if
(systemMemory < minSystemMemory) {
-
throw
new IllegalArgumentException(s
"System memory
$systemMemory
must "
+
-
s
"be at least
$minSystemMemory
. Please increase heap size using the --driver-memory "
+
-
s
"option or
${config.DRIVER_MEMORY.key}
in Spark configuration."
)
-
if
(conf.contains(config.EXECUTOR_MEMORY)) {
-
val
executorMemory = conf.getSizeAsBytes(config.EXECUTOR_MEMORY.key)
-
if
(executorMemory < minSystemMemory) {
-
throw
new IllegalArgumentException(s
"Executor memory
$executorMemory
must be at least "
+
-
s
"
$minSystemMemory
. Please increase executor memory using the "
+
-
s
"--executor-memory option or
${config.EXECUTOR_MEMORY.key}
in Spark configuration."
)
-
val
usableMemory
=
systemMemory
-
reservedMemory
-
val
memoryFraction = conf.
get
(config.MEMORY_FRACTION)
-
(
usableMemory
*
memoryFraction
).toLong
config\package.scala
-
private
[spark]
val
MEMORY_FRACTION = ConfigBuilder(
"spark.memory.fraction"
)
-
... ...
-
.createWithDefault(
0.6
)
点击UnifiedMemoryManager.apply()中的UnifiedMemoryManager
-
private
[spark]
class
UnifiedMemoryManager
(
-
conf: SparkConf,
-
val
maxHeapMemory:
Long
,
-
onHeapStorageRegionSize:
Long
,
-
numCores:
Int
)
-
extends
MemoryManager
(
-
conf,
-
numCores,
-
onHeapStorageRegionSize,
-
maxHeapMemory
-
onHeapStorageRegionSize
) {
//
执行内存
0.6 -0.3 = 0.3
点击MemoryManager
MemoryManager.scala
-
private
[spark]
abstract
class
MemoryManager
(
-
conf: SparkConf,
-
numCores:
Int
,
-
onHeapStorageMemory:
Long
,
-
onHeapExecutionMemory
:
Long
) extends Logging {
//
执行内存
0.6 -0.3 = 0.3
-
... ...
-
//
堆内存储内存
-
protected
val
onHeapStorageMemoryPool
= new StorageMemoryPool(
this
, MemoryMode.ON_HEAP)
-
//
堆外存储内存
-
protected
val
offHeapStorageMemoryPool
= new StorageMemoryPool(
this
, MemoryMode.OFF_HEAP)
-
//
堆内执行内存
-
protected
val
onHeapExecutionMemoryPool
= new ExecutionMemoryPool(
this
, MemoryMode.ON_HEAP)
-
//
堆外执行内存
-
protected
val
offHeapExecutionMemoryPool
= new ExecutionMemoryPool(
this
, MemoryMode.OFF_HEAP)
-
protected
[
this
]
val
maxOffHeapMemory = conf.
get
(MEMORY_OFFHEAP_SIZE)
-
//
堆外内存
MEMORY_STORAGE_FRACTION = 0.5
-
protected
[
this
]
val
offHeapStorageMemory =
-
(maxOffHeapMemory * conf.
get
(
MEMORY_STORAGE_FRACTION
)).toLong
-
... ...
存储内存管理
RDD的持久化机制
弹性分布式数据集(RDD)作为 Spark 最根本的数据抽象,是只读的分区记录(Partition)的集合,只能基于在稳定物理存储中的数据集上创建,或者在其他已有的RDD上执行转换(Transformation)操作产生一个新的RDD。转换后的RDD与原始的RDD之间产生的依赖关系,构成了血统(Lineage)。凭借血统,Spark 保证了每一个RDD都可以被重新恢复。但RDD的所有转换都是惰性的,即只有当一个返回结果给Driver的行动(Action)发生时,Spark才会创建任务读取RDD,然后真正触发转换的执行。
Task在启动之初读取一个分区时,会先判断这个分区是否已经被持久化,如果没有则需要检查Checkpoint 或按照血统重新计算。所以如果一个 RDD 上要执行多次行动,可以在第一次行动中使用 persist或cache 方法,在内存或磁盘中持久化或缓存这个RDD,从而在后面的行动时提升计算速度。
事实上,cache 方法是使用默认的 MEMORY_ONLY 的存储级别将 RDD 持久化到内存,故缓存是一种特殊的持久化。 堆内和堆外存储内存的设计,便可以对缓存RDD时使用的内存做统一的规划和管理。
RDD的持久化由 Spark的Storage模块负责,实现了RDD与物理存储的解耦合。Storage模块负责管理Spark在计算过程中产生的数据,将那些在内存或磁盘、在本地或远程存取数据的功能封装了起来。在具体实现时Driver端和 Executor 端的Storage模块构成了主从式的架构,即Driver端的BlockManager为Master,Executor端的BlockManager 为 Slave。
Storage模块在逻辑上以Block为基本存储单位,RDD的每个Partition经过处理后唯一对应一个 Block(BlockId 的格式为rdd_RDD-ID_PARTITION-ID )。Driver端的Master负责整个Spark应用程序的Block的元数据信息的管理和维护,而Executor端的Slave需要将Block的更新等状态上报到Master,同时接收Master 的命令,例如新增或删除一个RDD。
在对RDD持久化时,Spark规定了MEMORY_ONLY、MEMORY_AND_DISK 等7种不同的存储级别,而存储级别是以下5个变量的组合:
-
class
StorageLevel
private
(
-
private
var
_useDisk:
Boolean
,
//
磁盘
-
private
var
_useMemory:
Boolean
,
//
这里其实是指堆内内存
-
private
var
_useOffHeap:
Boolean
,
//
堆外内存
-
private
var
_deserialized:
Boolean
,
//
是否为非序列化
-
private
var
_replication:
Int
=
1
//
副本个数
Spark
中
7
种存储级别如下:
持久化级别
|
含义
|
MEMORY_ONLY
|
以非序列化的Java对象的方式持久化在JVM内存中。如果内存无法完全存储RDD所有的partition,那么那些没有持久化的partition就会在下一次需要使用它们的时候,重新被计算
|
MEMORY_AND_DISK
|
同上,但是当某些partition无法存储在内存中时,会持久化到磁盘中。下次需要使用这些partition时,需要从磁盘上读取
|
MEMORY_ONLY_SER
|
同MEMORY_ONLY,但是会使用Java序列化方式,将Java对象序列化后进行持久化。可以减少内存开销,但是需要进行反序列化,因此会加大CPU开销
|
MEMORY_AND_DISK_SER
|
同MEMORY_AND_DISK,但是使用序列化方式持久化Java对象
|
DISK_ONLY
|
使用非序列化Java对象的方式持久化,完全存储到磁盘上
|
MEMORY_ONLY_2
MEMORY_AND_DISK_2
等等
|
如果是尾部加了2的持久化级别,表示将持久化数据复用一份,保存到其他节点,从而在数据丢失时,不需要再次计算,只需要使用备份数据即可
|
通过对数据结构的分析,可以看出存储级别从三个维度定义了RDD的 Partition(同时也就是Block)的存储方式:
存储位置
:磁盘/堆内内存/堆外内存。如MEMORY_AND_DISK是同时在磁盘和堆内内存上存储,实现了冗余备份。OFF_HEAP 则是只在堆外内存存储,目前选择堆外内存时不能同时存储到其他位置。
存储形式
:Block 缓存到存储内存后,是否为非序列化的形式。如 MEMORY_ONLY是非序列化方式存储,OFF_HEAP 是序列化方式存储。
副本数量
:大于1时需要远程冗余备份到其他节点。如DISK_ONLY_2需要远程备份1个副本。
-
RDD的缓存过程
RDD 在缓存到存储内存之前,Partition中的数据一般以迭代器(
Iterator
)的数据结构来访问,这是Scala语言中一种遍历数据集合的方法。通过Iterator可以获取分区中每一条序列化或者非序列化的数据项(Record),这些Record的对象实例在逻辑上占用了JVM堆内内存的other部分的空间,同一Partition的不同 Record 的存储空间并不连续。
RDD 在缓存到存储内存之后,Partition 被转换成Block,Record在堆内或堆外存储内存中占用一块连续的空间。将Partition由不连续的存储空间转换为连续存储空间的过程,Spark称之为"展开"(Unroll)。
Block 有序列化和非序列化两种存储格式,具体以哪种方式取决于该 RDD 的存储级别。非序列化的Block以一种 DeserializedMemoryEntry 的数据结构定义,用一个数组存储所有的对象实例,序列化的Block则以SerializedMemoryEntry的数据结构定义,用字节缓冲区(ByteBuffer)来存储二进制数据。每个 Executor 的 Storage模块用一个链式Map结构(LinkedHashMap)来管理堆内和堆外存储内存中所有的Block对象的实例,对这个LinkedHashMap新增和删除间接记录了内存的申请和释放。
因为不能保证存储空间可以一次容纳 Iterator 中的所有数据,当前的计算任务在 Unroll 时要向 MemoryManager 申请足够的Unroll空间来临时占位,空间不足则Unroll失败,空间足够时可以继续进行。
对于序列化的Partition,其所需的Unroll空间可以直接累加计算,一次申请。
对于非序列化的 Partition 则要在遍历 Record 的过程中依次申请,即每读取一条 Record,采样估算其所需的Unroll空间并进行申请,空间不足时可以中断,释放已占用的Unroll空间。
如果最终Unroll成功,当前Partition所占用的Unroll空间被转换为正常的缓存 RDD的存储空间,如下图所示。
在静态内存管理时,Spark 在存储内存中专门划分了一块 Unroll 空间,其大小是固定的,统一内存管理时则没有对 Unroll 空间进行特别区分,当存储空间不足时会根据动态占用机制进行处理。
淘汰与落盘
由于同一个Executor的所有的计算任务共享有限的存储内存空间,当有新的 Block 需要缓存但是剩余空间不足且无法动态占用时,就要对LinkedHashMap中的旧Block进行淘汰(Eviction),而被淘汰的Block如果其存储级别中同时包含存储到磁盘的要求,则要对其进行落盘(Drop),否则直接删除该Block。
存储内存的淘汰规则为:
被淘汰的旧Block要与新Block的MemoryMode相同,即同属于堆外或堆内内存;
新旧Block不能属于同一个RDD,避免循环淘汰;
旧Block所属RDD不能处于被读状态,避免引发一致性问题;
遍历LinkedHashMap中Block,按照最近最少使用(LRU)的顺序淘汰,直到满足新Block所需的空间。其中LRU是LinkedHashMap的特性。
落盘的流程则比较简单,如果其存储级别符合_useDisk为true的条件,再根据其_deserialized判断是否是非序列化的形式,若是则对其进行序列化,最后将数据存储到磁盘,在Storage模块中更新其信息。
执行内存管理
执行内存主要用来存储任务在执行Shuffle时占用的内存,Shuffle是按照一定规则对RDD数据重新分区的过程,我们来看Shuffle的Write和Read两阶段对执行内存的使用:
1)Shuffle Write
若在map端选择普通的排序方式,会采用ExternalSorter进行外排,在内存中存储数据时主要占用堆内执行空间。
若在map端选择 Tungsten 的排序方式,则采用ShuffleExternalSorter直接对以序列化形式存储的数据排序,在内存中存储数据时可以占用堆外或堆内执行空间,取决于用户是否开启了堆外内存以及堆外执行内存是否足够。
2)Shuffle Read
在对reduce端的数据进行聚合时,要将数据交给Aggregator处理,在内存中存储数据时占用堆内执行空间。
如果需要进行最终结果排序,则要将再次将数据交给ExternalSorter 处理,占用堆内执行空间。
在ExternalSorter和Aggregator中,Spark会使用一种叫AppendOnlyMap的哈希表在堆内执行内存中存储数据,但在 Shuffle 过程中所有数据并不能都保存到该哈希表中,当这个哈希表占用的内存会进行周期性地采样估算,当其大到一定程度,无法再从MemoryManager 申请到新的执行内存时,Spark就会将其全部内容存储到磁盘文件中,这个过程被称为溢存(Spill),溢存到磁盘的文件最后会被归并(Merge)。
Shuffle Write 阶段中用到的Tungsten是Databricks公司提出的对Spark优化内存和CPU使用的计划(钨丝计划),解决了一些JVM在性能上的限制和弊端。Spark会根据Shuffle的情况来自动选择是否采用Tungsten排序。
Tungsten 采用的页式内存管理机制建立在MemoryManager之上,即 Tungsten 对执行内存的使用进行了一步的抽象,这样在 Shuffle 过程中无需关心数据具体存储在堆内还是堆外。
每个内存页用一个MemoryBlock来定义,并用 Object obj 和 long offset 这两个变量统一标识一个内存页在系统内存中的地址。
堆内的MemoryBlock是以long型数组的形式分配的内存,其obj的值为是这个数组的对象引用,offset是long型数组的在JVM中的初始偏移地址,两者配合使用可以定位这个数组在堆内的绝对地址;堆外的 MemoryBlock是直接申请到的内存块,其obj为null,offset是这个内存块在系统内存中的64位绝对地址。Spark用MemoryBlock巧妙地将堆内和堆外内存页统一抽象封装,并用页表(pageTable)管理每个Task申请到的内存页。
Tungsten 页式管理下的所有内存用64位的逻辑地址表示,由页号和页内偏移量组成:
页号:占13位,唯一标识一个内存页,Spark在申请内存页之前要先申请空闲页号。
页内偏移量:占51位,是在使用内存页存储数据时,数据在页内的偏移地址。
有了统一的寻址方式,Spark 可以用64位逻辑地址的指针定位到堆内或堆外的内存,整个Shuffle Write排序的过程只需要对指针进行排序,并且无需反序列化,整个过程非常高效,对于内存访问效率和CPU使用效率带来了明显的提升。
Spark的存储内存和执行内存有着截然不同的管理方式:对于存储内存来说,Spark用一个LinkedHashMap来集中管理所有的Block,Block由需要缓存的 RDD的Partition转化而成;而对于执行内存,Spark用AppendOnlyMap来存储 Shuffle过程中的数据,在Tungsten排序中甚至抽象成为页式内存管理,开辟了全新的JVM内存管理机制。