"""
This is a collection of module level functions to be applied to a DataFrame.
These methods can be used with the :py:class:`~spooq.transformer.mapper.Mapper` transformer
or directly within a ``select`` or ``withColumn`` statement.
All functions support following generic functionalities:
- ``alt_src_cols``: Alternative source columns that will be used within a coalesce function if provided
- ``cast``: Explicit casting after the transformation (sane defaults are set for each function)
``to_str``, ``to_int``, ``to_long``, ``to_float``, ``to_double`` are convenience methods with
a hardcoded cast that cannot be changed.
All examples assume following code has been executed before:
>>> from pyspark.sql import Row
>>> from pyspark.sql import functions as F, types as T
>>> from spooq.transformer import Mapper
>>> from spooq.transformer import mapper_transformations as spq
"""
import re
from functools import partial
from typing import Any, Union, List, Tuple, Set, Callable
import json
from pyspark.sql import functions as F, types as T
from pyspark.sql.column import Column
COLUMN_NAME_PATTERN = re.compile(r".*\'(.*)\'")
def _coalesce_source_columns(
source_column: Union[str, Column],
alt_src_cols: Union[str, Column, List[Union[str, Column]], Tuple[Union[str, Column]], Set[Union[str, Column]]],
) -> Column:
if alt_src_cols is None or alt_src_cols is False:
return source_column
if not isinstance(alt_src_cols, (list, tuple, set)):
return F.coalesce(source_column, alt_src_cols)
return F.coalesce(source_column, *alt_src_cols)
def _get_executable_function(
inner_func: Callable, source_column: Union[str, Column], name: str, **kwargs
) -> Union[partial, Column]:
direct_call = True
if isinstance(source_column, str):
name = name or source_column.split(".")[-1]
source_column = F.col(source_column)
elif isinstance(source_column, Column):
name = name or re.search(COLUMN_NAME_PATTERN, source_column.name()).group(1)
else:
direct_call = False
if direct_call:
return inner_func(source_column, name, **kwargs)
else:
return partial(inner_func, **kwargs)
[docs]def as_is(source_column: Union[str, Column] = None, name: str = None, **kwargs) -> Union[partial, Column]:
"""
Returns a renamed column without any casting. This is especially useful if you need to
keep a complex data type (f.e. array, list or struct).
Parameters
----------
source_column : str or Column
Input column. Can be a name, pyspark column or pyspark function
name : str, default -> derived from input column
Name of the output column. (``.alias(name)``)
Keyword Arguments
-----------------
alt_src_cols : str, default -> no coalescing, only source_column
Coalesce with source_column and columns from this parameter.
cast : T.DataType(), default -> no casting, same return data type as input data type
Applies provided datatype on output column (``.cast(cast)``)
Examples
--------
>>> input_df = spark.createDataFrame([
... Row(friends=[Row(first_name="Gianni", id=3993, last_name="Weber"),
... Row(first_name="Arielle", id=17484, last_name="Greaves")]),
... ])
>>>
>>> input_df.select(spq.as_is("friends.first_name")).show(truncate=False)
+-----------------+
|[Gianni, Arielle]|
+-----------------+
>>>
>>> mapping = [("my_friends", "friends", spq.as_is)]
>>> output_df = Mapper(mapping).transform(input_df)
>>> output_df.show(truncate=False)
+--------------------------------------------------+
|my_friends |
+--------------------------------------------------+
|[[Gianni, 3993, Weber], [Arielle, 17484, Greaves]]|
+--------------------------------------------------+
Returns
-------
partial or Column
This method returns a suitable type depending on how it was called. This ensures compability
with Spooq's mapper transformer - with or without explicit parameters - as well as direct calls via select,
withColumn, where, ...
"""
def _inner_func(source_column, name, alt_src_cols, cast):
source_column = _coalesce_source_columns(source_column, alt_src_cols)
if cast:
source_column = source_column.cast(cast)
return source_column.alias(name)
args = dict(
alt_src_cols=kwargs.get("alt_src_cols", False),
cast=kwargs.get("cast", False),
)
return _get_executable_function(_inner_func, source_column, name, **args)
[docs]def to_num(source_column=None, name=None, **kwargs: Any) -> Union[partial, Column]:
"""
More robust conversion to number data types (Default: LongType).
This method is able to additionally handle (compared to implicit Spark conversion):
* Preceding and/or trailing whitespace
* underscores as 'thousand' separators
Parameters
----------
source_column : str or Column
Input column. Can be a name, pyspark column or pyspark function
name : str, default -> derived from input column
Name of the output column. (``.alias(name)``)
Keyword Arguments
-----------------
alt_src_cols : str, default -> no coalescing, only source_column
Coalesce with source_column and columns from this parameter.
cast : T.DataType(), default -> T.LongType()
Applies provided datatype on output column (``.cast(cast)``)
Examples
--------
>>> input_df = spark.createDataFrame(
... [
... Row(input_string=" 123456 "),
... Row(input_string="Hello"),
... Row(input_string="123_456")
... ], schema="input_key string"
... )
>>>
>>> input_df.select(spq.to_num("input_key")).show(truncate=False)
+---------+
|123456 |
|null |
|123456 |
+---------+
>>>
>>> mapping = [
... ("original_value", "input_key", spq.as_is),
... ("transformed_value", "input_key", spq.to_num)
... ]
>>> output_df = Mapper(mapping).transform(input_df)
>>> output_df.show(truncate=False)
+--------------+-----------------+
|original_value|transformed_value|
+--------------+-----------------+
| 123456 |123456 |
|Hello |null |
|123_456 |123456 |
+--------------+-----------------+
Returns
-------
partial or Column
This method returns a suitable type depending on how it was called. This ensures compability
with Spooq's mapper transformer - with or without explicit parameters - as well as direct calls via select,
withColumn, where, ...
"""
def _inner_func(source_column, name, alt_src_cols, cast):
source_column = _coalesce_source_columns(source_column, alt_src_cols)
return F.regexp_replace(F.trim(source_column), "_", "").cast(cast).alias(name)
args = dict(
alt_src_cols=kwargs.get("alt_src_cols", False),
cast=kwargs.get("cast", T.LongType()),
)
return _get_executable_function(_inner_func, source_column, name, **args)
[docs]def to_bool(source_column=None, name=None, **kwargs: Any) -> partial:
"""
More robust conversion to BooleanType.
This method is able to additionally handle (compared to implicit Spark conversion):
* Preceding and/or trailing whitespace
* Define additional strings for true/false values ("on"/"off", "enabled"/"disabled" are added by default)
Parameters
----------
source_column : str or Column
Input column. Can be a name, pyspark column or pyspark function
name : str, default -> derived from input column
Name of the output column. (``.alias(name)``)
Keyword Arguments
-----------------
case_sensitive : Bool, default -> False
Defines whether the case for the additional true/false lookup values is considered
true_values : list, default -> ["on", "enabled"]
A list of values that should result in a ``True`` value if they are found in the source column
false_values : list, default -> ["off", "disabled"]
A list of values that should result in a ``False`` value if they are found in the source column
replace_default_values : Bool, default -> False
Defines whether additionally provided true/false values replace or extend the default list
alt_src_cols : str, default -> no coalescing, only source_column
Coalesce with source_column and columns from this parameter.
cast : T.DataType(), default -> T.BooleanType()
Applies provided datatype on output column (``.cast(cast)``)
Warning
---------
Spark (and Spooq) handles number to boolean conversions depending on the input datatype!
Please see this table for clarification:
+-------+----------+-----------------+-------------+
| Input | Result |
+-------+----------+-----------------+-------------+
| Value | Datatype | Cast to Boolean | spq.to_bool |
+=======+==========+=================+=============+
| -1 | int | True | NULL |
+-------+----------+-----------------+-------------+
| -1 | str | NULL | NULL |
+-------+----------+-----------------+-------------+
| 0 | int | False | False |
+-------+----------+-----------------+-------------+
| 0 | str | False | False |
+-------+----------+-----------------+-------------+
| 1 | int | True | True |
+-------+----------+-----------------+-------------+
| 1 | str | True | True |
+-------+----------+-----------------+-------------+
| 100 | int | True | NULL |
+-------+----------+-----------------+-------------+
| 100 | str | NULL | NULL |
+-------+----------+-----------------+-------------+
Examples
--------
>>> input_df = spark.createDataFrame(
... [
... Row(input_string=" false "),
... Row(input_string="123"),
... Row(input_string="1"),
... Row(input_string="Enabled"),
... Row(input_string="?"),
... Row(input_string="n")
... ], schema="input_key string"
... )
>>>
>>> input_df.select(spq.to_bool("input_key", false_values=["?"])).show(truncate=False)
+---------+
|false |
|null |
|true |
|false |
|false |
+---------+
>>>
>>> mapping = [
... ("original_value", "input_key", spq.as_is),
... ("transformed_value", "input_key", spq.to_bool(false_values=["?"]))
... ]
>>> output_df = Mapper(mapping).transform(input_df)
>>> output_df.show(truncate=False)
+--------------+-----------------+
|original_value|transformed_value|
+--------------+-----------------+
| false |false |
|123 |null |
|1 |true |
|Enabled |true |
|? |false |
|n |false |
+--------------+-----------------+
Returns
-------
partial or Column
This method returns a suitable type depending on how it was called. This ensures compability
with Spooq's mapper transformer - with or without explicit parameters - as well as direct calls via select,
withColumn, where, ...
"""
def _inner_func(source_column, name, true_values, false_values, case_sensitive, alt_src_cols, cast):
source_column = _coalesce_source_columns(source_column, alt_src_cols)
if case_sensitive:
true_condition = F.trim(source_column).isin(true_values)
false_condition = F.trim(source_column).isin(false_values)
else:
true_values = [str(val).lower() for val in true_values]
false_values = [str(val).lower() for val in false_values]
true_condition = F.lower(F.trim(source_column)).isin(true_values)
false_condition = F.lower(F.trim(source_column)).isin(false_values)
return (
(
F.when(true_condition, F.lit(True))
.when(false_condition, F.lit(False))
.otherwise(F.trim(source_column).cast(T.BooleanType()))
)
.cast(cast)
.alias(name)
)
args = dict(
case_sensitive=kwargs.get("case_sensitive", False),
alt_src_cols=kwargs.get("alt_src_cols", False),
cast=kwargs.get("cast", T.BooleanType()),
)
if kwargs.get("replace_default_values"):
args["true_values"] = kwargs.get("true_values", ["on", "enabled"])
args["false_values"] = kwargs.get("false_values", ["off", "disabled"])
else:
args["true_values"] = kwargs.get("true_values", []) + ["on", "enabled"]
args["false_values"] = kwargs.get("false_values", []) + ["off", "disabled"]
return _get_executable_function(_inner_func, source_column, name, **args)
[docs]def to_timestamp(source_column=None, name=None, **kwargs: Any) -> partial:
"""
More robust conversion to TimestampType (or as a formatted string).
This method supports following input types:
* Unix timestamps in seconds
* Unix timestamps in milliseconds
* Timestamps in any format supported by Spark
* Timestamps in any custom format (via ``input_format``)
* Preceding and/or trailing whitespace
Parameters
----------
source_column : str or Column
Input column. Can be a name, pyspark column or pyspark function
name : str, default -> derived from input column
Name of the output column. (``.alias(name)``)
Keyword Arguments
-----------------
max_timestamp_sec : int, default -> 4102358400 (=> 2099-12-31 01:00:00)
Defines the range in which unix timestamps are still considered as seconds (compared to milliseconds)
input_format : [str, Bool], default -> False
Spooq tries to convert the input string with the provided pattern (via ``F.unix_timestamp()``)
output_format : [str, Bool], default -> False
The output can be formatted according to the provided pattern (via ``F.date_format()``)
min_timestamp_ms : int, default -> -62135514321000 (=> Year 1)
Defines the overall allowed range to keep the timestamps within Python's ``datetime`` library limits
max_timestamp_ms : int, default -> 253402210800000 (=> Year 9999)
Defines the overall allowed range to keep the timestamps within Python's ``datetime`` library limits
alt_src_cols : str, default -> no coalescing, only source_column
Coalesce with source_column and columns from this parameter.
cast : T.DataType(), default -> T.TimestampType()
Applies provided datatype on output column (``.cast(cast)``)
Warning
---------
* Timestamps in the range (-inf, -max_timestamp_sec) and (max_timestamp_sec, inf) are treated as milliseconds
* There is a time interval (1970-01-01 +- ~2.5 months) where we can not distinguish correctly between s and ms
(e.g. 3974400000 would be treated as seconds (2095-12-11T00:00:00) as the value is smaller than
MAX_TIMESTAMP_S, but it could also be a valid date in Milliseconds (1970-02-16T00:00:00)
Examples
--------
>>> input_df = spark.createDataFrame(
... [
... Row(input_key="2020-08-12T12:43:14+0000"),
... Row(input_key="1597069446"),
... Row(input_key="1597069446000"),
... Row(input_key="2020-08-12"),
... ], schema="input_key string"
... )
>>>
>>> input_df.select(spq.to_timestamp("input_key")).show(truncate=False)
+-------------------+
|2020-08-12 14:43:14|
|2020-08-10 16:24:06|
|2020-08-10 16:24:06|
|2020-08-12 00:00:00|
+-------------------+
>>>
>>> mapping = [
... ("original_value", "input_key", spq.as_is),
... ("transformed_value", "input_key", spq.to_timestamp)
... ]
>>> output_df = Mapper(mapping).transform(input_df)
>>> output_df.show(truncate=False)
+------------------------+-------------------+
|original_value |transformed_value |
+------------------------+-------------------+
|2020-08-12T12:43:14+0000|2020-08-12 14:43:14|
|1597069446 |2020-08-10 16:24:06|
|1597069446000 |2020-08-10 16:24:06|
|2020-08-12 |2020-08-12 00:00:00|
+------------------------+-------------------+
Returns
-------
partial or Column
This method returns a suitable type depending on how it was called. This ensures compability
with Spooq's mapper transformer - with or without explicit parameters - as well as direct calls via select,
withColumn, where, ...
"""
def _inner_func(
source_column,
name,
max_timestamp_sec,
input_format,
output_format,
min_timestamp_ms,
max_timestamp_ms,
alt_src_cols,
cast,
):
source_column = _coalesce_source_columns(source_column, alt_src_cols)
if input_format:
output_col = (
F.unix_timestamp(source_column.cast(T.StringType()), input_format).cast(T.TimestampType()).alias(name)
)
else:
output_col = (
F.when(
~F.trim(source_column).cast(T.LongType()).between(min_timestamp_ms, max_timestamp_ms),
F.lit(None),
)
.when(
F.abs(F.trim(source_column).cast(T.LongType())).between(0, max_timestamp_sec),
F.trim(source_column).cast(T.LongType()).cast(T.TimestampType()),
)
.when(
F.abs(F.trim(source_column).cast(T.LongType())) > max_timestamp_sec,
(F.trim(source_column) / 1000).cast(T.TimestampType()),
)
.otherwise(F.trim(source_column))
)
if output_format:
output_col = F.date_format(output_col, output_format)
cast = T.StringType()
return output_col.cast(cast).alias(name)
args = dict(
max_timestamp_sec=kwargs.get("max_timestamp_sec", 4102358400),
input_format=kwargs.get("input_format", False),
output_format=kwargs.get("output_format", False),
min_timestamp_ms=kwargs.get("min_timestamp_ms", -62135514321000),
max_timestamp_ms=kwargs.get("max_timestamp_ms", 253402210800000),
alt_src_cols=kwargs.get("alt_src_cols", False),
cast=kwargs.get("cast", T.TimestampType()),
)
return _get_executable_function(_inner_func, source_column, name, **args)
[docs]def str_to_array(source_column=None, name=None, **kwargs: Any) -> partial:
"""
Splits a string into a list (ArrayType).
Parameters
----------
source_column : str or Column
Input column. Can be a name, pyspark column or pyspark function
name : str, default -> derived from input column
Name of the output column. (``.alias(name)``)
Keyword Arguments
-----------------
alt_src_cols : str, default -> no coalescing, only source_column
Coalesce with source_column and columns from this parameter.
cast : T.DataType(), default -> T.StringType()
Applies provided datatype on the elements of the output array (``.cast(T.ArrayType(cast))``)
Examples
--------
>>> input_df = spark.createDataFrame(
... [
... Row(input_key="[item1,item2,3]"),
... Row(input_key="item1,it[e]m2,it em3"),
... Row(input_key=" item1, item2 , item3")
... ], schema="input_key string"
... )
>>>
>>> input_df.select(spq.str_to_array("input_key")).show(truncate=False)
+------------------------+
|[item1, item2, 3] |
|[item1, it[e]m2, it em3]|
|[item1, item2, item3] |
+------------------------+
>>>
>>> mapping = [
... ("original_value", "input_key", spq.as_is),
... ("transformed_value", "input_key", spq.str_to_array)
... ]
>>> output_df = Mapper(mapping).transform(input_df)
>>> output_df.printSchema()
root
|-- original_value: string (nullable = true)
|-- transformed_value: array (nullable = true)
| |-- element: string (containsNull = true)
>>> output_df.show(truncate=False)
+-------------------------------+------------------------+
|original_value |transformed_value |
+-------------------------------+------------------------+
|[item1,item2,3] |[item1, item2, 3] |
|item1,it[e]m2,it em3 |[item1, it[e]m2, it em3]|
| item1, item2 , item3|[item1, item2, item3] |
+-------------------------------+------------------------+
Returns
-------
partial or Column
This method returns a suitable type depending on how it was called. This ensures compability
with Spooq's mapper transformer - with or without explicit parameters - as well as direct calls via select,
withColumn, where, ...
"""
def _inner_func(source_column, name, alt_src_cols, cast):
source_column = _coalesce_source_columns(source_column, alt_src_cols)
if isinstance(cast, str):
output_type = f"array<{cast}>"
else:
output_type = T.ArrayType(cast)
return (
F.split(F.regexp_replace(source_column, r"^\s*\[*\s*|\s*\]*\s*$", ""), r"\s*,\s*")
.cast(output_type)
.alias(name)
)
args = dict(
alt_src_cols=kwargs.get("alt_src_cols", False),
cast=kwargs.get("cast", T.StringType()),
)
return _get_executable_function(_inner_func, source_column, name, **args)
[docs]def map_values(source_column=None, name=None, **kwargs: Any) -> partial:
"""
Maps input values to specified output values.
Parameters
----------
source_column : str or Column
Input column. Can be a name, pyspark column or pyspark function
name : str, default -> derived from input column
Name of the output column. (``.alias(name)``)
Keyword Arguments
-----------------
mapping : dict
Dictionary containing lookup / substitute value pairs.
default : [str, Column, Any], default -> "source_column"
Defines what will be returned if no matching lookup value was found.
ignore_case : bool, default -> True
Only relevant for "equals" and "sql_like" comparison operators.
pattern_type : str, default -> "equals"
Please choose among ['equals', 'regex' and 'sql_like'] for the comparison of input value and mapping key.
alt_src_cols : str, default -> no coalescing, only source_column
Coalesce with source_column and columns from this parameter.
cast : T.DataType(), default -> T.StringType()
Applies provided datatype on output column (``.cast(cast)``)
Hint
----
Maybe this table helps you to better understand what happens behind the curtains:
+---------------+------------+---------------------------------------------------------------------------+
| lookup | substitute | mode | Internal Spark Logic |
+===============+============+=====================================+==================+==================+
| whitelist | allowlist | "equals" | F.when( |
| | | | F.col("input_column") == "whitelist", |
| | | | F.lit("allowlist") |
| | | | ).otherwise( |
| | | | F.col("input_column") |
| | | | ) |
+---------------+------------+---------------------------------------------------------------------------+
| %whitelist% | allowlist | "sql_like" | F.when( |
| | | | F.col("input_column").like("%whitelist%", |
| | | | F.lit("allowlist") |
| | | | ).otherwise( |
| | | | F.col("input_column") |
| | | | ) |
+---------------+------------+---------------------------------------------------------------------------+
| .*whitelist.* | allowlist | "regex " | F.when( |
| | | | F.col("input_column").rlike(".*whitelist.*", |
| | | | F.lit("allowlist") |
| | | | ).otherwise( |
| | | | F.col("input_column") |
| | | | ) |
+---------------+------------+---------------------------------------------------------------------------+
Examples
--------
>>> input_df = spark.createDataFrame(
... [
... ("allowlist", ),
... ("WhiteList", ),
... ("blocklist", ),
... ("blacklist", ),
... ("Blacklist", ),
... ("Shoppinglist", ),
... ], schema="input_key string"
... )
>>> substitute_mapping = {"whitelist": "allowlist", "blacklist": "blocklist"}
>>>
>>> input_df.select(spq.map_values("input_key", mapping=substitute_mapping)).show(truncate=False)
+------------+
|allowlist |
|allowlist |
|blocklist |
|blocklist |
|blocklist |
|Shoppinglist|
+------------+
>>>
>>> mapping = [
... ("original_value", "input_key", spq.as_is),
... ("transformed_value", "input_key", spq.map_values(mapping=substitute_mapping))
... ]
>>> output_df = Mapper(mapping).transform(input_df)
>>> output_df.show(truncate=False)
+--------------+-----------------+
|original_value|transformed_value|
+--------------+-----------------+
|allowlist |allowlist |
|WhiteList |allowlist |
|blocklist |blocklist |
|blacklist |blocklist |
|Blacklist |blocklist |
|Shoppinglist |Shoppinglist |
+--------------+-----------------+
Returns
-------
partial or Column
This method returns a suitable type depending on how it was called. This ensures compability
with Spooq's mapper transformer - with or without explicit parameters - as well as direct calls via select,
withColumn, where, ...
"""
def _inner_func(source_column, name, mapping, default, ignore_case, pattern_type, alt_src_cols, cast):
source_column = _coalesce_source_columns(source_column, alt_src_cols)
if isinstance(default, str) and default == "source_column":
default = source_column
elif not isinstance(default, Column):
default = F.lit(default)
if ignore_case and pattern_type != "regex":
mapping = {str(key).lower(): F.lit(value).cast(cast) for key, value in mapping.items()}
source_column = F.lower(source_column)
else:
mapping = {key: F.lit(value).cast(cast) for key, value in mapping.items()}
keys = list(mapping.keys())
if pattern_type == "equals":
when_clause = F.when(source_column == keys[0], mapping[keys[0]])
for key in keys[1:]:
when_clause = when_clause.when(source_column.cast(T.StringType()) == key, mapping[key])
elif pattern_type == "sql_like":
when_clause = F.when(source_column.like(keys[0]), mapping[keys[0]])
for key in keys[1:]:
when_clause = when_clause.when(source_column.like(key), mapping[key])
elif pattern_type == "regex":
when_clause = F.when(source_column.rlike(keys[0]), mapping[keys[0]])
for key in keys[1:]:
when_clause = when_clause.when(source_column.rlike(key), mapping[key])
else:
raise ValueError(
f"pattern_type <{pattern_type}> not recognized. "
"Please choose among ['equals' (default), 'regex' and 'sql_like']"
)
when_clause = when_clause.otherwise(default.cast(cast))
return when_clause.alias(name)
try:
mapping = kwargs["mapping"]
except TypeError:
raise TypeError("'map_values' is missing the mapping dict (f.e. mapping=dict(results='training'))")
if len(mapping.keys()) < 1:
raise ValueError("'map_values' received an empty map (f.e. mapping=dict(results='training'))")
args = dict(
mapping=mapping,
default=kwargs.get("default", "source_column"),
ignore_case=kwargs.get("ignore_case", True),
pattern_type=kwargs.get("pattern_type", "equals"),
alt_src_cols=kwargs.get("alt_src_cols", False),
cast=kwargs.get("cast", T.StringType()),
)
return _get_executable_function(_inner_func, source_column, name, **args)
[docs]def meters_to_cm(source_column=None, name=None, **kwargs: Any) -> partial:
"""
Converts meters to cm and casts the result to an IntegerType.
Parameters
----------
source_column : str or Column
Input column. Can be a name, pyspark column or pyspark function
name : str, default -> derived from input column
Name of the output column. (``.alias(name)``)
Keyword Arguments
-----------------
alt_src_cols : str, default -> no coalescing, only source_column
Coalesce with source_column and columns from this parameter.
cast : T.DataType(), default -> T.IntegerType()
Applies provided datatype on output column (``.cast(cast)``)
Examples
--------
>>> input_df = spark.createDataFrame([
... Row(size_in_m=1.80),
... Row(size_in_m=1.65),
... Row(size_in_m=2.05)
... ])
>>>
>>> input_df.select(spq.meters_to_cm("size_in_m")).show(truncate=False)
+---------+
|180 |
|165 |
|204 |
+---------+
>>>
>>> mapping = [
... ("original_value", "size_in_m", spq.as_is),
... ("size_in_cm", "size_in_m", spq.meters_to_cm),
... ]
>>> output_df = Mapper(mapping).transform(input_df)
>>> output_df.show(truncate=False)
+--------------+----------+
|original_value|size_in_cm|
+--------------+----------+
|1.8 |180 |
|1.65 |165 |
|2.05 |204 |
+--------------+----------+
Returns
-------
partial or Column
This method returns a suitable type depending on how it was called. This ensures compability
with Spooq's mapper transformer - with or without explicit parameters - as well as direct calls via select,
withColumn, where, ...
"""
def _inner_func(source_column, name, alt_src_cols, cast):
source_column = _coalesce_source_columns(source_column, alt_src_cols)
return (source_column * 100).cast(cast).alias(name)
args = dict(
alt_src_cols=kwargs.get("alt_src_cols", False),
cast=kwargs.get("cast", T.IntegerType()),
)
return _get_executable_function(_inner_func, source_column, name, **args)
[docs]def has_value(source_column=None, name=None, **kwargs: Any) -> partial:
"""
Returns True if the source_column is
- not NULL and
- not "" (empty string)
- otherwise it returns False
Warning
-------
This means that it will return True for values which would indicate a False value. Like "false" or 0!!!
Parameters
----------
source_column : str or Column
Input column. Can be a name, pyspark column or pyspark function
name : str, default -> derived from input column
Name of the output column. (``.alias(name)``)
Keyword Arguments
-----------------
alt_src_cols : str, default -> no coalescing, only source_column
Coalesce with source_column and columns from this parameter.
cast : T.DataType(), default -> T.BooleanType()
Applies provided datatype on output column (``.cast(cast)``)
Examples
--------
>>> input_df = spark.createDataFrame(
... [
... Row(input_key=False),
... Row(input_key=None),
... Row(input_key="some text"),
... Row(input_key="")
... ], schema="input_key string"
... )
>>>
>>> input_df.select(spq.has_value("input_key")).show(truncate=False)
+---------+
|true |
|false |
|true |
|false |
+---------+
>>>
>>> mapping = [
... ("original_value", "input_key", spq.as_is),
... ("has_value", "input_key", spq.has_value)
... ]
>>> output_df = Mapper(mapping).transform(input_df)
>>> output_df.show(truncate=False)
+--------------+---------+
|original_value|has_value|
+--------------+---------+
|false |true |
|null |false |
|some text |true |
| |false |
+--------------+---------+
Returns
-------
partial or Column
This method returns a suitable type depending on how it was called. This ensures compability
with Spooq's mapper transformer - with or without explicit parameters - as well as direct calls via select,
withColumn, where, ...
"""
def _inner_func(source_column, name, alt_src_cols, cast):
source_column = _coalesce_source_columns(source_column, alt_src_cols)
return (
F.when((source_column.isNotNull()) & (source_column.cast(T.StringType()) != ""), F.lit(True))
.otherwise(F.lit(False))
.cast(cast)
.alias(name)
)
args = dict(
alt_src_cols=kwargs.get("alt_src_cols", False),
cast=kwargs.get("cast", T.BooleanType()),
)
return _get_executable_function(_inner_func, source_column, name, **args)
[docs]def apply(source_column=None, name=None, **kwargs: Any) -> partial:
"""
Applies a function / partial
Parameters
----------
source_column : str or Column
Input column. Can be a name, pyspark column or pyspark function
name : str, default -> derived from input column
Name of the output column. (``.alias(name)``)
Keyword Arguments
-----------------
func : Callable
Function that takes the source column as single argument
alt_src_cols : str, default -> no coalescing, only source_column
Coalesce with source_column and columns from this parameter.
cast : T.DataType(), default -> no casting
Applies provided datatype on output column (``.cast(cast)``)
Examples
--------
>>> input_df = spark.createDataFrame(
... [
... ("F", ),
... ("f", ),
... ("x", ),
... ("X", ),
... ("m", ),
... ("M", ),
... ], schema="input_key string"
... )
>>>
>>> input_df.select(spq.apply("input_key", func=F.lower)).show(truncate=False)
+---------+
|f |
|f |
|x |
|x |
|m |
|m |
+---------+
>>>
>>> mapping = [
... ("original_value", "input_key", spq.as_is),
... ("transformed_value", "input_key", spq.apply(func=F.lower))
... ]
>>> output_df = Mapper(mapping).transform(input_df)
>>> output_df.show(truncate=False)
+--------------+-----------------+
|original_value|transformed_value|
+--------------+-----------------+
|F |f |
|f |f |
|x |x |
|X |x |
|m |m |
|M |m |
+--------------+-----------------+
>>> input_df = spark.createDataFrame(
... [
... ("sarajishvilileqso@gmx.at", ),
... ("jnnqn@astrinurdin.art", ),
... ("321aw@hotmail.com", ),
... ("techbrenda@hotmail.com", ),
... ("sdsxcx@gmail.com", ),
... ], schema="input_key string"
... )
...
>>> def _has_hotmail(source_column):
... return F.when(
... source_column.cast(T.StringType()).endswith("@hotmail.com"),
... F.lit(True)
... ).otherwise(F.lit(False))
...
>>> mapping = [
... ("original_value", "input_key", spq.as_is),
... ("over_sixty", "input_key", spq.apply(func=_has_hotmail))
... ]
>>> output_df = Mapper(mapping).transform(input_df)
>>> output_df.show(truncate=False)
+------------------------+----------+
|original_value |over_sixty|
+------------------------+----------+
|sarajishvilileqso@gmx.at|false |
|jnnqn@astrinurdin.art |false |
|321aw@hotmail.com |true |
|techbrenda@hotmail.com |true |
|sdsxcx@gmail.com |false |
+------------------------+----------+
Returns
-------
partial or Column
This method returns a suitable type depending on how it was called. This ensures compability
with Spooq's mapper transformer - with or without explicit parameters - as well as direct calls via select,
withColumn, where, ...
"""
def _inner_func(source_column, name, func, alt_src_cols, cast):
source_column = _coalesce_source_columns(source_column, alt_src_cols)
source_column = func(source_column)
if cast:
source_column = source_column.cast(cast)
return source_column.alias(name)
try:
func = kwargs["func"]
except TypeError:
raise TypeError("'apply' transformation is missing the custom function (f.e. func=F.lower)")
args = dict(
func=func,
alt_src_cols=kwargs.get("alt_src_cols", False),
cast=kwargs.get("cast", False),
)
return _get_executable_function(_inner_func, source_column, name, **args)
[docs]def to_json_string(source_column=None, name=None, **kwargs: Any) -> partial:
"""
Returns a column as json compatible string. Nested hierarchies are supported.
This function also supports NULL and strings as input in comparison to Spark's built-in ``to_json``.
The unicode representation of a column will be returned if an error occurs.
Parameters
----------
source_column : str or Column
Input column. Can be a name, pyspark column or pyspark function
name : str, default -> derived from input column
Name of the output column. (``.alias(name)``)
Keyword Arguments
-----------------
alt_src_cols : str, default -> no coalescing, only source_column
Coalesce with source_column and columns from this parameter.
cast : T.DataType(), default -> no casting, same return data type as input data type
Applies provided datatype on output column (``.cast(cast)``)
Examples
--------
>>> input_df = spark.createDataFrame([
... Row(friends=[Row(first_name="Gianni", id=3993, last_name="Weber"),
... Row(first_name="Arielle", id=17484, last_name="Greaves")]),
... ])
>>>
>>> input_df.select(spq.to_json_string("friends")).show(truncate=False)
+----------------------------------------------------------------------------------------------------------------------------+
|[{"first_name": "Gianni", "id": 3993, "last_name": "Weber"}, {"first_name": "Arielle", "id": 17484, "last_name": "Greaves"}]|
+----------------------------------------------------------------------------------------------------------------------------+
>>>
>>> mapping = [("friends_json", "friends", spq.to_json_string)]
>>> output_df = Mapper(mapping).transform(input_df)
>>> output_df.show(truncate=False)
+----------------------------------------------------------------------------------------------------------------------------+
|friends_json |
+----------------------------------------------------------------------------------------------------------------------------+
|[{"first_name": "Gianni", "id": 3993, "last_name": "Weber"}, {"first_name": "Arielle", "id": 17484, "last_name": "Greaves"}]|
+----------------------------------------------------------------------------------------------------------------------------+
>>> input_df.select(spq.to_json_string("friends.first_name")).show(truncate=False)
+---------------------+
|first_name |
+---------------------+
|['Gianni', 'Arielle']|
+---------------------+
Returns
-------
partial or Column
This method returns a suitable type depending on how you called it. This ensures compability
with Spooq's mapper transformer - with or without explicit parameters as well as direct calls via select,
withColumn, where, ...
"""
def _inner_func(source_column, name, alt_src_cols, cast):
def _to_json(col):
if not col:
return None
try:
if isinstance(col, list):
return json.dumps([x.asDict(recursive=True) for x in col])
else:
return json.dumps(col.asDict(recursive=True))
except (AttributeError, TypeError):
return str(col)
source_column = _coalesce_source_columns(source_column, alt_src_cols)
udf_to_json = F.udf(_to_json, cast)
return udf_to_json(source_column).alias(name)
args = dict(
alt_src_cols=kwargs.get("alt_src_cols", False),
cast=kwargs.get("cast", T.StringType()),
)
return _get_executable_function(_inner_func, source_column, name, **args)
# Convenience Methods
[docs]def to_str(source_column=None, name=None, **kwargs: Any) -> Union[partial, Column]:
"""
Convenience transformation that only casts to string.
Parameters
----------
source_column : str or Column
Input column. Can be a name, pyspark column or pyspark function
name : str, default -> derived from input column
Name of the output column. (``.alias(name)``)
Keyword Arguments
-----------------
alt_src_cols : str, default -> no coalescing, only source_column
Coalesce with source_column and columns from this parameter.
Examples
--------
>>> input_df = spark.createDataFrame(
... [
... Row(input_key=123456),
... Row(input_key=-123456),
... ], schema="input_key int"
... )
>>>
>>> input_df.select(spq.to_str("input_key")).show(truncate=False)
+---------+
|123456 |
|-123456 |
+---------+
>>>
>>> mapping = [
... ("original_value", "input_key", spq.as_is),
... ("transformed_value", "input_key", spq.to_str)
... ]
>>> output_df = Mapper(mapping).transform(input_df)
>>> output_df.show(truncate=False)
+--------------+-----------------+
|original_value|transformed_value|
+--------------+-----------------+
|123456 |123456 |
|-123456 |-123456 |
+--------------+-----------------+
Returns
-------
partial or Column
This method returns a suitable type depending on how it was called. This ensures compability
with Spooq's mapper transformer - with or without explicit parameters - as well as direct calls via select,
withColumn, where, ...
"""
def _inner_func(source_column, name, alt_src_cols):
source_column = _coalesce_source_columns(source_column, alt_src_cols)
return source_column.cast(T.StringType()).alias(name)
args = dict(
alt_src_cols=kwargs.get("alt_src_cols", False),
)
return _get_executable_function(_inner_func, source_column, name, **args)
[docs]def to_int(source_column=None, name=None, **kwargs: Any) -> Union[partial, Column]:
"""
Syntactic sugar for calling ``to_num(cast="int")``
Parameters
----------
source_column : str or Column
Input column. Can be a name, pyspark column or pyspark function
name : str, default -> derived from input column
Name of the output column. (``.alias(name)``)
Keyword Arguments
-----------------
alt_src_cols : str, default -> no coalescing, only source_column
Coalesce with source_column and columns from this parameter.
Returns
-------
partial or Column
This method returns a suitable type depending on how it was called. This ensures compability
with Spooq's mapper transformer - with or without explicit parameters - as well as direct calls via select,
withColumn, where, ...
"""
args = dict(
alt_src_cols=kwargs.get("alt_src_cols", False),
cast="int",
)
return to_num(source_column, name, **args)
[docs]def to_long(source_column=None, name=None, **kwargs: Any) -> Union[partial, Column]:
"""
Syntactic sugar for calling ``to_num(cast="long")``
Parameters
----------
source_column : str or Column
Input column. Can be a name, pyspark column or pyspark function
name : str, default -> derived from input column
Name of the output column. (``.alias(name)``)
Keyword Arguments
-----------------
alt_src_cols : str, default -> no coalescing, only source_column
Coalesce with source_column and columns from this parameter.
Returns
-------
partial or Column
This method returns a suitable type depending on how it was called. This ensures compability
with Spooq's mapper transformer - with or without explicit parameters - as well as direct calls via select,
withColumn, where, ...
"""
args = dict(
alt_src_cols=kwargs.get("alt_src_cols", False),
cast="long",
)
return to_num(source_column, name, **args)
[docs]def to_float(source_column=None, name=None, **kwargs: Any) -> Union[partial, Column]:
"""
Syntactic sugar for calling ``to_num(cast="float")``
Parameters
----------
source_column : str or Column
Input column. Can be a name, pyspark column or pyspark function
name : str, default -> derived from input column
Name of the output column. (``.alias(name)``)
Keyword Arguments
-----------------
alt_src_cols : str, default -> no coalescing, only source_column
Coalesce with source_column and columns from this parameter.
Returns
-------
partial or Column
This method returns a suitable type depending on how it was called. This ensures compability
with Spooq's mapper transformer - with or without explicit parameters - as well as direct calls via select,
withColumn, where, ...
"""
args = dict(
alt_src_cols=kwargs.get("alt_src_cols", False),
cast="float",
)
return to_num(source_column, name, **args)
[docs]def to_double(source_column=None, name=None, **kwargs: Any) -> Union[partial, Column]:
"""
Syntactic sugar for calling ``to_num(cast="double")``
Parameters
----------
source_column : str or Column
Input column. Can be a name, pyspark column or pyspark function
name : str, default -> derived from input column
Name of the output column. (``.alias(name)``)
Keyword Arguments
-----------------
alt_src_cols : str, default -> no coalescing, only source_column
Coalesce with source_column and columns from this parameter.
Returns
-------
partial or Column
This method returns a suitable type depending on how it was called. This ensures compability
with Spooq's mapper transformer - with or without explicit parameters - as well as direct calls via select,
withColumn, where, ...
"""
args = dict(
alt_src_cols=kwargs.get("alt_src_cols", False),
cast="double",
)
return to_num(source_column, name, **args)