6. Spark 案例分析

Spark 案例分析

# jeiba 测试
# conda install -i https://pypi.tuna.tsinghua.edu.cn/simple jieba
# coding=utf-8
# import sys
# reload(sys)
# sys.setdefaultencoding('utf-8')
import jieba

if __name__ == '__main__':
    # jeiba测试 cut_all=True 全模式
    seg_list = jieba.cut("我来到北京清华大学", cut_all=True)
    print("Full Mode: " + "/ ".join(seg_list))  # 全模式

    # jeiba测试 cut_all=False 精确模式
    seg_list = jieba.cut("我来到北京清华大学", cut_all=False)
    print("Default Mode: " + "/ ".join(seg_list))  # 精确模式

    # 搜索引擎模式,对长词再次切分
    seg_list = jieba.cut_for_search("我来到北京清华大学")  # 搜索引擎模式
    print("Search Engine Mode: " + "/ ".join(seg_list))

# Full Mode: 我/ 来到/ 北京/ 清华/ 清华大学/ 华大/ 大学
# Default Mode: 我/ 来到/ 北京/ 清华大学
# Search Engine Mode: 我/ 来到/ 北京/ 清华/ 华大/ 大学/ 清华大学

搜索引擎日志分析案例

# -*- coding: utf-8 -*-
# coding: utf-8
# coding=utf-8
# import sys
# reload(sys)
# sys.setdefaultencoding('utf-8')

from pyspark import SparkContext,SparkConf
import jieba

if __name__ == '__main__':
    # 初始化执行环境,构建SparkContext对象
    conf = SparkConf().setAppName("test").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    file_rdd = sc.textFile("file:///D:/git-python/大数据-Spark/files/SogouQ.txt")

    # 对数据进行切分 \t为分隔符
    words_rdd = file_rdd.map(lambda line: line.split("\t"))

    # 缓存words_rdd
    words_rdd.cache()

    # 采样几条数据查看
    # print(words_rdd.takeSample(True,1))

    # 取出所有关键字搜索内容
    search_rdd = words_rdd.map(lambda word: word[2])

    # 对搜索内容进行分词
    result_search_rdd = search_rdd.flatMap(lambda line: jieba.cut_for_search(line))
    # print(result_search_rdd.collect())

    # 对异常字符进行过滤处理 过滤不要的关键字 谷 帮 客
    filter_search_rdd = result_search_rdd.filter(lambda word: word not in ["谷","帮","客"])
    # print(filter_search_rdd.collect())

    # 修订4个关键词 传智播 -> 传智播客;博学 -> 博学谷;院校 -> 院校帮;湖 -> 数据湖
    final_search_rdd = filter_search_rdd.map(lambda word: "传智播客" if word == "传智播" else "博学谷" if word == "博学" else "院校帮" if word == "院校" else "数据湖" if word == "湖" else word)
    # print(final_search_rdd.collect())

    # 需求1:统计每个单词出现的次数并全局有效排序取top5
    word_count_rdd = final_search_rdd.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).sortBy(lambda x: x[1], ascending=False, numPartitions=1)
    word_count_rdd.cache()
    # print(word_count_rdd.collect())
    # 取top5
    print("搜索关键词前5名:",word_count_rdd.take(5))  # 已排序取前五,未排序使用top(5)

    # 需求2:用户和关键字组合分析
    # 取出用户ID和关键字
    user_and_word_rdd = words_rdd.map(lambda word: (word[1], word[2]))
    # 对关键字进行分词并过滤修改
    user_and_word_rdd2 = user_and_word_rdd.flatMap(lambda word: [(word[0], w) for w in jieba.cut_for_search(word[1]) if w not in ["谷","帮","客"]])
    # print(user_and_word_rdd2.collect())
    # 修订4个关键词 传智播 -> 传智播客;博学 -> 博学谷;院校 -> 院校帮;湖 -> 数据湖
    final_user_and_word_rdd = user_and_word_rdd2.map(lambda word: (word[0], "传智播客" if word[1] == "传智播" else "博学谷" if word[1] == "博学" else "院校帮" if word[1] == "院校" else "数据湖" if word[1] == "湖" else word[1]))
    # print(final_user_and_word_rdd.collect())
    # 统计用户关键词出现的次数以用户分组并按照关键词出现的次数进行全局排序
    user_word_count_rdd = final_user_and_word_rdd.map(lambda word: (word[0] + "_" + word[1], 1)).reduceByKey(lambda a, b: a + b).sortBy(lambda x: x[1], ascending=False, numPartitions=1)
    # print(user_word_count_rdd.collect())
    # 取top5
    print("用户搜索次数及关键词前5名:",user_word_count_rdd.take(5))

    # 需求3:热门搜索时间段分析
    # 取出时间并以:分隔
    time_rdd = words_rdd.map(lambda word: word[0].split(":"))
    # print(time_rdd.takeSample(True, 5))
    # 统计每个时间段出现的次数
    time_count_rdd = time_rdd.map(lambda time: (time[0] + ":" + time[1], 1)).reduceByKey(lambda a, b: a + b).sortBy(lambda x: x[1], ascending=False, numPartitions=1)
    print("热门搜索时间段前5名:",time_count_rdd.take(5))

    words_rdd.unpersist()
    sc.stop()

# 搜索关键词前5名: [('scala', 2310), ('hadoop', 2268), ('博学谷', 2002), ('传智汇', 1918), ('itheima', 1680)]
# 用户搜索次数及关键词前5名: [('6185822016522959_scala', 2016), ('41641664258866384_博学谷', 1372), ('44801909258572364_hadoop', 1260), ('7044693659960919_数据', 1120), ('7044693659960919_仓库', 1120)]
# 热门搜索时间段前5名: [('20:00', 3479), ('23:00', 3087), ('21:00', 2989), ('22:00', 2499), ('01:00', 1365)]

# 安装依赖
python2.7 get-pip.py
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple jieba
# 提交yarn集群
curl https://bootstrap.pypa.io/pip/2.7/get-pip.py -o get-pip.py
# cat search_jieba.py
# -*- coding: utf-8 -*-
# coding: utf-8
# coding=utf-8
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
from pyspark import SparkContext,SparkConf
import jieba

if __name__ == '__main__':
    # 初始化执行环境,构建SparkContext对象
    conf = SparkConf().setAppName("test").setMaster("yarn")
    conf.set("spark.executor.extraJavaOptions", "-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8")
    conf.set("spark.driver.extraJavaOptions", "-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8")
    conf.set('spark.python.unicode.decode.charset', 'utf-8')
    sc = SparkContext(conf=conf)
    #file_rdd = sc.textFile("file:///D:/git-python/大数据-Spark/files/SogouQ.txt")
    file_rdd = sc.textFile("hdfs://ns1/user/admin/files/SogouQ.txt",use_unicode=False)

    # 对数据进行切分 \t为分隔符
    words_rdd = file_rdd.map(lambda line: str(line).split("\t"))

    # 缓存words_rdd
    words_rdd.cache()

    # 采样几条数据查看
    # print(words_rdd.takeSample(True,1))

    # 取出所有关键字搜索内容
    search_rdd = words_rdd.map(lambda word: str(word[2]))

    # 对搜索内容进行分词
    result_search_rdd = search_rdd.flatMap(lambda line: jieba.cut_for_search(str(line)))
    # print(result_search_rdd.collect())

    # 对异常字符进行过滤处理 过滤不要的关键字 谷 帮 客
    filter_search_rdd = result_search_rdd.filter(lambda word: word not in ["谷".decode("utf-8"),"帮".decode("utf-8"),"客".decode("utf-8")])
    #filter_search_rdd = result_search_rdd.filter(lambda word: word not in ["谷","帮","客"])
    # print(filter_search_rdd.collect())

    # 修订4个关键词 传智播 -> 传智播客;博学 -> 博学谷;院校 -> 院校帮;湖 -> 数据湖
    final_search_rdd = filter_search_rdd.map(lambda word: "传智播客".decode("utf-8") if word == "传智播".decode("utf-8") else "博学谷".decode("utf-8") if word == "博学".decode("utf-8") else "院校帮".decode("utf-8") if word == "院校".decode("utf-8") else "数据湖".decode("utf-8") if word == "湖".decode("utf-8") else word)
    # print(final_search_rdd.collect())

    # 需求1:统计每个单词出现的次数并全局有效排序取top5
    word_count_rdd = final_search_rdd.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).sortBy(lambda x: x[1], ascending=False, numPartitions=1)
    #word_count_rdd.cache()
    # print(word_count_rdd.collect())
    # 取top5
    print("搜索关键词前5名:",word_count_rdd.take(5))  # 已排序取前五,未排序使用top(5)
    word_count_rdd.map(lambda word: word[0].encode("utf-8") + " " + str(word[1])).saveAsTextFile("hdfs://ns1/user/admin/files/test3")

    words_rdd.unpersist()
    sc.stop()

spark-submit --master yarn \
             --conf "spark.executor.extraJavaOptions=-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8" \
             --conf "spark.driver.extraJavaOptions=-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8" \
             --executor-memory 8g \
             --executor-cores 8 \
             --num-executors 4 \
             --deploy-mode client search_jieba.py
            #  --py-files /root/defs.py  # 依赖

# 每个executor吃8g内存,吃8个cpu核心,总共4个executor

# hdfs dfs -cat /user/admin/files/test3/*
# coding: utf-8
from pyspark import SparkContext,SparkConf
import os
# 配置HADOOP配置文件目录
os.environ["HADOOP_CONF_DIR"] = "D:/hadoop/hadoop-3.3.6/etc/hadoop"
os.environ['SPARK_HOME'] = 'D:\miniforge3\Lib\site-packages\pyspark'

if __name__ == '__main__':
    conf = SparkConf().setAppName("orderRDD").setMaster("local[*]")
    conf.set("spark.executor.extraJavaOptions", "-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8")
    conf.set("spark.driver.extraJavaOptions", "-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8")
    conf.set('spark.python.unicode.decode.charset', 'utf-8')
    sc = SparkContext(conf=conf)
    file_rdd = sc.wholeTextFiles("hdfs://ns1/user/admin/files/test3/*",use_unicode=False)
    # file_rdd = sc.wholeTextFiles("hdfs://ns1/user/admin/files/test3/part-00000",use_unicode=False)
    for filePath, content in file_rdd.collect():
        print(f"File: {filePath}")
        print(f"Content: \n{content.decode('utf-8')}")
    sc.stop()

Saprk3测试

spark3-submit --conf "spark.default.parallelism=100" --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode cluster --driver-memory 8g --executor-memory 4g --executor-cores 4 --num-executors 3 --queue root.default /opt/cloudera/parcels/CDH/lib/spark3/examples/jars/spark-examples*.jar 1000

curl https://bootstrap.pypa.io/pip/2.7/get-pip.py -o get-pip.py
python2.7 get-pip.py
pip3 install -i https://pypi.tuna.tsinghua.edu.cn/simple jieba
pip3 install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark

Spark3.3.4 Python版本得高于3.7
## centos7 安装python3.8.20
sudo yum install -y gcc openssl-devel bzip2-devel libffi-devel
wget https://www.python.org/ftp/python/3.8.20/Python-3.8.20.tgz
tar xzf Python-3.8.20.tgz
mv Python-3.8.20 python-3.8.20
cd python-3.8.20
./configure --prefix=/usr/local/python-3.8.20
# 编译报错 Could not import runpy module 解决方案
# 在低版本的gcc版本中带有 --enable-optimizations 参数时会出现上面问题
# gcc 8.1.0修复此问题
make
sudo make altinstall
/usr/local/python-3.8.20/bin/python3.8 -m pip install --upgrade pip -i https://pypi.tuna.tsinghua.edu.cn/simple
# 添加python3的软链接 
ln -s /usr/local/python-3.8.20/bin/python3.8 /usr/bin/python3
# 添加 pip3 的软链接
ln -s /usr/local/python-3.8.20/bin/pip3.8 /usr/bin/pip3
# 或者
sudo update-alternatives --install /usr/bin/python3 python3 /usr/local/bin/python3.8 1
python3 --version
pip3 install -i https://pypi.tuna.tsinghua.edu.cn/simple jieba --root-user-action=ignore
# 系统环境变量配置 PYSPARK_PYTHON 和 PYSPARK_DRIVER_PYTHON
import os
os.environ['PYSPARK_PYTHON'] = "/usr/local/python-3.8.20/bin/python"
os.environ['PYSPARK_DRIVER_PYTHON'] = "/usr/local/python-3.8.20/bin/python"

spark3-submit --master yarn \
             --conf "spark.executor.extraJavaOptions=-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8" \
             --conf "spark.driver.extraJavaOptions=-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8" \
             --conf "spark.default.parallelism=100" \
             --executor-memory 8g \
             --executor-cores 8 \
             --num-executors 3 \
             --deploy-mode client search_jieba.py