Transformer Base Class

Transformers take a pyspark.sql.DataFrame as an input, transform it accordingly and return a PySpark DataFrame.

Each Transformer class has to have a transform method which takes no arguments and returns a PySpark DataFrame.

Possible transformation methods can be Selecting the most up to date record by id, Exploding an array, Filter (on an exploded array), Apply basic threshold cleansing or Map the incoming DataFrame to at provided structure.

class Transformer[source]

Bases: object

Base Class of Transformer Classes.


Sets the __name__ of the class’ type as name, which is essentially the Class’ Name.


Shared, class level logger for all instances.


Performs a transformation on a DataFrame.

Parameters:input_df (pyspark.sql.DataFrame) – Input DataFrame
Returns:Transformed DataFrame.
Return type:pyspark.sql.DataFrame


This method does only take the Input DataFrame as a parameters. All other needed parameters are defined in the initialization of the Transformator Object.

Create your own Transformer

Let your transformer class inherit from the transformer base class. This includes the name, string representation and logger attributes from the superclass.

The only mandatory thing is to provide a transform() method which
takes a
=> PySpark DataFrame!
and returns a
=> PySpark DataFrame!

All configuration and parameterization should be done while initializing the class instance.

Here would be a simple example for a transformer which drops records without an Id:

Exemplary Sample Code

from transformer import Transformer

class NoIdDropper(Transformer):
    This is a simplified example on how to implement a new transformer class.
    Please take your time to write proper docstrings as they are automatically
    parsed via Sphinx to build the HTML and PDF documentation.
    Docstrings use the style of Numpy (via the napoleon plug-in).

    This class uses the :meth:`pyspark.sql.DataFrame.dropna` method internally.

    input_df = some_extractor_instance.extract()
    transformed_df = NoIdDropper(

    id_columns: :any:`str` or :any:`list`
        The name of the column containing the identifying Id values.
        Defaults to "id" 
        "how ('" + how + "') should be 'any' or 'all'"
        "subset should be a list or tuple of column names"

    def __init__(self, id_columns='id'):
        super(NoIdDropper, self).__init__()
        self.id_columns = id_columns

    def transform(self, input_df):"Dropping records without an Id (columns to consider: {col})"
        output_df = input_df.dropna(
        return output_df

References to include

This makes it possible to import the new transformer class directly from spooq2.transformer instead of spooq2.transformer.no_id_dropper. It will also be imported if you use from spooq2.transformer import *.

--- original 
+++ adapted 
@@ -1,13 +1,15 @@
 from newest_by_group import NewestByGroup
 from mapper import Mapper
 from exploder import Exploder
 from threshold_cleaner import ThresholdCleaner
 from sieve import Sieve
+from no_id_dropper import NoIdDropper
 __all__ = [
+    "NoIdDropper",


One of Spooq2’s features is to provide tested code for multiple data pipelines. Please take your time to write sufficient unit tests! You can reuse test data from tests/data or create a new schema / data set if needed. A SparkSession is provided as a global fixture called spark_session.

import pytest
from pyspark.sql.dataframe import DataFrame

from spooq2.transformer import NoIdDropper

def default_transformer():
    return NoIdDropper(id_columns=["first_name", "last_name"])

def input_df(spark_session):

def transformed_df(default_transformer, input_df):
    return default_transformer.transform(input_df)

class TestBasicAttributes(object):

    def test_logger_should_be_accessible(self, default_transformer):
        assert hasattr(default_transformer, "logger")

    def test_name_is_set(self, default_transformer):
        assert == "NoIdDropper"

    def test_str_representation_is_correct(self, default_transformer):
        assert unicode(default_transformer) == "Transformer Object of Class NoIdDropper"

class TestNoIdDropper(object):

    def test_records_are_dropped(transformed_df, input_df):
        """Transformed DataFrame has no records with missing first_name and last_name"""
        assert input_df.where("first_name is null or last_name is null").count() > 0
        assert transformed_df.where("first_name is null or last_name is null").count() == 0

    def test_schema_is_unchanged(transformed_df, input_df):
        """Converted DataFrame has the expected schema"""
        assert transformed_df.schema == input_df.schema


You need to create a rst for your transformer which needs to contain at minimum the automodule or the autoclass directive.

Record Dropper if Id is missing

Some text if you like...

.. automodule:: spooq2.transformer.no_id_dropper

To automatically include your new transformer in the HTML / PDF documentation you need to add it to a toctree directive. Just refer to your newly created no_id_dropper.rst file within the transformer overview page.

--- original
+++ adapted
@@ -7,14 +7,15 @@
 .. toctree::

+    no_id_dropper

 Class Diagram of Transformer Subpackage
 .. uml:: ../diagrams/from_thesis/class_diagram/transformers.puml

That should be it!