"""
This type of object glues the aforementioned processes together and extracts, transforms
(Transformer chain possible) and loads the data from start to end.
"""
import logging
[docs]class Pipeline(object):
"""
Represents a Pipeline of an Extractor, (multiple) Transformers and a Loader Object.
Attributes
----------
extractor : Subclass of :py:class:`spooq2.extractor.Extractor`
The entry point of the Pipeline. Extracts a DataFrame from a Source.
transformers : List of Subclasses of :py:class:`spooq2.transformer.Transformer` Objects
The Data Wrangling Part of the Pipeline. A chain of Transformers, a single Transformer
or a PassThrough Transformer can be set and used.
loader : Subclass of :py:class:`spooq2.loader.Loader`
The exit point of the Pipeline. Loads a DataFrame to a target Sink.
name : :any:`str`
Sets the `__name__` of the class' type as `name`, which is essentially the Class' Name.
logger : :any:`logging.Logger`
Shared, class level logger for all instances.
Example
-------
>>> from spooq2.pipeline import Pipeline
>>> import spooq2.extractor as E
>>> import spooq2.transformer as T
>>> import spooq2.loader as L
>>>
>>> # Definition how the output table should look like and where the attributes come from:
>>> users_mapping = [
>>> ("id", "id", "IntegerType"),
>>> ("guid", "guid", "StringType"),
>>> ("forename", "attributes.first_name", "StringType"),
>>> ("surename", "attributes.last_name", "StringType"),
>>> ("gender", "attributes.gender", "StringType"),
>>> ("has_email", "attributes.email", "StringBoolean"),
>>> ("has_university", "attributes.university", "StringBoolean"),
>>> ("created_at", "meta.created_at_ms", "timestamp_ms_to_s"),
>>> ]
>>>
>>> # The main object where all steps are defined:
>>> users_pipeline = Pipeline()
>>>
>>> # Defining the EXTRACTION:
>>> users_pipeline.set_extractor(E.JSONExtractor(
>>> input_path="tests/data/schema_v1/sequenceFiles"
>>> ))
>>>
>>> # Defining the TRANSFORMATION:
>>> users_pipeline.add_transformers([
>>> T.Mapper(mapping=users_mapping),
>>> T.ThresholdCleaner(thresholds={"created_at": {
>>> "min": 0,
>>> "max": 1580737513,
>>> "default": None}}),
>>> T.NewestByGroup(group_by="id", order_by="created_at")
>>> ])
>>>
>>> # Defining the LOAD:
>>> users_pipeline.set_loader(L.HiveLoader(
>>> db_name="users_and_friends",
>>> table_name="users",
>>> partition_definitions=[{
>>> "column_name": "dt",
>>> "column_type": "IntegerType",
>>> "default_value": 20200201}],
>>> repartition_size=10,
>>> ))
>>>
>>> # Executing the whole ETL pipeline
>>> users_pipeline.execute()
"""
def __init__(self, input_df=None, bypass_loader=False):
if input_df is not None:
self.bypass_extractor = True
self.input_df = input_df
else:
self.bypass_extractor = False
self.bypass_loader = bypass_loader
self.extractor = None
self.transformers = []
self.loader = None
self.name = type(self).__name__
self.logger = logging.getLogger("spooq2")
self.logger.info(
"New {cls_name} Instance created\n".format(cls_name=unicode(self.name))
+ unicode(self)
)
[docs] def execute(self):
"""
Executes the whole Pipeline at once.
Extracts from the Source, transformes the DataFrame and loads it into a target Sink.
Returns
-------
input_df : :any:`pyspark.sql.DataFrame`
**If** the ``bypass_loader`` attribute was set to True in the Pipeline class,
the output DataFrame from the Transformer(s) will be directly returned.
Note
----
This method does not take ANY input parameters. All needed parameters are defined
at the initialization phase.
"""
extracted_df = self.extract()
transformed_df = self.transform(extracted_df)
return self.load(transformed_df)
[docs] def load(self, input_df):
"""
Calls the load Method on the Loader Object.
Parameters
----------
input_df : :any:`pyspark.sql.DataFrame`
The output DataFrame from the Transformer(s).
Returns
-------
input_df : :any:`pyspark.sql.DataFrame`
**If** the ``bypass_loader`` attribute was set to True in the Pipeline class,
the output DataFrame from the Transformer(s) will be directly returned.
"""
if self.bypass_loader:
return input_df
else:
return self.loader.load(input_df)
[docs] def set_loader(self, loader):
"""
Sets an Loader Object to be used within the Pipeline.
Parameters
----------
loader : Subclass of :py:class:`spooq2.loader.Loader`
An already initialized Object of any Subclass of spooq2.loader.Loader.
Raises
------
:any:`exceptions.AssertionError`:
You can not set a loader if the `bypass_loader` parameter is set.
"""
assert not self.bypass_loader, "You can not set a loader if the `bypass_loader` parameter is set."
self.loader = loader
def __str__(self):
return """spooq2.pipeline Object
Used Extractor: {extr}
Used Transformers: {trans}
Used Loader: {ldr}
""".format(
extr=unicode(self.extractor),
trans=unicode(["{e}\n".format(e=extr) for extr in self.transformers]),
ldr=unicode(self.loader),
)