Threshold-based Cleaner

class ThresholdCleaner(thresholds={}, column_to_log_cleansed_values=None, store_as_map=False)[source]

Cleanes values based on defined boundaries and optionally logs the original values. The valid value ranges are provided via dictionary for each column to be cleaned. Boundaries can either be of static or dynamic nature that resolve to numbers, timetamps or dates.

Examples

>>> from pyspark.sql import functions as F
>>> from spooq.transformer import ThresholdCleaner
>>> transformer = ThresholdCleaner(
>>>     thresholds={
>>>         "created_at": {
>>>             "min": F.date_sub(F.current_date(), 365 * 10),  # Ten years ago
>>>             "max": F.current_date()
>>>         },
>>>         "size_cm": {
>>>             "min": 70,
>>>             "max": 250,
>>>             "default": None
>>>         },
>>>     }
>>> )
>>> from spooq.transformer import ThresholdCleaner
>>> from pyspark.sql import Row
>>>
>>> input_df = spark.createDataFrame([
>>>     Row(id=0, integers=-5, doubles=-0.75),
>>>     Row(id=1, integers= 5, doubles= 1.25),
>>>     Row(id=2, integers=15, doubles= 0.67),
>>> ])
>>> transformer = ThresholdCleaner(
>>>     thresholds={
>>>         "integers": {"min":  0, "max": 10},
>>>         "doubles":  {"min": -1, "max":  1}
>>>     },
>>>     column_to_log_cleansed_values="cleansed_values_threshold",
>>>     store_as_map=True,
>>> )
>>> output_df = transformer.transform(input_df)
>>> output_df.show(truncate=False)
+---+--------+-----------+---------------------+
|id |integers|doubles|cleansed_values_threshold|
+---+--------+-------+-------------------------+
|0  |null    |-0.75  |{integers -> -5}         |
|1  |5       |null   |{doubles -> 1.25}        |
|2  |null    |0.67   |{integers -> 15}         |
+---+--------+-------+-------------------------+
>>> output_df.printSchema()
 |-- id: long (nullable = true)
 |-- integers: long (nullable = true)
 |-- doubles: double (nullable = true)
 |-- cleansed_values_threshold: map (nullable = false)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
Parameters
  • thresholds (dict) – Dictionary containing column names and respective valid ranges

  • column_to_log_cleansed_values (str, Defaults to None) – Defines a column in which the original (uncleansed) value will be stored in case of cleansing. If no column name is given, nothing will be logged.

  • store_as_map (bool, Defaults to False) – Specifies if the logged cleansed values should be stored in a column as MapType with stringified values or as StructType with the original respective data types.

Note

Following cleansing rule attributes per column are supported:

  • min, mandatory: str, int, Column, pyspark.sql.functions

    A number or timestamp/date which serves as the lower limit for allowed values. Values below this threshold will be cleansed. Supports literals, Spark functions and Columns.

  • max, mandatory: str, int, Column, pyspark.sql.functions

    A number or timestamp/date which serves as the upper limit for allowed values. Values above this threshold will be cleansed. Supports literals, Spark functions and Columns.

  • default, defaults to None: str, int, Column, pyspark.sql.functions

    If a value gets cleansed it gets replaced with the provided default value. Supports literals, Spark functions and Columns.

The between() method is used internally.

Returns

The transformed DataFrame

Return type

DataFrame

Raises

exceptions.ValueError – Threshold-based cleaning only supports Numeric, Date and Timestamp Types! Column with name: {col_name} and type of: {col_type} was provided

Warning

Only Numeric, TimestampType, and DateType data types are supported!

transform(input_df)[source]

Performs a transformation on a DataFrame.

Parameters

input_df (DataFrame) – Input DataFrame

Returns

Transformed DataFrame.

Return type

DataFrame

Note

This method does only take the Input DataFrame as a parameters. Any other needed parameters are defined in the initialization of the Transformator Object.

Base Class

This abstract class provides the functionality to log any cleansed values into a separate column that contains a struct with a sub column per cleansed column (according to the cleaning_definition). If a value was cleansed, the original value will be stored in its respective sub column. If a value was not cleansed, the sub column will be empty (None).

class BaseCleaner(cleaning_definitions, column_to_log_cleansed_values, store_as_map=False, temporary_columns_prefix='1b75cdd2e2356a35486230c69cfac5493488a919')[source]