`

Spark 以及 spark streaming 核心原理及实践 - (1)

 
阅读更多

导语

spark 已经成为广告、报表以及推荐系统等大数据计算场景中首选系统,因效率高,易用以及通用性越来越得到大家的青睐,我自己最近半年在接触spark以及spark streaming之后,对spark技术的使用有一些自己的经验积累以及心得体会,在此分享给大家。

本文依次从:

spark生态,原理,基本概念,

spark streaming原理及实践,

还有spark调优

以及环境搭建

等方面进行介绍,希望对大家有所帮助。

 

Spark 生态及运行原理

 

Spark 特点

  1. 运行速度快 => Spark拥有DAG执行引擎,支持在内存中对数据进行迭代计算。官方提供的数据表明,如果数据由磁盘读取,速度是Hadoop MapReduce的10倍以上,如果数据从内存中读取,速度可以高达100多倍。

  2. 适用场景广泛 => 大数据分析统计,实时数据处理,图计算及机器学习

  3. 易用性 => 编写简单,支持80种以上的高级算子,支持多种语言,数据源丰富,可部署在多种集群中

  4. 容错性高。Spark引进了弹性分布式数据集RDD (Resilient Distributed Dataset) 的抽象,它是分布在一组节点中的只读对象集合,这些集合是弹性的,如果数据集一部分丢失,则可以根据“血统”(即充许基于数据衍生过程)对它们进行重建。另外在RDD计算时可以通过CheckPoint来实现容错,而CheckPoint有两种方式:CheckPoint Data,和Logging The Updates,用户可以控制采用哪种方式来实现容错。

Spark的适用场景

目前大数据处理场景有以下几个类型:

  1. 复杂的批量处理(Batch Data Processing),偏重点在于处理海量数据的能力,至于处理速度可忍受,通常的时间可能是在数十分钟到数小时;

  2. 基于历史数据的交互式查询(Interactive Query),通常的时间在数十秒到数十分钟之间

  3. 基于实时数据流的数据处理(Streaming Data Processing),通常在数百毫秒到数秒之间

Spark成功案例

目前大数据在互联网公司主要应用在广告、报表、推荐系统等业务上

> 在广告业务方面需要大数据做应用分析、效果分析、定向优化等,

> 在推荐系统方面则需要大数据优化相关排名、个性化推荐以及热点点击分析等。

这些应用场景的普遍特点是计算量大、效率要求高。

腾讯 / yahoo / 淘宝 / 优酷土豆

 

spark运行架构

spark基础运行架构如下所示:

 spark结合yarn集群背后的运行流程如下所示:

 

spark 运行流程:

Spark架构采用了分布式计算中的Master-Slave模型。Master是对应集群中的含有Master进程的节点,Slave是集群中含有Worker进程的节点。

  • Master作为整个集群的控制器,负责整个集群的正常运行;

  • Worker相当于计算节点,接收主节点命令与进行状态汇报;

  • Executor负责任务的执行;

  • Client作为用户的客户端负责提交应用;

  • Driver负责控制一个应用的执行。

Spark集群部署后,需要在主节点和从节点分别启动Master进程和Worker进程,对整个集群进行控制。在一个Spark应用的执行过程中,Driver和Worker是两个重要角色。Driver 程序是应用逻辑执行的起点,负责作业的调度,即Task任务的分发,而多个Worker用来管理计算节点和创建Executor并行处理任务。在执行阶段,Driver会将Task和Task所依赖的file和jar序列化后传递给对应的Worker机器,同时Executor对相应数据分区的任务进行处理。

  1. Excecutor /Task 每个程序自有,不同程序互相隔离,task多线程并行

  2. 集群对Spark透明,Spark只要能获取相关节点和进程

  3. Driver 与Executor保持通信,协作处理

三种集群模式:

1.Standalone 独立集群

2.Mesos, apache mesos

3.Yarn, hadoop yarn

基本概念:

  • Application =>Spark的应用程序,包含一个Driver program和若干Executor

  • SparkContext => Spark应用程序的入口,负责调度各个运算资源,协调各个Worker Node上的Executor

  • Driver Program => 运行Application的main()函数并且创建SparkContext

  • Executor => 是为Application运行在Worker node上的一个进程,该进程负责运行Task,并且负责将数据存在内存或者磁盘上。每个Application都会申请各自的Executor来处理任务

  • Cluster Manager =>在集群上获取资源的外部服务 (例如:Standalone、Mesos、Yarn)

  • Worker Node => 集群中任何可以运行Application代码的节点,运行一个或多个Executor进程

  • Task => 运行在Executor上的工作单元

  • Job => SparkContext提交的具体Action操作,常和Action对应

  • Stage => 每个Job会被拆分很多组task,每组任务被称为Stage,也称TaskSet

  • RDD => 是Resilient distributed datasets的简称,中文为弹性分布式数据集;是Spark最核心的模块和类

  • DAGScheduler => 根据Job构建基于Stage的DAG,并提交Stage给TaskScheduler

  • TaskScheduler => 将Taskset提交给Worker node集群运行并返回结果

  • Transformations => 是Spark API的一种类型,Transformation返回值还是一个RDD,所有的Transformation采用的都是懒策略,如果只是将Transformation提交是不会执行计算的

  • Action => 是Spark API的一种类型,Action返回值不是一个RDD,而是一个scala集合;计算只有在Action被提交的时候计算才被触发。

Spark核心概念之RDD

Spark核心概念之Transformations / Actions

Transformation返回值还是一个RDD。它使用了链式调用的设计模式,对一个RDD进行计算后,变换成另外一个RDD,然后这个RDD又可以进行另外一次转换。这个过程是分布式的。 Action返回值不是一个RDD。它要么是一个Scala的普通集合,要么是一个值,要么是空,最终或返回到Driver程序,或把RDD写入到文件系统中。

Action是返回值返回给driver或者存储到文件,是RDD到result的变换,Transformation是RDD到RDD的变换。

只有action执行时,rdd才会被计算生成,这是rdd懒惰执行的根本所在。

 

Spark核心概念之Jobs / Stage

Job => 包含多个task的并行计算,一个action触发一个job

stage => 一个job会被拆为多组task,每组任务称为一个stage,以shuffle进行划分

Spark核心概念之Shuffle

以reduceByKey为例解释shuffle过程。

在没有task的文件分片合并下的shuffle过程如下:(spark.shuffle.consolidateFiles=false

fetch 来的数据存放到哪里?

刚 fetch 来的 FileSegment 存放在 softBuffer 缓冲区,经过处理后的数据放在内存 + 磁盘上。这里我们主要讨论处理后的数据,可以灵活设置这些数据是“只用内存”还是“内存+磁盘”。如果spark.shuffle.spill = false就只用内存。由于不要求数据有序,shuffle write 的任务很简单:将数据 partition 好,并持久化。之所以要持久化,一方面是要减少内存存储空间压力,另一方面也是为了 fault-tolerance。

shuffle之所以需要把中间结果放到磁盘文件中,是因为虽然上一批task结束了,下一批task还需要使用内存。如果全部放在内存中,内存会不够。另外一方面为了容错,防止任务挂掉。

存在问题如下:

  1. 产生的 FileSegment 过多。每个 ShuffleMapTask 产生 R(reducer 个数)个 FileSegment,M 个 ShuffleMapTask 就会产生 M * R 个文件。一般 Spark job 的 M 和 R 都很大,因此磁盘上会存在大量的数据文件。

  2. 缓冲区占用内存空间大。每个 ShuffleMapTask 需要开 R 个 bucket,M 个 ShuffleMapTask 就会产生 MR 个 bucket。虽然一个 ShuffleMapTask 结束后,对应的缓冲区可以被回收,但一个 worker node 上同时存在的 bucket 个数可以达到 cores R 个(一般 worker 同时可以运行 cores 个 ShuffleMapTask),占用的内存空间也就达到了cores× R × 32 KB。对于 8 核 1000 个 reducer 来说,占用内存就是 256MB。

为了解决上述问题,我们可以使用文件合并的功能。

在进行task的文件分片合并下的shuffle过程如下:(spark.shuffle.consolidateFiles=true

可以明显看出,在一个 core 上连续执行的 ShuffleMapTasks 可以共用一个输出文件 ShuffleFile。先执行完的 ShuffleMapTask 形成 ShuffleBlock i,后执行的 ShuffleMapTask 可以将输出数据直接追加到 ShuffleBlock i 后面,形成 ShuffleBlock i',每个 ShuffleBlock 被称为 FileSegment。下一个 stage 的 reducer 只需要 fetch 整个 ShuffleFile 就行了。这样,每个 worker 持有的文件数降为 cores× R。FileConsolidation 功能可以通过spark.shuffle.consolidateFiles=true来开启。

Spark核心概念之Cache

val rdd1 = ... // 读取hdfs数据,加载成RDD
rdd1.cache

val rdd2 = rdd1.map(...)
val rdd3 = rdd1.filter(...)

rdd2.take(10).foreach(println)
rdd3.take(10).foreach(println)

rdd1.unpersist

cache和unpersisit两个操作比较特殊,他们既不是action也不是transformation。cache会将标记需要缓存的rdd,真正缓存是在第一次被相关action调用后才缓存;unpersisit是抹掉该标记,并且立刻释放内存。只有action执行时,rdd1才会开始创建并进行后续的rdd变换计算。

cache其实也是调用的persist持久化函数,只是选择的持久化级别为MEMORY_ONLY

persist支持的RDD持久化级别如下:

需要注意的问题:

Cache或shuffle场景序列化时, spark序列化不支持protobuf message,需要java 可以serializable的对象。一旦在序列化用到不支持java serializable的对象就会出现上述错误。

Spark只要写磁盘,就会用到序列化。除了shuffle阶段和persist会序列化,其他时候RDD处理都在内存中,不会用到序列化。

 

 

 

 

 

https://www.cnblogs.com/liuliliuli2017/p/6809094.html

分享到:
评论

相关推荐

    Spark以及sparkstreaming核心原理及实践

    本文依次从spark生态,原理,基本概念,sparkstreaming原理及实践,还有spark调优以及环境搭建等方面进行介绍,希望对大家有所帮助。运行速度快Spark拥有DAG执行引擎,支持在内存中对数据进行迭代计

    3.Spark编程模型(上)--概念及SparkShell实战.pdf

    7.SparkStreaming(上)--SparkStreaming原理介绍.pdf 7.SparkStreaming(下)--SparkStreaming实战.pdf 8.SparkMLlib(上)--机器学习及SparkMLlib简介.pdf 8.SparkMLlib(下)--SparkMLlib实战.pdf 9.SparkGraphX...

    SparkStreaming原理介绍

    Spark Streaming 是 Spark 核心 API 的一个扩展,可以实现高吞吐量的、具备容错机制的 实时流数据的处理。支持从多种数据源获取数据,包括 Kafk、Flume、Twitter、ZeroMQ、Kinesis 以及 TCP sockets,从数据源获取...

    Spark从入门到精通

    2、现场动手画图讲解Spark原理以及源码(绝对不是干讲源码和PPT); 3、覆盖Spark所有功能点(Spark RDD、Spark SQL、Spark Streaming,初级功能到高级特性,一个不少); 4、Scala全程案例实战讲解(近百个趣味性...

    Spark入门实战系列(上)-实时流计算SparkStreaming原理介绍

    SparkStreaming是Spark核心API的一个扩展,可以实现高吞吐量的、具备容错机制的实时流数据的处理。支持从多种数据源获取数据,包括Kafk、Flume、Twitter、ZeroMQ、Kinesis以及TCPsockets,从数据源获取数据之后,...

    Spark技术内幕深入解析Spark内核架构设计与实现原理

    本书以源码为基础,深入分析spark内核的设计理念和架构实现,系统讲解各个核心模块的实现,为性能调优、二次开发和系统运维提供理论支持,为更好地使用Spark Streaming、MLlib、Spark SQL和GraphX等奠定基础。

    Spark核心技术与高级应用

    本书共分为四大部分:, 基础篇(1~10章)介绍了Spark的用途、扩展、安装、运行模式、程序开发、编程模型、工作原理,以及SparkSQL、SparkStreaming、MLlib、GraphX、Bagel等重要的扩展;, 实战篇(11~14)讲解了...

    Spark分布式内存计算框架视频教程

    5.SparkStreaming工作原理 6.DStream及函数 7.集成Kafka 8.案例:百度搜索风云榜(实时ELT、窗口Window和状态State) 9.SparkStreaming Checkpoint 10.消费Kafka偏移量管理 第六章、StructuredStreaming模块 1....

    深入理解Spark 核心思想与源码分析

    , 扩展篇(第8~11章),主要讲解基于Spark核心的各种扩展及应用,包括SQL处理引擎、Hive处理、流式计算框架Spark Streaming、图计算框架GraphX、机器学习库MLlib等内容。通过阅读这部分内容,读者可以扩展实际项目...

    基于Spark Streaming+ALS的餐饮智能推荐系统.zip

    通过学习Spark,我了解了其基本概念和核心组件,如RDD(弹性分布式数据集)、Spark SQL、Spark Streaming和MLlib等。我学会了使用Spark编写分布式的数据处理程序,并通过调优技巧提高了程序的性能。在实践过程中,我...

    基于spark streaming和kafka,hbase的日志统计分析系统.zip

    通过学习Spark,我了解了其基本概念和核心组件,如RDD(弹性分布式数据集)、Spark SQL、Spark Streaming和MLlib等。我学会了使用Spark编写分布式的数据处理程序,并通过调优技巧提高了程序的性能。在实践过程中,我...

    SparkStreaming实时计算框架介绍

    本文将详细介绍SparkStreaming实时计算框架的原理与特点、适用场景。Spark是一个类似于MapReduce的分布式计算框架,其核心是弹性分布式数据集,提供了比MapReduce更丰富的模型,可以在快速在内存中对数据集进行多次...

    spark商业实战三部曲

    第2章 Spark2.2技术及原理... 14 2.1 Spark 2.2综述... 14 2.1.1 连续应用程序... 14 2.1.2 新的API 15 2.2 Spark 2.2 Core. 16 2.2.1 第二代Tungsten引擎... 16 2.2.2 SparkSession. 16 2.2.3 累加器API 17 ...

    Spark技术内幕 深入解析Spark内核架构设计与实现原理(全)

    本书以源码为基础,深入分析Spark内核的设计理念和架构实现,系统讲解各个核心模块的实现,为性能调优、二次开发和系统运维提供理论支持,为更好地使用Spark Streaming、MLlib、Spark SQL和GraphX等奠定基础。

    spark 高级数据分析 高清 书签

    , 扩展篇(第8~11章),主要讲解基于Spark核心的各种扩展及应用,包括SQL处理引擎、Hive处理、流式计算框架Spark Streaming、图计算框架GraphX、机器学习库MLlib等内容。通过阅读这部分内容,读者可以扩展实际项目...

Global site tag (gtag.js) - Google Analytics