Source code for spooq2.transformer.threshold_cleaner
from __future__ import absolute_import
import sys
if sys.version_info.major > 2:
# This is needed for python 2 as otherwise pyspark raises an exception for following command:
# data_type = input_df.schema[str(column_name)].dataType
# Pyspark checks if the input is a string, which does not work
# with the new strings from builtins
from builtins import str
import pyspark.sql.functions as F
import pyspark.sql.types as sql_types
from .transformer import Transformer
[docs]class ThresholdCleaner(Transformer):
"""
Sets outiers within a DataFrame to a default value.
Takes a dictionary with valid value ranges for each column to be cleaned.
Example
-------
>>> transformer = ThresholdCleaner(
>>> thresholds={
>>> "created_at": {
>>> "min": 0,
>>> "max": 1580737513,
>>> "default": None
>>> },
>>> "size_cm": {
>>> "min": 70,
>>> "max": 250,
>>> "default": None
>>> },
>>> }
>>> )
Parameters
----------
thresholds : :py:class:`dict`
Dictionary containing column names and respective valid ranges
Returns
-------
:any:`pyspark.sql.DataFrame`
The transformed DataFrame
Raises
------
:any:`exceptions.ValueError`
Threshold-based cleaning only supports Numeric, Date and Timestamp Types!
Column with name: {col_name} and type of: {col_type} was provided
Warning
-------
Only Numeric, TimestampType, and DateType data types are supported!
"""
def __init__(self, thresholds={}):
super(ThresholdCleaner, self).__init__()
self.thresholds = thresholds
self.logger.debug("Range Definitions: " + str(self.thresholds))
[docs] def transform(self, input_df):
self.logger.debug("input_df Schema: " + input_df._jdf.schema().treeString())
ordered_column_names = input_df.columns
for column_name, value_range in list(self.thresholds.items()):
data_type = input_df.schema[str(column_name)].dataType
if not isinstance(data_type, (sql_types.NumericType,
sql_types.DateType,
sql_types.TimestampType)):
raise ValueError(
"Threshold-based cleaning only supports Numeric, Date and Timestamp Types!\n",
"Column with name: {col_name} and type of: {col_type} was provided".format(
col_name=column_name, col_type=data_type
),
)
self.logger.debug(
"Ranges for column " + column_name + ": " + str(value_range)
)
input_df = input_df.withColumn(
column_name,
F.when(
input_df[column_name].between(
value_range["min"], value_range["max"]
),
input_df[column_name],
)
.otherwise(F.lit(value_range.get("default", None)))
.cast(data_type),
)
return input_df.select(ordered_column_names)