零經驗也可的 PySpark 教學 - UDF (User Defined Function)

Posted on  Dec 28, 2022  in  Python 模組/套件推薦 , Python 程式設計 - 高階  by  Amo Chen  ‐ 4 min read

Spark SQL 提供許多好用的函式(functions),例如 concat() , count() , date_format() 等等,但這些內建函式不一定能夠滿足一切的需求,所以有時候需要做一些擴充以達到目的,此種讓使用者能夠進行擴充的功能就被稱為 UDF (User Defined Function), RDBMS 諸如 MySQL 與 PostgreSQL 等也都支援 UDF, 詳見:

  1. MySQL - Adding a Loadable Function
  2. PostgreSQL - User-Defined Functions

Spark 也同樣支援 UDF, 讓使用者能夠擴充函式。

本文環境

  • macOS 13

啟動 PySpark Notebook Docker Container 指令

$ docker run -it -p 8888:8888 -v $PWD:/home/jovyan jupyter/pyspark-notebook

UDF (User Defined Function)

透過 PySpark 實作 UDF 是相當簡單的,實作 Python function 之後,只要呼叫 udf() 並將 Python function 與其回傳的資料型別代入,即可完成 UDF 。

以下範例時做 1 個將字串轉為 title case 的 UDF, 即將每個單字的第 1 個字元都轉為大寫:

from pyspark.sql.functions import udf


def title_case(s):
    return s.title()

udf_title_case = udf(title_case, StringType())

由於 DataFrame 中的資料具有型別的概念,因此上述範例中呼叫 udf(title_case, StringType()) 時,必須指明第 2 個參數 StringType() 告訴 Spark 該 UDF 會回傳 string 型別的資料。

關於更多 PySpark 中的型別,可以呼叫 help() 函式列出:

import pyspark

help(pyspark.sql.types)

上述函式執行結果:

CLASSES
    builtins.object
        DataType
            ArrayType
            MapType
            NullType
            StructField
            StructType
    builtins.tuple(builtins.object)
        Row
    AtomicType(DataType)
        BinaryType
        BooleanType
        DateType
        DayTimeIntervalType
        StringType
        TimestampType
    FractionalType(NumericType)
        DecimalType
        DoubleType
        FloatType
    IntegralType(NumericType)
        ByteType
        IntegerType
        LongType
        ShortType

實作完 UDF 之後,就能夠在 DataFrame 中呼叫 UDF, 例如呼叫前述範例中的 udf_title_case , 將以下的 df.Name 欄位轉為 title case:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.getOrCreate()

df = spark.createDataFrame(
    [
        ['john cena'],
        ['the rock johnson'],
    ],
    ['Name']
)
df.withColumn(
    'Name',
    udf_title_case(col('Name'))
).show(truncate=False)

上述執行結果如下,可以看到 Name 欄位都被轉為 title case:

+----------------+
|Name            |
+----------------+
|John Cena       |
|The Rock Johnson|
+----------------+

udf 裝飾子(decorator)

udf() 也可以用裝飾子的形式進行呼叫,前述 title_case 的範例,可以改寫成以下形式:

@udf(returnType=StringType())
def title_case(s):
    return s.title()

spark.udf.register

對於習慣用 SQL 方式操作 DataFrame 的使用者,可以呼叫 SparkSession 實例(instance)中的 udf.register() 註冊 UDF, 之後就能透過 SQL 呼叫該 UDF, 例如:

from pyspark.sql import SparkSession

@udf(returnType=StringType())
def title_case(s):
    return s.title()

spark = SparkSession.builder.getOrCreate()
spark.udf.register('title_case', title_case)

df = spark.createDataFrame(
    [
        ['john cena'],
        ['the rock johnson'],
    ],
    ['Name']
)

df.createOrReplaceTempView('table')

spark.sql('''
    select title_case(Name) as Name
    from table
''').show(truncate=False)

上述範例執行結果如下:

+----------------+
|Name            |
+----------------+
|John Cena       |
|The Rock Johnson|
+----------------+

用 StructType 定義更複雜的資料結構(schema), 以 API 為例

截至目前為止的範例都還是回傳很單純的資料型別,例如 StringType

不過也有些情況需要讓 UDF 回傳更複雜的資料型態,例如有些資料可能需要透過呼叫 API 才能取得的情況,這種情況下的 UDF 勢必得定義複雜的資料結構,此時可以透過 StructTypeStructField 客製(customize)資料結構。

以下是 Rick And Morty APIs 的角色資料回傳範例:

{
  "id": 1,
  "name": "Rick Sanchez",
  "status": "Alive",
  "species": "Human",
  "type": "Human",
  "gender": "Male",
  "origin": "Earth (C-137)",
  "image": "https://...1.jpeg"
}

可以看到該資料包含 id , name , status 等等欄位,如果用 StructTypeStructField 可以表示成:

from pyspark.sql.types import IntegerType, StringType, StructType, StructField

character_schema = StructType([
  StructField('id', IntegerType(), False),
  StructField('name', StringType(), False),
  StructField('status', StringType(), False),
  StructField('type', StringType(), False),
  StructField('gender', StringType(), False),
  StructField('origin', StringType(), False),
  StructField('image', StringType(), False),
])

其中 StructField 的第 3 個參數代表是否為 nullable, 預設值為 True, 代表該欄位可以是 null (會轉為 Python 的 None), 如果該欄位一定會有值的話,可以改為 True

定義好資料結構(schema)之後,接著就能實作 UDF:

import requests
from pyspark.sql.functions import col, udf


@udf(returnType=character_schema)
def get_character(character_id):
    return requests.get(f'https://api.sampleapis.com/rickandmorty/characters/{character_id}').json()

最後實際以 DataFrame 試試看:

from pyspark.sql import SparkSession


spark = SparkSession.builder.getOrCreate()
spark.udf.register('title_case', title_case)

df = spark.createDataFrame(
    [
        [1],
        [2],
    ],
    ['Id']
)


df.select(
    get_character(df.Id).alias('character')
).show(truncate=False)

上述執行結果如下,可以看到我們透過 UDF 順利將 Id 轉為 API 所回傳的資料:

不過,上述範例讓所有的資料都混在同一個欄位,我們可以進一步呼叫 select 方法轉為多個欄位:

df.select(
    get_character(df.Id).alias('c')
).select(
    col('c.id').alias('Id'),
    col('c.name').alias('Name'),
    col('c.status').alias('Status'),
).show()

上述範例執行結果如下,可以看到欄位已經變為多個:

+---+------------+------+
| Id|        Name|Status|
+---+------------+------+
|  1|Rick Sanchez| Alive|
|  2| Morty Smith| Alive|
+---+------------+------+

錯誤處理(Error handling)

使用 API 的 UDF, 並不 100% 保證每次呼叫 API 都能得到正確的結果,有可能會因為伺服器的錯誤、網路短暫錯誤等等,造成回應的資料並不是所預期的資料格式,例如 API 可能會回傳:

{
    "error": "internal server error",
    "code": 500
}

遇到非預期的資料, PySpark 就會出現類似以下的錯誤:

Py4JJavaError: An error occurred while calling o1714.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 218.0 failed 1 times, most recent failure: Lost task 0.0 in stage 218.0 (TID 248) (9b6dc08349a1 executor driver): java.lang.NullPointerException: Cannot invoke "org.apache.spark.unsafe.types.UTF8String.getBaseObject()" because "input" is null
	at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter.write(UnsafeWriter.java:110)

如果要解決這種問題,就需要在 UDF 中加入重試(retry)的機制,甚至為資料結構加上錯誤處理專用的欄位,例如 error 與 code 欄位,另外所有的欄位都必須改為 nullable, 因為所有欄位都可能因為錯誤的關係,不再保證 100% 會正常出現:

from pyspark.sql.types import IntegerType, StringType, StructType, StructField

character_schema = StructType([
  StructField('id', IntegerType(), True),
  StructField('name', StringType(), True),
  StructField('status', StringType(), True),
  StructField('type', StringType(), True),
  StructField('gender', StringType(), True),
  StructField('origin', StringType(), True),
  StructField('image', StringType(), True),
  StructField('error', StringType(), True),
  StructField('code', IntegerType(), True),
])

UDF 中則需要處理錯誤的情況,在錯誤發生時回傳 error 與 code 欄位,如此我們就能保證 Spark 不會因為預期外的資料而發生錯誤:

import requests
from pyspark.sql.functions import col, udf


@udf(returnType=character_schema)
def get_character(character_id):
    try:
        resp = requests.get(f'https://api.sampleapis.com/rickandmorty/characters/{character_id}')
        resp.raise_for_status()
    except Exception as e:
        return {'error': str(e), 'code': resp.status_code}

    return resp.json()

結論

UDF (User Defined Function) 雖然看起來相當厲害,充滿彈性運用的空間,不過實際上 UDF 應該視為最後手段,其原因在於 UDF 對 Spark 來說是個黑盒子,不像內建的函式有經過最佳化, UDF 很有可能造成 Spark 執行效率下降。

建議盡量使用內建的函式為上策,當內建函式無法滿足需求時,才考慮使用 UDF 。

References

PySpark Documentation — PySpark 3.1.3 documentation

Sample APIs

How to Execute a REST API call on Apache Spark the Right Way | by James S Hocking | Geek Culture | Aug, 2021 | Medium | Geek Culture

對抗久坐職業傷害

研究指出每天增加 2 小時坐著的時間,會增加大腸癌、心臟疾病、肺癌的風險,也造成肩頸、腰背疼痛等常見問題。

然而對抗這些問題,卻只需要工作時定期休息跟伸展身體即可!

你想輕鬆改變現狀嗎?試試看我們的 PomodoRoll 番茄鐘吧! PomodoRoll 番茄鐘會根據你所設定的專注時間,定期建議你 1 項辦公族適用的伸展運動,幫助你打敗久坐所帶來的傷害!

追蹤新知

看完這篇文章了嗎?還意猶未盡的話,追蹤粉絲專頁吧!

我們每天至少分享 1 篇文章/新聞或者實用的軟體/工具,讓你輕鬆增廣見聞提升專業能力!如果你喜歡我們的文章,或是想了解更多特定主題的教學,歡迎到我們的粉絲專頁按讚、留言讓我們知道。你的鼓勵,是我們的原力!

贊助我們的創作

看完這篇文章了嗎? 休息一下,喝杯咖啡吧!

如果你覺得 MyApollo 有讓你獲得實用的資訊,希望能看到更多的技術分享,邀請你贊助我們一杯咖啡,讓我們有更多的動力與精力繼續提供高品質的文章,感謝你的支持!