Loader Base Class
Loaders take a DataFrame
as an input and save it to a sink.
Each Loader class has to have a load method which takes a DataFrame as single paremter.
Possible Loader sinks can be Hive Tables, Kudu Tables, HBase Tables, JDBC Sinks or ParquetFiles.
Create your own Loader
Let your loader class inherit from the loader base class. This includes the name, string representation and logger attributes from the superclass.
All configuration and parameterization should be done while initializing the class instance.
Here would be a simple example for a loader which save a DataFrame to parquet files:
Exemplary Sample Code
from pyspark.sql import functions as F
from loader import Loader
class ParquetLoader(loader):
"""
This is a simplified example on how to implement a new loader class.
Please take your time to write proper docstrings as they are automatically
parsed via Sphinx to build the HTML and PDF documentation.
Docstrings use the style of Numpy (via the napoleon plug-in).
This class uses the :meth:`pyspark.sql.DataFrameWriter.parquet` method internally.
Examples
--------
input_df = some_extractor_instance.extract()
output_df = some_transformer_instance.transform(input_df)
ParquetLoader(
path="data/parquet_files",
partition_by="dt",
explicit_partition_values=20200201,
compression=""gzip""
).load(output_df)
Parameters
----------
path: :any:`str`
The path to where the loader persists the output parquet files.
If partitioning is set, this will be the base path where the partitions
are stored.
partition_by: :any:`str` or :any:`list` of (:any:`str`)
The column name or names by which the output should be partitioned.
If the partition_by parameter is set to None, no partitioning will be
performed.
Defaults to "dt"
explicit_partition_values: :any:`str` or :any:`int`
or :any:`list` of (:any:`str` and :any:`int`)
Only allowed if partition_by is not None.
If explicit_partition_values is not None, the dataframe will
* overwrite the partition_by columns values if it already exists or
* create and fill the partition_by columns if they do not yet exist
Defaults to None
compression: :any:`str`
The compression codec used for the parquet output files.
Defaults to "snappy"
Raises
------
:any:`exceptions.AssertionError`:
explicit_partition_values can only be used when partition_by is not None
:any:`exceptions.AssertionError`:
explicit_partition_values and partition_by must have the same length
"""
def __init__(self, path, partition_by="dt", explicit_partition_values=None, compression_codec="snappy"):
super(ParquetLoader, self).__init__()
self.path = path
self.partition_by = partition_by
self.explicit_partition_values = explicit_partition_values
self.compression_codec = compression_codec
if explicit_partition_values is not None:
assert (partition_by is not None,
"explicit_partition_values can only be used when partition_by is not None")
assert (len(partition_by) == len(explicit_partition_values),
"explicit_partition_values and partition_by must have the same length")
def load(self, input_df):
self.logger.info("Persisting DataFrame as Parquet Files to " + self.path)
if isinstance(self.explicit_partition_values, list):
for (k, v) in zip(self.partition_by, self.explicit_partition_values):
input_df = input_df.withColumn(k, F.lit(v))
elif isinstance(self.explicit_partition_values, basestring):
input_df = input_df.withColumn(self.partition_by, F.lit(self.explicit_partition_values))
input_df.write.parquet(
path=self.path,
partitionBy=self.partition_by,
compression=self.compression_codec
)
References to include
This makes it possible to import the new loader class directly from spooq.loader instead of spooq.loader.parquet. It will also be imported if you use from spooq.loader import *.
--- original
+++ adapted
@@ -1,7 +1,9 @@
from loader import Loader
from hive_loader import HiveLoader
+from parquet import ParquetLoader
__all__ = [
"Loader",
"HiveLoader",
+ "ParquetLoader",
]
Tests
One of Spooq’s features is to provide tested code for multiple data pipelines. Please take your time to write sufficient unit tests! You can reuse test data from tests/data or create a new schema / data set if needed. A SparkSession is provided as a global fixture called spark_session.
import pytest
from pyspark.sql.dataframe import DataFrame
from spooq.loader import ParquetLoader
@pytest.fixture(scope="module")
def output_path(tmpdir_factory):
return str(tmpdir_factory.mktemp("parquet_output"))
@pytest.fixture(scope="module")
def default_loader(output_path):
return ParquetLoader(
path=output_path,
partition_by="attributes.gender",
explicit_partition_values=None,
compression_codec=None
)
@pytest.fixture(scope="module")
def input_df(spark_session):
return spark_session.read.parquet("../data/schema_v1/parquetFiles")
@pytest.fixture(scope="module")
def loaded_df(default_loader, input_df, spark_session, output_path):
default_loader.load(input_df)
return spark_session.read.parquet(output_path)
class TestBasicAttributes(object):
def test_logger_should_be_accessible(self, default_loader):
assert hasattr(default_loader, "logger")
def test_name_is_set(self, default_loader):
assert default_loader.name == "ParquetLoader"
def test_str_representation_is_correct(self, default_loader):
assert unicode(default_loader) == "loader Object of Class ParquetLoader"
class TestParquetLoader(object):
def test_count_did_not_change(loaded_df, input_df):
"""Persisted DataFrame has the same number of records than the input DataFrame"""
assert input_df.count() == output_df.count() and input_df.count() > 0
def test_schema_is_unchanged(loaded_df, input_df):
"""Loaded DataFrame has the same schema as the input DataFrame"""
assert loaded.schema == input_df.schema
Documentation
You need to create a rst for your loader which needs to contain at minimum the automodule or the autoclass directive.
Parquet Loader
===============================
Some text if you like...
.. automodule:: spooq.loader.parquet
To automatically include your new loader in the HTML / PDF documentation you need to add it to a toctree directive. Just refer to your newly created parquet.rst file within the loader overview page.
--- original
+++ adapted
@@ -7,4 +7,5 @@
.. toctree::
hive_loader
+ parquet
Class Diagram of Loader Subpackage
That should be it!