Threshold-based Cleaner
- class ThresholdCleaner(thresholds={}, column_to_log_cleansed_values=None, store_as_map=False)[source]
Cleanes values based on defined boundaries and optionally logs the original values. The valid value ranges are provided via dictionary for each column to be cleaned. Boundaries can either be of static or dynamic nature that resolve to numbers, timetamps or dates.
Examples
>>> from pyspark.sql import functions as F >>> from spooq.transformer import ThresholdCleaner >>> transformer = ThresholdCleaner( >>> thresholds={ >>> "created_at": { >>> "min": F.date_sub(F.current_date(), 365 * 10), # Ten years ago >>> "max": F.current_date() >>> }, >>> "size_cm": { >>> "min": 70, >>> "max": 250, >>> "default": None >>> }, >>> } >>> )
>>> from spooq.transformer import ThresholdCleaner >>> from pyspark.sql import Row >>> >>> input_df = spark.createDataFrame([ >>> Row(id=0, integers=-5, doubles=-0.75), >>> Row(id=1, integers= 5, doubles= 1.25), >>> Row(id=2, integers=15, doubles= 0.67), >>> ]) >>> transformer = ThresholdCleaner( >>> thresholds={ >>> "integers": {"min": 0, "max": 10}, >>> "doubles": {"min": -1, "max": 1} >>> }, >>> column_to_log_cleansed_values="cleansed_values_threshold", >>> store_as_map=True, >>> ) >>> output_df = transformer.transform(input_df) >>> output_df.show(truncate=False) +---+--------+-----------+---------------------+ |id |integers|doubles|cleansed_values_threshold| +---+--------+-------+-------------------------+ |0 |null |-0.75 |{integers -> -5} | |1 |5 |null |{doubles -> 1.25} | |2 |null |0.67 |{integers -> 15} | +---+--------+-------+-------------------------+ >>> output_df.printSchema() |-- id: long (nullable = true) |-- integers: long (nullable = true) |-- doubles: double (nullable = true) |-- cleansed_values_threshold: map (nullable = false) | |-- key: string | |-- value: string (valueContainsNull = true)
- Parameters
thresholds (
dict) – Dictionary containing column names and respective valid rangescolumn_to_log_cleansed_values (
str, Defaults to None) – Defines a column in which the original (uncleansed) value will be stored in case of cleansing. If no column name is given, nothing will be logged.store_as_map (
bool, Defaults to False) – Specifies if the logged cleansed values should be stored in a column asMapTypewith stringified values or asStructTypewith the original respective data types.
Note
Following cleansing rule attributes per column are supported:
The
between()method is used internally.- Returns
The transformed DataFrame
- Return type
DataFrame- Raises
exceptions.ValueError – Threshold-based cleaning only supports Numeric, Date and Timestamp Types! Column with name: {col_name} and type of: {col_type} was provided
Warning
Only Numeric, TimestampType, and DateType data types are supported!
- transform(input_df)[source]
Performs a transformation on a DataFrame.
- Parameters
input_df (
DataFrame) – Input DataFrame- Returns
Transformed DataFrame.
- Return type
DataFrame
Note
This method does only take the Input DataFrame as a parameters. Any other needed parameters are defined in the initialization of the Transformator Object.
Base Class
This abstract class provides the functionality to log any cleansed values into a separate column that contains a struct with a sub column per cleansed column (according to the cleaning_definition). If a value was cleansed, the original value will be stored in its respective sub column. If a value was not cleansed, the sub column will be empty (None).
- class BaseCleaner(cleaning_definitions, column_to_log_cleansed_values, store_as_map=False, temporary_columns_prefix='1b75cdd2e2356a35486230c69cfac5493488a919')[source]