Source code for spooq.transformer.mapper_custom_data_types

"""This is a collection of module level methods to construct a specific
PySpark DataFrame query for custom defined data types.

These methods are not meant to be called directly but via the
the :py:class:`~spooq.transformer.mapper.Mapper` transformer.
Please see that particular class on how to apply custom data types.

For injecting your **own custom data types**, please have a visit to the
:py:meth:`add_custom_data_type` method!
"""
import sys
from functools import wraps
from functools import partial
import logging
import warnings

from pyspark.sql import functions as F
from pyspark.sql import types as T

from spooq.transformer import mapper_transformations as spq

__all__ = ["add_custom_data_type"]


def add_custom_data_type(function_name, func):
    """
    Registers a custom data type in runtime to be used with the :py:class:`~spooq.transformer.mapper.Mapper` transformer.

    Example
    -------
    >>> import spooq.transformer.mapper_custom_data_types as custom_types
    >>> import spooq.transformer as T
    >>> from pyspark.sql import Row, functions as F, types as T

    >>> def hello_world(source_column, name):
    >>>     "A UDF (User Defined Function) in Python"
    >>>     def _to_hello_world(col):
    >>>         if not col:
    >>>             return None
    >>>         else:
    >>>             return "Hello World"
    >>>
    >>>     udf_hello_world = F.udf(_to_hello_world, T.StringType())
    >>>     return udf_hello_world(source_column).alias(name)
    >>>
    >>> input_df = spark.createDataFrame(
    >>>     [Row(hello_from=u'tsivorn1@who.int'),
    >>>      Row(hello_from=u''),
    >>>      Row(hello_from=u'gisaksen4@skype.com')]
    >>> )
    >>>
    >>> custom_types.add_custom_data_type(function_name="hello_world", func=hello_world)
    >>> transformer = T.Mapper(mapping=[("hello_who", "hello_from", "hello_world")])
    >>> df = transformer.transform(input_df)
    >>> df.show()
    +-----------+
    |  hello_who|
    +-----------+
    |Hello World|
    |       null|
    |Hello World|
    +-----------+

    >>> def first_and_last_name(source_column, name):
    >>>     "A PySpark SQL expression referencing multiple columns"
    >>>     return F.concat_ws("_", source_column, F.col("attributes.last_name")).alias(name)
    >>>
    >>> custom_types.add_custom_data_type(function_name="full_name", func=first_and_last_name)
    >>>
    >>> transformer = T.Mapper(mapping=[
    >>>     ("first_name", "attributes.first_name", "StringType"),
    >>>     ("last_name",  "attributes.last_name",  "StringType"),
    >>>     ("full_name",  "attributes.first_name", "full_name"),
    >>> ])

    Parameters
    ----------
    function_name: :any:`str`
        The name of your custom data type

    func: compatible function
        The PySpark dataframe function which will be called on a column, defined in the mapping
        of the Mapper class.
        Required input parameters are ``source_column`` and ``name``.
        Please see the note about required input parameter of custom data types for more information!

    Note
    ----
    Required input parameter of custom data types:

    **source_column** (:py:class:`pyspark.sql.Column`) - This is where your logic will be applied.
      The mapper transformer takes care of calling this method with the right column so you can just
      handle it like an object which you would get from ``df["some_attribute"]``.

    **name** (:any:`str`) - The name how the resulting column will be named. Nested attributes are not
      supported. The Mapper transformer takes care of calling this method with the right column name.
    """

    if not function_name.startswith("_generate_select_expression_for_"):
        function_name = "_generate_select_expression_for_" + function_name
    current_module = sys.modules[__name__]
    setattr(current_module, function_name, func)


def _get_select_expression_for_custom_type(source_column, name, data_type):
    """ Internal method for calling functions dynamically """
    try:
        if hasattr(spq, data_type):
            return getattr(spq, data_type)()(source_column, name)

        else:
            current_module = sys.modules[__name__]
            function_name = "_generate_select_expression_for_" + data_type
            return getattr(current_module, function_name)(source_column, name)
    except AttributeError as e:
        error_msg = (
            "Spooq could not find a Select Expression Generator Method \n"
            + "for the custom DataType: "
            + '"'
            + data_type
            + '"'
            + "\n"
            + "Original Error Message: "
            + str(e)
        )
        raise AttributeError(error_msg)


def deprecated(func):
    @wraps(func)
    def deprecation_warning(*args, **kwargs):
        nice_name = str(func.__name__).replace("_generate_select_expression_for_", "")
        logging.getLogger("spooq").warning(
            f"The function {nice_name} is deprecated!"
        )
        warnings.warn(
            message=(
                f"The function {nice_name} is deprecated! "
                f"Please have a look at its docstring for alternatives:\n{func.__doc__}"
            ),
            category=FutureWarning
        )
        return func(*args, **kwargs)
    return deprecation_warning


[docs]@deprecated def _generate_select_expression_for_as_is(source_column, name): """ Deprecated! Please use :dt.:`~spooq.transformer.mapper_transformations.as_is` directly instead. """ return spq.as_is()(source_column, name)
[docs]@deprecated def _generate_select_expression_for_keep(source_column, name): """ Deprecated! Please use :dt.:`~spooq.transformer.mapper_transformations.as_is` directly instead. """ return spq.as_is()(source_column, name)
[docs]@deprecated def _generate_select_expression_for_no_change(source_column, name): """ Deprecated! Please use :dt.:`~spooq.transformer.mapper_transformations.as_is` directly instead. """ return spq.as_is()(source_column, name)
[docs]@deprecated def _generate_select_expression_without_casting(source_column, name): """ Deprecated! Please use :dt.:`~spooq.transformer.mapper_transformations.as_is` directly instead. """ return spq.as_is()(source_column, name)
[docs]@deprecated def _generate_select_expression_for_json_string(source_column, name): """ Deprecated! Please use :dt.:`~spooq.transformer.mapper_transformations.to_json_string` directly instead. """ return spq.to_json_string()(source_column, name)
[docs]@deprecated def _generate_select_expression_for_timestamp_ms_to_ms(source_column, name): """ Deprecated! Please just use :dt.:`~spooq.transformer.mapper_transformations.as_is` with ``"long"`` as data_type instead. This method doesn't do any cleansing anymore! """ return spq.as_is(cast="long")(source_column, name)
[docs]@deprecated def _generate_select_expression_for_timestamp_ms_to_s(source_column, name): """ Deprecated! Please use :dt.:`~spooq.transformer.mapper_transformations.apply` with a lambda instead. """ return spq.apply(func=lambda val: F.lit(val).cast("double") / 1000.0, cast="long")(source_column, name)
[docs]@deprecated def _generate_select_expression_for_timestamp_s_to_ms(source_column, name): """ Deprecated! Please use :dt.:`~spooq.transformer.mapper_transformations.apply` with a lambda instead. """ return spq.apply(func=lambda val: F.lit(val).cast("double") * 1000.0, cast="long")(source_column, name)
[docs]@deprecated def _generate_select_expression_for_timestamp_s_to_s(source_column, name): """ Deprecated! Please just use :dt.:`~spooq.transformer.mapper_transformations.as_is` with ``"long"`` as data_type instead. This method doesn't do any cleansing anymore! """ return spq.as_is(cast="long")(source_column, name)
[docs]@deprecated def _generate_select_expression_for_StringNull(source_column, name): # noqa: N802 """ Deprecated! Please just use ``F.lit(None)`` as source_column and ``"string"`` as data_type instead! """ return F.lit(None).cast("string").alias(name)
[docs]@deprecated def _generate_select_expression_for_IntNull(source_column, name): # noqa: N802 """ Deprecated! Please just use ``F.lit(None)`` as source_column and ``"int"`` as data_type instead! """ return F.lit(None).cast("int").alias(name)
[docs]@deprecated def _generate_select_expression_for_StringBoolean(source_column, name): # noqa: N802 """ Deprecated! Please use :dt.:`~spooq.transformer.mapper_transformations.has_value` instead. Used for Anonymizing. The column's value will be replaced by ``"1"`` if it is: * not NULL and * not an empty string Example ------- >>> from pyspark.sql import Row >>> from spooq.transformer import Mapper >>> >>> input_df = spark.createDataFrame( >>> [Row(email=u'tsivorn1@who.int'), >>> Row(email=u''), >>> Row(email=u'gisaksen4@skype.com')] >>> ) >>> >>> mapping = [("email", "email", "StringBoolean")] >>> output_df = Mapper(mapping).transform(input_df) >>> output_df.head(3) [Row(email=u'1'), Row(email=None), Row(email=u'1')] """ return ( F.when(source_column.isNull(), F.lit(None)) .when(source_column == "", F.lit(None)) .otherwise("1") .cast("string") .alias(name) )
[docs]@deprecated def _generate_select_expression_for_IntBoolean(source_column, name): # noqa: N802 """ Deprecated! Please use :dt.:`~spooq.transformer.mapper_transformations.has_value` instead. Used for Anonymizing. The column's value will be replaced by ``1`` if it contains a non-NULL value. Example ------- >>> from pyspark.sql import Row >>> from spooq.transformer import Mapper >>> >>> input_df = spark.createDataFrame( >>> [Row(facebook_id=3047288), >>> Row(facebook_id=0), >>> Row(facebook_id=None)] >>> ) >>> >>> mapping = [("facebook_id", "facebook_id", "IntBoolean")] >>> output_df = Mapper(mapping).transform(input_df) >>> output_df.head(3) [Row(facebook_id=1), Row(facebook_id=1), Row(facebook_id=None)] Note ---- ``0`` (zero) or negative numbers are still considered as valid values and therefore converted to ``1``. """ return F.when(source_column.isNull(), F.lit(None)).otherwise(1).cast("int").alias(name)
[docs]@deprecated def _generate_select_expression_for_TimestampMonth(source_column, name): # noqa: N802 """ Deprecated! Please use :dt.:`~spooq.transformer.mapper_transformations.apply` instead. """ _truncate_day = partial(F.trunc, format="month") return spq.apply( source_column=source_column, name=name, func=_truncate_day, cast="timestamp" )
[docs]@deprecated def _generate_select_expression_for_meters_to_cm(source_column, name): """ Deprecated! Please use :dt.:`~spooq.transformer.mapper_transformations.meters_to_cm` directly instead. """ return spq.meters_to_cm()(source_column, name)
[docs]@deprecated def _generate_select_expression_for_has_value(source_column, name): """ Deprecated! Please use :dt.:`~spooq.transformer.mapper_transformations.has_value` directly instead. """ return spq.has_value()(source_column, name)
[docs]@deprecated def _generate_select_expression_for_unix_timestamp_ms_to_spark_timestamp(source_column, name): """ Deprecated! Please use :dt.:`~spooq.transformer.mapper_transformations.to_timestamp` directly instead. """ return spq.to_timestamp()(source_column, name)
[docs]@deprecated def _generate_select_expression_for_extended_string_to_int(source_column, name): """ Deprecated! Please use :dt.:`~spooq.transformer.mapper_transformations.to_num` directly instead and define the ``cast`` as ``"int"``. """ return spq.to_num(cast="int")(source_column, name)
[docs]@deprecated def _generate_select_expression_for_extended_string_to_long(source_column, name): """ Deprecated! Please use :dt.:`~spooq.transformer.mapper_transformations.to_num` directly instead and define the ``cast`` as ``"long"``. """ return spq.to_num(cast="long")(source_column, name)
[docs]@deprecated def _generate_select_expression_for_extended_string_to_float(source_column, name): """ Deprecated! Please use :dt.:`~spooq.transformer.mapper_transformations.to_num` directly instead and define the ``cast`` as ``"float"``. """ return spq.to_num(cast="float")(source_column, name)
[docs]@deprecated def _generate_select_expression_for_extended_string_to_double(source_column, name): """ Deprecated! Please use :dt.:`~spooq.transformer.mapper_transformations.to_num` directly instead and define the ``cast`` as ``"double"``. """ return spq.to_num(cast="double")(source_column, name)
[docs]@deprecated def _generate_select_expression_for_extended_string_to_boolean(source_column, name): """ Deprecated! Please use :dt.:`~spooq.transformer.mapper_transformations.to_bool` directly instead. """ return spq.to_bool()(source_column, name)
[docs]@deprecated def _generate_select_expression_for_extended_string_to_timestamp(source_column, name): """ Deprecated! Please use :dt.:`~spooq.transformer.mapper_transformations.to_timestamp` instead. """ return spq.to_timestamp()(source_column, name)
[docs]@deprecated def _generate_select_expression_for_extended_string_to_date(source_column, name): """ Deprecated! Please use :dt.:`~spooq.transformer.mapper_transformations.to_timestamp` and cast to date instead. """ return spq.to_timestamp(cast="date")(source_column, name)
[docs]@deprecated def _generate_select_expression_for_extended_string_unix_timestamp_ms_to_timestamp(source_column, name): """ Deprecated! Please use :dt.:`~spooq.transformer.mapper_transformations.to_timestamp` instead. """ return spq.to_timestamp()(source_column, name)
[docs]@deprecated def _generate_select_expression_for_extended_string_unix_timestamp_ms_to_date(source_column, name): """ Deprecated! Please use :dt.:`~spooq.transformer.mapper_transformations.to_timestamp` and cast to date instead. """ return spq.to_timestamp(cast="date")(source_column, name)