零經驗也可的 PySpark 教學 - UDF (User Defined Function)
Posted on Dec 28, 2022 in Python 模組/套件推薦 , Python 程式設計 - 高階 by Amo Chen ‐ 4 min read
Spark SQL 提供許多好用的函式(functions),例如 concat() , count() , date_format() 等等,但這些內建函式不一定能夠滿足一切的需求,所以有時候需要做一些擴充以達到目的,此種讓使用者能夠進行擴充的功能就被稱為 UDF (User Defined Function), RDBMS 諸如 MySQL 與 PostgreSQL 等也都支援 UDF, 詳見:
Spark 也同樣支援 UDF, 讓使用者能夠擴充函式。
本文環境
- macOS 13
- Apache Spark 3.2.1
- Docker Desktop 4.4.2
啟動 PySpark Notebook Docker Container 指令
$ docker run -it -p 8888:8888 -v $PWD:/home/jovyan jupyter/pyspark-notebook
UDF (User Defined Function)
透過 PySpark 實作 UDF 是相當簡單的,實作 Python function 之後,只要呼叫 udf() 並將 Python function 與其回傳的資料型別代入,即可完成 UDF 。
以下範例時做 1 個將字串轉為 title case 的 UDF, 即將每個單字的第 1 個字元都轉為大寫:
from pyspark.sql.functions import udf
def title_case(s):
return s.title()
udf_title_case = udf(title_case, StringType())
由於 DataFrame 中的資料具有型別的概念,因此上述範例中呼叫 udf(title_case, StringType())
時,必須指明第 2 個參數 StringType()
告訴 Spark 該 UDF 會回傳 string 型別的資料。
關於更多 PySpark 中的型別,可以呼叫 help() 函式列出:
import pyspark
help(pyspark.sql.types)
上述函式執行結果:
CLASSES
builtins.object
DataType
ArrayType
MapType
NullType
StructField
StructType
builtins.tuple(builtins.object)
Row
AtomicType(DataType)
BinaryType
BooleanType
DateType
DayTimeIntervalType
StringType
TimestampType
FractionalType(NumericType)
DecimalType
DoubleType
FloatType
IntegralType(NumericType)
ByteType
IntegerType
LongType
ShortType
實作完 UDF 之後,就能夠在 DataFrame 中呼叫 UDF, 例如呼叫前述範例中的 udf_title_case
, 將以下的 df.Name
欄位轉為 title case:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(
[
['john cena'],
['the rock johnson'],
],
['Name']
)
df.withColumn(
'Name',
udf_title_case(col('Name'))
).show(truncate=False)
上述執行結果如下,可以看到 Name 欄位都被轉為 title case:
+----------------+
|Name |
+----------------+
|John Cena |
|The Rock Johnson|
+----------------+
udf 裝飾子(decorator)
udf() 也可以用裝飾子的形式進行呼叫,前述 title_case
的範例,可以改寫成以下形式:
@udf(returnType=StringType())
def title_case(s):
return s.title()
spark.udf.register
對於習慣用 SQL 方式操作 DataFrame 的使用者,可以呼叫 SparkSession 實例(instance)中的 udf.register()
註冊 UDF, 之後就能透過 SQL 呼叫該 UDF, 例如:
from pyspark.sql import SparkSession
@udf(returnType=StringType())
def title_case(s):
return s.title()
spark = SparkSession.builder.getOrCreate()
spark.udf.register('title_case', title_case)
df = spark.createDataFrame(
[
['john cena'],
['the rock johnson'],
],
['Name']
)
df.createOrReplaceTempView('table')
spark.sql('''
select title_case(Name) as Name
from table
''').show(truncate=False)
上述範例執行結果如下:
+----------------+
|Name |
+----------------+
|John Cena |
|The Rock Johnson|
+----------------+
用 StructType 定義更複雜的資料結構(schema), 以 API 為例
截至目前為止的範例都還是回傳很單純的資料型別,例如 StringType
。
不過也有些情況需要讓 UDF 回傳更複雜的資料型態,例如有些資料可能需要透過呼叫 API 才能取得的情況,這種情況下的 UDF 勢必得定義複雜的資料結構,此時可以透過 StructType 與 StructField 客製(customize)資料結構。
以下是 Rick And Morty APIs 的角色資料回傳範例:
{
"id": 1,
"name": "Rick Sanchez",
"status": "Alive",
"species": "Human",
"type": "Human",
"gender": "Male",
"origin": "Earth (C-137)",
"image": "https://...1.jpeg"
}
可以看到該資料包含 id
, name
, status
等等欄位,如果用 StructType 與 StructField 可以表示成:
from pyspark.sql.types import IntegerType, StringType, StructType, StructField
character_schema = StructType([
StructField('id', IntegerType(), False),
StructField('name', StringType(), False),
StructField('status', StringType(), False),
StructField('type', StringType(), False),
StructField('gender', StringType(), False),
StructField('origin', StringType(), False),
StructField('image', StringType(), False),
])
其中 StructField 的第 3 個參數代表是否為 nullable, 預設值為 True, 代表該欄位可以是 null (會轉為 Python 的 None), 如果該欄位一定會有值的話,可以改為 True
。
定義好資料結構(schema)之後,接著就能實作 UDF:
import requests
from pyspark.sql.functions import col, udf
@udf(returnType=character_schema)
def get_character(character_id):
return requests.get(f'https://api.sampleapis.com/rickandmorty/characters/{character_id}').json()
最後實際以 DataFrame 試試看:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.udf.register('title_case', title_case)
df = spark.createDataFrame(
[
[1],
[2],
],
['Id']
)
df.select(
get_character(df.Id).alias('character')
).show(truncate=False)
上述執行結果如下,可以看到我們透過 UDF 順利將 Id 轉為 API 所回傳的資料:
不過,上述範例讓所有的資料都混在同一個欄位,我們可以進一步呼叫 select 方法轉為多個欄位:
df.select(
get_character(df.Id).alias('c')
).select(
col('c.id').alias('Id'),
col('c.name').alias('Name'),
col('c.status').alias('Status'),
).show()
上述範例執行結果如下,可以看到欄位已經變為多個:
+---+------------+------+
| Id| Name|Status|
+---+------------+------+
| 1|Rick Sanchez| Alive|
| 2| Morty Smith| Alive|
+---+------------+------+
錯誤處理(Error handling)
使用 API 的 UDF, 並不 100% 保證每次呼叫 API 都能得到正確的結果,有可能會因為伺服器的錯誤、網路短暫錯誤等等,造成回應的資料並不是所預期的資料格式,例如 API 可能會回傳:
{
"error": "internal server error",
"code": 500
}
遇到非預期的資料, PySpark 就會出現類似以下的錯誤:
Py4JJavaError: An error occurred while calling o1714.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 218.0 failed 1 times, most recent failure: Lost task 0.0 in stage 218.0 (TID 248) (9b6dc08349a1 executor driver): java.lang.NullPointerException: Cannot invoke "org.apache.spark.unsafe.types.UTF8String.getBaseObject()" because "input" is null
at org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter.write(UnsafeWriter.java:110)
如果要解決這種問題,就需要在 UDF 中加入重試(retry)的機制,甚至為資料結構加上錯誤處理專用的欄位,例如 error 與 code 欄位,另外所有的欄位都必須改為 nullable, 因為所有欄位都可能因為錯誤的關係,不再保證 100% 會正常出現:
from pyspark.sql.types import IntegerType, StringType, StructType, StructField
character_schema = StructType([
StructField('id', IntegerType(), True),
StructField('name', StringType(), True),
StructField('status', StringType(), True),
StructField('type', StringType(), True),
StructField('gender', StringType(), True),
StructField('origin', StringType(), True),
StructField('image', StringType(), True),
StructField('error', StringType(), True),
StructField('code', IntegerType(), True),
])
UDF 中則需要處理錯誤的情況,在錯誤發生時回傳 error 與 code 欄位,如此我們就能保證 Spark 不會因為預期外的資料而發生錯誤:
import requests
from pyspark.sql.functions import col, udf
@udf(returnType=character_schema)
def get_character(character_id):
try:
resp = requests.get(f'https://api.sampleapis.com/rickandmorty/characters/{character_id}')
resp.raise_for_status()
except Exception as e:
return {'error': str(e), 'code': resp.status_code}
return resp.json()
結論
UDF (User Defined Function) 雖然看起來相當厲害,充滿彈性運用的空間,不過實際上 UDF 應該視為最後手段,其原因在於 UDF 對 Spark 來說是個黑盒子,不像內建的函式有經過最佳化, UDF 很有可能造成 Spark 執行效率下降。
建議盡量使用內建的函式為上策,當內建函式無法滿足需求時,才考慮使用 UDF 。
References
PySpark Documentation — PySpark 3.1.3 documentation