Spark 开篇

type
Post
status
Published
summary
Spark 借鉴了 MapReduce 思想发展而来,保留了其分布式并行计算的优点并改进了其明显的缺陷。让中间数据存储在内存中提 高了运行速度、并提供丰富的操作数据的API提高了开发速度。Spark可以计算:结构化、半结构化、非结构化等各种类型的数据结构,同时也支持使用Python、Java、Scala、R以及SQL语言去开发应用 程序计算数据。Spark 集成了多种大数据工具和框架,如 Kafka、Cassandra、HBase、HDFS 等,形成了一个强大的大数据处理生态系统。Spark 的统一编程模型和强大的性能使其成为大数据分析、实时流处理和机器学习等领域的重要工具。
slug
bigdata-spark-base
date
Jul 28, 2024
tags
大数据
Spark
Spark部署
category
大数据
password
icon
URL
Property
Jul 31, 2024 01:36 AM

概述

Apache Spark 是由加州大学伯克利分校 AMP 实验室于 2009 年发起的开源大数据处理引擎,旨在提高大数据处理的速度和效率。Spark 的设计理念是通过在内存中进行数据处理来提升性能,并提供统一的编程模型来简化大数据处理任务。
在 Spark 之前,大数据处理主要依赖于 Hadoop MapReduce。虽然 Hadoop MapReduce 为分布式计算提供了良好的框架,但其性能和编程模型存在一些局限性:
  1. 磁盘 I/O:MapReduce 在每个阶段之间需要将中间结果写入磁盘,这导致大量的磁盘 I/O 操作,影响了性能。
  1. 编程复杂性:MapReduce 的编程模型比较低级,对于复杂的数据处理任务(如迭代算法、交互式查询)来说,编写和维护代码变得困难。
为了克服 Hadoop MapReduce 的这些局限性,AMP 实验室的研究人员开发了 Spark。Spark 的主要目标是提供更快的计算速度和更简单的编程模型。其核心思想包括:
  1. 内存计算:通过在内存中进行数据处理,减少磁盘 I/O 操作,从而显著提高计算速度。
  1. 弹性分布式数据集(RDD):引入了 RDD 概念,提供了一种容错的、分布式的内存抽象,简化了分布式数据处理的编程模型。
自 2013 年 Spark 作为 Apache 项目孵化以来,它得到了迅速的发展和广泛的应用。2014 年,Spark 成为 Apache 顶级项目。随着版本的迭代,Spark 的功能和性能不断提升,逐渐成为大数据处理领域的重要工具。
Spark 借鉴了 MapReduce 思想发展而来,保留了其分布式并行计算的优点并改进了其明显的缺陷。让中间数据存储在内存中提 高了运行速度、并提供丰富的操作数据的API提高了开发速度。Spark可以计算:结构化、半结构化、非结构化等各种类型的数据结构,同时也支持使用Python、Java、Scala、R以及SQL语言去开发应用 程序计算数据。Spark 集成了多种大数据工具和框架,如 Kafka、Cassandra、HBase、HDFS 等,形成了一个强大的大数据处理生态系统。Spark 的统一编程模型和强大的性能使其成为大数据分析、实时流处理和机器学习等领域的重要工具。
Spark 和 Hadoop 的对比
Hadoop
Spark
类型
基础平台, 包含计算, 存储, 调度
纯计算工具(分布式)
场景
海量数据批处理(磁盘迭代计算)
海量数据的批处理(内存迭代计算、交互式计算)、海量数 据流计算
价格
对机器要求低, 便宜
对内存有要求, 相对较贵
编程范式
Map+Reduce, API 较为底层, 算法适应性差
RDD组成DAG有向无环图, API 较为顶层, 方便使用
数据存储结构
MapReduce中间计算结果在HDFS磁盘上, 延迟大
RDD中间运算结果在内存中 , 延迟小
运行方式
Task以进程方式维护, 任务启动慢
Task以线程方式维护, 任务启动快,可批量创建提高并行能 力
尽管Spark相对于Hadoop而言具有较大优势,但Spark并不能完全替代Hadoop  在计算层面,Spark相比较MR(MapReduce)有巨大的性能优势,但至今仍有许多计算工具基于MR构架,比如非常成熟的Hive  Spark仅做计算,而Hadoop生态圈不仅有计算(MR)也有存储(HDFS)和资源管理调度(YARN),HDFS和YARN仍是许多大数据 体系的核心架构。

工作原理

Spark架构

notion imagenotion image
 
Term(术语)
Meaning(含义)
Application
Spark 应用程序,由集群上的一个 Driver 节点和多个 Executor 节点组成。
Driver program
主运用程序,该进程运行应用的 main() 方法并且创建 SparkContext
Cluster manager
集群资源管理器(例如,Standlone Manager,Mesos,YARN)
Worker node
执行计算任务的工作节点
Executor
位于工作节点上的应用进程,负责执行计算任务并且将输出数据保存到内存或者磁盘中
Task
被发送到 Executor 中的工作单元
1. Driver(驱动程序):Driver 是运行 Spark 应用程序的主程序,负责整个应用程序的生命周期管理。
  • 创建 SparkContext:Driver 程序创建 SparkContext,作为与集群进行交互的主要接口。
  • 定义 RDD 转换和行动操作:Driver 程序定义 RDD 的转换(transformation)和行动(action)操作。
  • 任务调度:将作业分解为任务(tasks),并将这些任务分配给集群中的 Executors。
  • 收集结果:从 Executors 收集计算结果并进行汇总。
2. Executor(执行器):Executor 是运行在集群中的工作节点,负责执行任务并进行数据存储。
  • 执行任务:Executor 接收 Driver 分配的任务,并在其本地计算数据。
  • 数据缓存:根据需要将中间结果缓存到内存中,以加速后续的计算。
  • 任务结果反馈:将任务的执行结果返回给 Driver。
3. Cluster Manager(集群管理器):Cluster Manager 负责管理集群资源,调度和分配资源给 Spark 应用程序。常见的 Cluster Manager 包括 Spark 自带的 Standalone 模式、YARN 和 Mesos。
  • 资源分配:根据 Spark 应用程序的资源需求,分配计算资源(CPU、内存等)。
  • 启动 Executors:在合适的 Worker 节点上启动 Executors,并进行资源隔离和管理。
  • 监控资源使用:监控集群中各个节点的资源使用情况,并进行动态调整。
4. Worker(工作节点):Worker 是集群中的计算节点,负责运行 Executor 并执行实际的计算任务。
  • 运行 Executors:Worker 根据 Cluster Manager 的指令,启动和管理 Executors。
  • 资源管理:管理节点上的资源(如 CPU、内存),确保合理分配和使用。
  • 任务执行:执行分配到的任务,并将结果返回给 Driver。
角色之间的关系
  • Driver 与 Cluster Manager:Driver 向 Cluster Manager 请求资源,以便在集群中启动 Executors。
  • Cluster Manager 与 Worker:Cluster Manager 分配资源并在 Worker 节点上启动 Executors。
  • Driver 与 Executor:Driver 将任务分配给 Executors 并收集它们的执行结果。
  • Executor 与 Worker:Executor 运行在 Worker 节点上,执行实际的计算任务。
示例工作流程
  1. Driver 启动:Driver 程序启动,并创建 SparkContext。
  1. 请求资源:SparkContext 向 Cluster Manager 请求资源,以启动 Executors。
  1. 启动 Executors:Cluster Manager 在 Worker 节点上启动 Executors,并分配相应的资源。
  1. 任务调度:Driver 将任务分配给运行在 Worker 节点上的 Executors。
  1. 执行任务:Executors 执行任务并将结果返回给 Driver。
  1. 结果汇总:Driver 收集并处理任务结果,最终得到应用程序的输出。
 
Standalone 模式:Spark 启动后,服务器中启动的服务会有 Master、Worker、Historyserver;Master是在主节点上运行,Worker在工作节点上运行。当有任务请求时,任务会创建一个Driver 程序,Driver 程序向 Master 请求资源并启动执行Executors;当有第二个任务请求时,继续创建第二个Driver 程序,然后请求并执行第二批的Executors。以此类推
 

核心组件

整个Spark 框架模块包含:Spark Core、 Spark SQL、 Spark Streaming、 Spark GraphX、 Spark MLlib,而后四项的能力都是建立在核心引擎之上。
notion imagenotion image
  1. Spark Core:Spark的核心,Spark核心功能均由Spark Core模块提供,是Spark运行的基础。Spark Core提供了 RDD 的抽象和基本的任务调度、内存管理等功能,提供Python、Java、 Scala、R语言的API,可以编程进行海量离线数据批处理计算。
  1. Spark SQL:基于SparkCore之上,提供结构化数据的处理模块。SparkSQL支持以SQL语言对数据进行处理,SparkSQL本身针对离线计算场景。同 时基于SparkSQL,Spark提供了StructuredStreaming模块,可以以SparkSQL为基础,进行数据的流式计算。 能够将 SQL 查询与 Spark 程序无缝混合,允许您使用 SQL 或 DataFrame API 对结构化数据进行查询;支持多种数据源,包括 Hive,Avro,Parquet,ORC,JSON 和 JDBC;支持 HiveQL 语法以及用户自定义函数 (UDF),允许你访问现有的 Hive 仓库;支持标准的 JDBC 和 ODBC 连接;支持优化器,列式存储和代码生成等特性,以提高查询效率。
  1. Spark Streaming:Spark Streaming 以SparkCore为基础,主要用于快速构建可扩展,高吞吐量,高容错的流处理程序。支持从 HDFS,Flume,Kafka,Twitter 和 ZeroMQ 读取数据,并进行处理。Spark Streaming 的本质是微批处理,它将数据流进行极小粒度的拆分,拆分为多个批处理,从而达到接近于流处理的效果。
    1. notion imagenotion image
      notion imagenotion image
  1. MLlib:以SparkCore为基础,进行机器学习计算,内置了大量的机器学习库和API算法等。方便用户以分布式计算的模式进行机器学习计算。 常见的机器学习算法:如分类,回归,聚类和协同过滤;特征化:特征提取,转换,降维和选择;管道:用于构建,评估和调整 ML 管道的工具;持久性:保存和加载算法,模型,管道数据;实用工具:线性代数,统计,数据处理等。
  1. GraphX:以SparkCore为基础,进行图计算,提供了大量的图计算API,方便用于以分布式计算模式进行图计算。在高层次上,GraphX 通过引入一个新的图形抽象来扩展 RDD(一种具有附加到每个顶点和边缘的属性的定向多重图形)。为了支持图计算,GraphX 提供了一组基本运算符(如: subgraph,joinVertices 和 aggregateMessages)以及优化后的 Pregel API。此外,GraphX 还包括越来越多的图形算法和构建器,以简化图形分析任务。

运行模式

Apache Spark 提供了多种运行模式,以适应不同的使用场景和集群配置。主要的运行模式包括:
  1. Local 模式:在单台机器上运行 Spark,主要用于开发和测试。
      • 适合小规模数据集的快速开发和调试。
      • 方便在本地环境中快速进行开发,无需配置集群。
  1. Standalone 模式:Spark 自带的简单集群管理器,可以在单机上或多台机器上运行 Spark 集群。
      • 易于配置和使用,适合小规模集群和开发环境。
      • 不依赖于外部资源管理工具,简单直接。
  1. YARN 模式:Spark 可以作为 Hadoop YARN 的应用程序运行,利用 YARN 的资源管理能力。
      • 支持大规模集群,适合与 Hadoop 生态系统集成。
      • 利用 YARN 的资源调度功能,实现多种应用的共享和调度。
  1. Kubernetes 模式:Spark 可以在 Kubernetes 上运行,适合容器化环境。
      • 利用 Kubernetes 的编排和管理功能,简化集群管理。
      • 方便与其他容器化应用程序集成,适用于微服务架构。
  1. Mesos 模式:Apache Mesos 是一个通用的集群管理系统,Spark 可以在 Mesos 上运行。
      • 提供强大的资源管理和调度能力,支持多种框架共存。
      • 适用于需要动态资源分配的复杂场景。
各模式对比总结
模式
适用场景
特点
Local
开发和测试
便捷快速的本地开发
一个进程内多个线程来模拟Spark 集群
Standalone
小规模集群和开发环境
简单易用
一主多从,存在单点故障问题
YARN
大规模集群
与 Hadoop 生态系统集成
Kubernetes
容器化环境
适合微服务架构
Mesos
动态资源分配
支持多种框架共存

Spark特点

  • 使用先进的 DAG(有向无环图)调度程序,减少了任务之间的数据传输和等待时间,查询优化器和物理执行引擎,以实现性能上的保证;
  • 多语言支持,目前支持的有 Java,Scala,Python 和 R;
  • 提供了 80 多个高级 API,可以轻松地构建应用程序;
  • 支持批处理,流处理和复杂的业务分析;
  • 丰富的类库支持:包括 SQL,MLlib,GraphX 和 Spark Streaming 等库,并且可以将它们无缝地进行组合;
  • 丰富的部署模式:支持本地模式和自带的集群模式,也支持在 Hadoop,Mesos,Kubernetes 上运行;
  • 多数据源支持:支持访问 HDFS,Alluxio,Cassandra,HBase,Hive 以及数百个其他数据源中的数据。

Spark部署

conda下载安装

conda 二选一:(M芯片的 MacOS 上的 Ubuntu 虚拟机安装时选择 Linux-aarch64.sh 版本)
  • Anaconda:清华源 (原始包更多,体积更大)
安装 miniconda
  • 将下载好的 sh 文件上传到 export 文件夹,并运行 sh 文件,
  • 欢迎页面,按回车继续,显示很多协议项,协议显示完后会询问是否接收协议,输入 yes继续
  • 选择安装位置,[/root/miniconda3] >>> /export/server/miniconda3 将其安装在/export/server/miniconda3中,回车
  • 询问是否进行初始化:You can undo this by running conda init --reverse $SHELL? [yes|no] ,yes
  • 出现:Thank you for installing Miniconda3! 安装完成
  • control + d 或者 exit 退出,然后重新切换回 root 用户即可发现命令行开头有虚拟环境的标识出现了:(base) root@anjhon:~#
  • 修改三方库的镜像源:
    • 打开 .condarc 文件,如果不存在就创建:vim ~/.condarc
    • 添加镜像源:
      • # Conda 配置文件 # 默认的下载通道,优先级最高。设置为defaults将使用default_channels中定义的通道。 channels: - defaults # 安装时是否显示包的下载 URL show_channel_urls: true # 默认通道配置,将默认的下载通道指向清华大学的镜像源。 default_channels: - https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main # 主通道 - https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free # 开源通道 - https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/r # R 语言通道 - https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/msys2 # msys2 通道 # 自定义通道配置,将一些常用的社区通道指向清华大学的镜像源。 custom_channels: conda-forge: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/conda-forge # conda-forge 通道 msys2: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/msys2 # msys2 通道 bioconda: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/bioconda # bioconda 通道 menpo: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/menpo # menpo 通道 pytorch: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/pytorch # pytorch 通道 simpleitk: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud/simpleitk # simpleitk 通道
创建虚拟环境
  • 创建: conda create -n pyspark python=3.8 ,遇到问答输入 y
  • 切换:conda activate pyspark

Spark 下载安装

  • 下载上传解压到 /export/server/ 下
  • 创建软连接
配置环境变量
  • vim /etc/profile
    • export JAVA_HOME=/export/server/jdk export HADOOP_HOME=/export/server/hadoop export SPARK_HOME=/export/server/spark export PYSPARK_PYTHON=/export/server/miniconda3/envs/pyspark/bin/python3.8 export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop export PATH=$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH
    • 应用配置: source /etc/profile
  • vim ~/.bashrc
    • export PYSPARK_PYTHON=/export/server/miniconda3/envs/pyspark/bin/python3.8 export JAVA_HOME=/export/server/jdk

Local 模式

该模式只需要一台服务器,通过单进程内的多线程来模拟集群效果。如果时多任务就会开启多个进程。
本质:启动一个JVM Process进程(一个进程里面有多个线程),执行任务Task。
  • Local模式可以限制模拟Spark集群环境的线程数量, 即Local[N] 或 Local[*]
  • 其中N代表可以使用N个线程,每个线程拥有一个cpu core。如果不指定N, 则默认是1个线程(该线程有1个core)。 通常Cpu有几个Core,就指定几个 线程,最大化利用计算能力.
  • 如果是local[*],则代表 Run Spark locally with as many worker threads as logical cores on your machine.按照Cpu最多的Cores设置线程数
Local模式只能运行一个Spark程序, 如果执行多个Spark程序, 那就是由多个相互独立的Local进程在执行
Local模式就是以一个独立进程配合其内部线程来提供完成Spark运行时环境. Local模式可以通过spark-shell/pyspark/spark-submit等来开启
如果在 local 模式启动两个进程,那么两个进程会绑定到两个不同的端口以供 webUI 访问。

启动 Spark local python

使用默认进入Spark local:(pyspark) root@anjhon:/export/server/spark/bin# ./pyspark
指定进程参数进入 local:(pyspark) root@anjhon:/export/server/spark/bin# ./pyspark --master local[3]
运行测试:sc.parallelize([1,2,3,4]).map(lambda x: x*10).collect()
notion imagenotion image
Driver 的 web 监控页面:http://anjhon:4040
提交代码到Spark local运行:(pyspark) root@anjhon:/export/server/spark/bin# ./spark-submit --master local[*] /export/server/spark/examples/src/main/python/pi.py 10 计算 pi 的案例,迭代 10 次

启动 Spark local scala

(pyspark) root@anjhon:/export/server/spark/bin# ./spark-shell

Standalone 模式

主从模式

环境准备

  • 在集群的其他节点上也安装 conda 环境
    • 将 conda 的安装文件通过 scp 分发到其他节点,进行安装(不能直接复制安装好的文件夹)
    • scp Miniconda3-py38_23.9.0-0-Linux-aarch64.sh anjhon2:$PWD
    • 安装步骤详见上文
  • 配置所有集群节点上环境变量(详见上文参照主节点的配置进行)
  • 修改Spark 的配置文件
    • 将export/server/spark 的所有权修改为 Hadoop:(pyspark) root@anjhon:/export/server# chown -R hadoop:hadoop spark*
    • 切换到 Hadoop 用户:su - hadoop
    • 修改配置
      • workers:
        • hadoop@anjhon:/export/server/spark/conf$ mv workers.template workers
          hadoop@anjhon:/export/server/spark/conf$ vim workers
          anjhon anjhon2 anjhon3
      • spark-env.sh
        • hadoop@anjhon:/export/server/spark/conf$ vim spark-env.sh
          ## 设置JAVA安装目录 JAVA_HOME=/export/server/jdk ## HADOOP软件配置文件目录,读取HDFS上文件和运行YARN集群 HADOOP_CONF_DIR=/export/server/hadoop/etc/hadoop YARN_CONF_DIR=/export/server/hadoop/etc/hadoop ## 指定spark老大Master的IP和提交任务的通信端口 # 告知Spark的master运行在哪个机器上 export SPARK_MASTER_HOST=anjhon # 告知sparkmaster的通讯端口 export SPARK_MASTER_PORT=7077 # 告知spark master的 webui端口 SPARK_MASTER_WEBUI_PORT=8080 # worker cpu可用核数 SPARK_WORKER_CORES=1 # worker可用内存 SPARK_WORKER_MEMORY=1g # worker的工作通讯地址 SPARK_WORKER_PORT=7078 # worker的 webui地址 SPARK_WORKER_WEBUI_PORT=8081 ## 设置历史服务器 # 配置的意思是 将spark程序运行的历史日志 存到hdfs的/sparklog文件夹中 SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=hdfs://anjhon:8020/sparklog/ -Dspark.history.fs.cleaner.enabled=true"
          配置好后在 hdfs 中创建对应的文件夹:hdfs dfs -mkdir /sparklog ;修改权限:hdfs dfs -chmod 777 /sparklog
      • spark-defaults.conf
        • hadoop@anjhon:/export/server/spark/conf$ mv spark-defaults.conf.template spark-defaults.conf
          hadoop@anjhon:/export/server/spark/conf$ vim spark-defaults.conf
          # 开启spark的日期记录功能 spark.eventLog.enabled true # 设置spark日志记录的路径 spark.eventLog.dir hdfs://anjhon:8020/sparklog/ # 设置spark日志是否启动压缩 spark.eventLog.compress true
      • log4j2.properties
        • hadoop@anjhon:/export/server/spark/conf$ mv log4j2.properties.template log4j2.properties
          hadoop@anjhon:/export/server/spark/conf$ vim log4j2.properties
          rootLogger.level = info 改为 rootLogger.level = warn
  • 将 Spark 分发到其他集群节点
    • scp -r spark-3.4.3-bin-hadoop3 anjhon2:$PWD
      • 在 anjhon2 上切换到 Hadoop 用户,创建 Spark 的软连接
        • ln -s /export/server/spark-3.4.3-bin-hadoop3 /export/server/spark
    • scp -r spark-3.4.3-bin-hadoop3 anjhon3:$PWD
      • 在 anjhon3 上切换到 Hadoop 用户,创建 Spark 的软连接
        • ln -s /export/server/spark-3.4.3-bin-hadoop3 /export/server/spark

启动Spark集群

  • 启动历史服务器:hadoop@anjhon:/export/server/spark$ ./sbin/start-history-server.sh
  • 启动集群:hadoop@anjhon:/export/server/spark$ ./sbin/start-all.sh
    • hadoop@anjhon:/export/server/spark$ jps 42788 Master 42951 Worker 42344 HistoryServer

连接到集群工作

bin/pyspark
hadoop@anjhon:/export/server/spark$ bin/pyspark --master spark://anjhon:7077
sc.parallelize([1,2,3]).map(lambda x:x*10).collect()
notion imagenotion image
Driver 的 web 监控页面:http://anjhon:4040
bin/spark-submit
hadoop@anjhon:/export/server/spark$ bin/spark-submit --master spark://anjhon:7077 /export/server/spark/examples/src/main/python/pi.py 100
访问历史服务器的 webui 查看页面:http://anjhon:18080/ 查看程序运行细节
 
Spark程序运行层次结构
Spark 端口使用情况:
  • 4040: 是一个运行的Application在运行的过程中临时绑定的端口,用以查看当前任务的状态.4040被占用会顺延到4041.4042等;4040是一个临时端口,当前程序运行完成后, 4040就会被注销
  • 8080: 默认是StandAlone下, Master角色(进程)的WEB端口,用以查看当前Master(集群)的状态
  • 18080: 默认是历史服务器的端口, 由于每个程序运行完成后,4040端口就被注销了. 在以后想回看某个程序的运行状态就可以通过历史服务器查看,历史服务器长期稳定运行,可供随时查看被记录的程序的运行过程.
 
一个Spark程序会被分成多个子任务(Job)运行, 每一个Job会分成多个State(阶段)来 运行, 每一个State内会分出来多个Task(线程)来执行具体任务
Spark Application程序运行时三个核心概念:Job、Stage、 Task,说明如下:
  • Job:由多个 Task 的并行计算部分,一般 Spark 中的 action 操作(如 save、collect,后面进一步说明),会 生成一个 Job。
  • Stage:Job 的组成单位,一个 Job 会切分成多个 Stage ,Stage 彼此之间相互依赖顺序执行,而每个 Stage 是多 个 Task 的集合,类似 map 和 reduce stage。
  • Task:被分配到各个 Executor 的单位工作内容,它是 Spark 中的最小执行单位,一般来说有多少个 Paritition (物理层面的概念,即分支可以理解为将数据划分成不同 部分并行处理),就会有多少个 Task,每个 Task 只会处 理单一分支上的数据。

StandAlone HA 模式(最理想模式)

Spark Standalone集群是Master-Slaves架构的集群模式,和大部分的Master-Slaves结构集群一样,存在着Master 单点故障(SPOF)的问题(一个主节点多个子节点,如果主节点故障则集群瘫痪)。
Spark提供了两种方案:
1.基于文件系统的单点恢复(Single-Node Recovery with Local File System)--只能用于开发或测试环境。
2.基于zookeeper的Standby Masters(Standby Masters with ZooKeeper)--可以用于生产环境。
具体原理是:ZooKeeper提供了一个Leader Election机制,设置多个 Master,但是在服务启动后只会有一个 Master 能在 ZooKeeper 中注册成为领导主节点,其他 Master 为监听状态(随时等待上位),当Master 主节点注册完成后,Spark 的 worker 节点也会和 zookeeper 通信确认现任领导(活跃的 Master) ,然后和其建立联系组成集群工作。当活跃的 Master 出故障时,处于监听的Master 立马上位,在短时间内与 worker 节点取得联系并进入正常工作。由于集群的信息 ,包括Worker, Driver和Application的信息都已经持久化到文件系统,因此在切换的过程中只会影响新Job的提交,对 于正在进行的Job没有任何的影响。加入ZooKeeper的集群整体架构如下图所示。
notion imagenotion image

环境准备

  1. 启动 zookeeper 和 HDFS
      • 启动 zookeeper:/export/server/zookeeper/bin/zkServer.sh start 在需要需要设置备份 Master 的节点上都执行该启动程序,zookeeper 的集群节点需要手动启动,不会自动启动
      • 启动 HDFS:start-dfs.sh
  1. Spark 配置文件修改
      • 不指定 Master 运行的机器:先在spark-env.sh中,删除或注释: SPARK_MASTER_HOST=anjhon
      • spark-env.sh中新增配置项:
        • SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=anjhon:2181,anjhon2:2181,anjhon3:2181 -Dspark.deploy.zookeeper.dir=/spark-ha" # spark.deploy.recoveryMode 指定HA模式 基于Zookeeper实现 # 指定Zookeeper的连接地址 # 指定在Zookeeper中注册临时节点的路径
      • 分发文件到其他集群节点
        • hadoop@anjhon:/export/server/spark/conf$ scp spark-env.sh anjhon2:$PWD
          hadoop@anjhon:/export/server/spark/conf$ scp spark-env.sh anjhon3:$PWD
      • (如果 standalone 集群已启动则)停止集群:hadoop@anjhon:/export/server/spark$ sbin/stop-all.sh

启动 standalone HA

  1. 在第一个节点上启动集群:hadoop@anjhon:/export/server/spark$ sbin/start-all.sh
  1. 在其他节点上启动 Master:hadoop@anjhon2:/export/server/spark$ sbin/start-master.sh
  1. 如果想设置更多的备用 Master,在其他节点依次开启即可。开启之前注意 zookeeper 的开启
  1. 在浏览器监看 Master 节点的 8080 端口(端口可能顺延)可以查看 Master 运行状态。如果正常启动,则必定有一个 Master 处于活跃状态(ALIVE),其他的都是待命状态(STANDBY),如果是主备切换过程中则显示恢复中(RECOVER)。
    1. notion imagenotion image
遇到问题
  • 问题一:启动之后所有的 Master 都处于 STANDBY 待命状态,没有活跃的 Master
    • 怀疑时环境的问题,重启 zookeeper、hdfs 等服务,未解决
    • 活跃的 Master 由 zookeeper 注册决定,没有活跃的 Master 说明 大概率是 zookeeper 没在干活。
    • 检查 zookeeper 状态,发现提示如下:Error contacting service. It is probably not running. 确定时 zookeeper 的问题
      • 检查所有节点防火墙对 zookeeper 的端口 2181、2888、3888 的放行状态
      • 检查所有节点的 zookeeper 配置文件
    • 最后发现是其他节点的 zookeeper 服务没有启动(以为只需要启动一个,其他的就会跟着启动了;但是 zookeeper 中所有节点都需要手动启动)最终解决问题。

主备切换测试

  • 在 anjhon2 上任务,指定 Master 为 anjhon 上的:hadoop@anjhon2:/export/server/spark$ ./bin/spark-submit --master spark://anjhon:7077 /export/server/spark/examples/src/main/python/pi.py 1000
  • 在 anjhon 上 kill 掉 Master 进程,相当于任务在运行的过程中,Master 故障了,正常情况下备用的 Master 应该会马上接管集群并继续执行任务,切换过程约 30 秒左右
    • 24/07/18 19:59:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 24/07/18 20:00:20 WARN StandaloneAppClient$ClientEndpoint: Connection to anjhon:7077 failed; waiting for master to reconnect... 24/07/18 20:00:20 WARN StandaloneSchedulerBackend: Disconnected from Spark cluster! Waiting for reconnection... 24/07/18 20:00:20 WARN StandaloneAppClient$ClientEndpoint: Connection to anjhon:7077 failed; waiting for master to reconnect... Pi is roughly 3.138240

Spark On Yarn(最常用模式)

按照前面环境部署中所学习的, 如果我们想要一个稳定的生产Spark环境, 那么最优的选择就是构建:HA StandAlone集 群。不过在企业中, 服务器的资源总是紧张的, 许多企业不管做什么业务,都基本上会有Hadoop集群. 也就是会有YARN集群。对于企业来说,在已有YARN集群的前提下在单独准备Spark StandAlone集群,对资源的利用就不高. 所以, 在企业中,多 数场景下,会将Spark运行到YARN集群中.
YARN本身是一个资源调度框架, 负责对运行在内部的计算框架进行资源调度管理. 作为典型的计算框架, Spark本身也是直接运行在YARN中, 并接受YARN的调度的。所以, 对于Spark On YARN, 无需部署Spark集群, 只要找一台服务器, 充当Spark的客户端, 即可提交任务到YARN集群 中运行.
Spark On Yarn的本质:
将资源管理的活儿(Master、Worker)交个 Yarn,Spark只关注程序运行层面(Driver、Executor)
  • Master角色由YARN的ResourceManager担任.
  • Worker角色由YARN的NodeManager担任.
  • Driver角色运行在YARN容器内 或 提交任务的客户端进程中
  • 真正干活的Executor运行在YARN提供的容器内
notion imagenotion image

环境配置

只需要确保 Spark 的配置文件中有 HADOOP_CONF_DIRYARN_CONF_DIR 的环境变量即可。
  • hadoop@anjhon:/export/server/spark/conf$ vim spark-env.sh
    • ## HADOOP软件配置文件目录,读取HDFS上文件和运行YARN集群 HADOOP_CONF_DIR=/export/server/hadoop/etc/hadoop YARN_CONF_DIR=/export/server/hadoop/etc/hadoop

启动 Yarn 模式

  1. bin/pyspark
      • 启动 Yarn: hadoop@anjhon:/export/server/spark$ start-yarn.sh
      • 启动 Spark 的历史服务器:hadoop@anjhon:/export/server/spark$ sbin/start-history-server.sh
      • 启动 Spark: hadoop@anjhon:/export/server/spark$ bin/pyspark --master yarn
        • notion imagenotion image
      • 访问 anjhon 的 8088 端口,在 Yarn 中查看 Spark 任务管理
      • 运行程序:sc.parallelize([1,2,3]).map(lambda x:x*10).collect() 在 anjhon 的 4040 端口查看任务执行
  1. bin/spark-submit (PI)
      • hadoop@anjhon:/export/server/spark$ bin/spark-submit --master yarn /export/server/spark/examples/src/main/python/pi.py 100

部署模式

sSpark On YARN是有两种运行模式的,一种是Cluster模式一种是Client模式。这两种模式的区别就是Driver运行的位置.
  1. Cluster模式
Driver运行在YARN容器内部, 和ApplicationMaster在同一个容器内
  • Cluster模式执行流程
    • notion imagenotion image
      流程步骤如下:
      1)、Driver在任务提交的本地机器上运行,Driver启动后会和ResourceManager通讯申请启动ApplicationMaster ;
      2)、随后ResourceManager分配Container,在合适的NodeManager上启动ApplicationMaster,此时的 ApplicationMaster的功能相当于一个ExecutorLaucher,只负责向ResourceManager申请Executor内存;
      3)、ResourceManager接到ApplicationMaster的资源申请后会分配Container,然后ApplicationMaster在资源分 配指定的NodeManager上启动Executor进程;
      4)、Executor进程启动后会向Driver反向注册,Executor全部注册完成后Driver开始执行main函数;
      5)、之后执行到Action算子时,触发一个Job,并根据宽依赖开始划分Stage,每个Stage生成对应的TaskSet,之后 将Task分发到各个Executor上执行。
  • Cluster模式启动
    • hadoop@anjhon:/export/server/spark$ bin/spark-submit --master yarn --deploy-mode cluster --driver-memory 512m --executor-memory 512m --num-executors 3 --total-executor-cores 3 /export/server/spark/examples/src/main/python/pi.py 100
      • --deploy-mode client :部署模式client(默认)
      • --driver-memory 512m :driver 内存
      • --executor-memory 512m :executor 内存
      • --num-executors 3 :executor 个数
      • --total-executor-cores 3
    • 在 anjhon 的 18080 端口查看历史执行任务,在 8080 端口查看进程及日志
  1. Client模式
Driver运行在客户端进程中, 比如Driver运行在spark-submit程序的进程中
  • Client模式执行流程
    • notion imagenotion image
      具体流程步骤如下:
      1)、任务提交后会和ResourceManager通讯申请启动ApplicationMaster;
      2)、随后ResourceManager分配Container,在合适的NodeManager上启动ApplicationMaster,此时的 ApplicationMaster就是Driver;
      3)、Driver启动后向ResourceManager申请Executor内存,ResourceManager接到ApplicationMaster的资源申请 后会分配Container,然后在合适的NodeManager上启动Executor进程;
      4)、Executor进程启动后会向Driver反向注册;
      5)、Executor全部注册完成后Driver开始执行main函数,之后执行到Action算子时,触发一个job,并根据宽依赖开 始划分stage,每个stage生成对应的taskSet,之后将task分发到各个Executor上执行;
  • Client模式启动
    • hadoop@anjhon:/export/server/spark$ bin/spark-submit --master yarn --deploy-mode client --driver-memory 512m --executor-memory 512m --num-executors 3 --total-executor-cores 3 /export/server/spark/examples/src/main/python/pi.py 100
Cluster模式
Client模式
Driver运行位置
YARN容器内
客户端进程内
通讯效率
低于Cluster模式
日志查看
日志输出在容器内, 查看不方便
日志输出在客户端 的标准输出流中,方 便查看
生产可用
推荐
不推荐
稳定性
稳定
基于客户端进程,受 到客户端进程影响

Spark On Hive

对于Hive来说,就2东西:
  1. SQL优化翻译器(执行引擎),翻译SQL到MapReduce并提交到YARN执行
  1. MetaStore 元数据管理中心
对于Spark来说,自身是一个 执行引擎,但是 Spark自己没有元数据管理功能,所以Spark On Hive就是 Spark 借用 Hive 的元数据管理服务,然后使用 Spark 自身的执行引擎。
根据原理,就是Spark能够连接上Hive的MetaStore就可以了。所以部署原理是:
  1. MetaStore需要存在并开机
  1. Spark知道MetaStore在哪里(IP 端口号)

环境配置

  1. 在 spark 的 conf 目录中,创建 hive-site.xml 文件
    1. <configuration> <property> <name>hive.metastore.warehouse.dir</name> <value>/user/hive/warehouse</value> </property> <property> <name>hive.metastore.uris</name> <value>thrift://anjhon:9083</value> </property> </configuration>
  1. 确保 Hive 的hive-site.xml 文件中有元数据服务的配置
    1. <property> <name>hive.metastore.uris</name> <value>thrift://anjhon:9083</value> </property>

在 Spark 中创建表,在 Hive 中查看

  • 确保 Hive 的元数据服务已经启动
  • 在终端建表检测
notion imagenotion image
notion imagenotion image
notion imagenotion image
  • 在代码中连接 Hive 元数据服务,并直接查表
    • from pyspark.sql import SparkSession if __name__ == '__main__': # 创建 SparkSession 并配置 JAR 文件路径 spark = SparkSession.builder \ .appName('spark on hive') \ .master('local[3]') \ .config('spark.sql.shuffle.partitions', 3) \ .config('spark.sql.warehouse.dir', 'hdfs://anjhon:8020/user/hive/warehouse') \ .config('hive.metastore.uris', 'thrift://anjhon:9083')\ .enableHiveSupport() \ .getOrCreate() sc = spark.sparkContext try: spark.sql("select * from t1").show() finally: spark.stop()

Spark ThriftServer

Spark中有一个服务叫做:ThriftServer服务,可以启动并监听在 10000 端口;这个服务对外提供功能,我们可以用数据库工具或者代码连接上来 直接写SQL即可操作spark
启动服务:
  • 确保已经配置好了 Spark On Hive
  • 启动 spark thriftserver 服务
    • start-thriftserver.sh --hiveconf hive.server2.thrift.port=10000 --hiveconf hive.server2.thrift.bind.host=anjhon --master local[3]
  • 在 DBeaver 中连接至 Spark 数据库
  • 在 python 代码中链接至 Spark 数据库
    • pyhive 包
      • 为了安装pyhive包需要安装一堆linux软件,执行如下命令进行linux软件安装:yum install zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel readline-devel tk-devel libffi-devel gcc make gcc-c++ python-devel cyrus-sasl-devel cyrus-sasl-plain cyrus-sasl-gssapi -y
      • 安装pyhive 及相关包:/export/server/anaconda3/envs/pyspark/bin/python -m pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyhive pymysql sasl thrift thrift_sasl
      • from pyhive import hive if __name__ == '__main__': # 获取到Hive(Spark ThriftServer的链接) conn = hive.Connection(host="node1", port=10000, username="hadoop") # 获取一个游标对象 cursor = conn.cursor() # 执行SQL cursor.execute("SELECT * FROM student") # 通过fetchall API 获得返回值 result = cursor.fetchall() print(result)

PySpark

PySpark 是 Apache Spark 的 Python API,允许数据科学家和工程师利用 Spark 的强大功能进行大规模数据处理和分析。它结合了 Python 的易用性和 Spark 的分布式计算能力,广泛用于处理和分析大数据集。

本地环境准备

PySpark的开发模式一般是本地写代码,提交到 Spark 集群上去运行。所以需要配置本地的开发环境。我一直使用的是VScode,所以接下来都是基于 VScode 的介绍和教程。
  • 在 VScode 中创建一个 pyspark 的虚拟环境(详细教程见本站的《Python虚拟环境》)
  • 配置远程开发环境
    • 将本地主机与服务器的 ssh 免密连接配置好,教程详见本站的《Linux 基础》
    • 安装 VScode 插件 Remote Development
    • Remote Development 插件内连接服务器集群(按照下面三步执行完成后会要求选择 condif 路径,Linux 服务器的配置路径一般是第一个)
      • notion imagenotion image
    • Linux 中的 conda 是使用 root 账户安装的,所以只有用 root 用户登陆才能使用 pyspark 虚拟环境
    • 如果要删除主机,需要去.ssh/config文件中删除对应的信息(VSCode没有删除按钮)

运行环境测试

import os from pyspark import SparkConf, SparkContext if __name__ == '__main__': # 设置配置 conf = SparkConf().setMaster("local[*]").setAppName("WordCountHelloWorld") # 如果使用 submit 客户端来提交代码,则需要删除.setMaster("local[*]")设置s # 创建或获取现有的 SparkContext sc = SparkContext.getOrCreate(conf=conf) # 更改日志级别,选择适合你的级别 sc.setLogLevel("WARN") # 输出警告和错误 try: # file_rdd = sc.textFile("hdfs://anjhon:8020/input/words.txt") # hdfs 中的文件 file_rdd = sc.textFile("file:///root/words.txt") # Linux 中的文件(一般不建议使用,因为当前主机上有的文件在其他节点上不一定有相同的文件) words_rdd = file_rdd.flatMap(lambda line: line.split(" ")) print("Initial RDD: ", words_rdd.take(5)) # 检查初始 RDD 数据 words_with_one_rdd = words_rdd.map(lambda x: (x, 1)) print("Mapped RDD: ", words_with_one_rdd.take(5)) # 检查映射后的 RDD 数据 result_rdd = words_with_one_rdd.reduceByKey(lambda a, b: a + b) print("Reduced RDD: ", result_rdd.take(5)) # 检查 reduceByKey 后的 RDD 数据 # 打印结果 print(result_rdd.collect()) finally: # 停止 SparkContext 以释放资源 sc.stop()
注意点:
  • 如果使用 submit 客户端来提交代码,则需要删除.setMaster("local[*]")设置s
  • Linux 中的文件(一般不建议使用,因为当前主机上有的文件在其他节点上不一定有相同的文件)
遇到问题:
AttributeError: Can't get attribute 'PySparkRuntimeError' on <module 'pyspark.errors.exceptions.base' from '/export/server/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/base.py'>
这通常与 PySpark 的版本、Spark版本、cloudpickle版本的不匹配或环境设置问题有关。
我的是PySpark 的版本与Spark版本不一致导致的,spark version 3.4.3,pyspark Version: 3.5.1。将pyspark的版本也安装成 3.4.3 之后问题解决。

Python On Spark 执行原理

PySpark宗旨是在不破坏Spark已有的运行时架构,在Spark架构外层包装一层Python API,借助Py4j实现Python和 Java的交互,进而实现通过Python编写Spark应用程序,其运行时架构如下图所示。
notion imagenotion image
  • Python Driver:用户编写的 Python 程序(Driver)会在本地机器上运行。
  • Py4J Gateway:PySpark 使用 Py4J(一个使 Python 可以调用 Java 代码的库)将 Python 代码转换成 JVM(Java Virtual Machine)可以执行的命令。Py4J Gateway 在 Python 端和 JVM 端之间建立了一个通信桥梁。
  • SparkContext:在 Driver 中创建的 SparkContext 对象是与 Spark 集群交互的主要接口。SparkContext 初始化后,会通过 Py4J 向 JVM 发送命令来初始化 Spark 的核心组件。
  • 任务调度:Driver 程序将任务(Task)分发到集群中的各个 Executor 上运行。任务被分配到 Executor 后,会被序列化并通过网络传输到 Executor 端。
  • 任务执行:Executor 端接收到任务后,使用 JVM 执行任务。Python 代码会通过 Py4J 被传递到 JVM 中执行,并与 JVM 端的 Spark API 交互。
  • 结果返回:任务执行完成后,Executor 会将结果返回给 Driver 程序。
If you have any questions, please contact me.