Spark 快速入门

前置条件

1. 创建 SparkSession

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
1
2
3
4
5
6
7

2. 创建 Dataframe

people.json 文件位于 Spark 安装包的 examples 目录中。

 # spark is an existing SparkSession
df = spark.read.json("examples/src/main/resources/people.json")
# Displays the content of the DataFrame to stdout
df.show()
1
2
3
4

-w1357

3. Dataframe 操作

# Print the schema in a tree format

df.printSchema()
1
2
3
# Select only the "name" column

df.select("name").show()
1
2
3
#Select everybody, but increment the age by 1

df.select(df['name'], df['age'] + 1).show()
1
2
3
# Select people older than 21

df.filter(df['age']>21).show()
1
2
3
df.groupby('age').count().show()
1

-w1038

4. Running SQL Queries Programmatically(以编程方式运行SQL查询)

# Register the DataFrame as a SQL temporary view
# 将 DataFrame 注册为 SQL 临时视图
df.createOrReplaceTempView("people")

sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
1
2
3
4
5
6

5.Global Temporary View

Spark SQL 中的临时视图是会话范围的,如果创建它的会话终止,临时视图将消失。如果您希望拥有在所有会话之间共享的临时视图,并且在 Spark 应用程序终止之前保持活动状态,则可以创建全局临时视图。全局临时视图绑定到系统保留的数据库 GLOBAL_TEMP,我们必须使用限定名称来引用它,例如 SELECT * FROM GLOBAL_TEMP.view1。

df.createGlobalTempView("people")
1
spark.sql("select * from global_temp.people").show()
1
# Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()
1
2

-w1039

6. Interoperating with RDDs

6.1 Inferring the Schema Using Reflection

!cat examples/src/main/resources/people.txt
1
from pyspark.sql import Row

sc = spark.sparkContext

# Load a text file and convert each line to a Row.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))

# Infer the schema, and register the DataFrame as a table.
schemaPeople = spark.createDataFrame(people)
schemaPeople.createOrReplaceTempView("people")

# SQL can be run over DataFrames that have been registered as a table.
teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

# The results of SQL queries are Dataframe objects.
# rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`.
teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect()
for name in teenNames:
    print(name)
# Name: Justin
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

6.2 Programmatically Specifying the Schema

当无法提前定义 kwarg 字典时(例如,记录的结构编码为字符串,或者将解析文本数据集,并针对不同的用户以不同的方式投影字段),可以通过以下三个步骤以编程方式创建 DataFrame

从原始 RDD 创建元组或列表的 RDD; 创建由 StructType 表示的模式,该 StructType 与在步骤1中创建的 RDD 中的元组或列表的结构相匹配。 通过 SparkSession 提供的 createDataFrame 方法将模式应用于 RDD。 例如:

# Import data types
from pyspark.sql.types import StringType, StructType, StructField

sc = spark.sparkContext

# Load a text file and convert each line to a Row.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
# Each line is converted to a tuple.
people = parts.map(lambda p: (p[0], p[1].strip()))

# The schema is encoded in a string.
schemaString = "name age"

fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)

# Apply the schema to the RDD.
schemaPeople = spark.createDataFrame(people, schema)

# Creates a temporary view using the DataFrame
schemaPeople.createOrReplaceTempView("people")

# SQL can be run over DataFrames that have been registered as a table.
results = spark.sql("SELECT name FROM people")

results.show()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

-w975-w965

reference