import sys
import pyspark.sql.functions as F, types as T
from pyspark.sql.column import Column
from .base_cleaner import BaseCleaner
[docs]class EnumCleaner(BaseCleaner):
"""
Cleanses a dataframe based on lists of allowed|disallowed values.
Examples
--------
>>> from spooq.transformer import EnumCleaner
>>>
>>> transformer = EnumCleaner(
>>> cleaning_definitions={
>>> "status": {
>>> "elements": ["active", "inactive"],
>>> },
>>> "version": {
>>> "elements": ["", "None", "none", "null", "NULL"],
>>> "mode": "disallow",
>>> "default": None,
>>> },
>>> }
>>> )
>>> from spooq.transformer import EnumCleaner
>>> from pyspark.sql import Row
>>>
>>> input_df = spark.createDataFrame([
>>> Row(a="stay", b="positive"),
>>> Row(a="stay", b="negative"),
>>> Row(a="stay", b="positive"),
>>> ])
>>> transformer = EnumCleaner(
>>> cleaning_definitions={
>>> "b": {
>>> "elements": ["positive"],
>>> "mode": "allow",
>>> }
>>> },
>>> column_to_log_cleansed_values="cleansed_values_enum",
>>> store_as_map=True,
>>> )
>>> output_df = transformer.transform(input_df)
>>> output_df.show()
+----+--------+--------------------+
| a| b|cleansed_values_enum|
+----+--------+--------------------+
|stay|positive| []|
|stay| null| [b -> negative]|
|stay|positive| []|
+----+--------+--------------------+
>>> output_df.printSchema()
root
|-- a: string (nullable = true)
|-- b: string (nullable = true)
|-- cleansed_values_enum: map (nullable = false)
| |-- key: string
| |-- value: string (valueContainsNull = true)
Parameters
----------
cleaning_definitions : :py:class:`dict`
Dictionary containing column names and respective cleansing rules
column_to_log_cleansed_values : :any:`str`, Defaults to None
Defines a column in which the original (uncleansed) value will be stored in case of cleansing. If no column
name is given, nothing will be logged.
store_as_map : :any:`bool`, Defaults to False
Specifies if the logged cleansed values should be stored in a column as :any:`pyspark.sql.types.MapType` with
stringified values or as :any:`pyspark.sql.types.StructType` with the original respective data types.
Note
----
Following cleansing rule attributes per column are supported:
* elements, mandatory - :class:`list`
A list of elements which will be used to allow or reject (based on mode) values from the input DataFrame.
* mode, allow|disallow, defaults to 'allow' - :any:`str`
"allow" will set all values which are NOT in the list (ignoring NULL) to the default value.
"disallow" will set all values which ARE in the list (ignoring NULL) to the default value.
* default, defaults to None - :class:`~pyspark.sql.column.Column` or any primitive Python value
If a value gets cleansed it gets replaced with the provided default value.
Returns
-------
|SPARK_DATAFRAME|
The transformed DataFrame
Raises
------
:any:`exceptions.ValueError`
Enumeration-based cleaning requires a non-empty list of elements per cleaning rule!
Spooq did not find such a list for column: {column_name}
:any:`exceptions.ValueError`
Only the following modes are supported by EnumCleaner: 'allow' and 'disallow'.
Warning
-------
None values are explicitly ignored as input values because `F.lit(None).isin(["elem1", "elem2"])` will neither
return True nor False but None.
If you want to replace Null values you should use the transformer :py:class:`spooq.transformer.NullCleaner`
"""
def __init__(self, cleaning_definitions={}, column_to_log_cleansed_values=None, store_as_map=False):
super().__init__(
cleaning_definitions=cleaning_definitions,
column_to_log_cleansed_values=column_to_log_cleansed_values,
store_as_map=store_as_map,
temporary_columns_prefix="9b7798529fef529c8f2586be7ca43a66",
)
self.logger.debug("Enumeration List: " + str(self.cleaning_definitions))