Threshold-based Cleaner¶
-
class
ThresholdCleaner
(thresholds={}, column_to_log_cleansed_values=None, store_as_map=False)[source]¶ Bases:
spooq.transformer.base_cleaner.BaseCleaner
Cleanes values based on defined boundaries and optionally logs the original values. The valid value ranges are provided via dictionary for each column to be cleaned. Boundaries can either be of static or dynamic nature that resolve to numbers, timetamps or dates.
Examples
>>> from pyspark.sql import functions as F >>> from spooq.transformer import ThresholdCleaner >>> transformer = ThresholdCleaner( >>> thresholds={ >>> "created_at": { >>> "min": F.date_sub(F.current_date(), 365 * 10), # Ten years ago >>> "max": F.current_date() >>> }, >>> "size_cm": { >>> "min": 70, >>> "max": 250, >>> "default": None >>> }, >>> } >>> )
>>> from spooq.transformer import ThresholdCleaner >>> from pyspark.sql import Row >>> >>> input_df = spark.createDataFrame([ >>> Row(id=0, integers=-5, doubles=-0.75), >>> Row(id=1, integers= 5, doubles= 1.25), >>> Row(id=2, integers=15, doubles= 0.67), >>> ]) >>> transformer = ThresholdCleaner( >>> thresholds={ >>> "integers": {"min": 0, "max": 10}, >>> "doubles": {"min": -1, "max": 1} >>> }, >>> column_to_log_cleansed_values="cleansed_values_threshold", >>> store_as_map=True, >>> ) >>> output_df = transformer.transform(input_df) >>> output_df.show(truncate=False) +---+--------+-----------+---------------------+ |id |integers|doubles|cleansed_values_threshold| +---+--------+-------+-------------------------+ |0 |null |-0.75 |{integers -> -5} | |1 |5 |null |{doubles -> 1.25} | |2 |null |0.67 |{integers -> 15} | +---+--------+-------+-------------------------+ >>> output_df.printSchema() |-- id: long (nullable = true) |-- integers: long (nullable = true) |-- doubles: double (nullable = true) |-- cleansed_values_threshold: map (nullable = false) | |-- key: string | |-- value: string (valueContainsNull = true)
Parameters: - thresholds (
dict
) – Dictionary containing column names and respective valid ranges - column_to_log_cleansed_values (
str
, Defaults to None) – Defines a column in which the original (uncleansed) value will be stored in case of cleansing. If no column name is given, nothing will be logged. - store_as_map (
bool
, Defaults to False) – Specifies if the logged cleansed values should be stored in a column asMapType
with stringified values or asStructType
with the original respective data types.
Note
Following cleansing rule attributes per column are supported:
- min, mandatory:
str
,int
,Column
,pyspark.sql.functions
- A number or timestamp/date which serves as the lower limit for allowed values. Values below this threshold will be cleansed. Supports literals, Spark functions and Columns.
- min, mandatory:
- max, mandatory:
str
,int
,Column
,pyspark.sql.functions
- A number or timestamp/date which serves as the upper limit for allowed values. Values above this threshold will be cleansed. Supports literals, Spark functions and Columns.
- max, mandatory:
- default, defaults to None:
str
,int
,Column
,pyspark.sql.functions
- If a value gets cleansed it gets replaced with the provided default value. Supports literals, Spark functions and Columns.
- default, defaults to None:
The
between()
method is used internally.Returns: The transformed DataFrame Return type: DataFrame
Raises: exceptions.ValueError
– Threshold-based cleaning only supports Numeric, Date and Timestamp Types! Column with name: {col_name} and type of: {col_type} was providedWarning
Only Numeric, TimestampType, and DateType data types are supported!
-
transform
(input_df)[source]¶ Performs a transformation on a DataFrame.
Parameters: input_df ( DataFrame
) – Input DataFrameReturns: Transformed DataFrame. Return type: DataFrame
Note
This method does only take the Input DataFrame as a parameters. Any other needed parameters are defined in the initialization of the Transformator Object.
- thresholds (
Base Class¶
This abstract class provides the functionality to log any cleansed values into a separate column that contains a struct with a sub column per cleansed column (according to the cleaning_definition). If a value was cleansed, the original value will be stored in its respective sub column. If a value was not cleansed, the sub column will be empty (None).
-
class
BaseCleaner
(cleaning_definitions, column_to_log_cleansed_values, store_as_map=False, temporary_columns_prefix='1b75cdd2e2356a35486230c69cfac5493488a919')[source] Bases:
spooq.transformer.transformer.Transformer