0. 远程访问cdh spark集群

远程访问cdh spark集群

# 下载hadoop
https://mirrors.tuna.tsinghua.edu.cn/apache/hadoop/common/hadoop-3.3.6/hadoop-3.3.6.tar.gz

# 下载 Windows环境安装所需的bin文件包覆盖到hadoop的bin目录下
https://gitee.com/nkuhyx/winutils/tree/master/hadoop-3.3.0/bin

# 配置环境变量
HADOOP_HOME=D:\hadoop\hadoop-3.3.6
HADOOP_CONF_DIR=%HADOOP_HOME%/etc/hadoop
PYSPARK_PYTHON=D:\miniforge3\python.exe
SPARK_HOME=D:\miniforge3\Lib\site-packages\pyspark
PATH 增加 %HADOOP_HOME%\bin  %HADOOP_HOME%\sbin

# 下载cdh集群hadoop配置覆盖到hadoop的配置目录下
# 修改配置 hadoop-env.cmd
set HADOOP_HEAPSIZE=1024
set HADOOP_OPTS=%HADOOP_OPTS% -Djava.net.preferIPv4Stack=true

# yarn
su - admin
mkdir config
cp /usr/local/hadoop/etc/hadoop/core-site.xml config/
cp /usr/local/hadoop/etc/hadoop/yarn-site.xml config/
export HADOOP_CONF_DIR=D:\\git-python\\大数据-Spark\\config
$Env:HADOOP_CONF_DIR='config'   # powershell

# CDH6.3.1  YARN 下载客户端配置文件
export HADOOP_CONF_DIR=D:\\git-python\\大数据-Spark\\yarn-conf
# powershell
$Env:HADOOP_CONF_DIR='D:\git-python\大数据-Spark\yarn-conf'
$Env:YARN_CONF_DIR='D:\git-python\大数据-Spark\yarn-conf'
# 较低版本中文路径影响
$Env:HADOOP_CONF_DIR='D:\yarn-conf'
$Env:YARN_CONF_DIR='D:\yarn-conf'

vim py/test_spark_yarn.py
from pyspark import SparkConf
from pyspark import SparkContext
conf = SparkConf()
conf.setMaster('yarn')
conf.setAppName('testing')
conf.set("spark.executor.memory", "6g")
conf.set("spark.executor.instances", "1")
conf.set("spark.executor.cores", "2")
conf.set("spark.dynamicAllocation.enabled", "true")
conf.set("spark.driver.memory", "4g")
conf.set("spark.shuffle.service.enabled", "true")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1,2,3])
count = rdd.count()
print(sc.master)
print(count)

cd /opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/spark/jars
zip -q -r spark_jars_.zip *
su - spark;cd /opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/spark
hdfs dfs -put jars /user/spark/

cd /opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/spark/hive
hdfs dfs -put hive-exec-2.1.1-cdh6.3.2.jar /user/spark/jars/
hdfs dfs -ls /user/spark/jars
hdfs dfs -chmod -R 777 /user/spark/jars

cd /opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/hadoop/lib/native
hdfs dfs -put native /user/spark/

spark-submit --master yarn --deploy-mode cluster py/test_spark_yarn.py
spark-submit.cmd --master yarn --deploy-mode cluster py/test_spark_yarn.py

# 远程客户端提交
# spark-defaults.conf
cd D:\miniforge3\Lib\site-packages\pyspark\conf
vim spark-defaults.conf
spark.authenticate=false
spark.driver.log.dfsDir=/user/spark/driverLogs
spark.driver.log.persistToDfs.enabled=true
spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.executorIdleTimeout=60
spark.dynamicAllocation.minExecutors=0
spark.dynamicAllocation.schedulerBacklogTimeout=1
spark.eventLog.enabled=true
spark.io.encryption.enabled=false
spark.network.crypto.enabled=false
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.shuffle.service.enabled=true
spark.shuffle.service.port=7337
spark.ui.enabled=true
spark.ui.killEnabled=true
spark.lineage.log.dir=/var/log/spark/lineage
spark.lineage.enabled=true
spark.master=yarn
spark.submit.deployMode=client
spark.eventLog.dir=hdfs://ns1/user/spark/applicationHistory
spark.yarn.historyServer.address=http://kube-41:18088
spark.yarn.archive hdfs://ns1/user/spark/jars/spark_jars_.zip
spark.yarn.jars=hdfs://ns1/user/spark/jars/*
spark.driver.extraLibraryPath=hdfs://ns1/user/spark/native
spark.executor.extraLibraryPath=hdfs://ns1/user/spark/native
spark.yarn.am.extraLibraryPath=hdfs://ns1/user/spark/native
spark.yarn.historyServer.allowTracking=true
spark.yarn.appMasterEnv.MKL_NUM_THREADS=1
spark.executorEnv.MKL_NUM_THREADS=1
spark.yarn.appMasterEnv.OPENBLAS_NUM_THREADS=1
spark.executorEnv.OPENBLAS_NUM_THREADS=1
spark.extraListeners=com.cloudera.spark.lineage.NavigatorAppListener
spark.sql.queryExecutionListeners=com.cloudera.spark.lineage.NavigatorQueryListener

spark-submit.cmd --master yarn --deploy-mode cluster py/test_spark_yarn.py

hdfs dfs -ls /user/spark
hdfs dfs -chmod 777 /user
hdfs dfs -chmod 777 /user/spark

# 常用参数
{
  "spark.dynamicAllocation.minExecutors": "200",
  "spark.dynamicAllocation.initialExecutors": "200",
  "spark.dynamicAllocation.maxExecutors": "500",
  "spark.driver.memory": "2g",
  "spark.executor.memory": "4g",
  "spark.driver.cores": "4",
  "spark.yarn.executor.memoryOverhead": "4g",
  "spark.sql.adaptive.enabled": "true",
  "spark.sql.adaptive.skewedJoin.enabled": "true",
  "spark.sql.adaptive.skewedJoinWithAgg.enabled": "true",
  "spark.sql.adaptive.multipleSkewedJoin.enabled": "true"
}
# 示例
# ~conda_path/envs/py37/lib/python3.7/site-packages
# spark-submit \
#     --master yarn \
#     --deploy-mode cluster \
#     --queue ${queue_name} \
#     --conf spark.driver.memory=4g \
#     --conf spark.dynamicAllocation.enabled=true \
#     --conf spark.dynamicAllocation.minExecutors=10 \
#     --conf spark.dynamicAllocation.initialExecutors=10 \
#     --conf spark.dynamicAllocation.maxExecutors=20 \
#     --conf spark.executor.memory=4g \
#     --conf spark.executor.cores=4 \
#     --name spark_test_env \
#     --conf spark.hadoop.yarn.cluster.name=${cluster_name} \
#     --conf spark.pyspark.python=./py37_env/bin/python3 \
#     --conf spark.pyspark.driver.python=./py37_env/bin/python3 \
#     --archives hdfs_path/py37_env.tar.gz#py37_env
#     main.py

使用spark使用远程hdfs文件

HADOOP_HOME=D:\hadoop\hadoop-3.3.6
HADOOP_CONF_DIR=%HADOOP_HOME%/etc/hadoop
# 复制远程hdfs配置文件至本地hadoop配置文件目录覆盖
pyspark.cmd --master local[*]
>>> resultRDD = sc.textFile("hdfs://ns1/user/admin/sh/test.sh") \
    .flatMap(lambda line: line.split(" ")) \
    .map(lambda x: (x, 1)) \
    .reduceByKey(lambda a, b: a + b)
>>> resultRDD.collect()

Spark3

spark-submit.cmd --master yarn --deploy-mode cluster  D:\\miniforge3\\Lib\\site-packages\\pyspark\\examples\\src\\main\\python\\pi.py