from __future__ import absolute_import
import sys
if sys.version_info.major > 2:
# This is needed for python 2 as otherwise pyspark raises an exception for following command:
# data_type = input_df.schema[str(column_name)].dataType
# Pyspark checks if the input is a string, which does not work
# with the new strings from builtins
from builtins import str
import pyspark.sql.functions as F
import pyspark.sql.types as sql_types
from pyspark.sql.column import Column
from .base_cleaner import BaseCleaner
[docs]class ThresholdCleaner(BaseCleaner):
"""
Cleanes values based on defined boundaries and optionally logs the original values.
The valid value ranges are provided via dictionary for each column to be cleaned. Boundaries can either be of
static or dynamic nature that resolve to numbers, timetamps or dates.
Examples
--------
>>> from pyspark.sql import functions as F
>>> from spooq.transformer import ThresholdCleaner
>>> transformer = ThresholdCleaner(
>>> thresholds={
>>> "created_at": {
>>> "min": F.date_sub(F.current_date(), 365 * 10), # Ten years ago
>>> "max": F.current_date()
>>> },
>>> "size_cm": {
>>> "min": 70,
>>> "max": 250,
>>> "default": None
>>> },
>>> }
>>> )
>>> from spooq.transformer import ThresholdCleaner
>>> from pyspark.sql import Row
>>>
>>> input_df = spark.createDataFrame([
>>> Row(id=0, integers=-5, doubles=-0.75),
>>> Row(id=1, integers= 5, doubles= 1.25),
>>> Row(id=2, integers=15, doubles= 0.67),
>>> ])
>>> transformer = ThresholdCleaner(
>>> thresholds={
>>> "integers": {"min": 0, "max": 10},
>>> "doubles": {"min": -1, "max": 1}
>>> },
>>> column_to_log_cleansed_values="cleansed_values_threshold",
>>> store_as_map=True,
>>> )
>>> output_df = transformer.transform(input_df)
>>> output_df.show(truncate=False)
+---+--------+-----------+---------------------+
|id |integers|doubles|cleansed_values_threshold|
+---+--------+-------+-------------------------+
|0 |null |-0.75 |{integers -> -5} |
|1 |5 |null |{doubles -> 1.25} |
|2 |null |0.67 |{integers -> 15} |
+---+--------+-------+-------------------------+
>>> output_df.printSchema()
|-- id: long (nullable = true)
|-- integers: long (nullable = true)
|-- doubles: double (nullable = true)
|-- cleansed_values_threshold: map (nullable = false)
| |-- key: string
| |-- value: string (valueContainsNull = true)
Parameters
----------
thresholds : :py:class:`dict`
Dictionary containing column names and respective valid ranges
column_to_log_cleansed_values : :any:`str`, Defaults to None
Defines a column in which the original (uncleansed) value will be stored in case of cleansing. If no column
name is given, nothing will be logged.
store_as_map : :any:`bool`, Defaults to False
Specifies if the logged cleansed values should be stored in a column as :class:`~pyspark.sql.types.MapType` with
stringified values or as :class:`~pyspark.sql.types.StructType` with the original respective data types.
Note
----
Following cleansing rule attributes per column are supported:
* min, mandatory: :any:`str`, :any:`int`, |SPARK_COLUMN|, |SPARK_FUNCTION|
A number or timestamp/date which serves as the lower limit for allowed values. Values
below this threshold will be cleansed. Supports literals, Spark functions and Columns.
* max, mandatory: :any:`str`, :any:`int`, |SPARK_COLUMN|, |SPARK_FUNCTION|
A number or timestamp/date which serves as the upper limit for allowed values. Values
above this threshold will be cleansed. Supports literals, Spark functions and Columns.
* default, defaults to None: :any:`str`, :any:`int`, |SPARK_COLUMN|, |SPARK_FUNCTION|
If a value gets cleansed it gets replaced with the provided default value. Supports literals,
Spark functions and Columns.
The :py:meth:`~pyspark.sql.Column.between` method is used internally.
Returns
-------
|SPARK_DATAFRAME|
The transformed DataFrame
Raises
------
:any:`exceptions.ValueError`
Threshold-based cleaning only supports Numeric, Date and Timestamp Types!
Column with name: {col_name} and type of: {col_type} was provided
Warning
-------
Only Numeric, TimestampType, and DateType data types are supported!
"""
def __init__(self, thresholds={}, column_to_log_cleansed_values=None, store_as_map=False):
super().__init__(
cleaning_definitions=thresholds,
column_to_log_cleansed_values=column_to_log_cleansed_values,
store_as_map=store_as_map,
temporary_columns_prefix="dac28b56d8055953a7038bfe3b5097e7",
)
self.logger.debug("Range Definitions: " + str(self.cleaning_definitions))