Spark入门

最近需要用spark比较多,重新学习一下。今天先学习一些基础。

参考:
https://classroom.udacity.com/courses/ud2002

Spark处理数据

函数式编程

首先用下图来看一下,函数式编程和过程式编程的区别。图片

函数式编程非常适合分布式系统。Python并不是函数编程语言,但使用PySparkAPI 可以让你编写Spark程序,并确保你的代码使用了函数式编程。在底层,Python 代码使用 py4j 来调用 Java 虚拟机(JVM)。

假设有下面一段代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
log_of_songs = [
        "Despacito",
        "Nice for what",
        "No tears left to cry",
        "Despacito",
        "Havana",
        "In my feelings",
        "Nice for what",
        "Despacito",
        "All the stars"
]
play_count = 0
def count_plays(song_title):
    global play_count
    for song in log_of_songs:
        if song == song_title:
            play_count = play_count + 1
    return play_count

调用两次count_plays(“Despacito”)会得到不同的结果,这是因为play_count是作为全局变量,在函数内部进行了修改。解决这个问题可以采用如下代码:

1
2
3
4
5
def count_plays(song_title, play_count):
for song in log_of_songs:
if song == song_title:
play_count = play_count + 1
return play_count

这就是Spark解决问题的方式。
在Spark中我们使用Pure Function(纯函数),就像面包制造厂,不同的面包机器之间是互不干扰的,且不会损坏原材料。Spark会在函数执行前,将数据复制多分,以输入到不同函数中。为了防止内存溢出,Spark会在代码中建立一个数据的有向无环图,在运行前检查是否有必要对某一分数据进行复制。

图片

运行时参数设置

参考:
https://tech.meituan.com/2016/04/29/spark-tuning-basic.html
https://spark.apache.org/docs/1.6.1/running-on-yarn.html

1
2
3
4
5
6
7
8
9
10
11
12
13
spark-submit \
    --master yarn \
    --deploy-mode cluster  \
    --num-executors 100 \
    --driver-memory 2g \
    --executor-memory 14g \
    --executor-cores 6 \
    --conf spark.default.parallelism=1000 \
    --conf spark.storage.memoryFraction=0.2 \
    --conf spark.shuffle.memoryFraction=0.6 \
    --conf spark.executor.extraJavaOptions='-Dlog4j.configuration=log4j.properties' \
    --driver-java-options -Dlog4j.configuration=log4j.properties \
    python文件  \
  • spark-submit: which spark-submit 查看该命令是 spark 系统的还是 pyspark 包自带的,应该使用 spark 系统的
    • master:
    • standaloone: spark 自带的集群资源管理器
    • yarn
    • local: 本地运行
  • deploy-mode:
    • client: driver 在本机上,能够直接使用本机文件系统
    • cluster: driver 指不定在哪台机器上,不能读取本机文件系统
  • spark 运行时配置:主要的有运行内存和节点数量:
    • num_executors
    • spark_driver_memory
    • spark_executor_memory
  • addFiles 与 —files(将需要使用的文件分发到每台机器上):
    • addFiles():能够分发到每台机器上,包括 driver 上
    • —files: 只能分发到 executor 上
  • 引用其他模块的问题:
    • 第三方库:需要将第三方库打包上传供使用
    • 自己的模块:也需要打包上传,以供使用
  • 运行下面前请确认
    • export SPARK_HOME=…../spark-1.6.2-bin-hadoop2.6
    • export PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH
    • export JAVA_HOME=…/jdk1.8.0_60
    • PYSPARK_PYTHON=./NLTK/conda-env/bin/python spark-submit —conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./NLTK/conda-env/bin/python —master yarn-cluster —archives conda-env.zip#NLTK clean_step_two.py

Maps和Lambda

lambda函数起源:
http://palmstroem.blogspot.com/2012/05/lambda-calculus-for-absolute-dummies.html

Maps会复制原始数据,并把副本数据按照Maps中的函数进行转换。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from pyspark import SparkContext
log_of_songs = [
"Despacito",
"Nice for what",
"No tears left to cry",
"Despacito",
"Havana",
"In my feelings",
"Nice for what",
"Despacito",
"All the stars"
]

def convert_song_to_lowercase(song):
return song.lower()

if __name__ == "__main__":
conf = SparkConf()
conf.setAppName("Testing").setMaster("local")
sc = SparkContext(conf=conf)
sc.setLogLevel("WARN")

# parallelize将对象分配到不同节点上
distributed_song_log = sc.parallelize(log_of_songs)
# 定义不同节点的所有数据执行convert_song_to_lowercase的操作
# 但此时spark还未执行,它在等待所有定义结束后,看是否可以优化某些操作
distributed_song_log.map(convert_song_to_lowercase)
# 如果想强制spark执行,则可以使用collect,则会将所有数据汇总
# 注意此时spark并没有改变原始数据的大小写,它将原始数据进行了拷贝,再做的处理
distributed_song_log.collect()
# 也可以使用python的匿名函数进行map
distributed_song_log.map(lambda song: song.lower()).collect()

Data Frame

数据处理有两种方式,一种使用Data Frame和Python进行命令式编程,另一种使用SQL进行声明式编程。命令式编程关注的是”How”,声明式编程关注的是”What”。

图片

图片

图片

Data Frame的读取和写入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import pyspark
from pyspark import SparkConf
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Our first Python Spark SQL example") \
    .getOrCreate()
# 检查一下是否生效了。
spark.sparkContext.getConf().getAll()
path = "data/sparkify_log_small.json"
user_log = spark.read.json(path)
user_log.printSchema()
user_log.describe()
user_log.show(n=1)
# 取数据的前5条
user_log.take(5)
out_path = "data/sparkify_log_small.csv"
user_log.write.save(out_path, format="csv", header=True)
# 读取另一个daraframe
user_log_2 = spark.read.csv(out_path, header=True)
user_log_2.printSchema()
user_log_2.take(2)
user_log_2.select("userID").show()

Data Frame数据处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import desc
from pyspark.sql.functions import asc
from pyspark.sql.functions import sum as Fsum

import datetime

import numpy as np
import pandas as pd
%matplotlib inline
import matplotlib.pyplot as plt

spark = SparkSession \
    .builder \
    .appName("Wrangling Data") \
    .getOrCreate()

path = "data/sparkify_log_small.json"
user_log = spark.read.json(path)
# 数据搜索
user_log.take(5)
user_log.printSchema()
user_log.describe().show()
user_log.describe("artist").show()
user_log.describe("sessionId").show()
user_log.count()
user_log.select("page").dropDuplicates().sort("page").show()
user_log.select(["userId", "firstname", "page", "song"]).where(user_log.userId == "1046").collect()
# 按小时统计数据
get_hour = udf(lambda x: datetime.datetime.fromtimestamp(x / 1000.0). hour)
user_log = user_log.withColumn("hour", get_hour(user_log.ts))
user_log.head()
songs_in_hour = user_log.filter(user_log.page == "NextSong").groupby(user_log.hour).count().orderBy(user_log.hour.cast("float"))
songs_in_hour.show()
songs_in_hour_pd = songs_in_hour.toPandas()
songs_in_hour_pd.hour = pd.to_numeric(songs_in_hour_pd.hour)
plt.scatter(songs_in_hour_pd["hour"], songs_in_hour_pd["count"])
plt.xlim(-1, 24);
plt.ylim(0, 1.2 * max(songs_in_hour_pd["count"]))
plt.xlabel("Hour")
plt.ylabel("Songs played");

# 删除空值的行
user_log_valid = user_log.dropna(how = "any", subset = ["userId", "sessionId"])
user_log_valid.count()
user_log.select("userId").dropDuplicates().sort("userId").show()
user_log_valid = user_log_valid.filter(user_log_valid["userId"] != "")
user_log_valid.count()
# 降级服务的用户
user_log_valid.filter("page = 'Submit Downgrade'").show()
user_log.select(["userId", "firstname", "page", "level", "song"]).where(user_log.userId == "1138").collect()
flag_downgrade_event = udf(lambda x: 1 if x == "Submit Downgrade" else 0, IntegerType())
user_log_valid = user_log_valid.withColumn("downgraded", flag_downgrade_event("page"))
user_log_valid.head()
from pyspark.sql import Window
windowval = Window.partitionBy("userId").orderBy(desc("ts")).rangeBetween(Window.unboundedPreceding, 0)
user_log_valid = user_log_valid.withColumn("phase", Fsum("downgraded").over(windowval))
user_log_valid.select(["userId", "firstname", "ts", "page", "level", "phase"]).where(user_log.userId == "1138").sort("ts").collect()

Spark SQL

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import desc
from pyspark.sql.functions import asc
from pyspark.sql.functions import sum as Fsum

import datetime

import numpy as np
import pandas as pd
%matplotlib inline
import matplotlib.pyplot as plt

spark = SparkSession \
    .builder \
    .appName("Data wrangling with Spark SQL") \
    .getOrCreate()

path = "data/sparkify_log_small.json"
user_log = spark.read.json(path)

user_log.take(1)
# 下面的代码创建了一个临时视图,你可以使用该视图运行 SQL 查询
user_log.createOrReplaceTempView("user_log_table")
spark.sql("SELECT * FROM user_log_table LIMIT 2").show()
spark.sql('''
          SELECT * 
          FROM user_log_table 
          LIMIT 2
          '''
          ).show()
spark.sql('''
          SELECT COUNT(*) 
          FROM user_log_table 
          '''
          ).show()
spark.sql('''
          SELECT userID, firstname, page, song
          FROM user_log_table 
          WHERE userID == '1046'
          '''
          ).collect()
spark.sql('''
          SELECT DISTINCT page
          FROM user_log_table 
          ORDER BY page ASC
          '''
          ).show()

# 自定义函数
spark.udf.register("get_hour", lambda x: int(datetime.datetime.fromtimestamp(x / 1000.0).hour))
spark.sql('''
          SELECT *, get_hour(ts) AS hour
          FROM user_log_table 
          LIMIT 1
          '''
          ).collect()
songs_in_hour = spark.sql('''
          SELECT get_hour(ts) AS hour, COUNT(*) as plays_per_hour
          FROM user_log_table
          WHERE page = "NextSong"
          GROUP BY hour
          ORDER BY cast(hour as int) ASC
          '''
          )
songs_in_hour.show()
# 用 Pandas 转换数据
songs_in_hour_pd = songs_in_hour.toPandas()
print(songs_in_hour_pd)

RDD

参考:
https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html
https://spark.apache.org/docs/latest/rdd-programming-guide.html

不管使用Pyspark还是其他语言,Spark的底层都会通过Catalyst转成执行DAG序列:

图片

DAG在底层使用RDD对象进行操作。

Spark中的机器学习

特征提取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
from pyspark.sql import SparkSession
from pyspark.ml.feature import RegexTokenizer, VectorAssembler, Normalizer, StandardScaler
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

import re
# create a SparkSession: note this step was left out of the screencast
spark = SparkSession.builder \
    .master("local") \
    .appName("Word Count") \
    .getOrCreate()

# 如何读取数据集
stack_overflow_data = 'Train_onetag_small.json'
df = spark.read.json(stack_overflow_data)
df.head()
# 把字符串分为单独的单词。Spark有一个[Tokenizer](https://spark.apache.org/docs/latest/ml-features.html#tokenizer) 类以及RegexTokenizer。 后者在分词时有更大的自由度。
# split the body text into separate words
regexTokenizer = RegexTokenizer(inputCol="Body", outputCol="words", pattern="\\W")
df = regexTokenizer.transform(df)
df.head()
# count the number of words in each body tag
body_length = udf(lambda x: len(x), IntegerType())
df = df.withColumn("BodyLength", body_length(df.words))
# count the number of paragraphs and links in each body tag
number_of_paragraphs = udf(lambda x: len(re.findall("</p>", x)), IntegerType())
number_of_links = udf(lambda x: len(re.findall("</a>", x)), IntegerType())
df = df.withColumn("NumParagraphs", number_of_paragraphs(df.Body))
df = df.withColumn("NumLinks", number_of_links(df.Body))
df.head(2)
# 将内容长度,段落数和内容中的链接数合并为一个向量
assembler = VectorAssembler(inputCols=["BodyLength", "NumParagraphs", "NumLinks"], outputCol="NumFeatures")
df = assembler.transform(df)
df.head()
# 归一化向量
scaler = Normalizer(inputCol="NumFeatures", outputCol="ScaledNumFeatures")
df = scaler.transform(df)
df.head(2)
# 缩放向量
scaler2 = StandardScaler(inputCol="NumFeatures", outputCol="ScaledNumFeatures2", withStd=True)
scalerModel = scaler2.fit(df)
df = scalerModel.transform(df)
df.head(2)

文本特征

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
from pyspark.sql import SparkSession
from pyspark.ml.feature import RegexTokenizer, CountVectorizer, \
IDF, StringIndexer
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

import re
# create a SparkSession: note this step was left out of the screencast
spark = SparkSession.builder \
.master("local") \
.appName("Word Count") \
.getOrCreate()

# 如何读取数据集
stack_overflow_data = 'Train_onetag_small.json'
df = spark.read.json(stack_overflow_data)
df.head()

# 分词将字符串拆分为单独的单词。Spark 有一个[Tokenizer] (https://spark.apache.org/docs/latest/ml-features.html#tokenizer) 类和RegexTokenizer。后者在分词时有更大的自由度 。
# split the body text into separate words
regexTokenizer = RegexTokenizer(inputCol="Body", outputCol="words", pattern="\\W")
df = regexTokenizer.transform(df)
df.head()

# CountVectorizer
# find the term frequencies of the words
cv = CountVectorizer(inputCol="words", outputCol="TF", vocabSize=1000)
cvmodel = cv.fit(df)
df = cvmodel.transform(df)
df.take(1)
# show the vocabulary in order of
cvmodel.vocabulary
# show the last 10 terms in the vocabulary
cvmodel.vocabulary[-10:]

# 逆文本频率指数(Inter-document Frequency )
idf = IDF(inputCol="TF", outputCol="TFIDF")
idfModel = idf.fit(df)
df = idfModel.transform(df)
df.head()

# StringIndexer
indexer = StringIndexer(inputCol="oneTag", outputCol="label")
df = indexer.fit(df).transform(df)
df.head()