Source code for spooq2.transformer.enum_cleaner
import sys
import pyspark.sql.functions as F, types as T
from pyspark.sql.column import Column
from .transformer import Transformer
[docs]class EnumCleaner(Transformer):
"""
Cleanses a dataframe based on lists of allowed|disallowed values.
Example
-------
>>> transformer = EnumCleaner(
>>> cleaning_definitions={
>>> "status": {
>>> "elements": ["active", "inactive"],
>>> },
>>> "version": {
>>> "elements": ["", "None", "none", "null", "NULL"],
>>> "mode": "disallow",
>>> "default": None
>>> },
>>> }
>>> )
Parameters
----------
cleaning_definitions : :py:class:`dict`
Dictionary containing column names and respective cleansing rules
Note
----
Following cleansing rule attributes per column are supported:
* elements, mandatory - :class:`list`
A list of elements which will be used to allow or reject (based on mode) values from the input DataFrame.
* mode, allow|disallow, defaults to 'allow' - :any:`str`
"allow" will set all values which are NOT in the list (ignoring NULL) to the default value.
"disallow" will set all values which ARE in the list (ignoring NULL) to the default value.
* default, defaults to None - :class:`~pyspark.sql.column.Column` or any primitive Python value
If a value gets cleansed it gets replaced with the provided default value.
Returns
-------
:any:`pyspark.sql.DataFrame`
The transformed DataFrame
Raises
------
:any:`exceptions.ValueError`
Enumeration-based cleaning requires a non-empty list of elements per cleaning rule!
Spooq did not find such a list for column: {column_name}
:any:`exceptions.ValueError`
Only the following modes are supported by EnumCleaner: 'allow' and 'disallow'.
Warning
-------
None values are explicitly ignored as input values because `F.lit(None).isin(["elem1", "elem2"])` will neither
return True nor False but None.
If you want to replace Null values you should use the method ~pyspark.sql.DataFrame.fillna from Spark.
"""
def __init__(self, cleaning_definitions={}):
super().__init__()
self.cleaning_definitions = cleaning_definitions
self.logger.debug("Enumeration List: " + str(self.cleaning_definitions))
[docs] def transform(self, input_df):
self.logger.debug("input_df Schema: " + input_df._jdf.schema().treeString())
for column_name, cleaning_definition in list(self.cleaning_definitions.items()):
self.logger.debug(f"Cleaning Definition for Column {column_name}: {str(cleaning_definition)}")
elements = cleaning_definition.get("elements", None)
if not elements:
raise ValueError(
f"Enumeration-based cleaning requires a non-empty list of elements per cleaning rule!",
f"\nSpooq did not find such a list for column: {column_name}"
)
mode = cleaning_definition.get("mode", "allow")
substitute = cleaning_definition.get("default", None)
data_type = input_df.schema[column_name].dataType
if not isinstance(substitute, Column):
substitute = F.lit(substitute)
if mode == "allow":
input_df = input_df.withColumn(
column_name,
F.when(F.col(column_name).isNull(), F.lit(None))
.otherwise(
F.when(F.col(column_name).isin(elements), F.col(column_name))
.otherwise(substitute)
)
.cast(data_type)
)
elif mode == "disallow":
input_df = input_df.withColumn(
column_name,
F.when(F.col(column_name).isNull(), F.lit(None))
.otherwise(
F.when(F.col(column_name).isin(elements), substitute)
.otherwise(F.col(column_name))
)
.cast(data_type)
)
else:
raise ValueError(
f"Only the following modes are supported by EnumCleaner: 'allow' and 'disallow'."
)
return input_df