Pipeline

This type of object glues the aforementioned processes together and extracts, transforms (Transformer chain possible) and loads the data from start to end.

class Pipeline(input_df=None, bypass_loader=False)[source]

Bases: object

Represents a Pipeline of an Extractor, (multiple) Transformers and a Loader Object.

extractor

The entry point of the Pipeline. Extracts a DataFrame from a Source.

Type:Subclass of spooq2.extractor.Extractor
transformers

The Data Wrangling Part of the Pipeline. A chain of Transformers, a single Transformer or a PassThrough Transformer can be set and used.

Type:List of Subclasses of spooq2.transformer.Transformer Objects
loader

The exit point of the Pipeline. Loads a DataFrame to a target Sink.

Type:Subclass of spooq2.loader.Loader
name

Sets the __name__ of the class’ type as name, which is essentially the Class’ Name.

Type:str
logger

Shared, class level logger for all instances.

Type:logging.Logger

Example

>>> from spooq2.pipeline import Pipeline
>>> import spooq2.extractor as   E
>>> import spooq2.transformer as T
>>> import spooq2.loader as      L
>>>
>>> #  Definition how the output table should look like and where the attributes come from:
>>> users_mapping = [
>>>     ("id",              "id",                     "IntegerType"),
>>>     ("guid",            "guid",                   "StringType"),
>>>     ("forename",        "attributes.first_name",  "StringType"),
>>>     ("surename",        "attributes.last_name",   "StringType"),
>>>     ("gender",          "attributes.gender",      "StringType"),
>>>     ("has_email",       "attributes.email",       "StringBoolean"),
>>>     ("has_university",  "attributes.university",  "StringBoolean"),
>>>     ("created_at",      "meta.created_at_ms",     "timestamp_ms_to_s"),
>>> ]
>>>
>>> #  The main object where all steps are defined:
>>> users_pipeline = Pipeline()
>>>
>>> #  Defining the EXTRACTION:
>>> users_pipeline.set_extractor(E.JSONExtractor(
>>>     input_path="tests/data/schema_v1/sequenceFiles"
>>> ))
>>>
>>> #  Defining the TRANSFORMATION:
>>> users_pipeline.add_transformers([
>>>     T.Mapper(mapping=users_mapping),
>>>     T.ThresholdCleaner(thresholds={"created_at": {
>>>                                                 "min": 0,
>>>                                                 "max": 1580737513,
>>>                                                 "default": None}}),
>>>     T.NewestByGroup(group_by="id", order_by="created_at")
>>> ])
>>>
>>> #  Defining the LOAD:
>>> users_pipeline.set_loader(L.HiveLoader(
>>>     db_name="users_and_friends",
>>>     table_name="users",
>>>     partition_definitions=[{
>>>         "column_name": "dt",
>>>         "column_type": "IntegerType",
>>>         "default_value": 20200201}],
>>>     repartition_size=10,
>>> ))
>>>
>>> #  Executing the whole ETL pipeline
>>> users_pipeline.execute()
execute()[source]

Executes the whole Pipeline at once.

Extracts from the Source, transformes the DataFrame and loads it into a target Sink.

Returns:input_dfIf the bypass_loader attribute was set to True in the Pipeline class, the output DataFrame from the Transformer(s) will be directly returned.
Return type:pyspark.sql.DataFrame

Note

This method does not take ANY input parameters. All needed parameters are defined at the initialization phase.

extract()[source]

Calls the extract Method on the Extractor Object.

Returns:The output_df from the Extractor used as the input for the Transformer (chain).
Return type:pyspark.sql.DataFrame
transform(input_df)[source]

Calls the transform Method on the Transformer Object(s) in the order of importing the Objects while passing the DataFrame.

Parameters:input_df (pyspark.sql.DataFrame) – The output DataFrame of the Extractor Object.
Returns:The input DataFrame for the Loader.
Return type:pyspark.sql.DataFrame
load(input_df)[source]

Calls the load Method on the Loader Object.

Parameters:input_df (pyspark.sql.DataFrame) – The output DataFrame from the Transformer(s).
Returns:input_dfIf the bypass_loader attribute was set to True in the Pipeline class, the output DataFrame from the Transformer(s) will be directly returned.
Return type:pyspark.sql.DataFrame
set_extractor(extractor)[source]

Sets an Extractor Object to be used within the Pipeline.

Parameters:extractor (Subclass of spooq2.extractor.Extractor) – An already initialized Object of any Subclass of spooq2.extractor.Extractor.
Raises:exceptions.AssertionError: – An input_df was already provided which bypasses the extraction action
add_transformers(transformers)[source]

Adds a list of Transformer Objects to be used within the Pipeline.

Parameters:transformer (list of Subclass of spooq2.transformer.Transformer) – Already initialized Object of any Subclass of spooq2.transformer.Transformer.
clear_transformers()[source]

Clears the list of already added Transformers.

set_loader(loader)[source]

Sets an Loader Object to be used within the Pipeline.

Parameters:loader (Subclass of spooq2.loader.Loader) – An already initialized Object of any Subclass of spooq2.loader.Loader.
Raises:exceptions.AssertionError: – You can not set a loader if the bypass_loader parameter is set.