Threshold-based Cleaner
- class ThresholdCleaner(thresholds={}, column_to_log_cleansed_values=None, store_as_map=False)[source]
Cleanes values based on defined boundaries and optionally logs the original values. The valid value ranges are provided via dictionary for each column to be cleaned. Boundaries can either be of static or dynamic nature that resolve to numbers, timetamps or dates.
Examples
>>> from pyspark.sql import functions as F >>> from spooq.transformer import ThresholdCleaner >>> transformer = ThresholdCleaner( >>> thresholds={ >>> "created_at": { >>> "min": F.date_sub(F.current_date(), 365 * 10), # Ten years ago >>> "max": F.current_date() >>> }, >>> "size_cm": { >>> "min": 70, >>> "max": 250, >>> "default": None >>> }, >>> } >>> )
>>> from spooq.transformer import ThresholdCleaner >>> from pyspark.sql import Row >>> >>> input_df = spark.createDataFrame([ >>> Row(id=0, integers=-5, doubles=-0.75), >>> Row(id=1, integers= 5, doubles= 1.25), >>> Row(id=2, integers=15, doubles= 0.67), >>> ]) >>> transformer = ThresholdCleaner( >>> thresholds={ >>> "integers": {"min": 0, "max": 10}, >>> "doubles": {"min": -1, "max": 1} >>> }, >>> column_to_log_cleansed_values="cleansed_values_threshold", >>> store_as_map=True, >>> ) >>> output_df = transformer.transform(input_df) >>> output_df.show(truncate=False) +---+--------+-----------+---------------------+ |id |integers|doubles|cleansed_values_threshold| +---+--------+-------+-------------------------+ |0 |null |-0.75 |{integers -> -5} | |1 |5 |null |{doubles -> 1.25} | |2 |null |0.67 |{integers -> 15} | +---+--------+-------+-------------------------+ >>> output_df.printSchema() |-- id: long (nullable = true) |-- integers: long (nullable = true) |-- doubles: double (nullable = true) |-- cleansed_values_threshold: map (nullable = false) | |-- key: string | |-- value: string (valueContainsNull = true)
- Parameters
thresholds (
dict
) – Dictionary containing column names and respective valid rangescolumn_to_log_cleansed_values (
str
, Defaults to None) – Defines a column in which the original (uncleansed) value will be stored in case of cleansing. If no column name is given, nothing will be logged.store_as_map (
bool
, Defaults to False) – Specifies if the logged cleansed values should be stored in a column asMapType
with stringified values or asStructType
with the original respective data types.
Note
Following cleansing rule attributes per column are supported:
- min, mandatory:
str
,int
,Column
,pyspark.sql.functions
A number or timestamp/date which serves as the lower limit for allowed values. Values below this threshold will be cleansed. Supports literals, Spark functions and Columns.
- min, mandatory:
- max, mandatory:
str
,int
,Column
,pyspark.sql.functions
A number or timestamp/date which serves as the upper limit for allowed values. Values above this threshold will be cleansed. Supports literals, Spark functions and Columns.
- max, mandatory:
- default, defaults to None:
str
,int
,Column
,pyspark.sql.functions
If a value gets cleansed it gets replaced with the provided default value. Supports literals, Spark functions and Columns.
- default, defaults to None:
The
between()
method is used internally.- Returns
The transformed DataFrame
- Return type
- Raises
exceptions.ValueError – Threshold-based cleaning only supports Numeric, Date and Timestamp Types! Column with name: {col_name} and type of: {col_type} was provided
Warning
Only Numeric, TimestampType, and DateType data types are supported!
- transform(input_df)[source]
Performs a transformation on a DataFrame.
- Parameters
input_df (
DataFrame
) – Input DataFrame- Returns
Transformed DataFrame.
- Return type
Note
This method does only take the Input DataFrame as a parameters. Any other needed parameters are defined in the initialization of the Transformator Object.
Base Class
This abstract class provides the functionality to log any cleansed values into a separate column that contains a struct with a sub column per cleansed column (according to the cleaning_definition). If a value was cleansed, the original value will be stored in its respective sub column. If a value was not cleansed, the sub column will be empty (None).
- class BaseCleaner(cleaning_definitions, column_to_log_cleansed_values, store_as_map=False, temporary_columns_prefix='1b75cdd2e2356a35486230c69cfac5493488a919')[source]