PySpark OOP UDF
Create PySpark UDF in an OOP code pattern
Most software engineers are tought to use Object Oriented Programming (OOP). When they transition to the processing of data on Spark (or any Spark product like Databricks, BigQuery or EMR), we sometimes run into interesting issues.
One of those issues is having their Class Object being pickled and send across all the worker nodes, resulting in the following error: PicklingError: Could not serialize object: TypeError: can't pickle <function name>
when the Class contains a User Defined Function (UDF). User-Defined Functions (UDFs) are user-programmable routines that act on one row. This documentation lists the classes that are required for creating and registering UDFs.
In PySpark, a UDF is created like any other function, but it is decorated with the @udf
decorator.
In order to prevent the PicklingError, you will need to add the @staticmethod
decorator on top of the @udf
decorator. But why is that? Let’s dive in.
PICKLING 🥒
What is pickling and why is it giving us errors? The term pickling comes from the python pickling library, which in itself comes from the preservation technique. The pickling library is used for serialization and deserialization of data and code. Most data structures in Python are stored as chunks of data in different places in memory, with pointers connecting them. Serialization is taking an object that is scattered around your computer’s memory and turning it into a continuous series of bytes that can be saved or transmitted and then converted back to the original object.
Because Spark uses distributed execution, objects defined in driver need to be sent to workers. This requires them to be serializable. In particular, udfs need to be serializable.
def make_bold(fn):
return lambda : "<b>" + fn() + "</b>"
def make_italic(fn):
return lambda : "<i>" + fn() + "</i>"
@make_bold
@make_italic
def ogl():
return "One Giant Leap - Data Engineering"
print(ogl())
-------------------------------------
output: <b><i>One Giant Leap</i></b>
PythonDECORATORS
Let’s start with a small demo that shows the order of wrapping of decorators. A decorator wraps the entire function which it is decorating.
In this case, the @make_italic decorator is wrapping the ogl function. The @make_bold decorator is in turn wrapping the ogl function, which has been wrapped by the @make_italic decorator.
This is visualized by the HTML italic tags being inside the HTML bold tags.
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
from tika import parser
class DataLoaderBase(BaseModel):
"""
Base Class for loading data
"""
data_path: str
@staticmethod
@F.udf(returnType=StringType())
def _binary_to_text(bytes) -> str:
"""
Convert binary PDF representation
to text representation.
:param bytes: Binary representation of PDF
:return: Text representation of PDF
"""
parsed = parser.from_buffer(io.BytesIO(bytes))
return parsed["content"]
PythonOOP Demo
Now let’s have a look at how we would create a UDF when we are working in an OOP patter.
This code snippet shows a fictional class for data loading. It has a private method called _binary_to_text
that takes a bytes object and returns the string representation of that. This was the actual function that led me to write this post, but is not something you would find quickly in this class.
This method has two decorators; PySpark’s @udf
decorator and a @staticmethod
decorator. As mentioned earlier, the @staticmethod
decorator is required to be able to pickle the class.
BRINGING IT TOGETHER
When you program in an Object Oriented style and you create a class with a method, that class is being pickled by PySpark on the master node in order to send it to the worker nodes for actual execution on a subset of the data.
You can’t pickle a single method of a class which is not defined at the top level (the class is part of the top level but not its methods). The @staticmethod
lets you call the method on instances without the system trying to bind self
.