Pipeline Factory

To decrease the complexity of building data pipelines for data engineers, an expert system or business rules engine can be used to automatically build and configure a data pipeline based on context variables, groomed metadata, and relevant rules.

class PipelineFactory(url='http://localhost:5000/pipeline/get')[source]

Provides an interface to automatically construct pipelines for Spooq.

Example

>>> pipeline_factory = PipelineFactory()
>>>
>>> #  Fetch user data set with applied mapping, filtering,
>>> #  and cleaning transformers
>>> df = pipeline_factory.execute({
>>>      "entity_type": "user",
>>>      "date": "2018-10-20",
>>>      "time_range": "last_day"})
>>>
>>> #  Load user data partition with applied mapping, filtering,
>>> #  and cleaning transformers to a hive database
>>> pipeline_factory.execute({
>>>      "entity_type": "user",
>>>      "date": "2018-10-20",
>>>      "batch_size": "daily"})
url

The end point of an expert system which will be called to infer names and parameters.

Type

str, (Defaults to “http://localhost:5000/pipeline/get”)

Note

PipelineFactory is only responsible for querying an expert system with provided parameters and constructing a Spooq pipeline out of the response. It does not have any reasoning capabilities itself! It requires therefore a HTTP service responding with a JSON object containing following structure:

{
    "extractor": {"name": "Type1Extractor", "params": {"key 1": "val 1", "key N": "val N"}},
    "transformers": [
        {"name": "Type1Transformer", "params": {"key 1": "val 1", "key N": "val N"}},
        {"name": "Type2Transformer", "params": {"key 1": "val 1", "key N": "val N"}},
        {"name": "Type3Transformer", "params": {"key 1": "val 1", "key N": "val N"}},
        {"name": "Type4Transformer", "params": {"key 1": "val 1", "key N": "val N"}},
        {"name": "Type5Transformer", "params": {"key 1": "val 1", "key N": "val N"}},
    ],
    "loader": {"name": "Type1Loader", "params": {"key 1": "val 1", "key N": "val N"}}
}

Hint

There is an experimental implementation of an expert system which complies with the requirements of PipelineFactory called spooq_rules. If you are interested, please ask the author of Spooq about it.

execute(context_variables)[source]

Fetches a ready-to-go pipeline instance via get_pipeline() and executes it.

Parameters

context_variables (dict) – These collection of parameters should describe the current context about the use case of the pipeline. Please see the examples of the PipelineFactory class’ documentation.

Returns

  • |SPARK_DATAFRAME| – If the loader component is by-passed (in the case of ad_hoc use cases).

  • None – If the loader component does not return a value (in the case of persisting data).

get_metadata(context_variables)[source]

Sends a POST request to the defined endpoint (url) containing the supplied context variables.

Parameters

context_variables (dict) – These collection of parameters should describe the current context about the use case of the pipeline. Please see the examples of the PipelineFactory class’ documentation.

Returns

Names and parameters of each ETL component to construct a Spooq pipeline

Return type

dict

get_pipeline(context_variables)[source]

Fetches the necessary metadata via get_metadata() and returns a ready-to-go pipeline instance.

Parameters

context_variables (dict) – These collection of parameters should describe the current context about the use case of the pipeline. Please see the examples of the PipelineFactory class’ documentation.

Returns

A Spooq pipeline instance which is fully configured and can still be adapted and consequently executed.

Return type

Pipeline