零經驗也可的 PySpark 教學 - RDD 初體驗

Posted on  Jan 25, 2023  in  Python 模組/套件推薦 , Python 程式設計 - 高階  by  Amo Chen  ‐ 3 min read

先前的教學文中,我們已經理解 DataFrame 是基於 RDD(Resilient Distributed Dataset) 所演化出的資料結構,因此也有必要稍微理解 RDD 相關的操作,以自由地在 DataFrame 與 RDD 之間遊走。

本文透過實際範例操作 RDD 以使大家都能夠對 RDD 初步上手。

本文環境

啟動 PySpark Notebook Docker Container 指令

$ docker run -it -p 8888:8888 -v $PWD:/home/jovyan jupyter/pyspark-notebook

RDD(Resilient Distributed Dataset) 初體驗

如果對 RDD 有興趣的話,可以存取 DataFrame 的 rdd 屬性(attribute), 即可取得 Spark 的 RDD 資料:

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([
    {"id": 1, "cost": 100},
    {"id": 2, "cost": 200},
    {"id": 3, "cost": 250},
])
df.rdd

例如,以下取得 RDD 的第 1 筆資料:

df.rdd.first()

上述範例結果如下,可以看到 RDD 每 1 筆資料都是 Row object:

Row(cost=100, id=1)

建立 RDD

如果要使用 PySpark 建立 RDD, 可以呼叫 pyspark.SparkContext.parallelize 方法,下列範例使用 parallelize 方法建立僅有 3 筆資料的 RDD:

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

rdd = spark.sparkContext.parallelize([
    {"id": 1, "cost": 100},
    {"id": 2, "cost": 200},
    {"id": 3, "cost": 250},
])

RDD 的運算

RDD 與 DataFrame 相似,也支援 map , reduce , filter , join , aggregate 等操作(operations),同樣可以使用 help 函式列出其文件:

help(rdd)

或者參閱 pyspark.RDD 官方文件

下文僅以 map, reduce 以及 filter 作為操作示範,以體驗 RDD 的相關操作。

map()

map 可以針對每 1 筆資料進行運算,並請回傳新的 RDD, 以下範例以 map 方法對每筆資料的 cost 除以 100:

rdd.map(lambda r: {'id': r['id'], 'cost': r['cost'] / 100}).collect()

上述範例結果如下,可以看到每 1 筆資料的 cost 都被除以 100:

[{'id': 1, 'cost': 1.0}, {'id': 2, 'cost': 2.0}, {'id': 3, 'cost': 2.5}]

reduce()

reduce 則是將所有符合條件的值累計或匯總為 1 個輸出值的方法,例如加總的操作,相當適合以 reduce 方法進行,以下範例以 map 方法對所有資料的 cost 除以 100 後,再以 reduce 方法將所有的 cost 進行加總:

rdd.map(lambda r: r['cost'] / 100).reduce(lambda a, b: a + b)

上述範例結果如下:

5.5

filter()

filter 則是將符合條件的資料過濾出來,例如以下範例將陳列 cost 大於 200 的資料:

rdd.filter(lambda r: r['cost'] > 200).collect()

上述範例結果如下:

[{'id': 3, 'cost': 250}]

RDD 轉 DataFrame

如果想將 RDD 轉換成 DataFrame, 最簡單的方式是呼叫 toDF() 方法,該方法是 spark.createDataFrame(rdd, schema, sampleRatio) 的簡寫,可以使用 help 函式查閱該方法的文件:

help(rdd.toDF)

上述結果如下:

Help on method toDF in module pyspark.sql.session:

toDF(schema=None, sampleRatio=None) method of pyspark.rdd.RDD instance
    Converts current :class:`RDD` into a :class:`DataFrame`
    
    This is a shorthand for ``spark.createDataFrame(rdd, schema, sampleRatio)``

該方法可接受 2 個參數:

  1. schema
  2. sampleRatio

schema 是資料結構的定義,如不指定則會自動進行推論(inferring)。

The first row will be used if samplingRatio is None .

sampleRatio 則是進行推論時的取樣率,如不指定則會以第 1 筆資料進行推論,如果希望抽樣 50% 的資料進行推論,則指定 0.5 即可。

下列範例為 RDD 建立 schema 定義後呼叫 toDF(schema=schema) 方法轉為 DataFrame:

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType

spark = SparkSession.builder.getOrCreate()

rdd = spark.sparkContext.parallelize([
    {"id": 1, "cost": 100},
    {"id": 2, "cost": 200},
    {"id": 3, "cost": 250},
])
schema = StructType([       
    StructField('id', IntegerType(), False),
    StructField('cost', IntegerType(), False)
])
df = rdd.toDF(schema=schema)
df.printSchema()
df.show(truncate=False)

上述範例等同下列形式,可以改成呼叫 pyspark.sql.SparkSession.createDataFrame 將 RDD 轉為 DataFrame:

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType

spark = SparkSession.builder.getOrCreate()

rdd = spark.sparkContext.parallelize([
    {"id": 1, "cost": 100},
    {"id": 2, "cost": 200},
    {"id": 3, "cost": 250},
])

schema = StructType([       
    StructField('id', IntegerType(), False),
    StructField('cost', IntegerType(), False)
])

df = spark.createDataFrame(rdd, schema=schema)
df.printSchema()
df.show(truncate=False)

上述範例結果如下:

root
 |-- id: integer (nullable = false)
 |-- cost: integer (nullable = false)

+---+----+
|id |cost|
+---+----+
|1  |100 |
|2  |200 |
|3  |250 |
+---+----+

總結

一般來說,使用 DataFrame 能夠解決絕大多數的需求之外, DataFrame 的效能也比較好,如果有 DataFrame 無法滿足的需要或者當使用 RDD 效能較好時,再考慮使用 RDD, 否則 DataFrame 基本都是優先考慮的選擇。

本文僅是示範如何操作 RDD 以備不時之需。

Happy Coding!

References

PySpark Documentation — PySpark 3.2.1 documentation

對抗久坐職業傷害

研究指出每天增加 2 小時坐著的時間,會增加大腸癌、心臟疾病、肺癌的風險,也造成肩頸、腰背疼痛等常見問題。

然而對抗這些問題,卻只需要工作時定期休息跟伸展身體即可!

你想輕鬆改變現狀嗎?試試看我們的 PomodoRoll 番茄鐘吧! PomodoRoll 番茄鐘會根據你所設定的專注時間,定期建議你 1 項辦公族適用的伸展運動,幫助你打敗久坐所帶來的傷害!

贊助我們的創作

看完這篇文章了嗎? 休息一下,喝杯咖啡吧!

如果你覺得 MyApollo 有讓你獲得實用的資訊,希望能看到更多的技術分享,邀請你贊助我們一杯咖啡,讓我們有更多的動力與精力繼續提供高品質的文章,感謝你的支持!