Mapper

Class

class Mapper(mapping)[source]

Bases: spooq2.transformer.transformer.Transformer

Constructs and applies a PySpark SQL expression, based on the provided mapping.

Examples

>>> mapping = [
>>>     ('id',              'data.relationships.food.data.id',     'StringType'),
>>>     ('message_id',      'data.id',                             'StringType'),
>>>     ('type',            'data.relationships.food.data.type',   'StringType'),
>>>     ('created_at',      'elem.attributes.created_at',          'timestamp_ms_to_s'),
>>>     ('updated_at',      'elem.attributes.updated_at',          'timestamp_ms_to_s'),
>>>     ('deleted_at',      'elem.attributes.deleted_at',          'timestamp_ms_to_s'),
>>>     ('brand',           'elem.attributes.brand',               'StringType')
>>> ]
>>> transformer = Mapper(mapping=mapping)
>>> mapping = [
>>>     ('id',          'data.relationships.food.data.id',   'StringType'),
>>>     ('updated_at',  'elem.attributes.updated_at',        'timestamp_ms_to_s'),
>>>     ('deleted_at',  'elem.attributes.deleted_at',        'timestamp_ms_to_s'),
>>>     ('name',        'elem.attributes.name',              'array')
>>> ]
>>> transformer = Mapper(mapping=mapping)
Parameters:mapping (list of tuple containing three str) – This is the main parameter for this transformation. It essentially gives information about the column names for the output DataFrame, the column names (paths) from the input DataFrame, and their data types. Custom data types are also supported, which can clean, pivot, anonymize, … the data itself. Please have a look at the spooq2.transformer.mapper_custom_data_types module for more information.

Note

Let’s talk about Mappings:

The mapping should be a list of tuples which are containing all information per column.

  • Column Name : str

    Sets the name of the column in the resulting output DataFrame.

  • Source Path / Name : str

    Points to the name of the column in the input DataFrame. If the input is a flat DataFrame, it will essentially be the column name. If it is of complex type, it will point to the path of the actual value. For example: data.relationships.sample.data.id, where id is the value we want.

  • DataType : str

    DataTypes can be types from pyspark.sql.types, selected custom datatypes or injected, ad-hoc custom datatypes. The datatype will be interpreted as a PySpark built-in if it is a member of pyspark.sql.types. If it is not an importable PySpark data type, a method to construct the statement will be called by the data type’s name.

Note

Please see spooq2.transformer.mapper_custom_data_types for all available custom data types and how to inject your own.

Note

Attention: Decimal is NOT SUPPORTED by Hive! Please use Double instead!

transform(input_df)[source]

Performs a transformation on a DataFrame.

Parameters:input_df (pyspark.sql.DataFrame) – Input DataFrame
Returns:Transformed DataFrame.
Return type:pyspark.sql.DataFrame

Note

This method does only take the Input DataFrame as a parameters. All other needed parameters are defined in the initialization of the Transformator Object.

Activity Diagram

@startuml
start

skinparam monochrome true
skinparam defaultFontname Bitstream Vera Sans Mono
' skinparam defaultFontSize 18

:**""mapping""**:
- **""source_column""**
- **""name""**
- **""data_type""**;
while (unprocessed definitions in **""mapping""**?) is (yes)
  if (**""data_type""** is Spark built-in?) then (yes)
    if (**""source_column""**\n is missing?) then (yes)
      :Value = **""None""**;
    else (no)
      :Value = **""source_column""**;
    endif
    :rename to **""name""**;
    :cast as **""data_type""**;
  else (no)
    if (**""source_column""**\n is missing?) then (yes)
      :Value = **""None""**;
      :rename to **""name""**;
    else (no)
      :Value = **""source_column""**;
      :_get_select_expression_↩
       ↪for_custom_type(
          **""source_column""**, 
          **""name""**, 
          **""data_type""**
      );
    endif
  endif
  :add to global select expression;
endwhile (no)
:return global select expression;

stop
@enduml

Activity Diagram for Mapper Transformer

Custom Mapping Methods

This is a collection of module level methods to construct a specific PySpark DataFrame query for custom defined data types.

These methods are not meant to be called directly but via the the Mapper transformer. Please see that particular class on how to apply custom data types.

For injecting your own custom data types, please have a visit to the add_custom_data_type() method!

add_custom_data_type(function_name, func)[source]

Registers a custom data type in runtime to be used with the Mapper transformer.

Example

>>> import spooq2.transformer.mapper_custom_data_types as custom_types
>>> import spooq2.transformer as T
>>> from pyspark.sql import Row, functions as F, types as sql_types
>>> def hello_world(source_column, name):
>>>     "A UDF (User Defined Function) in Python"
>>>     def _to_hello_world(col):
>>>         if not col:
>>>             return None
>>>         else:
>>>             return "Hello World"
>>>
>>>     udf_hello_world = F.udf(_to_hello_world, sql_types.StringType())
>>>     return udf_hello_world(source_column).alias(name)
>>>
>>> input_df = spark.createDataFrame(
>>>     [Row(hello_from=u'tsivorn1@who.int'),
>>>      Row(hello_from=u''),
>>>      Row(hello_from=u'gisaksen4@skype.com')]
>>> )
>>>
>>> custom_types.add_custom_data_type(function_name="hello_world", func=hello_world)
>>> transformer = T.Mapper(mapping=[("hello_who", "hello_from", "hello_world")])
>>> df = transformer.transform(input_df)
>>> df.show()
+-----------+
|  hello_who|
+-----------+
|Hello World|
|       null|
|Hello World|
+-----------+
>>> def first_and_last_name(source_column, name):
>>>     "A PySpark SQL expression referencing multiple columns"
>>>     return F.concat_ws("_", source_column, F.col("attributes.last_name")).alias(name)
>>>
>>> custom_types.add_custom_data_type(function_name="full_name", func=first_and_last_name)
>>>
>>> transformer = T.Mapper(mapping=[
>>>     ("first_name", "attributes.first_name", "StringType"),
>>>     ("last_name",  "attributes.last_name",  "StringType"),
>>>     ("full_name",  "attributes.first_name", "full_name"),
>>> ])
Parameters:
  • function_name (str) – The name of your custom data type
  • func (compatible function) – The PySpark dataframe function which will be called on a column, defined in the mapping of the Mapper class. Required input parameters are source_column and name. Please see the note about required input parameter of custom data types for more information!

Note

Required input parameter of custom data types:

source_column (pyspark.sql.Column) - This is where your logic will be applied.
The mapper transformer takes care of calling this method with the right column so you can just handle it like an object which you would get from df["some_attribute"].
name (str) - The name how the resulting column will be named. Nested attributes are not
supported. The Mapper transformer takes care of calling this method with the right column name.
_get_select_expression_for_custom_type(source_column, name, data_type)[source]

Internal method for calling functions dynamically

_generate_select_expression_for_as_is(source_column, name)[source]

alias for _generate_select_expression_without_casting

_generate_select_expression_for_keep(source_column, name)[source]

alias for _generate_select_expression_without_casting

_generate_select_expression_for_no_change(source_column, name)[source]

alias for _generate_select_expression_without_casting

_generate_select_expression_without_casting(source_column, name)[source]

Returns a column without casting. This is especially useful if you need to keep a complex data type, like an array, list or a struct.

>>> from spooq2.transformer import Mapper
>>>
>>> input_df.head(3)
[Row(friends=[Row(first_name=None, id=3993, last_name=None), Row(first_name=u'Ruò', id=17484, last_name=u'Trank')]),
 Row(friends=[]),
 Row(friends=[Row(first_name=u'Daphnée', id=16707, last_name=u'Lyddiard'), Row(first_name=u'Adélaïde', id=17429, last_name=u'Wisdom')])]
>>> mapping = [("my_friends", "friends", "as_is")]
>>> output_df = Mapper(mapping).transform(input_df)
>>> output_df.head(3)
[Row(my_friends=[Row(first_name=None, id=3993, last_name=None), Row(first_name=u'Ruò', id=17484, last_name=u'Trank')]),
 Row(my_friends=[]),
 Row(my_friends=[Row(first_name=u'Daphnée', id=16707, last_name=u'Lyddiard'), Row(first_name=u'Adélaïde', id=17429, last_name=u'Wisdom')])]
_generate_select_expression_for_json_string(source_column, name)[source]

Returns a column as json compatible string. Nested hierarchies are supported. The unicode representation of a column will be returned if an error occurs.

Example

>>> from spooq2.transformer import Mapper
>>>
>>> input_df.head(3)
[Row(friends=[Row(first_name=None, id=3993, last_name=None), Row(first_name=u'Ruò', id=17484, last_name=u'Trank')]),
 Row(friends=[]),
 Row(friends=[Row(first_name=u'Daphnée', id=16707, last_name=u'Lyddiard'), Row(first_name=u'Adélaïde', id=17429, last_name=u'Wisdom')])]    >>> mapping = [("friends_json", "friends", "json_string")]
>>> mapping = [("friends_json", "friends", "json_string")]
>>> output_df = Mapper(mapping).transform(input_df)
>>> output_df.head(3)
[Row(friends_json=u'[{"first_name": null, "last_name": null, "id": 3993}, {"first_name": "Ru\u00f2", "last_name": "Trank", "id": 17484}]'),
 Row(friends_json=None),
 Row(friends_json=u'[{"first_name": "Daphn\u00e9e", "last_name": "Lyddiard", "id": 16707}, {"first_name": "Ad\u00e9la\u00efde", "last_name": "Wisdom", "id": 17429}]')]
_generate_select_expression_for_timestamp_ms_to_ms(source_column, name)[source]

This Constructor is used for unix timestamps. The values are cleaned next to casting and renaming. If the values are not between 01.01.1970 and 31.12.2099, NULL will be returned. Cast to pyspark.sql.types.LongType

Example

>>> from pyspark.sql import Row
>>> from spooq2.transformer import Mapper
>>>
>>> input_df = spark.createDataFrame([
>>>     Row(time_sec=1581540839000),  # 02/12/2020 @ 8:53pm (UTC)
>>>     Row(time_sec=-4887839000),    # Invalid!
>>>     Row(time_sec=4737139200000)   # 02/12/2120 @ 12:00am (UTC)
>>> ])
>>>
>>> mapping = [("unix_ts", "time_sec", "timestamp_ms_to_ms")]
>>> output_df = Mapper(mapping).transform(input_df)
>>> output_df.head(3)
[Row(unix_ts=1581540839000), Row(unix_ts=None), Row(unix_ts=None)]

Note

input in milli seconds output in milli seconds

_generate_select_expression_for_timestamp_ms_to_s(source_column, name)[source]

This Constructor is used for unix timestamps. The values are cleaned next to casting and renaming. If the values are not between 01.01.1970 and 31.12.2099, NULL will be returned. Cast to pyspark.sql.types.LongType

Example

>>> from pyspark.sql import Row
>>> from spooq2.transformer import Mapper
>>>
>>> input_df = spark.createDataFrame([
>>>     Row(time_sec=1581540839000),  # 02/12/2020 @ 8:53pm (UTC)
>>>     Row(time_sec=-4887839000),    # Invalid!
>>>     Row(time_sec=4737139200000)   # 02/12/2120 @ 12:00am (UTC)
>>> ])
>>>
>>> mapping = [("unix_ts", "time_sec", "timestamp_ms_to_s")]
>>> output_df = Mapper(mapping).transform(input_df)
>>> output_df.head(3)
[Row(unix_ts=1581540839), Row(unix_ts=None), Row(unix_ts=None)]

Note

input in milli seconds output in seconds

_generate_select_expression_for_timestamp_s_to_ms(source_column, name)[source]

This Constructor is used for unix timestamps. The values are cleaned next to casting and renaming. If the values are not between 01.01.1970 and 31.12.2099, NULL will be returned. Cast to pyspark.sql.types.LongType

Example

>>> from pyspark.sql import Row
>>> from spooq2.transformer import Mapper
>>>
>>> input_df = spark.createDataFrame([
>>>     Row(time_sec=1581540839),  # 02/12/2020 @ 8:53pm (UTC)
>>>     Row(time_sec=-4887839),    # Invalid!
>>>     Row(time_sec=4737139200)   # 02/12/2120 @ 12:00am (UTC)
>>> ])
>>>
>>> mapping = [("unix_ts", "time_sec", "timestamp_s_to_ms")]
>>> output_df = Mapper(mapping).transform(input_df)
>>> output_df.head(3)
[Row(unix_ts=1581540839000), Row(unix_ts=None), Row(unix_ts=None)]

Note

input in seconds output in milli seconds

_generate_select_expression_for_timestamp_s_to_s(source_column, name)[source]

This Constructor is used for unix timestamps. The values are cleaned next to casting and renaming. If the values are not between 01.01.1970 and 31.12.2099, NULL will be returned. Cast to pyspark.sql.types.LongType

Example

>>> from pyspark.sql import Row
>>> from spooq2.transformer import Mapper
>>>
>>> input_df = spark.createDataFrame([
>>>     Row(time_sec=1581540839),  # 02/12/2020 @ 8:53pm (UTC)
>>>     Row(time_sec=-4887839),    # Invalid!
>>>     Row(time_sec=4737139200)   # 02/12/2120 @ 12:00am (UTC)
>>> ])
>>>
>>> mapping = [("unix_ts", "time_sec", "timestamp_s_to_ms")]
>>> output_df = Mapper(mapping).transform(input_df)
>>> output_df.head(3)
[Row(unix_ts=1581540839), Row(unix_ts=None), Row(unix_ts=None)]

Note

input in seconds output in seconds

_generate_select_expression_for_StringNull(source_column, name)[source]

Used for Anonymizing. Input values will be ignored and replaced by NULL, Cast to pyspark.sql.types.StringType

Example

>>> from pyspark.sql import Row
>>> from spooq2.transformer import Mapper
>>>
>>> input_df = spark.createDataFrame(
>>>     [Row(email=u'tsivorn1@who.int'),
>>>      Row(email=u''),
>>>      Row(email=u'gisaksen4@skype.com')]
>>> )
>>>
>>> mapping = [("email", "email", "StringNull")]
>>> output_df = Mapper(mapping).transform(input_df)
>>> output_df.head(3)
[Row(email=None), Row(email=None), Row(email=None)]
_generate_select_expression_for_IntNull(source_column, name)[source]

Used for Anonymizing. Input values will be ignored and replaced by NULL, Cast to pyspark.sql.types.IntegerType

Example

>>> from pyspark.sql import Row
>>> from spooq2.transformer import Mapper
>>>
>>> input_df = spark.createDataFrame(
>>>     [Row(facebook_id=3047288),
>>>      Row(facebook_id=0),
>>>      Row(facebook_id=57815)]
>>> )
>>>
>>> mapping = [("facebook_id", "facebook_id", "IntNull")]
>>> output_df = Mapper(mapping).transform(input_df)
>>> output_df.head(3)
[Row(facebook_id=None), Row(facebook_id=None), Row(facebook_id=None)]
_generate_select_expression_for_StringBoolean(source_column, name)[source]

Used for Anonymizing. The column’s value will be replaced by “1” if it is:

  • not NULL and
  • not an empty string

Example

>>> from pyspark.sql import Row
>>> from spooq2.transformer import Mapper
>>>
>>> input_df = spark.createDataFrame(
>>>     [Row(email=u'tsivorn1@who.int'),
>>>      Row(email=u''),
>>>      Row(email=u'gisaksen4@skype.com')]
>>> )
>>>
>>> mapping = [("email", "email", "StringBoolean")]
>>> output_df = Mapper(mapping).transform(input_df)
>>> output_df.head(3)
[Row(email=u'1'), Row(email=None), Row(email=u'1')]
_generate_select_expression_for_IntBoolean(source_column, name)[source]

Used for Anonymizing. The column’s value will be replaced by 1 if it contains a non-NULL value.

Example

>>> from pyspark.sql import Row
>>> from spooq2.transformer import Mapper
>>>
>>> input_df = spark.createDataFrame(
>>>     [Row(facebook_id=3047288),
>>>      Row(facebook_id=0),
>>>      Row(facebook_id=None)]
>>> )
>>>
>>> mapping = [("facebook_id", "facebook_id", "IntBoolean")]
>>> output_df = Mapper(mapping).transform(input_df)
>>> output_df.head(3)
[Row(facebook_id=1), Row(facebook_id=1), Row(facebook_id=None)]

Note

0 (zero) or negative numbers are still considered as valid values and therefore converted to 1.

_generate_select_expression_for_TimestampMonth(source_column, name)[source]

Used for Anonymizing. Can be used to keep the age but obscure the explicit birthday. This custom datatype requires a pyspark.sql.types.TimestampType column as input. The datetime value will be set to the first day of the month.

Example

>>> from pyspark.sql import Row
>>> from datetime import datetime
>>> from spooq2.transformer import Mapper
>>>
>>> input_df = spark.createDataFrame(
>>>     [Row(birthday=datetime(2019, 2, 9, 2, 45)),
>>>      Row(birthday=None),
>>>      Row(birthday=datetime(1988, 1, 31, 8))]
>>> )
>>>
>>> mapping = [("birthday", "birthday", "TimestampMonth")]
>>> output_df = Mapper(mapping).transform(input_df)
>>> output_df.head(3)
[Row(birthday=datetime.datetime(2019, 2, 1, 0, 0)),
 Row(birthday=None),
 Row(birthday=datetime.datetime(1988, 1, 1, 0, 0))]