Spark 案例分析
import jieba
if __name__ == '__main__':
seg_list = jieba.cut("我来到北京清华大学", cut_all=True)
print("Full Mode: " + "/ ".join(seg_list)) # 全模式
seg_list = jieba.cut("我来到北京清华大学", cut_all=False)
print("Default Mode: " + "/ ".join(seg_list)) # 精确模式
seg_list = jieba.cut_for_search("我来到北京清华大学") # 搜索引擎模式
print("Search Engine Mode: " + "/ ".join(seg_list))
搜索引擎日志分析案例
from pyspark import SparkContext,SparkConf
import jieba
if __name__ == '__main__':
conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)
file_rdd = sc.textFile("file:///D:/git-python/大数据-Spark/files/SogouQ.txt")
words_rdd = file_rdd.map(lambda line: line.split("\t"))
words_rdd.cache()
search_rdd = words_rdd.map(lambda word: word[2])
result_search_rdd = search_rdd.flatMap(lambda line: jieba.cut_for_search(line))
filter_search_rdd = result_search_rdd.filter(lambda word: word not in ["谷","帮","客"])
final_search_rdd = filter_search_rdd.map(lambda word: "传智播客" if word == "传智播" else "博学谷" if word == "博学" else "院校帮" if word == "院校" else "数据湖" if word == "湖" else word)
word_count_rdd = final_search_rdd.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).sortBy(lambda x: x[1], ascending=False, numPartitions=1)
word_count_rdd.cache()
print("搜索关键词前5名:",word_count_rdd.take(5)) # 已排序取前五,未排序使用top(5)
user_and_word_rdd = words_rdd.map(lambda word: (word[1], word[2]))
user_and_word_rdd2 = user_and_word_rdd.flatMap(lambda word: [(word[0], w) for w in jieba.cut_for_search(word[1]) if w not in ["谷","帮","客"]])
final_user_and_word_rdd = user_and_word_rdd2.map(lambda word: (word[0], "传智播客" if word[1] == "传智播" else "博学谷" if word[1] == "博学" else "院校帮" if word[1] == "院校" else "数据湖" if word[1] == "湖" else word[1]))
user_word_count_rdd = final_user_and_word_rdd.map(lambda word: (word[0] + "_" + word[1], 1)).reduceByKey(lambda a, b: a + b).sortBy(lambda x: x[1], ascending=False, numPartitions=1)
print("用户搜索次数及关键词前5名:",user_word_count_rdd.take(5))
time_rdd = words_rdd.map(lambda word: word[0].split(":"))
time_count_rdd = time_rdd.map(lambda time: (time[0] + ":" + time[1], 1)).reduceByKey(lambda a, b: a + b).sortBy(lambda x: x[1], ascending=False, numPartitions=1)
print("热门搜索时间段前5名:",time_count_rdd.take(5))
words_rdd.unpersist()
sc.stop()
python2.7 get-pip.py
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple jieba
curl https://bootstrap.pypa.io/pip/2.7/get-pip.py -o get-pip.py
import sys
reload(sys)
sys.setdefaultencoding('utf-8')
from pyspark import SparkContext,SparkConf
import jieba
if __name__ == '__main__':
conf = SparkConf().setAppName("test").setMaster("yarn")
conf.set("spark.executor.extraJavaOptions", "-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8")
conf.set("spark.driver.extraJavaOptions", "-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8")
conf.set('spark.python.unicode.decode.charset', 'utf-8')
sc = SparkContext(conf=conf)
file_rdd = sc.textFile("hdfs://ns1/user/admin/files/SogouQ.txt",use_unicode=False)
words_rdd = file_rdd.map(lambda line: str(line).split("\t"))
words_rdd.cache()
search_rdd = words_rdd.map(lambda word: str(word[2]))
result_search_rdd = search_rdd.flatMap(lambda line: jieba.cut_for_search(str(line)))
filter_search_rdd = result_search_rdd.filter(lambda word: word not in ["谷".decode("utf-8"),"帮".decode("utf-8"),"客".decode("utf-8")])
final_search_rdd = filter_search_rdd.map(lambda word: "传智播客".decode("utf-8") if word == "传智播".decode("utf-8") else "博学谷".decode("utf-8") if word == "博学".decode("utf-8") else "院校帮".decode("utf-8") if word == "院校".decode("utf-8") else "数据湖".decode("utf-8") if word == "湖".decode("utf-8") else word)
word_count_rdd = final_search_rdd.map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).sortBy(lambda x: x[1], ascending=False, numPartitions=1)
print("搜索关键词前5名:",word_count_rdd.take(5)) # 已排序取前五,未排序使用top(5)
word_count_rdd.map(lambda word: word[0].encode("utf-8") + " " + str(word[1])).saveAsTextFile("hdfs://ns1/user/admin/files/test3")
words_rdd.unpersist()
sc.stop()
spark-submit --master yarn \
--conf "spark.executor.extraJavaOptions=-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8" \
--conf "spark.driver.extraJavaOptions=-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8" \
--executor-memory 8g \
--executor-cores 8 \
--num-executors 4 \
--deploy-mode client search_jieba.py
from pyspark import SparkContext,SparkConf
import os
os.environ["HADOOP_CONF_DIR"] = "D:/hadoop/hadoop-3.3.6/etc/hadoop"
os.environ['SPARK_HOME'] = 'D:\miniforge3\Lib\site-packages\pyspark'
if __name__ == '__main__':
conf = SparkConf().setAppName("orderRDD").setMaster("local[*]")
conf.set("spark.executor.extraJavaOptions", "-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8")
conf.set("spark.driver.extraJavaOptions", "-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8")
conf.set('spark.python.unicode.decode.charset', 'utf-8')
sc = SparkContext(conf=conf)
file_rdd = sc.wholeTextFiles("hdfs://ns1/user/admin/files/test3/*",use_unicode=False)
for filePath, content in file_rdd.collect():
print(f"File: {filePath}")
print(f"Content: \n{content.decode('utf-8')}")
sc.stop()
Saprk3测试
spark3-submit --conf "spark.default.parallelism=100" --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode cluster --driver-memory 8g --executor-memory 4g --executor-cores 4 --num-executors 3 --queue root.default /opt/cloudera/parcels/CDH/lib/spark3/examples/jars/spark-examples*.jar 1000
curl https://bootstrap.pypa.io/pip/2.7/get-pip.py -o get-pip.py
python2.7 get-pip.py
pip3 install -i https://pypi.tuna.tsinghua.edu.cn/simple jieba
pip3 install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark
Spark3.3.4 Python版本得高于3.7
sudo yum install -y gcc openssl-devel bzip2-devel libffi-devel
wget https://www.python.org/ftp/python/3.8.20/Python-3.8.20.tgz
tar xzf Python-3.8.20.tgz
mv Python-3.8.20 python-3.8.20
cd python-3.8.20
./configure --prefix=/usr/local/python-3.8.20
make
sudo make altinstall
/usr/local/python-3.8.20/bin/python3.8 -m pip install --upgrade pip -i https://pypi.tuna.tsinghua.edu.cn/simple
ln -s /usr/local/python-3.8.20/bin/python3.8 /usr/bin/python3
ln -s /usr/local/python-3.8.20/bin/pip3.8 /usr/bin/pip3
sudo update-alternatives --install /usr/bin/python3 python3 /usr/local/bin/python3.8 1
python3 --version
pip3 install -i https://pypi.tuna.tsinghua.edu.cn/simple jieba --root-user-action=ignore
import os
os.environ['PYSPARK_PYTHON'] = "/usr/local/python-3.8.20/bin/python"
os.environ['PYSPARK_DRIVER_PYTHON'] = "/usr/local/python-3.8.20/bin/python"
spark3-submit --master yarn \
--conf "spark.executor.extraJavaOptions=-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8" \
--conf "spark.driver.extraJavaOptions=-Dfile.encoding=UTF-8 -Dsun.jnu.encoding=UTF-8" \
--conf "spark.default.parallelism=100" \
--executor-memory 8g \
--executor-cores 8 \
--num-executors 3 \
--deploy-mode client search_jieba.py