零經驗也可的 PySpark 教學 - RDD 初體驗
Posted on Jan 25, 2023 in Python 模組/套件推薦 , Python 程式設計 - 高階 by Amo Chen ‐ 3 min read
先前的教學文中,我們已經理解 DataFrame 是基於 RDD(Resilient Distributed Dataset) 所演化出的資料結構,因此也有必要稍微理解 RDD 相關的操作,以自由地在 DataFrame 與 RDD 之間遊走。
本文透過實際範例操作 RDD 以使大家都能夠對 RDD 初步上手。
本文環境
- macOS 13
- Apache Spark 3.2.1
- Docker Desktop 4.4.2
啟動 PySpark Notebook Docker Container 指令
$ docker run -it -p 8888:8888 -v $PWD:/home/jovyan jupyter/pyspark-notebook
RDD(Resilient Distributed Dataset) 初體驗
如果對 RDD 有興趣的話,可以存取 DataFrame 的 rdd 屬性(attribute), 即可取得 Spark 的 RDD 資料:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([
{"id": 1, "cost": 100},
{"id": 2, "cost": 200},
{"id": 3, "cost": 250},
])
df.rdd
例如,以下取得 RDD 的第 1 筆資料:
df.rdd.first()
上述範例結果如下,可以看到 RDD 每 1 筆資料都是 Row object:
Row(cost=100, id=1)
建立 RDD
如果要使用 PySpark 建立 RDD, 可以呼叫 pyspark.SparkContext.parallelize 方法,下列範例使用 parallelize 方法建立僅有 3 筆資料的 RDD:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
rdd = spark.sparkContext.parallelize([
{"id": 1, "cost": 100},
{"id": 2, "cost": 200},
{"id": 3, "cost": 250},
])
RDD 的運算
RDD 與 DataFrame 相似,也支援 map , reduce , filter , join , aggregate 等操作(operations),同樣可以使用 help 函式列出其文件:
help(rdd)
或者參閱 pyspark.RDD 官方文件 。
下文僅以 map, reduce 以及 filter 作為操作示範,以體驗 RDD 的相關操作。
map()
map 可以針對每 1 筆資料進行運算,並請回傳新的 RDD, 以下範例以 map 方法對每筆資料的 cost 除以 100:
rdd.map(lambda r: {'id': r['id'], 'cost': r['cost'] / 100}).collect()
上述範例結果如下,可以看到每 1 筆資料的 cost 都被除以 100:
[{'id': 1, 'cost': 1.0}, {'id': 2, 'cost': 2.0}, {'id': 3, 'cost': 2.5}]
reduce()
reduce 則是將所有符合條件的值累計或匯總為 1 個輸出值的方法,例如加總的操作,相當適合以 reduce 方法進行,以下範例以 map 方法對所有資料的 cost 除以 100 後,再以 reduce 方法將所有的 cost 進行加總:
rdd.map(lambda r: r['cost'] / 100).reduce(lambda a, b: a + b)
上述範例結果如下:
5.5
filter()
filter 則是將符合條件的資料過濾出來,例如以下範例將陳列 cost 大於 200 的資料:
rdd.filter(lambda r: r['cost'] > 200).collect()
上述範例結果如下:
[{'id': 3, 'cost': 250}]
RDD 轉 DataFrame
如果想將 RDD 轉換成 DataFrame, 最簡單的方式是呼叫 toDF() 方法,該方法是 spark.createDataFrame(rdd, schema, sampleRatio)
的簡寫,可以使用 help 函式查閱該方法的文件:
help(rdd.toDF)
上述結果如下:
Help on method toDF in module pyspark.sql.session:
toDF(schema=None, sampleRatio=None) method of pyspark.rdd.RDD instance
Converts current :class:`RDD` into a :class:`DataFrame`
This is a shorthand for ``spark.createDataFrame(rdd, schema, sampleRatio)``
該方法可接受 2 個參數:
- schema
- sampleRatio
schema
是資料結構的定義,如不指定則會自動進行推論(inferring)。
The first row will be used if samplingRatio is
None
.
sampleRatio
則是進行推論時的取樣率,如不指定則會以第 1 筆資料進行推論,如果希望抽樣 50% 的資料進行推論,則指定 0.5 即可。
下列範例為 RDD 建立 schema 定義後呼叫 toDF(schema=schema)
方法轉為 DataFrame:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType
spark = SparkSession.builder.getOrCreate()
rdd = spark.sparkContext.parallelize([
{"id": 1, "cost": 100},
{"id": 2, "cost": 200},
{"id": 3, "cost": 250},
])
schema = StructType([
StructField('id', IntegerType(), False),
StructField('cost', IntegerType(), False)
])
df = rdd.toDF(schema=schema)
df.printSchema()
df.show(truncate=False)
上述範例等同下列形式,可以改成呼叫 pyspark.sql.SparkSession.createDataFrame 將 RDD 轉為 DataFrame:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType
spark = SparkSession.builder.getOrCreate()
rdd = spark.sparkContext.parallelize([
{"id": 1, "cost": 100},
{"id": 2, "cost": 200},
{"id": 3, "cost": 250},
])
schema = StructType([
StructField('id', IntegerType(), False),
StructField('cost', IntegerType(), False)
])
df = spark.createDataFrame(rdd, schema=schema)
df.printSchema()
df.show(truncate=False)
上述範例結果如下:
root
|-- id: integer (nullable = false)
|-- cost: integer (nullable = false)
+---+----+
|id |cost|
+---+----+
|1 |100 |
|2 |200 |
|3 |250 |
+---+----+
總結
一般來說,使用 DataFrame 能夠解決絕大多數的需求之外, DataFrame 的效能也比較好,如果有 DataFrame 無法滿足的需要或者當使用 RDD 效能較好時,再考慮使用 RDD, 否則 DataFrame 基本都是優先考慮的選擇。
本文僅是示範如何操作 RDD 以備不時之需。
Happy Coding!
References
PySpark Documentation — PySpark 3.2.1 documentation