Pipeline¶
This type of object glues the aforementioned processes together and extracts, transforms (Transformer chain possible) and loads the data from start to end.
-
class
Pipeline
(input_df=None, bypass_loader=False)[source]¶ Bases:
object
Represents a Pipeline of an Extractor, (multiple) Transformers and a Loader Object.
-
extractor
¶ The entry point of the Pipeline. Extracts a DataFrame from a Source.
Type: Subclass of spooq2.extractor.Extractor
-
transformers
¶ The Data Wrangling Part of the Pipeline. A chain of Transformers, a single Transformer or a PassThrough Transformer can be set and used.
Type: List of Subclasses of spooq2.transformer.Transformer
Objects
-
loader
¶ The exit point of the Pipeline. Loads a DataFrame to a target Sink.
Type: Subclass of spooq2.loader.Loader
-
logger
¶ Shared, class level logger for all instances.
Type: logging.Logger
Example
>>> from spooq2.pipeline import Pipeline >>> import spooq2.extractor as E >>> import spooq2.transformer as T >>> import spooq2.loader as L >>> >>> # Definition how the output table should look like and where the attributes come from: >>> users_mapping = [ >>> ("id", "id", "IntegerType"), >>> ("guid", "guid", "StringType"), >>> ("forename", "attributes.first_name", "StringType"), >>> ("surename", "attributes.last_name", "StringType"), >>> ("gender", "attributes.gender", "StringType"), >>> ("has_email", "attributes.email", "StringBoolean"), >>> ("has_university", "attributes.university", "StringBoolean"), >>> ("created_at", "meta.created_at_ms", "timestamp_ms_to_s"), >>> ] >>> >>> # The main object where all steps are defined: >>> users_pipeline = Pipeline() >>> >>> # Defining the EXTRACTION: >>> users_pipeline.set_extractor(E.JSONExtractor( >>> input_path="tests/data/schema_v1/sequenceFiles" >>> )) >>> >>> # Defining the TRANSFORMATION: >>> users_pipeline.add_transformers([ >>> T.Mapper(mapping=users_mapping), >>> T.ThresholdCleaner(thresholds={"created_at": { >>> "min": 0, >>> "max": 1580737513, >>> "default": None}}), >>> T.NewestByGroup(group_by="id", order_by="created_at") >>> ]) >>> >>> # Defining the LOAD: >>> users_pipeline.set_loader(L.HiveLoader( >>> db_name="users_and_friends", >>> table_name="users", >>> partition_definitions=[{ >>> "column_name": "dt", >>> "column_type": "IntegerType", >>> "default_value": 20200201}], >>> repartition_size=10, >>> )) >>> >>> # Executing the whole ETL pipeline >>> users_pipeline.execute()
-
execute
()[source]¶ Executes the whole Pipeline at once.
Extracts from the Source, transformes the DataFrame and loads it into a target Sink.
Returns: input_df – If the bypass_loader
attribute was set to True in the Pipeline class, the output DataFrame from the Transformer(s) will be directly returned.Return type: pyspark.sql.DataFrame
Note
This method does not take ANY input parameters. All needed parameters are defined at the initialization phase.
-
extract
()[source]¶ Calls the extract Method on the Extractor Object.
Returns: The output_df from the Extractor used as the input for the Transformer (chain). Return type: pyspark.sql.DataFrame
-
transform
(input_df)[source]¶ Calls the transform Method on the Transformer Object(s) in the order of importing the Objects while passing the DataFrame.
Parameters: input_df ( pyspark.sql.DataFrame
) – The output DataFrame of the Extractor Object.Returns: The input DataFrame for the Loader. Return type: pyspark.sql.DataFrame
-
load
(input_df)[source]¶ Calls the load Method on the Loader Object.
Parameters: input_df ( pyspark.sql.DataFrame
) – The output DataFrame from the Transformer(s).Returns: input_df – If the bypass_loader
attribute was set to True in the Pipeline class, the output DataFrame from the Transformer(s) will be directly returned.Return type: pyspark.sql.DataFrame
-
set_extractor
(extractor)[source]¶ Sets an Extractor Object to be used within the Pipeline.
Parameters: extractor (Subclass of spooq2.extractor.Extractor
) – An already initialized Object of any Subclass of spooq2.extractor.Extractor.Raises: exceptions.AssertionError
: – An input_df was already provided which bypasses the extraction action
-
add_transformers
(transformers)[source]¶ Adds a list of Transformer Objects to be used within the Pipeline.
Parameters: transformer ( list
of Subclass ofspooq2.transformer.Transformer
) – Already initialized Object of any Subclass of spooq2.transformer.Transformer.
-
set_loader
(loader)[source]¶ Sets an Loader Object to be used within the Pipeline.
Parameters: loader (Subclass of spooq2.loader.Loader
) – An already initialized Object of any Subclass of spooq2.loader.Loader.Raises: exceptions.AssertionError
: – You can not set a loader if the bypass_loader parameter is set.
-