Source code for spooq.transformer.null_cleaner
import pyspark.sql.functions as F
from pyspark.sql.column import Column
from .base_cleaner import BaseCleaner
[docs]class NullCleaner(BaseCleaner):
"""
Fills Null values of the specifield fields.
Takes a dictionary with the fields to be cleaned and the default value to be set when the field is null.
Examples
--------
>>> from pyspark.sql import functions as F
>>> from spooq.transformer import NullCleaner
>>> transformer = NullCleaner(
>>> cleaning_definitions={
>>> "points": {
>>> "default": 0
>>> }
>>> }
>>> )
>>> from spooq.transformer import NullCleaner
>>> from pyspark.sql import Row
>>>
>>> input_df = spark.createDataFrame([
>>> Row(id=0, points=5),
>>> Row(id=1, points= None),
>>> Row(id=2, points=15),
>>> ])
>>> transformer = NullCleaner(
>>> cleaning_definitions={
>>> "points": {"default": 0},
>>> },
>>> column_to_log_cleansed_values="cleansed_values_null",
>>> store_as_map=True,
>>> )
>>> output_df = transformer.transform(input_df)
>>> output_df.show()
+---+------+--------------------+
| id|points|cleansed_values_null|
+---+------+--------------------+
| 0| 5| null|
| 1| 0| [points -> null]|
| 2| 15| null|
+---+------+--------------------+
>>> output_df.printSchema()
|-- id: long (nullable = true)
|-- points: long (nullable = true)
|-- cleansed_values_null: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
Parameters
----------
cleaning_definitions : :py:class:`dict`
Dictionary containing column names and respective default values
column_to_log_cleansed_values : :any:`str`, Defaults to None
Defines a column in which the original (uncleansed) value will be stored in case of cleansing. If no column
name is given, nothing will be logged.
store_as_map : :any:`bool`, Defaults to False
Specifies if the logged cleansed values should be stored in a column as :any:`pyspark.sql.types.MapType` or as
:any:`pyspark.sql.types.StructType` with stringified values.
Note
----
The following cleaning_definitions attributes per column are mandatory:
* default - :class:`~pyspark.sql.column.Column` or any primitive Python value
If a value gets cleansed it gets replaced with the provided default value.
Returns
-------
|SPARK_DATAFRAME|
The transformed DataFrame
Raises
------
:any:`exceptions.ValueError`
Null-based cleaning requires the field default.
Default parameter is not specified for column with name: {column_name}
"""
def __init__(self, cleaning_definitions=None, column_to_log_cleansed_values=None, store_as_map=False):
cleaning_definitions = cleaning_definitions if cleaning_definitions is not None else {}
super().__init__(
cleaning_definitions=cleaning_definitions,
column_to_log_cleansed_values=column_to_log_cleansed_values,
store_as_map=store_as_map,
temporary_columns_prefix="ead8cn9f7tf0sf1cs1ua61464zti82kj",
)
self.logger.debug("Cleansing Definitions: " + str(self.cleaning_definitions))
[docs] def transform(self, input_df):
self.logger.debug("input_df Schema: " + input_df._jdf.schema().treeString())
ordered_column_names = input_df.columns
if self.column_to_log_cleansed_values:
column_names_to_clean = self.cleaning_definitions.keys()
temporary_column_names = self._get_temporary_column_names(column_names_to_clean)
input_df = self._add_temporary_columns(input_df, column_names_to_clean, temporary_column_names)
cleansing_expressions = []
for column_name, cleansing_value in list(self.cleaning_definitions.items()):
data_type = input_df.schema[str(column_name)].dataType
substitute = cleansing_value.get("default")
if substitute is None:
raise ValueError("Null-based cleaning requires the field default.", f"Default parameter is not specified for column with name: {column_name}")
if not isinstance(substitute, Column):
substitute = F.lit(substitute)
self.logger.debug("Cleansing value for column " + column_name + ": " + str(cleansing_value))
cleansing_expression = (
F.when(
F.col(column_name).isNull(),
substitute,
).otherwise(F.col(column_name))
.cast(data_type)
)
self.logger.debug("Cleansing Expression for " + column_name + ": " + str(cleansing_expression))
cleansing_expressions.append((column_name, cleansing_expression))
self.logger.info("Full null cleansing expression:")
self.logger.info(
".".join(
[
f"withColumn({column_name}, {str(cleansing_expr)})"
for (column_name, cleansing_expr) in cleansing_expressions
]
)
)
for (column_name, cleansing_expr) in cleansing_expressions:
input_df = input_df.withColumn(column_name, cleansing_expr)
if self.column_to_log_cleansed_values:
input_df = self._log_cleansed_values(input_df, column_names_to_clean, temporary_column_names)
ordered_column_names.append(self.column_to_log_cleansed_values)
return input_df.select(ordered_column_names)