Mapper¶
Class¶
-
class
Mapper
(mapping)[source]¶ Bases:
spooq2.transformer.transformer.Transformer
Constructs and applies a PySpark SQL expression, based on the provided mapping.
Examples
>>> mapping = [ >>> ('id', 'data.relationships.food.data.id', 'StringType'), >>> ('message_id', 'data.id', 'StringType'), >>> ('type', 'data.relationships.food.data.type', 'StringType'), >>> ('created_at', 'elem.attributes.created_at', 'timestamp_ms_to_s'), >>> ('updated_at', 'elem.attributes.updated_at', 'timestamp_ms_to_s'), >>> ('deleted_at', 'elem.attributes.deleted_at', 'timestamp_ms_to_s'), >>> ('brand', 'elem.attributes.brand', 'StringType') >>> ] >>> transformer = Mapper(mapping=mapping)
>>> mapping = [ >>> ('id', 'data.relationships.food.data.id', 'StringType'), >>> ('updated_at', 'elem.attributes.updated_at', 'timestamp_ms_to_s'), >>> ('deleted_at', 'elem.attributes.deleted_at', 'timestamp_ms_to_s'), >>> ('name', 'elem.attributes.name', 'array') >>> ] >>> transformer = Mapper(mapping=mapping)
Parameters: mapping ( list
oftuple
containing threestr
) – This is the main parameter for this transformation. It essentially gives information about the column names for the output DataFrame, the column names (paths) from the input DataFrame, and their data types. Custom data types are also supported, which can clean, pivot, anonymize, … the data itself. Please have a look at thespooq2.transformer.mapper_custom_data_types
module for more information.Note
Let’s talk about Mappings:
The mapping should be a list of tuples which are containing all information per column.
- Column Name :
str
Sets the name of the column in the resulting output DataFrame.
- Column Name :
- Source Path / Name :
str
Points to the name of the column in the input DataFrame. If the input is a flat DataFrame, it will essentially be the column name. If it is of complex type, it will point to the path of the actual value. For example:
data.relationships.sample.data.id
, where id is the value we want.
- Source Path / Name :
- DataType :
str
DataTypes can be types from
pyspark.sql.types
, selected custom datatypes or injected, ad-hoc custom datatypes. The datatype will be interpreted as a PySpark built-in if it is a member ofpyspark.sql.types
. If it is not an importable PySpark data type, a method to construct the statement will be called by the data type’s name.
- DataType :
Note
Please see
spooq2.transformer.mapper_custom_data_types
for all available custom data types and how to inject your own.Note
Attention: Decimal is NOT SUPPORTED by Hive! Please use Double instead!
-
transform
(input_df)[source]¶ Performs a transformation on a DataFrame.
Parameters: input_df ( pyspark.sql.DataFrame
) – Input DataFrameReturns: Transformed DataFrame. Return type: pyspark.sql.DataFrame
Note
This method does only take the Input DataFrame as a parameters. All other needed parameters are defined in the initialization of the Transformator Object.
Activity Diagram¶
Custom Mapping Methods¶
This is a collection of module level methods to construct a specific PySpark DataFrame query for custom defined data types.
These methods are not meant to be called directly but via the
the Mapper
transformer.
Please see that particular class on how to apply custom data types.
For injecting your own custom data types, please have a visit to the
add_custom_data_type()
method!
-
add_custom_data_type
(function_name, func)[source]¶ Registers a custom data type in runtime to be used with the
Mapper
transformer.Example
>>> import spooq2.transformer.mapper_custom_data_types as custom_types >>> import spooq2.transformer as T >>> from pyspark.sql import Row, functions as F, types as sql_types
>>> def hello_world(source_column, name): >>> "A UDF (User Defined Function) in Python" >>> def _to_hello_world(col): >>> if not col: >>> return None >>> else: >>> return "Hello World" >>> >>> udf_hello_world = F.udf(_to_hello_world, sql_types.StringType()) >>> return udf_hello_world(source_column).alias(name) >>> >>> input_df = spark.createDataFrame( >>> [Row(hello_from=u'tsivorn1@who.int'), >>> Row(hello_from=u''), >>> Row(hello_from=u'gisaksen4@skype.com')] >>> ) >>> >>> custom_types.add_custom_data_type(function_name="hello_world", func=hello_world) >>> transformer = T.Mapper(mapping=[("hello_who", "hello_from", "hello_world")]) >>> df = transformer.transform(input_df) >>> df.show() +-----------+ | hello_who| +-----------+ |Hello World| | null| |Hello World| +-----------+
>>> def first_and_last_name(source_column, name): >>> "A PySpark SQL expression referencing multiple columns" >>> return F.concat_ws("_", source_column, F.col("attributes.last_name")).alias(name) >>> >>> custom_types.add_custom_data_type(function_name="full_name", func=first_and_last_name) >>> >>> transformer = T.Mapper(mapping=[ >>> ("first_name", "attributes.first_name", "StringType"), >>> ("last_name", "attributes.last_name", "StringType"), >>> ("full_name", "attributes.first_name", "full_name"), >>> ])
Parameters: - function_name (
str
) – The name of your custom data type - func (compatible function) – The PySpark dataframe function which will be called on a column, defined in the mapping
of the Mapper class.
Required input parameters are
source_column
andname
. Please see the note about required input parameter of custom data types for more information!
Note
Required input parameter of custom data types:
- source_column (
pyspark.sql.Column
) - This is where your logic will be applied. - The mapper transformer takes care of calling this method with the right column so you can just
handle it like an object which you would get from
df["some_attribute"]
. - name (
str
) - The name how the resulting column will be named. Nested attributes are not - supported. The Mapper transformer takes care of calling this method with the right column name.
- function_name (
-
_get_select_expression_for_custom_type
(source_column, name, data_type)[source]¶ Internal method for calling functions dynamically
-
_generate_select_expression_for_as_is
(source_column, name)[source]¶ alias for _generate_select_expression_without_casting
-
_generate_select_expression_for_keep
(source_column, name)[source]¶ alias for _generate_select_expression_without_casting
-
_generate_select_expression_for_no_change
(source_column, name)[source]¶ alias for _generate_select_expression_without_casting
-
_generate_select_expression_without_casting
(source_column, name)[source]¶ Returns a column without casting. This is especially useful if you need to keep a complex data type, like an array, list or a struct.
>>> from spooq2.transformer import Mapper >>> >>> input_df.head(3) [Row(friends=[Row(first_name=None, id=3993, last_name=None), Row(first_name=u'Ruò', id=17484, last_name=u'Trank')]), Row(friends=[]), Row(friends=[Row(first_name=u'Daphnée', id=16707, last_name=u'Lyddiard'), Row(first_name=u'Adélaïde', id=17429, last_name=u'Wisdom')])] >>> mapping = [("my_friends", "friends", "as_is")] >>> output_df = Mapper(mapping).transform(input_df) >>> output_df.head(3) [Row(my_friends=[Row(first_name=None, id=3993, last_name=None), Row(first_name=u'Ruò', id=17484, last_name=u'Trank')]), Row(my_friends=[]), Row(my_friends=[Row(first_name=u'Daphnée', id=16707, last_name=u'Lyddiard'), Row(first_name=u'Adélaïde', id=17429, last_name=u'Wisdom')])]
-
_generate_select_expression_for_json_string
(source_column, name)[source]¶ Returns a column as json compatible string. Nested hierarchies are supported. The unicode representation of a column will be returned if an error occurs.
Example
>>> from spooq2.transformer import Mapper >>> >>> input_df.head(3) [Row(friends=[Row(first_name=None, id=3993, last_name=None), Row(first_name=u'Ruò', id=17484, last_name=u'Trank')]), Row(friends=[]), Row(friends=[Row(first_name=u'Daphnée', id=16707, last_name=u'Lyddiard'), Row(first_name=u'Adélaïde', id=17429, last_name=u'Wisdom')])] >>> mapping = [("friends_json", "friends", "json_string")] >>> mapping = [("friends_json", "friends", "json_string")] >>> output_df = Mapper(mapping).transform(input_df) >>> output_df.head(3) [Row(friends_json=u'[{"first_name": null, "last_name": null, "id": 3993}, {"first_name": "Ru\u00f2", "last_name": "Trank", "id": 17484}]'), Row(friends_json=None), Row(friends_json=u'[{"first_name": "Daphn\u00e9e", "last_name": "Lyddiard", "id": 16707}, {"first_name": "Ad\u00e9la\u00efde", "last_name": "Wisdom", "id": 17429}]')]
-
_generate_select_expression_for_timestamp_ms_to_ms
(source_column, name)[source]¶ This Constructor is used for unix timestamps. The values are cleaned next to casting and renaming. If the values are not between 01.01.1970 and 31.12.2099, NULL will be returned. Cast to
pyspark.sql.types.LongType
Example
>>> from pyspark.sql import Row >>> from spooq2.transformer import Mapper >>> >>> input_df = spark.createDataFrame([ >>> Row(time_sec=1581540839000), # 02/12/2020 @ 8:53pm (UTC) >>> Row(time_sec=-4887839000), # Invalid! >>> Row(time_sec=4737139200000) # 02/12/2120 @ 12:00am (UTC) >>> ]) >>> >>> mapping = [("unix_ts", "time_sec", "timestamp_ms_to_ms")] >>> output_df = Mapper(mapping).transform(input_df) >>> output_df.head(3) [Row(unix_ts=1581540839000), Row(unix_ts=None), Row(unix_ts=None)]
Note
input in milli seconds output in milli seconds
-
_generate_select_expression_for_timestamp_ms_to_s
(source_column, name)[source]¶ This Constructor is used for unix timestamps. The values are cleaned next to casting and renaming. If the values are not between 01.01.1970 and 31.12.2099, NULL will be returned. Cast to
pyspark.sql.types.LongType
Example
>>> from pyspark.sql import Row >>> from spooq2.transformer import Mapper >>> >>> input_df = spark.createDataFrame([ >>> Row(time_sec=1581540839000), # 02/12/2020 @ 8:53pm (UTC) >>> Row(time_sec=-4887839000), # Invalid! >>> Row(time_sec=4737139200000) # 02/12/2120 @ 12:00am (UTC) >>> ]) >>> >>> mapping = [("unix_ts", "time_sec", "timestamp_ms_to_s")] >>> output_df = Mapper(mapping).transform(input_df) >>> output_df.head(3) [Row(unix_ts=1581540839), Row(unix_ts=None), Row(unix_ts=None)]
Note
input in milli seconds output in seconds
-
_generate_select_expression_for_timestamp_s_to_ms
(source_column, name)[source]¶ This Constructor is used for unix timestamps. The values are cleaned next to casting and renaming. If the values are not between 01.01.1970 and 31.12.2099, NULL will be returned. Cast to
pyspark.sql.types.LongType
Example
>>> from pyspark.sql import Row >>> from spooq2.transformer import Mapper >>> >>> input_df = spark.createDataFrame([ >>> Row(time_sec=1581540839), # 02/12/2020 @ 8:53pm (UTC) >>> Row(time_sec=-4887839), # Invalid! >>> Row(time_sec=4737139200) # 02/12/2120 @ 12:00am (UTC) >>> ]) >>> >>> mapping = [("unix_ts", "time_sec", "timestamp_s_to_ms")] >>> output_df = Mapper(mapping).transform(input_df) >>> output_df.head(3) [Row(unix_ts=1581540839000), Row(unix_ts=None), Row(unix_ts=None)]
Note
input in seconds output in milli seconds
-
_generate_select_expression_for_timestamp_s_to_s
(source_column, name)[source]¶ This Constructor is used for unix timestamps. The values are cleaned next to casting and renaming. If the values are not between 01.01.1970 and 31.12.2099, NULL will be returned. Cast to
pyspark.sql.types.LongType
Example
>>> from pyspark.sql import Row >>> from spooq2.transformer import Mapper >>> >>> input_df = spark.createDataFrame([ >>> Row(time_sec=1581540839), # 02/12/2020 @ 8:53pm (UTC) >>> Row(time_sec=-4887839), # Invalid! >>> Row(time_sec=4737139200) # 02/12/2120 @ 12:00am (UTC) >>> ]) >>> >>> mapping = [("unix_ts", "time_sec", "timestamp_s_to_ms")] >>> output_df = Mapper(mapping).transform(input_df) >>> output_df.head(3) [Row(unix_ts=1581540839), Row(unix_ts=None), Row(unix_ts=None)]
Note
input in seconds output in seconds
-
_generate_select_expression_for_StringNull
(source_column, name)[source]¶ Used for Anonymizing. Input values will be ignored and replaced by NULL, Cast to
pyspark.sql.types.StringType
Example
>>> from pyspark.sql import Row >>> from spooq2.transformer import Mapper >>> >>> input_df = spark.createDataFrame( >>> [Row(email=u'tsivorn1@who.int'), >>> Row(email=u''), >>> Row(email=u'gisaksen4@skype.com')] >>> ) >>> >>> mapping = [("email", "email", "StringNull")] >>> output_df = Mapper(mapping).transform(input_df) >>> output_df.head(3) [Row(email=None), Row(email=None), Row(email=None)]
-
_generate_select_expression_for_IntNull
(source_column, name)[source]¶ Used for Anonymizing. Input values will be ignored and replaced by NULL, Cast to
pyspark.sql.types.IntegerType
Example
>>> from pyspark.sql import Row >>> from spooq2.transformer import Mapper >>> >>> input_df = spark.createDataFrame( >>> [Row(facebook_id=3047288), >>> Row(facebook_id=0), >>> Row(facebook_id=57815)] >>> ) >>> >>> mapping = [("facebook_id", "facebook_id", "IntNull")] >>> output_df = Mapper(mapping).transform(input_df) >>> output_df.head(3) [Row(facebook_id=None), Row(facebook_id=None), Row(facebook_id=None)]
-
_generate_select_expression_for_StringBoolean
(source_column, name)[source]¶ Used for Anonymizing. The column’s value will be replaced by “1” if it is:
- not NULL and
- not an empty string
Example
>>> from pyspark.sql import Row >>> from spooq2.transformer import Mapper >>> >>> input_df = spark.createDataFrame( >>> [Row(email=u'tsivorn1@who.int'), >>> Row(email=u''), >>> Row(email=u'gisaksen4@skype.com')] >>> ) >>> >>> mapping = [("email", "email", "StringBoolean")] >>> output_df = Mapper(mapping).transform(input_df) >>> output_df.head(3) [Row(email=u'1'), Row(email=None), Row(email=u'1')]
-
_generate_select_expression_for_IntBoolean
(source_column, name)[source]¶ Used for Anonymizing. The column’s value will be replaced by 1 if it contains a non-NULL value.
Example
>>> from pyspark.sql import Row >>> from spooq2.transformer import Mapper >>> >>> input_df = spark.createDataFrame( >>> [Row(facebook_id=3047288), >>> Row(facebook_id=0), >>> Row(facebook_id=None)] >>> ) >>> >>> mapping = [("facebook_id", "facebook_id", "IntBoolean")] >>> output_df = Mapper(mapping).transform(input_df) >>> output_df.head(3) [Row(facebook_id=1), Row(facebook_id=1), Row(facebook_id=None)]
Note
0 (zero) or negative numbers are still considered as valid values and therefore converted to 1.
-
_generate_select_expression_for_TimestampMonth
(source_column, name)[source]¶ Used for Anonymizing. Can be used to keep the age but obscure the explicit birthday. This custom datatype requires a
pyspark.sql.types.TimestampType
column as input. The datetime value will be set to the first day of the month.Example
>>> from pyspark.sql import Row >>> from datetime import datetime >>> from spooq2.transformer import Mapper >>> >>> input_df = spark.createDataFrame( >>> [Row(birthday=datetime(2019, 2, 9, 2, 45)), >>> Row(birthday=None), >>> Row(birthday=datetime(1988, 1, 31, 8))] >>> ) >>> >>> mapping = [("birthday", "birthday", "TimestampMonth")] >>> output_df = Mapper(mapping).transform(input_df) >>> output_df.head(3) [Row(birthday=datetime.datetime(2019, 2, 1, 0, 0)), Row(birthday=None), Row(birthday=datetime.datetime(1988, 1, 1, 0, 0))]