部署安装Spark集群
PySpark环境安装
conda create -n spark24 python=3.7
conda init powershell
conda activate spark24
conda deactivate
# 使用PyPI安装PySpark如下:也可以指定版本安装
conda install pyspark=3.3
conda install pyspark=2.4.0 # 指定版本安装
# 或者指定清华镜像(对于网络较差的情况):
conda install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark # 指定清华镜像源
# 如果要为特定组件安装额外的依赖项,可以按如下方式安装(此步骤暂不执行,后面Sparksql部分会执行):
conda install pyspark[sql]
SPARK_HOME=D:\miniforge3\envs\spark24\Lib\site-packages\pyspark
PYSPARK_PYTHON=D:\miniforge3\envs\spark24\python.exe
PATH增加: %SPARK_HOME%\bin
# 测试-基于bin/pyspark
pyspark --master local[*] # --master local[*]表示使用本地模式,*表示使用所有可用的CPU核心
pyspark --master yarn --deploy-mode cluster # --master yarn表示使用YARN模式
# 参数解释:
--master yarn:指定YARN作为集群管理器。
--deploy-mode cluster:集群模式,Driver运行在YARN上。
--driver-memory:Driver分配的内存。
--executor-memory:每个Executor分配的内存。
--executor-cores:每个Executor分配的CPU核心数。
>>> sc.parallelize([1,2,3,4,5]).map(lambda x: x + 1).collect()
# 基于bin/spark-shell测试
spark-shell
scala> sc.parallelize(Array(1,2,3,4,5)).map(x=>x+ 1).collect()
res0: Array[Int] = Array(2, 3, 4, 5, 6)
字符集问题
修改spark字符集
CM界面spark配置搜索spark-default.conf,添加:
spark.executor.extraJavaOptions=-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8
spark.driver.extraJavaOptions=-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8
Spark Local
本质:启动一个JVM Process进程(一个进程里面有多个线程),执行任务Task
- Local模式可以限制模拟Spark集群环境的线程数量, 即Local[N] 或Local[*]
- 其中N代表可以使用N个线程,每个线程拥有一个cpu core。如果不指定N,则默认是1个线程(该线程有1个core)。通常Cpu有几个Core,就指定几个线程,最大化利用计算能力.
- 如果是local[*],则代表Run Spark locally with as many worker threads as logical cores on your machine.按照Cpu最多的Cores设置线程数
Local 下的角色分布:
- 资源管理:
- Master:Local进程本身
- Worker:Local进程本身
- 任务执行:
- Driver:Local进程本身
- Executor:不存在,没有独立的Executor角色, 由Local进程(也就是Driver)内的线程提供计算能力
PS: Driver也算一种特殊的Executor, 只不过多数时候, 我们将Executor当做纯Worker对待, 这样和Driver好区分(一类是管理 一类是工人)
注意: Local模式只能运行一个Spark程序, 如果执行多个Spark程序, 那就是由多个相互独立的Local进程在执行
yum install python36
cd /usr/local/src; wget https://dlcdn.apache.org/spark/spark-3.5.3/spark-3.5.3-bin-hadoop3.tgz
tar xf spark-3.5.3-bin-hadoop3.tgz
ln -s /usr/local/src/spark-3.5.3-bin-hadoop3 /usr/local/spark
cd /usr/local/spark/bin
./spark-shell
scala> sc.parallelize(Array(1,2,3,4,5)).map(x=>x+ 1).collect()
res0: Array[Int] = Array(2, 3, 4, 5, 6)
./pyspark --master local[*]
>>> sc.parallelize([1,2,3,4,5]).map(lambda x: x + 1).collect()
[2, 3, 4, 5, 6]
# 访问Spark Web UI
http://kube-42:4040
# Spark-shell说明:
# 1.直接使用./spark-shell
# 表示使用local模式启动,在本机启动一个SparkSubmit进程
# 2.还可指定参数 --master,如:
spark-shell --master local[N] # 表示在本地模拟N个线程来运行当前任务
spark-shell --master local[*] # 表示使用当前机器上所有可用的资源
# 3.不携带参数默认就是
spark-shell --master local[*]
# 4.后续还可以使用--master指定集群地址,表示把任务提交到集群上运行,如
./spark-shell --master spark://kube-41:7077,kube-42:7077
# 5.退出spark-shell
# 使用 :quit
# 基于bin/spark-submit测试
spark-submit.cmd --master local[*] py/test_spark_yarn.py
# spark-submit程序, 作用: 提交指定的Spark代码到Spark环境中运行
# 计算pi圆周率值,后面的100表示计算的精度
./spark-submit /usr/local/spark/examples/src/main/python/pi.py 100
Pi is roughly 3.142160
Spark Standalone集群环境
Standalone模式是Spark自带的一种集群模式,不同于前面本地模式启动多个进程来模拟集群的环境,Standalone模
式是真实地在多个机器之间搭建Spark集群的环境,完全可以利用该模式搭建多机器集群,用于实际的大数据处理。
StandAlone 是完整的Spark运行环境,其中:
- Master角色以Master进程存在, Worker角色以Worker进程存在
- Driver和Executor运行于Worker进程内, 由Worker提供资源供给它们运行
Standalone 架构
StandAlone集群在进程上主要有3类进程:
- 主节点Master进程:
- Master角色, 管理整个集群资源,并托管运行各个任务的Driver
- 从节点Workers:
- Worker角色, 管理每个机器的资源,分配对应的资源来运行Executor(Task);
- 每个从节点分配资源信息给Worker管理,资源信息包含内存Memory和CPU Cores核数
- 历史服务器HistoryServer(可选):
- Spark Application运行完成以后,保存事件日志数据至HDFS,启动HistoryServer可以查看应用运行相关信息。
# 所有节点安装python3.6
yum install python36
# 所有节点安装spark
cd /usr/local/src; wget https://dlcdn.apache.org/spark/spark-3.5.3/spark-3.5.3-bin-hadoop3.tgz
tar xf spark-3.5.3-bin-hadoop3.tgz
ln -s /usr/local/src/spark-3.5.3-bin-hadoop3 /usr/local/spark
# 所有节点配置环境变量
cd /usr/local/spark/conf/
cp workers.template workers
vim workers
# 添加以下内容:
kube-41
kube-42
kube-43
cp spark-env.sh.template spark-env.sh
vim spark-env.sh
cd /usr/local/spark/conf
cp spark-env.sh.template spark-env.sh
vim spark-env.sh
# 增加如下内容:
export JAVA_HOME=/usr/java/jdk1.8.0_361-amd64
export HADOOP_HOME=/usr/local/hadoop
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export YARN_CONF_DIR=$HADOOP_HOME/etc/hadoop
export SPARK_MASTER_HOST=kube-41
export SPARK_MASTER_PORT=7077
SPARK_MASTER_WEBUI_PORT=8080
SPARK_WORKER_CORES=2
SPARK_WORKER_MEMORY=2g
SPARK_WORKER_PORT=7078
SPARK_WORKER_WEBUI_PORT=8081
SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=hdfs://kube-41:8020/user/spark/sparklog/ -Dspark.history.fs.cleaner.enabled=true"
# 注意:
# Jdk,hadoop, yarn的路径, 需要配置为自己的路径(可能与此位置不一致)
# History配置中, 需要指定hdfs的地址, 其中端口号为8020或者9820, 大家需要参考hdfs配置的namenode通信端口号
# 配置spark应用日志
# 第一步: 在HDFS上创建应用运行事件日志目录:
hdfs dfs -mkdir -p /user/spark/sparklog/
# 第二步: 配置spark-defaults.conf
cd /usr/local/spark/conf
cp spark-defaults.conf.template spark-defaults.conf
vim spark-defaults.conf
# 添加以下内容:
spark.eventLog.enabled true
spark.eventLog.dir hdfs://kube-41:8020/user/spark/sparklog/
spark.eventLog.compress true
# 配置【log4j.properties】
cp log4j2.properties.template log4j2.properties
vim log4j2.properties
# 可以更改日志级别 warn info debug
# 配置分发至其他节点
scp -r /usr/local/spark root@kube-42:/usr/local/
scp -r /usr/local/spark root@kube-43:/usr/local/
# 在主节点上启动spark集群
/usr/local/spark/sbin/start-all.sh
# 在主节点上停止spark集群
/usr/local/spark/sbin/stop-all.sh
# 在主节点上启动历史服务器
/usr/local/spark/sbin/start-history-server.sh
http://kube-41:18080
# 单独启动和停止
# 在 master 安装节点上启动和停止 master:
start-master.sh
stop-master.sh
# 在 Master 所在节点上启动和停止worker(work指的是slaves 配置文件中的主机名)
start-slaves.sh
stop-slaves.sh
# WEB UI页面
http://kube-41:8080
# 连接集群
cd /usr/local/spark/bin
spark-shell --master spark://kube-41:7077
# pyspark 连接
pyspark --master spark://kube-41:7077
Spark Standalone HA 模式安装
# 官方文档
https://spark.apache.org/docs/3.5.3/spark-standalone.html#standby-masters-with-zookeeper
cd /usr/local/spark/conf
vim spark-env.sh
# 注释或删除MASTER_HOST内容:
# SPARK_MASTER_HOST=kube-41
# 增加以下配置:
SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url= kube-41:2181,kube-42:2181,kube-43:2181 -Dspark.deploy.zookeeper.dir=/spark-ha"
# 说明函数说明:
spark.deploy.recoveryMode:恢复模式
spark.deploy.zookeeper.url:ZooKeeper的Server地址
spark.deploy.zookeeper.dir:保存集群元数据信息的文件、目录。包括Worker、Driver、Application信息。
# 配置分发至其他节点
scp -r /usr/local/spark root@kube-42:/usr/local/
scp -r /usr/local/spark root@kube-43:/usr/local/
# 首先启动zookeeper服务: 三个节点都需要启动
zkServer.sh status --查看状态
zkServer.sh stop --停止命令
zkServer.sh start --启动命令
# 接着在kube-41启动spark集群
cd /usr/local/spark
./sbin/start-all.sh
# 最后在kube-42上单独启动一个master
cd /usr/local/spark
./sbin/start-master.sh
# 查看WebUI 一主一从
http://kube-41:8080/
http://kube-42:8080/
Spark on YARN 环境搭建
# 修改spark-env.sh
cd /usr/local/spark/conf
vim /usr/local/spark/conf/spark-env.sh
# 添加以下内容:
HADOOP_CONF_DIR=/usr/local/hadoop/etc/hadoop
YARN_CONF_DIR=/usr/local/hadoop/etc/hadoop
# 同步到其他两台
cd /usr/local/spark/conf
scp -r spark-env.sh kube-42:$PWD
scp -r spark-env.sh kube-43:$PWD
# 修改hadoop的yarn-site.xml
cd /usr/local/hadoop/etc/hadoop/
vim /usr/local/hadoop/etc/hadoop/yarn-site.xml
# 添加以下内容:
<configuration>
<!-- 配置yarn主节点的位置 -->
<property>
<name>yarn.resourcemanager.hostname</name>
<value>kube-41</value>
</property>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<!-- 设置yarn集群的内存分配方案 -->
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>20480</value>
</property>
<property>
<name>yarn.scheduler.minimum-allocation-mb</name>
<value>2048</value>
</property>
<property>
<name>yarn.nodemanager.vmem-pmem-ratio</name>
<value>2.1</value>
</property>
<!-- 开启日志聚合功能 -->
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>
<!-- 设置聚合日志在hdfs上的保存时间 -->
<property>
<name>yarn.log-aggregation.retain-seconds</name>
<value>604800</value>
</property>
<!-- 设置yarn历史服务器地址 -->
<property>
<name>yarn.log.server.url</name>
<value>http://kube-41:19888/jobhistory/logs</value>
</property>
<!-- 关闭yarn内存检查 -->
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>false</value>
</property>
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
</configuration>
# Spark设置历史服务地址
cd /usr/local/spark/conf
vim spark-defaults.conf
# 添加以下内容:
spark.eventLog.enabled true
spark.eventLog.dir hdfs://kube-41:8020/user/spark/sparklog/
spark.eventLog.compress true
spark.yarn.historyServer.address kube-41:18080
# 配置依赖spark jar包
# 当Spark Application应用提交运行在YARN上时,默认情况下,每次提交应用都需要将依赖Spark相关jar包上传到YARN 集群中,为了节省提交时间和存储空间,将Spark相关jar包上传到HDFS目录中,设置属性告知Spark Application应用。
hadoop fs -mkdir -p /user/spark/jars/
hadoop fs -put /usr/local/spark/jars/* /user/spark/jars/
# 修改spark-defaults.conf
cd /usr/local/spark/conf
vim spark-defaults.conf
# 添加以下内容:
spark.yarn.jars hdfs://kube-41:8020/spark/jars/*
# 同步到其他节点
scp spark-defaults.conf root@kube-42:/usr/local/spark/conf
scp spark-defaults.conf root@kube-43:/usr/local/spark/conf
# 启动服务
Spark Application运行在YARN上时,上述配置完成
启动服务:HDFS、YARN、MRHistoryServer和Spark HistoryServer,命令如下:
## 启动HDFS和YARN服务,在kube-41执行命令
start-dfs.sh
start-yarn.sh
或
start-all.sh
# 注意:在onyarn模式下不需要启动start-all.sh(jps查看一下看不到worker和master)
## 启动MRHistoryServer服务,在kube-41执行命令
mapred --daemon start
## 启动Spark HistoryServer服务,在kube-41执行命令
/usr/local/spark/sbin/start-history-server.sh
# Spark HistoryServer服务WEB UI页面地址:
http://kube-41:18080/
## 提交测试
# 先将圆周率PI程序提交运行在YARN上,命令如下:
/usr/local/spark/bin/spark-submit \
--master yarn \
--conf "spark.pyspark.driver.python=/usr/bin/python3" \
--conf "spark.pyspark.python=/usr/bin/python3" \
/usr/local/spark/examples/src/main/python/pi.py \
10
# 设置资源信息,提交运行pi程序至YARN上,命令如下:
/usr/local/spark/bin/spark-submit \
--master yarn \
--driver-memory 512m \
--executor-memory 512m \
--executor-cores 1 \
--num-executors 2 \
--queue default \
--conf "spark.pyspark.driver.python=/usr/bin/python3" \
--conf "spark.pyspark.python=/usr/bin/python3" \
/usr/local/spark/examples/src/main/python/pi.py \
10
# 当pi应用运行YARN上完成以后,从8080 WEB 页面点击应用历史服务连接,查看应用运行状态信息。
# pyspark测试
pyspark --master yarn --conf "spark.pyspark.driver.python=/usr/bin/python3" --conf "spark.pyspark.python=/usr/bin/python3"
>>> resultRDD = sc.textFile("hdfs://ns1/user/admin/sh/test.sh") \
.flatMap(lambda line: line.split(" ")) \
.map(lambda x: (x, 1)) \
.reduceByKey(lambda a, b: a + b)
>>> resultRDD.collect()
SparkSQL与Hive整合
# 将hive-site.xml拷贝到spark安装路径conf目录
cp /usr/local/hive/conf/hive-site.xml /usr/local/spark/conf/
# 同步到其他节点
scp hive-site.xml root@kube-42:/usr/local/spark/conf/
scp hive-site.xml root@kube-43:/usr/local/spark/conf/
# 将mysql的连接驱动包拷贝到spark的jars目录下
cd /usr/local/hive/lib
cp mysql-connector-java-5.1.32.jar /usr/local/spark/jars/
# 同步到其他节点
scp mysql-connector-java-5.1.32.jar root@kube-42:/usr/local/spark/jars/
scp mysql-connector-java-5.1.32.jar root@kube-43:/usr/local/spark/jars/
# Hive开启MetaStore服务
# 修改 hive/conf/hive-site.xml新增如下配置
# 远程模式部署metastore 服务地址
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>hive.metastore.uris</name>
<value>thrift://kube-41:9083</value>
</property>
</configuration>
# 启动 Hive MetaStore服务
# 前台启动:
bin/hive --service metastore
# 后台启动:
nohup /usr/local/hive/bin/hive --service metastore 2>&1 >> /var/log.log &
# 完整的hive-site.xml文件
<configuration>
<!-- 存储元数据mysql相关配置 -->
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value> jdbc:mysql://kube-41:3306/hive?createDatabaseIfNotExist=true&useSSL=false&useUnicode=true&characterEncoding=UTF-8</value>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>123456</value>
</property>
<!-- H2S运行绑定host -->
<property>
<name>hive.server2.thrift.bind.host</name>
<value>kube-41</value>
</property>
<!-- 远程模式部署metastore 服务地址 -->
<property>
<name>hive.metastore.uris</name>
<value>thrift://kube-41:9083</value>
</property>
<!-- 关闭元数据存储授权 -->
<property>
<name>hive.metastore.event.db.notification.api.auth</name>
<value>false</value>
</property>
<!-- 关闭元数据存储版本的验证 -->
<property>
<name>hive.metastore.schema.verification</name>
<value>false</value>
</property>
</configuration>
## 测试Sparksql整合Hive是否成功
# Spark-sql方式测试
# 先启动hadoop集群,在启动spark集群,确保启动成功之后kube-41执行命令,指明master地址、每一个executor的内存大小、一共所需要的核数、mysql数据库连接驱动:
cd /usr/local/spark
bin/spark-sql --master local[2] --executor-memory 512m --total-executor-cores 1
# 或
bin/spark-sql --master spark://kube-41:7077 --executor-memory 512m --total-executor-cores 1
# 执行成功后的界面:进入到spark-sql 客户端命令行界面
# 查看当前有哪些数据库, 并创建数据库
show databases;
create database sparkhive;
# 看到数据的结果,说明sparksql整合hive成功!
# 注意:日志太多,我们可以修改spark的日志输出级别(conf/log4j.properties)
# 在spark2.0版本后由于出现了sparkSession,在初始化sqlContext的时候,会设置默认的spark.sql.warehouse.dir=spark-warehouse,
# 此时将hive与sparksql整合完成之后,在通过spark-sql脚本启动的时候,还是会在那里启动spark-sql脚本,就会在当前目录下创建一个spark.sql.warehouse.dir为spark-warehouse的目录,存放由spark-sql创建数据库和创建表的数据信息,与之前hive的数据息不是放在同一个路径下(可以互相访问)。但是此时spark-sql中表的数据在本地,不利于操作,也不安全。
# 所有在启动的时候需要加上这样一个参数:
# --conf spark.sql.warehouse.dir=hdfs://kube-41:8020/user/hive/warehouse
# 保证spark-sql启动时不在产生新的存放数据的目录,sparksql与hive最终使用的是hive同一存放数据的目录。如果使用的是spark2.0之前的版本,由于没有sparkSession,不会出现spark.sql.warehouse.dir配置项,不会出现上述问题。
# Spark2之后最后的执行脚本,kube-41执行以下命令重新进去spark-sql
cd /usr/local/spark
bin/spark-sql \
--master spark://kube-41:7077 \
--executor-memory 512m --total-executor-cores 1 \
--conf spark.sql.warehouse.dir=hdfs://kube-41:8020/user/hive/warehouse
# Spark-Shell方式启动:
bin/spark-shell --master local[3]
spark.sql("show databases").show()
# PySpark方式启动:
bin/pyspark --master local[2]
spark.sql("show databases").show()
PyCharm整合Hive
原理
Hive表的元数据库中,描述了有哪些database、table、以及表有多少列,每一列是什么类型,以及表的数据保存在hdfs的什么位置
执行HQL时,先到MySQL元数据库中查找描述信息,然后解析HQL并根据描述信息生成MR任务,简单来说Hive就是将SQL根据MySQL中元数据信息转成MapReduce执行,但是速度慢
使用SparkSQL整合Hive其实就是让SparkSQL去加载Hive 的元数据库,然后通过SparkSQL执行引擎去操作Hive表
所以首先需要开启Hive的元数据库服务,让SparkSQL能够加载元数据
API
在Spark2.0之后,SparkSession对HiveContext和SqlContext在进行了统一
可以通过操作SparkSession来操作HiveContext和SqlContext。
SparkSQL整合Hive MetaStore
默认Spark 有一个内置的 MateStore,使用 Derby 嵌入式数据库保存数据【上面案例】,但是这种方式不适合生产环境,因为这种模式同一时间只能有一个 SparkSession 使用,所以生产环境更推荐使用 Hive 的 MetaStore
SparkSQL 整合 Hive 的 MetaStore 主要思路就是要通过配置能够访问它,并且能够使用 HDFS保存WareHouse,所以可以直接拷贝Hadoop和Hive的配置文件到Spark的配置目录。
使用SparkSQL操作集群Hive表
# 在PyCharm中开发应用,集成Hive读取表的数据进行分析,构建SparkSession时需要设置HiveMetaStore服务器地址及集成Hive选项:
# 范例演示代码如下:
# -*- coding: utf-8 -*-
# Program function:
from pyspark.sql import SparkSession
import os
os.environ['SPARK_HOME'] = '/usr/local/spark'
PYSPARK_PYTHON = "/usr/bin/python3"
# 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
if __name__ == '__main__':
# _SPARK_HOST = "spark://kube-41:7077"
_SPARK_HOST = "local[3]"
_APP_NAME = "test"
spark = SparkSession.builder \
.master(_SPARK_HOST) \
.appName(_APP_NAME) \
.enableHiveSupport() \
.getOrCreate()
spark.sparkContext.setLogLevel("WARN")
#PROJECT_ROOT = os.path.dirname(os.path.realpath(__file__)) # 获取项目根目录
# print(PROJECT_ROOT)#/export/pyfolder1/pyspark-chapter03_3.8/main
#path = os.path.join(PROJECT_ROOT, "data\\edge\\0_fuse.txt") # 文件路径
# 查看有哪些表
spark.sql("show databases").show()
spark.sql("use sparkhive").show()
spark.sql("show tables").show()
# 创建表
spark.sql(
"create table if not exists person (id int, name string, age int) row format delimited fields terminated by ','")
# 加载数据, 数据为当前目录下的person.txt(和src平级)
spark.sql("LOAD DATA LOCAL INPATH '/tmp/student.csv' INTO TABLE person")
# 查询数据
spark.sql("select * from person ").show()
print("===========================================================")
import pyspark.sql.functions as fn
spark.read \
.table("person ") \
.groupBy("name") \
.agg(fn.round(fn.avg("age"), 2).alias("avg_age")) \
.show(10, truncate=False)
spark.stop()