Pipeline Factory

To decrease the complexity of building data pipelines for data engineers, an expert system or business rules engine can be used to automatically build and configure a data pipeline based on context variables, groomed metadata, and relevant rules.

class PipelineFactory(url='http://localhost:5000/pipeline/get')[source]

Bases: object

Provides an interface to automatically construct pipelines for Spooq.

Example

>>> pipeline_factory = PipelineFactory()
>>>
>>> #  Fetch user data set with applied mapping, filtering,
>>> #  and cleaning transformers
>>> df = pipeline_factory.execute({
>>>      "entity_type": "user",
>>>      "date": "2018-10-20",
>>>      "time_range": "last_day"})
>>>
>>> #  Load user data partition with applied mapping, filtering,
>>> #  and cleaning transformers to a hive database
>>> pipeline_factory.execute({
>>>      "entity_type": "user",
>>>      "date": "2018-10-20",
>>>      "batch_size": "daily"})
url

The end point of an expert system which will be called to infer names and parameters.

Type:str, (Defaults to “http://localhost:5000/pipeline/get”)

Note

PipelineFactory is only responsible for querying an expert system with provided parameters and constructing a Spooq pipeline out of the response. It does not have any reasoning capabilities itself! It requires therefore a HTTP service responding with a JSON object containing following structure:

{
    "extractor": {"name": "Type1Extractor", "params": {"key 1": "val 1", "key N": "val N"}},
    "transformers": [
        {"name": "Type1Transformer", "params": {"key 1": "val 1", "key N": "val N"}},
        {"name": "Type2Transformer", "params": {"key 1": "val 1", "key N": "val N"}},
        {"name": "Type3Transformer", "params": {"key 1": "val 1", "key N": "val N"}},
        {"name": "Type4Transformer", "params": {"key 1": "val 1", "key N": "val N"}},
        {"name": "Type5Transformer", "params": {"key 1": "val 1", "key N": "val N"}},
    ],
    "loader": {"name": "Type1Loader", "params": {"key 1": "val 1", "key N": "val N"}}
}

Hint

There is an experimental implementation of an expert system which complies with the requirements of PipelineFactory called spooq_rules. If you are interested, please ask the author of Spooq about it.

execute(context_variables)[source]

Fetches a ready-to-go pipeline instance via get_pipeline() and executes it.

Parameters:context_variables (dict) – These collection of parameters should describe the current context about the use case of the pipeline. Please see the examples of the PipelineFactory class’ documentation.
Returns:
  • pyspark.sql.DataFrame – If the loader component is by-passed (in the case of ad_hoc use cases).
  • None – If the loader component does not return a value (in the case of persisting data).
get_metadata(context_variables)[source]

Sends a POST request to the defined endpoint (url) containing the supplied context variables.

Parameters:context_variables (dict) – These collection of parameters should describe the current context about the use case of the pipeline. Please see the examples of the PipelineFactory class’ documentation.
Returns:Names and parameters of each ETL component to construct a Spooq pipeline
Return type:dict
get_pipeline(context_variables)[source]

Fetches the necessary metadata via get_metadata() and returns a ready-to-go pipeline instance.

Parameters:context_variables (dict) – These collection of parameters should describe the current context about the use case of the pipeline. Please see the examples of the PipelineFactory class’ documentation.
Returns:A Spooq pipeline instance which is fully configured and can still be adapted and consequently executed.
Return type:Pipeline