2. 部署安装Spark集群

部署安装Spark集群

PySpark环境安装

conda create -n spark24 python=3.7
conda init powershell
conda activate spark24
conda deactivate

# 使用PyPI安装PySpark如下:也可以指定版本安装
conda install pyspark=3.3
conda install pyspark=2.4.0      # 指定版本安装
# 或者指定清华镜像(对于网络较差的情况):
conda install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark  # 指定清华镜像源
# 如果要为特定组件安装额外的依赖项,可以按如下方式安装(此步骤暂不执行,后面Sparksql部分会执行):
conda install pyspark[sql]

SPARK_HOME=D:\miniforge3\envs\spark24\Lib\site-packages\pyspark
PYSPARK_PYTHON=D:\miniforge3\envs\spark24\python.exe
PATH增加: %SPARK_HOME%\bin

# 测试-基于bin/pyspark
pyspark --master local[*]  # --master local[*]表示使用本地模式,*表示使用所有可用的CPU核心
pyspark --master yarn --deploy-mode cluster  # --master yarn表示使用YARN模式
# 参数解释:
--master yarn:指定YARN作为集群管理器。
--deploy-mode cluster:集群模式,Driver运行在YARN上。
--driver-memory:Driver分配的内存。
--executor-memory:每个Executor分配的内存。
--executor-cores:每个Executor分配的CPU核心数。

>>> sc.parallelize([1,2,3,4,5]).map(lambda x: x + 1).collect()

# 基于bin/spark-shell测试
spark-shell
scala> sc.parallelize(Array(1,2,3,4,5)).map(x=>x+ 1).collect()
res0: Array[Int] = Array(2, 3, 4, 5, 6)

字符集问题

修改spark字符集
CM界面spark配置搜索spark-default.conf,添加:
spark.executor.extraJavaOptions=-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8
spark.driver.extraJavaOptions=-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8

Spark Local

本质:启动一个JVM Process进程(一个进程里面有多个线程),执行任务Task
- Local模式可以限制模拟Spark集群环境的线程数量, 即Local[N] 或Local[*]
- 其中N代表可以使用N个线程,每个线程拥有一个cpu core。如果不指定N,则默认是1个线程(该线程有1个core)。通常Cpu有几个Core,就指定几个线程,最大化利用计算能力.
- 如果是local[*],则代表Run Spark locally with as many worker threads as logical cores on your machine.按照Cpu最多的Cores设置线程数

Local 下的角色分布:
- 资源管理:
  - Master:Local进程本身
  - Worker:Local进程本身
- 任务执行:
  - Driver:Local进程本身
  - Executor:不存在,没有独立的Executor角色, 由Local进程(也就是Driver)内的线程提供计算能力

PS: Driver也算一种特殊的Executor, 只不过多数时候, 我们将Executor当做纯Worker对待, 这样和Driver好区分(一类是管理 一类是工人)

注意: Local模式只能运行一个Spark程序, 如果执行多个Spark程序, 那就是由多个相互独立的Local进程在执行
yum install python36
cd /usr/local/src; wget https://dlcdn.apache.org/spark/spark-3.5.3/spark-3.5.3-bin-hadoop3.tgz
tar  xf spark-3.5.3-bin-hadoop3.tgz
ln -s /usr/local/src/spark-3.5.3-bin-hadoop3 /usr/local/spark
cd /usr/local/spark/bin
./spark-shell
scala> sc.parallelize(Array(1,2,3,4,5)).map(x=>x+ 1).collect()
res0: Array[Int] = Array(2, 3, 4, 5, 6)

./pyspark --master local[*]
>>> sc.parallelize([1,2,3,4,5]).map(lambda x: x + 1).collect()
[2, 3, 4, 5, 6]

# 访问Spark Web UI
http://kube-42:4040

# Spark-shell说明:
# 1.直接使用./spark-shell
# 表示使用local模式启动,在本机启动一个SparkSubmit进程
# 2.还可指定参数 --master,如:
spark-shell --master local[N] # 表示在本地模拟N个线程来运行当前任务
spark-shell --master local[*] # 表示使用当前机器上所有可用的资源
# 3.不携带参数默认就是
spark-shell --master local[*]
# 4.后续还可以使用--master指定集群地址,表示把任务提交到集群上运行,如
./spark-shell --master spark://kube-41:7077,kube-42:7077
# 5.退出spark-shell
# 使用 :quit

# 基于bin/spark-submit测试
spark-submit.cmd --master local[*]  py/test_spark_yarn.py
# spark-submit程序, 作用: 提交指定的Spark代码到Spark环境中运行
# 计算pi圆周率值,后面的100表示计算的精度
./spark-submit /usr/local/spark/examples/src/main/python/pi.py 100
Pi is roughly 3.142160

Spark Standalone集群环境

Standalone模式是Spark自带的一种集群模式,不同于前面本地模式启动多个进程来模拟集群的环境,Standalone模
式是真实地在多个机器之间搭建Spark集群的环境,完全可以利用该模式搭建多机器集群,用于实际的大数据处理。

StandAlone 是完整的Spark运行环境,其中:

  • Master角色以Master进程存在, Worker角色以Worker进程存在
  • Driver和Executor运行于Worker进程内, 由Worker提供资源供给它们运行

Standalone 架构

StandAlone集群在进程上主要有3类进程:

  • 主节点Master进程:
    • Master角色, 管理整个集群资源,并托管运行各个任务的Driver
  • 从节点Workers:
    • Worker角色, 管理每个机器的资源,分配对应的资源来运行Executor(Task);
    • 每个从节点分配资源信息给Worker管理,资源信息包含内存Memory和CPU Cores核数
  • 历史服务器HistoryServer(可选):
    • Spark Application运行完成以后,保存事件日志数据至HDFS,启动HistoryServer可以查看应用运行相关信息。
# 所有节点安装python3.6
yum install python36
# 所有节点安装spark
cd /usr/local/src; wget https://dlcdn.apache.org/spark/spark-3.5.3/spark-3.5.3-bin-hadoop3.tgz
tar  xf spark-3.5.3-bin-hadoop3.tgz
ln -s /usr/local/src/spark-3.5.3-bin-hadoop3 /usr/local/spark
# 所有节点配置环境变量
cd /usr/local/spark/conf/
cp workers.template  workers
vim workers
# 添加以下内容:
kube-41
kube-42
kube-43

cp spark-env.sh.template spark-env.sh
vim spark-env.sh
cd /usr/local/spark/conf
cp spark-env.sh.template spark-env.sh
vim spark-env.sh
# 增加如下内容:
export JAVA_HOME=/usr/java/jdk1.8.0_361-amd64
export HADOOP_HOME=/usr/local/hadoop
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export YARN_CONF_DIR=$HADOOP_HOME/etc/hadoop

export SPARK_MASTER_HOST=kube-41
export SPARK_MASTER_PORT=7077

SPARK_MASTER_WEBUI_PORT=8080
SPARK_WORKER_CORES=2
SPARK_WORKER_MEMORY=2g
SPARK_WORKER_PORT=7078
SPARK_WORKER_WEBUI_PORT=8081

SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=hdfs://kube-41:8020/user/spark/sparklog/ -Dspark.history.fs.cleaner.enabled=true"

# 注意:
#   Jdk,hadoop, yarn的路径, 需要配置为自己的路径(可能与此位置不一致)
#   History配置中, 需要指定hdfs的地址, 其中端口号为8020或者9820, 大家需要参考hdfs配置的namenode通信端口号

# 配置spark应用日志
# 第一步: 在HDFS上创建应用运行事件日志目录:
hdfs dfs -mkdir -p /user/spark/sparklog/
# 第二步: 配置spark-defaults.conf
cd /usr/local/spark/conf
cp spark-defaults.conf.template spark-defaults.conf
vim spark-defaults.conf
# 添加以下内容:
spark.eventLog.enabled  true
spark.eventLog.dir      hdfs://kube-41:8020/user/spark/sparklog/
spark.eventLog.compress true
# 配置【log4j.properties】
cp log4j2.properties.template log4j2.properties
vim log4j2.properties
# 可以更改日志级别 warn info  debug

# 配置分发至其他节点
scp -r /usr/local/spark root@kube-42:/usr/local/
scp -r /usr/local/spark root@kube-43:/usr/local/

# 在主节点上启动spark集群
/usr/local/spark/sbin/start-all.sh
# 在主节点上停止spark集群
/usr/local/spark/sbin/stop-all.sh
# 在主节点上启动历史服务器
/usr/local/spark/sbin/start-history-server.sh
http://kube-41:18080

# 单独启动和停止
# 在 master 安装节点上启动和停止 master:
start-master.sh
stop-master.sh
# 在 Master 所在节点上启动和停止worker(work指的是slaves 配置文件中的主机名)
start-slaves.sh
stop-slaves.sh

# WEB UI页面
http://kube-41:8080

# 连接集群
cd /usr/local/spark/bin
spark-shell --master  spark://kube-41:7077
# pyspark 连接
pyspark --master spark://kube-41:7077

Spark Standalone HA 模式安装

# 官方文档
https://spark.apache.org/docs/3.5.3/spark-standalone.html#standby-masters-with-zookeeper

cd /usr/local/spark/conf
vim spark-env.sh
# 注释或删除MASTER_HOST内容:
# SPARK_MASTER_HOST=kube-41
# 增加以下配置:
SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url= kube-41:2181,kube-42:2181,kube-43:2181 -Dspark.deploy.zookeeper.dir=/spark-ha"

# 说明函数说明:
spark.deploy.recoveryMode:恢复模式
spark.deploy.zookeeper.url:ZooKeeper的Server地址
spark.deploy.zookeeper.dir:保存集群元数据信息的文件、目录。包括Worker、Driver、Application信息。

# 配置分发至其他节点
scp -r /usr/local/spark root@kube-42:/usr/local/
scp -r /usr/local/spark root@kube-43:/usr/local/

# 首先启动zookeeper服务: 三个节点都需要启动
zkServer.sh status  --查看状态
zkServer.sh stop   --停止命令
zkServer.sh start   --启动命令

# 接着在kube-41启动spark集群
cd /usr/local/spark
./sbin/start-all.sh

# 最后在kube-42上单独启动一个master
cd /usr/local/spark
./sbin/start-master.sh
# 查看WebUI 一主一从
http://kube-41:8080/
http://kube-42:8080/

Spark on YARN 环境搭建

# 修改spark-env.sh
cd /usr/local/spark/conf
vim /usr/local/spark/conf/spark-env.sh

# 添加以下内容:
HADOOP_CONF_DIR=/usr/local/hadoop/etc/hadoop
YARN_CONF_DIR=/usr/local/hadoop/etc/hadoop

# 同步到其他两台
cd /usr/local/spark/conf
scp -r spark-env.sh kube-42:$PWD
scp -r spark-env.sh kube-43:$PWD

# 修改hadoop的yarn-site.xml
cd /usr/local/hadoop/etc/hadoop/
vim /usr/local/hadoop/etc/hadoop/yarn-site.xml

# 添加以下内容:
<configuration>
    <!-- 配置yarn主节点的位置 -->
    <property>
        <name>yarn.resourcemanager.hostname</name>
        <value>kube-41</value>
    </property>
    <property>
        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle</value>
    </property>
    <!-- 设置yarn集群的内存分配方案 -->
    <property>
        <name>yarn.nodemanager.resource.memory-mb</name>
        <value>20480</value>
    </property>
    <property>
        <name>yarn.scheduler.minimum-allocation-mb</name>
        <value>2048</value>
    </property>
    <property>
        <name>yarn.nodemanager.vmem-pmem-ratio</name>
        <value>2.1</value>
    </property>
    <!-- 开启日志聚合功能 -->
    <property>
        <name>yarn.log-aggregation-enable</name>
        <value>true</value>
    </property>
    <!-- 设置聚合日志在hdfs上的保存时间 -->
    <property>
        <name>yarn.log-aggregation.retain-seconds</name>
        <value>604800</value>
    </property>
    <!-- 设置yarn历史服务器地址 -->
    <property>
        <name>yarn.log.server.url</name>
        <value>http://kube-41:19888/jobhistory/logs</value>
    </property>
    <!-- 关闭yarn内存检查 -->
    <property>
        <name>yarn.nodemanager.pmem-check-enabled</name>
        <value>false</value>
    </property>
    <property>
        <name>yarn.nodemanager.vmem-check-enabled</name>
        <value>false</value>
    </property>
</configuration>

# Spark设置历史服务地址
cd /usr/local/spark/conf
vim spark-defaults.conf

# 添加以下内容:
spark.eventLog.enabled                  true
spark.eventLog.dir                      hdfs://kube-41:8020/user/spark/sparklog/
spark.eventLog.compress                 true
spark.yarn.historyServer.address        kube-41:18080

# 配置依赖spark jar包
# 当Spark Application应用提交运行在YARN上时,默认情况下,每次提交应用都需要将依赖Spark相关jar包上传到YARN 集群中,为了节省提交时间和存储空间,将Spark相关jar包上传到HDFS目录中,设置属性告知Spark Application应用。
hadoop fs -mkdir -p /user/spark/jars/
hadoop fs -put /usr/local/spark/jars/* /user/spark/jars/
# 修改spark-defaults.conf
cd /usr/local/spark/conf
vim spark-defaults.conf
# 添加以下内容:
spark.yarn.jars  hdfs://kube-41:8020/spark/jars/*
# 同步到其他节点
scp spark-defaults.conf root@kube-42:/usr/local/spark/conf
scp spark-defaults.conf root@kube-43:/usr/local/spark/conf

# 启动服务
Spark Application运行在YARN上时,上述配置完成
启动服务:HDFS、YARN、MRHistoryServer和Spark HistoryServer,命令如下:
## 启动HDFS和YARN服务,在kube-41执行命令
start-dfs.sh
start-yarn.sh
或
start-all.sh
# 注意:在onyarn模式下不需要启动start-all.sh(jps查看一下看不到worker和master)
## 启动MRHistoryServer服务,在kube-41执行命令
mapred --daemon start 
## 启动Spark HistoryServer服务,在kube-41执行命令
/usr/local/spark/sbin/start-history-server.sh
# Spark HistoryServer服务WEB UI页面地址:
http://kube-41:18080/

## 提交测试
# 先将圆周率PI程序提交运行在YARN上,命令如下:
/usr/local/spark/bin/spark-submit \
--master yarn \
--conf "spark.pyspark.driver.python=/usr/bin/python3" \
--conf "spark.pyspark.python=/usr/bin/python3" \
/usr/local/spark/examples/src/main/python/pi.py \
10

# 设置资源信息,提交运行pi程序至YARN上,命令如下:
/usr/local/spark/bin/spark-submit \
--master yarn \
--driver-memory 512m \
--executor-memory 512m \
--executor-cores 1 \
--num-executors 2 \
--queue default \
--conf "spark.pyspark.driver.python=/usr/bin/python3" \
--conf "spark.pyspark.python=/usr/bin/python3" \
/usr/local/spark/examples/src/main/python/pi.py \
10
# 当pi应用运行YARN上完成以后,从8080 WEB 页面点击应用历史服务连接,查看应用运行状态信息。

# pyspark测试
pyspark --master yarn --conf "spark.pyspark.driver.python=/usr/bin/python3" --conf "spark.pyspark.python=/usr/bin/python3"

>>> resultRDD = sc.textFile("hdfs://ns1/user/admin/sh/test.sh") \
    .flatMap(lambda line: line.split(" ")) \
    .map(lambda x: (x, 1)) \
    .reduceByKey(lambda a, b: a + b)
>>> resultRDD.collect()

SparkSQL与Hive整合

# 将hive-site.xml拷贝到spark安装路径conf目录
cp /usr/local/hive/conf/hive-site.xml /usr/local/spark/conf/
# 同步到其他节点
scp hive-site.xml root@kube-42:/usr/local/spark/conf/
scp hive-site.xml root@kube-43:/usr/local/spark/conf/

# 将mysql的连接驱动包拷贝到spark的jars目录下
cd /usr/local/hive/lib
cp mysql-connector-java-5.1.32.jar  /usr/local/spark/jars/
# 同步到其他节点
scp mysql-connector-java-5.1.32.jar  root@kube-42:/usr/local/spark/jars/
scp mysql-connector-java-5.1.32.jar  root@kube-43:/usr/local/spark/jars/

# Hive开启MetaStore服务
# 修改 hive/conf/hive-site.xml新增如下配置
# 远程模式部署metastore 服务地址
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
    <property>
        <name>hive.metastore.uris</name>
        <value>thrift://kube-41:9083</value>
    </property>
</configuration>

# 启动 Hive MetaStore服务
# 前台启动:
bin/hive --service metastore 
# 后台启动:
nohup /usr/local/hive/bin/hive --service metastore 2>&1 >> /var/log.log &

# 完整的hive-site.xml文件
<configuration>
    <!-- 存储元数据mysql相关配置 -->
    <property>
        <name>javax.jdo.option.ConnectionURL</name>
        <value> jdbc:mysql://kube-41:3306/hive?createDatabaseIfNotExist=true&useSSL=false&useUnicode=true&characterEncoding=UTF-8</value>
    </property>
    <property>
        <name>javax.jdo.option.ConnectionDriverName</name>
        <value>com.mysql.jdbc.Driver</value>
    </property>
    <property>
        <name>javax.jdo.option.ConnectionUserName</name>
        <value>root</value>
    </property>
    <property>
        <name>javax.jdo.option.ConnectionPassword</name>
        <value>123456</value>
    </property>
 <!-- H2S运行绑定host -->
    <property>
        <name>hive.server2.thrift.bind.host</name>
        <value>kube-41</value>
    </property>
    <!-- 远程模式部署metastore 服务地址 -->
    <property>
        <name>hive.metastore.uris</name>
        <value>thrift://kube-41:9083</value>
    </property>
    <!-- 关闭元数据存储授权  -->
    <property>
        <name>hive.metastore.event.db.notification.api.auth</name>
        <value>false</value>
    </property>
    <!-- 关闭元数据存储版本的验证 -->
    <property>
        <name>hive.metastore.schema.verification</name>
        <value>false</value>
    </property>
</configuration>

## 测试Sparksql整合Hive是否成功
# Spark-sql方式测试
# 先启动hadoop集群,在启动spark集群,确保启动成功之后kube-41执行命令,指明master地址、每一个executor的内存大小、一共所需要的核数、mysql数据库连接驱动:
cd /usr/local/spark
bin/spark-sql --master local[2] --executor-memory 512m --total-executor-cores 1
# 或
bin/spark-sql --master spark://kube-41:7077 --executor-memory 512m --total-executor-cores 1
# 执行成功后的界面:进入到spark-sql 客户端命令行界面
# 查看当前有哪些数据库, 并创建数据库
show databases;
create database sparkhive;
# 看到数据的结果,说明sparksql整合hive成功!

# 注意:日志太多,我们可以修改spark的日志输出级别(conf/log4j.properties)
# 在spark2.0版本后由于出现了sparkSession,在初始化sqlContext的时候,会设置默认的spark.sql.warehouse.dir=spark-warehouse,
# 此时将hive与sparksql整合完成之后,在通过spark-sql脚本启动的时候,还是会在那里启动spark-sql脚本,就会在当前目录下创建一个spark.sql.warehouse.dir为spark-warehouse的目录,存放由spark-sql创建数据库和创建表的数据信息,与之前hive的数据息不是放在同一个路径下(可以互相访问)。但是此时spark-sql中表的数据在本地,不利于操作,也不安全。
# 所有在启动的时候需要加上这样一个参数:
# --conf  spark.sql.warehouse.dir=hdfs://kube-41:8020/user/hive/warehouse
# 保证spark-sql启动时不在产生新的存放数据的目录,sparksql与hive最终使用的是hive同一存放数据的目录。如果使用的是spark2.0之前的版本,由于没有sparkSession,不会出现spark.sql.warehouse.dir配置项,不会出现上述问题。
# Spark2之后最后的执行脚本,kube-41执行以下命令重新进去spark-sql
cd /usr/local/spark
bin/spark-sql \
 --master spark://kube-41:7077 \
 --executor-memory 512m --total-executor-cores 1 \
 --conf spark.sql.warehouse.dir=hdfs://kube-41:8020/user/hive/warehouse

# Spark-Shell方式启动:
bin/spark-shell --master local[3]
spark.sql("show databases").show()

# PySpark方式启动:
bin/pyspark --master local[2]
spark.sql("show databases").show()

PyCharm整合Hive

原理

Hive表的元数据库中,描述了有哪些database、table、以及表有多少列,每一列是什么类型,以及表的数据保存在hdfs的什么位置
执行HQL时,先到MySQL元数据库中查找描述信息,然后解析HQL并根据描述信息生成MR任务,简单来说Hive就是将SQL根据MySQL中元数据信息转成MapReduce执行,但是速度慢
使用SparkSQL整合Hive其实就是让SparkSQL去加载Hive 的元数据库,然后通过SparkSQL执行引擎去操作Hive表
所以首先需要开启Hive的元数据库服务,让SparkSQL能够加载元数据

API

在Spark2.0之后,SparkSession对HiveContext和SqlContext在进行了统一
可以通过操作SparkSession来操作HiveContext和SqlContext。

SparkSQL整合Hive MetaStore

默认Spark 有一个内置的 MateStore,使用 Derby 嵌入式数据库保存数据【上面案例】,但是这种方式不适合生产环境,因为这种模式同一时间只能有一个 SparkSession 使用,所以生产环境更推荐使用 Hive 的 MetaStore
SparkSQL 整合 Hive 的 MetaStore 主要思路就是要通过配置能够访问它,并且能够使用 HDFS保存WareHouse,所以可以直接拷贝Hadoop和Hive的配置文件到Spark的配置目录。

使用SparkSQL操作集群Hive表

# 在PyCharm中开发应用,集成Hive读取表的数据进行分析,构建SparkSession时需要设置HiveMetaStore服务器地址及集成Hive选项:
# 范例演示代码如下:

# -*- coding: utf-8 -*-
# Program function:
from pyspark.sql import SparkSession
import os

os.environ['SPARK_HOME'] = '/usr/local/spark'
PYSPARK_PYTHON = "/usr/bin/python3"
# 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
if __name__ == '__main__':
    # _SPARK_HOST = "spark://kube-41:7077"
    _SPARK_HOST = "local[3]"
    _APP_NAME = "test"
    spark = SparkSession.builder \
        .master(_SPARK_HOST) \
        .appName(_APP_NAME) \
        .enableHiveSupport() \
        .getOrCreate()
    spark.sparkContext.setLogLevel("WARN")
    #PROJECT_ROOT = os.path.dirname(os.path.realpath(__file__))  # 获取项目根目录
    # print(PROJECT_ROOT)#/export/pyfolder1/pyspark-chapter03_3.8/main
    #path = os.path.join(PROJECT_ROOT, "data\\edge\\0_fuse.txt")  # 文件路径
    # 查看有哪些表
    spark.sql("show databases").show()
    spark.sql("use sparkhive").show()
    spark.sql("show tables").show()
    # 创建表
    spark.sql(
        "create table if not exists person (id int, name string, age int) row format delimited fields terminated by ','")
    # 加载数据, 数据为当前目录下的person.txt(和src平级)
    spark.sql("LOAD DATA LOCAL INPATH '/tmp/student.csv' INTO TABLE person")
    # 查询数据
    spark.sql("select * from person ").show()
    print("===========================================================")
    import pyspark.sql.functions as fn
    spark.read \
        .table("person ") \
        .groupBy("name") \
        .agg(fn.round(fn.avg("age"), 2).alias("avg_age")) \
        .show(10, truncate=False)
    spark.stop()