Transformer Base Class

Transformers take a DataFrame as an input, transform it accordingly and return a DataFrame.

Each Transformer class has to have a transform method which takes no arguments and returns a DataFrame.

Possible transformation methods can be Selecting the most up-to-date record by id, Exploding an array, Filter (on an exploded array), Apply basic threshold cleansing or Map the incoming DataFrame to at provided structure.

Create your own Transformer

Let your transformer class inherit from the transformer base class. This includes the name, string representation and logger attributes from the superclass.

The only mandatory thing is to provide a transform() method which
takes a
=> PySpark DataFrame!
and returns a
=> PySpark DataFrame!

All configuration and parameterization should be done while initializing the class instance.

Here would be a simple example for a transformer which drops records without an Id:

Exemplary Sample Code

spooq/transformer/no_id_dropper.py:
from transformer import Transformer

class NoIdDropper(Transformer):
    """
    This is a simplified example on how to implement a new transformer class.
    Please take your time to write proper docstrings as they are automatically
    parsed via Sphinx to build the HTML and PDF documentation.
    Docstrings use the style of Numpy (via the napoleon plug-in).

    This class uses the :meth:`pyspark.sql.DataFrame.dropna` method internally.

    Examples
    --------
    input_df = some_extractor_instance.extract()
    transformed_df = NoIdDropper(
        id_columns='user_id'
    ).transform(input_df)

    Parameters
    ----------
    id_columns: :any:`str` or :any:`list`
        The name of the column containing the identifying Id values.
        Defaults to "id" 
    
    Raises
    ------
    :any:`exceptions.ValueError`: 
        "how ('" + how + "') should be 'any' or 'all'"
    :any:`exceptions.ValueError`: 
        "subset should be a list or tuple of column names"
    """

    def __init__(self, id_columns='id'):
        super(NoIdDropper, self).__init__()
        self.id_columns = id_columns


    def transform(self, input_df):
        self.logger.info("Dropping records without an Id (columns to consider: {col})"
            .format(col=self.id_columns))
        output_df = input_df.dropna(
            how='all', 
            thresh=None, 
            subset=self.id_columns
        )
        
        return output_df

References to include

This makes it possible to import the new transformer class directly from spooq.transformer instead of spooq.transformer.no_id_dropper. It will also be imported if you use from spooq.transformer import *.

spooq/transformer/__init__.py:
--- original 
+++ adapted 
@@ -1,13 +1,15 @@
 from newest_by_group import NewestByGroup
 from mapper import Mapper
 from exploder import Exploder
 from threshold_cleaner import ThresholdCleaner
 from sieve import Sieve
+from no_id_dropper import NoIdDropper
 
 __all__ = [
     "NewestByGroup",
     "Mapper",
     "Exploder",
     "ThresholdCleaner",
     "Sieve",
+    "NoIdDropper",
 ]

Tests

One of Spooq’s features is to provide tested code for multiple data pipelines. Please take your time to write sufficient unit tests! You can reuse test data from tests/data or create a new schema / data set if needed. A SparkSession is provided as a global fixture called spark_session.

tests/unit/transformer/test_no_id_dropper.py:
import pytest
from pyspark.sql.dataframe import DataFrame

from spooq.transformer import NoIdDropper


@pytest.fixture()
def default_transformer():
    return NoIdDropper(id_columns=["first_name", "last_name"])


@pytest.fixture()
def input_df(spark_session):
    return spark_session.read.parquet("../data/schema_v1/parquetFiles")


@pytest.fixture()
def transformed_df(default_transformer, input_df):
    return default_transformer.transform(input_df)


class TestBasicAttributes(object):

    def test_logger_should_be_accessible(self, default_transformer):
        assert hasattr(default_transformer, "logger")

    def test_name_is_set(self, default_transformer):
        assert default_transformer.name == "NoIdDropper"

    def test_str_representation_is_correct(self, default_transformer):
        assert unicode(default_transformer) == "Transformer Object of Class NoIdDropper"

class TestNoIdDropper(object):

    def test_records_are_dropped(transformed_df, input_df):
        """Transformed DataFrame has no records with missing first_name and last_name"""
        assert input_df.where("first_name is null or last_name is null").count() > 0
        assert transformed_df.where("first_name is null or last_name is null").count() == 0

    def test_schema_is_unchanged(transformed_df, input_df):
        """Converted DataFrame has the expected schema"""
        assert transformed_df.schema == input_df.schema

Documentation

You need to create a rst for your transformer which needs to contain at minimum the automodule or the autoclass directive.

docs/source/transformer/no_id_dropper.rst:
Record Dropper if Id is missing
===============================

Some text if you like...

.. automodule:: spooq.transformer.no_id_dropper

To automatically include your new transformer in the HTML / PDF documentation you need to add it to a toctree directive. Just refer to your newly created no_id_dropper.rst file within the transformer overview page.

docs/source/transformer/overview.rst:
--- original
+++ adapted
@@ -7,14 +7,15 @@
 .. toctree::

     exploder
     sieve
     mapper
     threshold_cleaner
     newest_by_group
+    no_id_dropper

 Class Diagram of Transformer Subpackage
 ------------------------------------------------
 .. uml:: ../diagrams/from_thesis/class_diagram/transformers.puml

That should be it!