from pyspark.sql.utils import AnalysisException
from pyspark.sql.functions import lit
from pyspark.sql import types as sql_types
from transformer import Transformer
from mapper_custom_data_types import _get_select_expression_for_custom_type
[docs]class Mapper(Transformer):
"""
Constructs and applies a PySpark SQL expression, based on the provided mapping.
Examples
--------
>>> mapping = [
>>> ('id', 'data.relationships.food.data.id', 'StringType'),
>>> ('message_id', 'data.id', 'StringType'),
>>> ('type', 'data.relationships.food.data.type', 'StringType'),
>>> ('created_at', 'elem.attributes.created_at', 'timestamp_ms_to_s'),
>>> ('updated_at', 'elem.attributes.updated_at', 'timestamp_ms_to_s'),
>>> ('deleted_at', 'elem.attributes.deleted_at', 'timestamp_ms_to_s'),
>>> ('brand', 'elem.attributes.brand', 'StringType')
>>> ]
>>> transformer = Mapper(mapping=mapping)
>>> mapping = [
>>> ('id', 'data.relationships.food.data.id', 'StringType'),
>>> ('updated_at', 'elem.attributes.updated_at', 'timestamp_ms_to_s'),
>>> ('deleted_at', 'elem.attributes.deleted_at', 'timestamp_ms_to_s'),
>>> ('name', 'elem.attributes.name', 'array')
>>> ]
>>> transformer = Mapper(mapping=mapping)
Parameters
----------
mapping : :class:`list` of :any:`tuple` containing three :any:`str`
This is the main parameter for this transformation. It essentially gives information
about the column names for the output DataFrame, the column names (paths)
from the input DataFrame, and their data types. Custom data types are also supported, which can
clean, pivot, anonymize, ... the data itself. Please have a look at the
:py:mod:`spooq2.transformer.mapper_custom_data_types` module for more information.
Note
----
Let's talk about Mappings:
The mapping should be a list of tuples which are containing all information per column.
* Column Name : :any:`str`
Sets the name of the column in the resulting output DataFrame.
* Source Path / Name : :any:`str`
Points to the name of the column in the input DataFrame. If the input
is a flat DataFrame, it will essentially be the column name. If it is of complex
type, it will point to the path of the actual value. For example:
``data.relationships.sample.data.id``, where id is the value we want.
* DataType : :any:`str`
DataTypes can be types from :any:`pyspark.sql.types`, selected custom datatypes or
injected, ad-hoc custom datatypes.
The datatype will be interpreted as a PySpark built-in if it is a member of ``pyspark.sql.types``.
If it is not an importable PySpark data type, a method to construct the statement will be
called by the data type's name.
Note
----
Please see :py:mod:`spooq2.transformer.mapper_custom_data_types` for all available custom
data types and how to inject your own.
Note
----
Attention: Decimal is NOT SUPPORTED by Hive! Please use Double instead!
"""
def __init__(self, mapping):
super(Mapper, self).__init__()
self.mapping = mapping
self.logger.debug("Mapping: {mp}".format(mp=unicode(self.mapping)))