Mapper

Class

class Mapper(mapping, ignore_missing_columns=False, mode='replace')[source]

Bases: spooq2.transformer.transformer.Transformer

Constructs and applies a PySpark SQL expression, based on the provided mapping.

Examples

>>> from pyspark.sql import functions as F
>>> from spooq2.transformer import Mapper
>>>
>>> mapping = [
>>>     ("id",            "data.relationships.food.data.id",  "StringType"),
>>>     ("version",       "data.version",                     "extended_string_to_int"),
>>>     ("type",          "elem.attributes.type",             "StringType"),
>>>     ("created_at",    "elem.attributes.created_at",       "extended_string_to_timestamp"),
>>>     ("created_on",    "elem.attributes.created_at",       "extended_string_to_date"),
>>>     ("process_date",  F.current_timestamp(),              "DateType"),
>>> ]
>>> mapper = Mapper(mapping=mapping)
>>> mapper.transform(input_df).printSchema()
root
 |-- id: string (nullable = true)
 |-- version: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- created_at: timestamp (nullable = true)
 |-- created_on: date (nullable = true)
 |-- process_date: date (nullable = false)
Parameters:
  • mapping (list of tuple containing three str or Column or functions) – This is the main parameter for this transformation. It gives information about the column names for the output DataFrame, the column names (paths) from the input DataFrame, and their data types. Custom data types are also supported, which can clean, pivot, anonymize, … the data itself. Please have a look at the spooq2.transformer.mapper_custom_data_types module for more information.
  • ignore_missing_columns (bool, Defaults to False) – Specifies if the mapping transformation should use NULL if a referenced input column is missing in the provided DataFrame. If set to False, it will raise an exception.
  • mode (str, Defaults to “replace”) –

    Defines weather the mapping should fully replace the schema of the input DataFrame or just add to it. Following modes are supported:

    • replace
      The output schema is the same as the provided mapping. => output schema: new columns
    • append
      The columns provided in the mapping are added at the end of the input schema. If a column already exists in the input DataFrame, its position is kept. => output schema: input columns + new columns
    • prepend
      The columns provided in the mapping are added at the beginning of the input schema. If a column already exists in the input DataFrame, its position is kept. => output schema: new columns + input columns

Note

Let’s talk about Mappings:

The mapping should be a list of tuples that contain all necessary information per column.

  • Column Name: str
    Sets the name of the column in the resulting output DataFrame.
  • Source Path / Name / Column / Function: str or Column or functions
    Points to the name of the column in the input DataFrame. If the input is a flat DataFrame, it will essentially be the column name. If it is of complex type, it will point to the path of the actual value. For example: data.relationships.sample.data.id, where id is the value we want. It is also possible to directly pass a PySpark Column which will get evaluated. This can contain arbitrary logic supported by Spark. For example: F.current_date() or F.when(F.col("size") == 180, F.lit("tall")).otherwise(F.lit("tiny")).
  • DataType: str or DataType
    DataTypes can be types from pyspark.sql.types, selected custom datatypes or injected, ad-hoc custom datatypes. The datatype will be interpreted as a PySpark built-in if it is a member of pyspark.sql.types module. If it is not an importable PySpark data type, a method to construct the statement will be called by the data type’s name.

Note

The available input columns can vary from batch to batch if you use schema inference (f.e. on json data) for the extraction. Ignoring missing columns on the input DataFrame is highly encouraged in this case. Although, if you have tight control over the structure of the extracted DataFrame, setting ignore_missing_columns to True is advised as it can uncover typos and bugs.

Note

Please see spooq2.transformer.mapper_custom_data_types for all available custom data types and how to inject your own.

Note

Attention: Decimal is NOT SUPPORTED by Hive! Please use Double instead!

transform(input_df)[source]

Performs a transformation on a DataFrame.

Parameters:input_df (pyspark.sql.DataFrame) – Input DataFrame
Returns:Transformed DataFrame.
Return type:pyspark.sql.DataFrame

Note

This method does only take the Input DataFrame as a parameters. All other needed parameters are defined in the initialization of the Transformator Object.

Activity Diagram

# todo: update to new logic

@startuml
start

skinparam monochrome true
skinparam defaultFontname Bitstream Vera Sans Mono
' skinparam defaultFontSize 18

:**""mapping""**:
- **""source_column""**
- **""name""**
- **""data_type""**;
while (unprocessed definitions in **""mapping""**?) is (yes)
  if (**""data_type""** is Spark built-in?) then (yes)
    if (**""source_column""**\n is missing?) then (yes)
      :Value = **""None""**;
    else (no)
      :Value = **""source_column""**;
    endif
    :rename to **""name""**;
    :cast as **""data_type""**;
  else (no)
    if (**""source_column""**\n is missing?) then (yes)
      :Value = **""None""**;
      :rename to **""name""**;
    else (no)
      :Value = **""source_column""**;
      :_get_select_expression_↩
       ↪for_custom_type(
          **""source_column""**, 
          **""name""**, 
          **""data_type""**
      );
    endif
  endif
  :add to global select expression;
endwhile (no)
:return global select expression;

stop
@enduml

Available Custom Mapping Methods

as_is / keep / no_change / without_casting (aliases)

Only renaming applied. No casting / transformation.

unix_timestamp_ms_to_spark_timestamp

unix timestamp in ms (LongType) -> timestamp (TimestampType)

extended_string_to_int

Number (IntegerType, FloatType, StringType) -> Number (IntegerType)

extended_string_to_long

Number (IntegerType, FloatType, StringType) -> Number (LongType)

extended_string_to_float

Number (IntegerType, FloatType, StringType) -> Number (FloatType)

extended_string_to_double

Number (IntegerType, FloatType, StringType) -> Number (DoubleType)

extended_string_to_boolean

Number (IntegerType, FloatType, StringType, BooleanType) -> boolean (BooleanType)

extended_string_to_timestamp

unix timestamp in s or text (IntegerType, FloatType, StringType) -> timestamp (TimestampType)

extended_string_to_date

unix timestamp in s or text (IntegerType, FloatType, StringType) -> date (DateType)

extended_string_unix_timestamp_ms_to_timestamp

unix timestamp in ms or text (IntegerType, FloatType, StringType) -> timestamp (TimestampType)

extended_string_unix_timestamp_ms_to_date

unix timestamp in ms or text (IntegerType, FloatType, StringType) -> date (DateType)

meters_to_cm

Number (IntegerType, FloatType, StringType) -> Number * 100 (IntegerType)

has_value

Any data -> False if NULL or empty string, otherwise True (BooleanType)

json_string

Any input data type will be returned as json (StringType). Complex data types are supported!

timestamp_ms_to_ms

Unix timestamp in ms (LongType) -> Unix timestamp in ms (LongType) if timestamp is between 1970 and 2099

timestamp_ms_to_s

Unix timestamp in ms (LongType) -> Unix timestamp in s (LongType) if timestamp is between 1970 and 2099

timestamp_s_to_ms

Unix timestamp in s (LongType) -> Unix timestamp in ms (LongType) if timestamp is between 1970 and 2099

timestamp_s_to_s

Unix timestamp in s (LongType) -> Unix timestamp in s (LongType) if timestamp is between 1970 and 2099

StringNull

Any data -> NULL (StringType)

IntNull

Any data -> NULL (IntegerType)

StringBoolean

Any data -> “1” (StringType) if source columns contains any valid data, otherwise NULL

IntBoolean

Any data -> 1 (IntegerType) if source columns contains any valid data, otherwise NULL

TimestampMonth

Timestamp (TimestampType / DateType) -> 1st day of the input value’s month (TimestampType)

Custom Mapping Methods Details

This is a collection of module level methods to construct a specific PySpark DataFrame query for custom defined data types.

These methods are not meant to be called directly but via the the Mapper transformer. Please see that particular class on how to apply custom data types.

For injecting your own custom data types, please have a visit to the add_custom_data_type() method!

add_custom_data_type(function_name, func)[source]

Registers a custom data type in runtime to be used with the Mapper transformer.

Example

>>> import spooq2.transformer.mapper_custom_data_types as custom_types
>>> import spooq2.transformer as T
>>> from pyspark.sql import Row, functions as F, types as T
>>> def hello_world(source_column, name):
>>>     "A UDF (User Defined Function) in Python"
>>>     def _to_hello_world(col):
>>>         if not col:
>>>             return None
>>>         else:
>>>             return "Hello World"
>>>
>>>     udf_hello_world = F.udf(_to_hello_world, T.StringType())
>>>     return udf_hello_world(source_column).alias(name)
>>>
>>> input_df = spark.createDataFrame(
>>>     [Row(hello_from=u'tsivorn1@who.int'),
>>>      Row(hello_from=u''),
>>>      Row(hello_from=u'gisaksen4@skype.com')]
>>> )
>>>
>>> custom_types.add_custom_data_type(function_name="hello_world", func=hello_world)
>>> transformer = T.Mapper(mapping=[("hello_who", "hello_from", "hello_world")])
>>> df = transformer.transform(input_df)
>>> df.show()
+-----------+
|  hello_who|
+-----------+
|Hello World|
|       null|
|Hello World|
+-----------+
>>> def first_and_last_name(source_column, name):
>>>     "A PySpark SQL expression referencing multiple columns"
>>>     return F.concat_ws("_", source_column, F.col("attributes.last_name")).alias(name)
>>>
>>> custom_types.add_custom_data_type(function_name="full_name", func=first_and_last_name)
>>>
>>> transformer = T.Mapper(mapping=[
>>>     ("first_name", "attributes.first_name", "StringType"),
>>>     ("last_name",  "attributes.last_name",  "StringType"),
>>>     ("full_name",  "attributes.first_name", "full_name"),
>>> ])
Parameters:
  • function_name (str) – The name of your custom data type
  • func (compatible function) – The PySpark dataframe function which will be called on a column, defined in the mapping of the Mapper class. Required input parameters are source_column and name. Please see the note about required input parameter of custom data types for more information!

Note

Required input parameter of custom data types:

source_column (pyspark.sql.Column) - This is where your logic will be applied.
The mapper transformer takes care of calling this method with the right column so you can just handle it like an object which you would get from df["some_attribute"].
name (str) - The name how the resulting column will be named. Nested attributes are not
supported. The Mapper transformer takes care of calling this method with the right column name.
_get_select_expression_for_custom_type(source_column, name, data_type)[source]

Internal method for calling functions dynamically

_generate_select_expression_for_as_is(source_column, name)[source]

alias for _generate_select_expression_without_casting

_generate_select_expression_for_keep(source_column, name)[source]

alias for _generate_select_expression_without_casting

_generate_select_expression_for_no_change(source_column, name)[source]

alias for _generate_select_expression_without_casting

_generate_select_expression_without_casting(source_column, name)[source]

Returns a column without casting. This is especially useful if you need to keep a complex data type, like an array, list or a struct.

>>> from spooq2.transformer import Mapper
>>>
>>> input_df.head(3)
[Row(friends=[Row(first_name=None, id=3993, last_name=None), Row(first_name=u'Ruò', id=17484, last_name=u'Trank')]),
 Row(friends=[]),
 Row(friends=[Row(first_name=u'Daphnée', id=16707, last_name=u'Lyddiard'), Row(first_name=u'Adélaïde', id=17429, last_name=u'Wisdom')])]
>>> mapping = [("my_friends", "friends", "as_is")]
>>> output_df = Mapper(mapping).transform(input_df)
>>> output_df.head(3)
[Row(my_friends=[Row(first_name=None, id=3993, last_name=None), Row(first_name=u'Ruò', id=17484, last_name=u'Trank')]),
 Row(my_friends=[]),
 Row(my_friends=[Row(first_name=u'Daphnée', id=16707, last_name=u'Lyddiard'), Row(first_name=u'Adélaïde', id=17429, last_name=u'Wisdom')])]
_generate_select_expression_for_json_string(source_column, name)[source]

Returns a column as json compatible string. Nested hierarchies are supported. The unicode representation of a column will be returned if an error occurs.

Example

>>> from spooq2.transformer import Mapper
>>>
>>> input_df.head(3)
[Row(friends=[Row(first_name=None, id=3993, last_name=None), Row(first_name=u'Ruò', id=17484, last_name=u'Trank')]),
 Row(friends=[]),
 Row(friends=[Row(first_name=u'Daphnée', id=16707, last_name=u'Lyddiard'), Row(first_name=u'Adélaïde', id=17429, last_name=u'Wisdom')])]    >>> mapping = [("friends_json", "friends", "json_string")]
>>> mapping = [("friends_json", "friends", "json_string")]
>>> output_df = Mapper(mapping).transform(input_df)
>>> output_df.head(3)
[Row(friends_json=u'[{"first_name": null, "last_name": null, "id": 3993}, {"first_name": "Ru\u00f2", "last_name": "Trank", "id": 17484}]'),
 Row(friends_json=None),
 Row(friends_json=u'[{"first_name": "Daphn\u00e9e", "last_name": "Lyddiard", "id": 16707}, {"first_name": "Ad\u00e9la\u00efde", "last_name": "Wisdom", "id": 17429}]')]
_generate_select_expression_for_timestamp_ms_to_ms(source_column, name)[source]

This Constructor is used for unix timestamps. The values are cleaned next to casting and renaming. If the values are not between 01.01.1970 and 31.12.2099, NULL will be returned. Cast to pyspark.sql.types.LongType

Example

>>> from pyspark.sql import Row
>>> from spooq2.transformer import Mapper
>>>
>>> input_df = spark.createDataFrame([
>>>     Row(time_sec=1581540839000),  # 02/12/2020 @ 8:53pm (UTC)
>>>     Row(time_sec=-4887839000),    # Invalid!
>>>     Row(time_sec=4737139200000)   # 02/12/2120 @ 12:00am (UTC)
>>> ])
>>>
>>> mapping = [("unix_ts", "time_sec", "timestamp_ms_to_ms")]
>>> output_df = Mapper(mapping).transform(input_df)
>>> output_df.head(3)
[Row(unix_ts=1581540839000), Row(unix_ts=None), Row(unix_ts=None)]

Note

input in milli seconds output in milli seconds

_generate_select_expression_for_timestamp_ms_to_s(source_column, name)[source]

This Constructor is used for unix timestamps. The values are cleaned next to casting and renaming. If the values are not between 01.01.1970 and 31.12.2099, NULL will be returned. Cast to pyspark.sql.types.LongType

Example

>>> from pyspark.sql import Row
>>> from spooq2.transformer import Mapper
>>>
>>> input_df = spark.createDataFrame([
>>>     Row(time_sec=1581540839000),  # 02/12/2020 @ 8:53pm (UTC)
>>>     Row(time_sec=-4887839000),    # Invalid!
>>>     Row(time_sec=4737139200000)   # 02/12/2120 @ 12:00am (UTC)
>>> ])
>>>
>>> mapping = [("unix_ts", "time_sec", "timestamp_ms_to_s")]
>>> output_df = Mapper(mapping).transform(input_df)
>>> output_df.head(3)
[Row(unix_ts=1581540839), Row(unix_ts=None), Row(unix_ts=None)]

Note

input in milli seconds output in seconds

_generate_select_expression_for_timestamp_s_to_ms(source_column, name)[source]

This Constructor is used for unix timestamps. The values are cleaned next to casting and renaming. If the values are not between 01.01.1970 and 31.12.2099, NULL will be returned. Cast to pyspark.sql.types.LongType

Example

>>> from pyspark.sql import Row
>>> from spooq2.transformer import Mapper
>>>
>>> input_df = spark.createDataFrame([
>>>     Row(time_sec=1581540839),  # 02/12/2020 @ 8:53pm (UTC)
>>>     Row(time_sec=-4887839),    # Invalid!
>>>     Row(time_sec=4737139200)   # 02/12/2120 @ 12:00am (UTC)
>>> ])
>>>
>>> mapping = [("unix_ts", "time_sec", "timestamp_s_to_ms")]
>>> output_df = Mapper(mapping).transform(input_df)
>>> output_df.head(3)
[Row(unix_ts=1581540839000), Row(unix_ts=None), Row(unix_ts=None)]

Note

input in seconds output in milli seconds

_generate_select_expression_for_timestamp_s_to_s(source_column, name)[source]

This Constructor is used for unix timestamps. The values are cleaned next to casting and renaming. If the values are not between 01.01.1970 and 31.12.2099, NULL will be returned. Cast to pyspark.sql.types.LongType

Example

>>> from pyspark.sql import Row
>>> from spooq2.transformer import Mapper
>>>
>>> input_df = spark.createDataFrame([
>>>     Row(time_sec=1581540839),  # 02/12/2020 @ 8:53pm (UTC)
>>>     Row(time_sec=-4887839),    # Invalid!
>>>     Row(time_sec=4737139200)   # 02/12/2120 @ 12:00am (UTC)
>>> ])
>>>
>>> mapping = [("unix_ts", "time_sec", "timestamp_s_to_ms")]
>>> output_df = Mapper(mapping).transform(input_df)
>>> output_df.head(3)
[Row(unix_ts=1581540839), Row(unix_ts=None), Row(unix_ts=None)]

Note

input in seconds output in seconds

_generate_select_expression_for_StringNull(source_column, name)[source]

Used for Anonymizing. Input values will be ignored and replaced by NULL, Cast to pyspark.sql.types.StringType

Example

>>> from pyspark.sql import Row
>>> from spooq2.transformer import Mapper
>>>
>>> input_df = spark.createDataFrame(
>>>     [Row(email=u'tsivorn1@who.int'),
>>>      Row(email=u''),
>>>      Row(email=u'gisaksen4@skype.com')]
>>> )
>>>
>>> mapping = [("email", "email", "StringNull")]
>>> output_df = Mapper(mapping).transform(input_df)
>>> output_df.head(3)
[Row(email=None), Row(email=None), Row(email=None)]
_generate_select_expression_for_IntNull(source_column, name)[source]

Used for Anonymizing. Input values will be ignored and replaced by NULL, Cast to pyspark.sql.types.IntegerType

Example

>>> from pyspark.sql import Row
>>> from spooq2.transformer import Mapper
>>>
>>> input_df = spark.createDataFrame(
>>>     [Row(facebook_id=3047288),
>>>      Row(facebook_id=0),
>>>      Row(facebook_id=57815)]
>>> )
>>>
>>> mapping = [("facebook_id", "facebook_id", "IntNull")]
>>> output_df = Mapper(mapping).transform(input_df)
>>> output_df.head(3)
[Row(facebook_id=None), Row(facebook_id=None), Row(facebook_id=None)]
_generate_select_expression_for_StringBoolean(source_column, name)[source]

Used for Anonymizing. The column’s value will be replaced by “1” if it is:

  • not NULL and
  • not an empty string

Example

>>> from pyspark.sql import Row
>>> from spooq2.transformer import Mapper
>>>
>>> input_df = spark.createDataFrame(
>>>     [Row(email=u'tsivorn1@who.int'),
>>>      Row(email=u''),
>>>      Row(email=u'gisaksen4@skype.com')]
>>> )
>>>
>>> mapping = [("email", "email", "StringBoolean")]
>>> output_df = Mapper(mapping).transform(input_df)
>>> output_df.head(3)
[Row(email=u'1'), Row(email=None), Row(email=u'1')]
_generate_select_expression_for_IntBoolean(source_column, name)[source]

Used for Anonymizing. The column’s value will be replaced by 1 if it contains a non-NULL value.

Example

>>> from pyspark.sql import Row
>>> from spooq2.transformer import Mapper
>>>
>>> input_df = spark.createDataFrame(
>>>     [Row(facebook_id=3047288),
>>>      Row(facebook_id=0),
>>>      Row(facebook_id=None)]
>>> )
>>>
>>> mapping = [("facebook_id", "facebook_id", "IntBoolean")]
>>> output_df = Mapper(mapping).transform(input_df)
>>> output_df.head(3)
[Row(facebook_id=1), Row(facebook_id=1), Row(facebook_id=None)]

Note

0 (zero) or negative numbers are still considered as valid values and therefore converted to 1.

_generate_select_expression_for_TimestampMonth(source_column, name)[source]

Used for Anonymizing. Can be used to keep the age but obscure the explicit birthday. This custom datatype requires a pyspark.sql.types.TimestampType column as input. The datetime value will be set to the first day of the month.

Example

>>> from pyspark.sql import Row
>>> from datetime import datetime
>>> from spooq2.transformer import Mapper
>>>
>>> input_df = spark.createDataFrame(
>>>     [Row(birthday=datetime(2019, 2, 9, 2, 45)),
>>>      Row(birthday=None),
>>>      Row(birthday=datetime(1988, 1, 31, 8))]
>>> )
>>>
>>> mapping = [("birthday", "birthday", "TimestampMonth")]
>>> output_df = Mapper(mapping).transform(input_df)
>>> output_df.head(3)
[Row(birthday=datetime.datetime(2019, 2, 1, 0, 0)),
 Row(birthday=None),
 Row(birthday=datetime.datetime(1988, 1, 1, 0, 0))]
_generate_select_expression_for_meters_to_cm(source_column, name)[source]

Convert meters to cm and cast the result to an IntegerType.

Example

>>> from pyspark.sql import Row
>>> from spooq2.transformer import Mapper
>>>
>>> input_df = spark.createDataFrame(
>>>     [Row(size_in_m=1.80),
>>>      Row(size_in_m=1.65),
>>>      Row(size_in_m=2.05)]
>>> )
>>>
>>> mapping = [("size_in_cm", "size_in_m", "meters_to_cm")]
>>> output_df = Mapper(mapping).transform(input_df)
>>> output_df.head(3)
[Row(size_in_cm=180),
 Row(size_in_cm=165),
 Row(size_in_cm=205)]
_generate_select_expression_for_has_value(source_column, name)[source]
Returns True if the source_column is
  • not NULL and
  • not “” (empty string)

otherwise it returns False

Warning

This means that it will return True for values which would indicate a False value. Like “false” or 0!!!

Example

>>> from pyspark.sql import Row
>>> from spooq2.transformer import Mapper
>>>
>>> input_df = spark.createDataFrame(
>>>     [Row(input_key=1.80),
>>>      Row(input_key=None),
>>>      Row(input_key="some text"),
>>>      Row(input_key="")]
>>> )
>>>
>>> mapping = [("input_key", "result", "has_value")]
>>> output_df = Mapper(mapping).transform(input_df)
>>> output_df.head(4)
[Row(result=True),
 Row(result=False),
 Row(result=True),
 Row(result=False)]
_generate_select_expression_for_unix_timestamp_ms_to_spark_timestamp(source_column, name)[source]

Convert unix timestamps in milliseconds to a Spark TimeStampType. It is assumed that the timezone is already set to UTC in spark / java to avoid implicit timezone conversions.

Example

>>> from pyspark.sql import Row
>>> from spooq2.transformer import Mapper
>>>
>>> input_df = spark.createDataFrame(
>>>     [Row(unix_timestamp_in_ms=1591627696951),
>>>      Row(unix_timestamp_in_ms=1596812952000),
>>>      Row(unix_timestamp_in_ms=946672200000)]
>>> )
>>>
>>> mapping = [("spark_timestamp", "unix_timestamp_in_ms", "unix_timestamp_ms_to_spark_timestamp")]
>>> output_df = Mapper(mapping).transform(input_df)
>>> output_df.head(3)
[Row(spark_timestamp=datetime.datetime(2020, 6, 8, 16, 48, 16, 951000)),
 Row(spark_timestamp=datetime.datetime(2020, 8, 7, 17, 9, 12)),
 Row(spark_timestamp=datetime.datetime(1999, 12, 31, 21, 30))]
_generate_select_expression_for_extended_string_to_int(source_column, name)[source]

More robust conversion from StringType to IntegerType. Is able to additionally handle (compared to implicit Spark conversion):

  • Preceding whitespace
  • Trailing whitespace
  • Preceeding and trailing whitespace
  • underscores as thousand separators

Hint

Please have a look at the tests to get a better feeling how it behaves under tests/unit/transformer/test_mapper_custom_data_types.py::TestExtendedStringConversions and tests/data/test_fixtures/mapper_custom_data_types_fixtures.py

Example

>>> from spooq2.transformer import Mapper
>>>
>>> input_df.head(3)
[Row(input_string="  123456 "),
 Row(input_string="Hello"),
 Row(input_string="123_456")]
>>> mapping = [("output_value", "input_string", "extended_string_to_int")]
>>> output_df = Mapper(mapping).transform(input_df)
>>> output_df.head(3)
[Row(input_string=123456),
 Row(input_string=None),
 Row(input_string=123456)]
_generate_select_expression_for_extended_string_to_long(source_column, name)[source]

More robust conversion from StringType to LongType. Is able to additionally handle (compared to implicit Spark conversion):

  • Preceding whitespace
  • Trailing whitespace
  • Preceeding and trailing whitespace
  • underscores as thousand separators

Hint

Please have a look at the tests to get a better feeling how it behaves under tests/unit/transformer/test_mapper_custom_data_types.py::TestExtendedStringConversions and tests/data/test_fixtures/mapper_custom_data_types_fixtures.py

Example

>>> from spooq2.transformer import Mapper
>>>
>>> input_df.head(3)
[Row(input_string="  21474836470 "),
 Row(input_string="Hello"),
 Row(input_string="21_474_836_470")]
>>> mapping = [("output_value", "input_string", "extended_string_to_long")]
>>> output_df = Mapper(mapping).transform(input_df)
>>> output_df.head(3)
[Row(input_string=21474836470),
 Row(input_string=None),
 Row(input_string=21474836470)]
_generate_select_expression_for_extended_string_to_float(source_column, name)[source]

More robust conversion from StringType to FloatType. Is able to additionally handle (compared to implicit Spark conversion):

  • Preceding whitespace
  • Trailing whitespace
  • Preceeding and trailing whitespace
  • underscores as thousand separators

Hint

Please have a look at the tests to get a better feeling how it behaves under tests/unit/transformer/test_mapper_custom_data_types.py::TestExtendedStringConversions and tests/data/test_fixtures/mapper_custom_data_types_fixtures.py

Example

>>> from spooq2.transformer import Mapper
>>>
>>> input_df.head(3)
[Row(input_string="  836470.819 "),
 Row(input_string="Hello"),
 Row(input_string="836_470.819")]
>>> mapping = [("output_value", "input_string", "extended_string_to_float")]
>>> output_df = Mapper(mapping).transform(input_df)
>>> output_df.head(3)
[Row(input_string=836470.819),
 Row(input_string=None),
 Row(input_string=836470.819)]
_generate_select_expression_for_extended_string_to_double(source_column, name)[source]

More robust conversion from StringType to DoubleType. Is able to additionally handle (compared to implicit Spark conversion):

  • Preceding whitespace
  • Trailing whitespace
  • Preceeding and trailing whitespace
  • underscores as thousand separators

Hint

Please have a look at the tests to get a better feeling how it behaves under tests/unit/transformer/test_mapper_custom_data_types.py::TestExtendedStringConversions and tests/data/test_fixtures/mapper_custom_data_types_fixtures.py

Example

>>> from spooq2.transformer import Mapper
>>>
>>> input_df.head(3)
[Row(input_string="  21474838464.70 "),
 Row(input_string="Hello"),
 Row(input_string="21_474_838_464.70")]
>>> mapping = [("output_value", "input_string", "extended_string_to_double")]
>>> output_df = Mapper(mapping).transform(input_df)
>>> output_df.head(3)
[Row(input_string=21474838464.7),
 Row(input_string=None),
 Row(input_string=21474838464.70)]
_generate_select_expression_for_extended_string_to_boolean(source_column, name)[source]

More robust conversion from StringType to BooleanType. Is able to additionally handle (compared to implicit Spark conversion):

  • Preceding whitespace
  • Trailing whitespace
  • Preceeding and trailing whitespace

Warning

This does not handle numbers (cast as string) the same way as numbers (cast as number) to boolean conversion! F.e.

  • 100 to boolean => True
  • “100” to extended_string_to_boolean => False
  • “100” to boolean => False

Hint

Please have a look at the tests to get a better feeling how it behaves under tests/unit/transformer/test_mapper_custom_data_types.py::TestExtendedStringConversions and tests/data/test_fixtures/mapper_custom_data_types_fixtures.py

Example

>>> from spooq2.transformer import Mapper
>>>
>>> input_df.head(3)
[Row(input_string="  true "),
 Row(input_string="0"),
 Row(input_string="y")]
>>> mapping = [("output_value", "input_string", "extended_string_to_boolean")]
>>> output_df = Mapper(mapping).transform(input_df)
>>> output_df.head(3)
[Row(input_string=True),
 Row(input_string=False),
 Row(input_string=True)]
_generate_select_expression_for_extended_string_to_timestamp(source_column, name)[source]

More robust conversion from StringType to TimestampType. It is assumed that the timezone is already set to UTC in spark / java to avoid implicit timezone conversions.

The conversion can handle unix timestamps in seconds and in milliseconds:
  • Timestamps in the range [-MAX_TIMESTAMP_S, MAX_TIMESTAMP_S] are treated as seconds
  • Timestamps in the range [-inf, -MAX_TIMESTAMP_S) and (MAX_TIMESTAMP_S, inf] are treated as milliseconds
  • There is a time interval (1970-01-01 +- ~2.5 months)where we can not distinguish correctly between s and ms (e.g. 3974400000 would be treated as seconds (2095-12-11T00:00:00) as the value is smaller than MAX_TIMESTAMP_S, but it could also be a valid date in Milliseconds (1970-02-16T00:00:00)

Is able to additionally handle (compared to implicit Spark conversion): * Preceding whitespace * Trailing whitespace * Preceeding and trailing whitespace

Hint

Please have a look at the tests to get a better feeling how it behaves under tests/unit/transformer/test_mapper_custom_data_types.py::TestExtendedStringConversions and tests/data/test_fixtures/mapper_custom_data_types_fixtures.py

Example

>>> from spooq2.transformer import Mapper
>>>
>>> input_df.head(3)
[Row(input_string="2020-08-12T12:43:14+0000"),
 Row(input_string="1597069446"),
 Row(input_string="2020-08-12")]
>>> mapping = [("output_value", "input_string", "extended_string_to_timestamp")]
>>> output_df = Mapper(mapping).transform(input_df)
>>> output_df.head(3)
[Row(input_string=datetime.datetime(2020, 8, 12, 12, 43, 14)),
 Row(input_string=datetime.datetime(2020, 8, 10, 14, 24, 6)),
 Row(input_string=datetime.datetime(2020, 8, 12, 0, 0, 0))]
_generate_select_expression_for_extended_string_to_date(source_column, name)[source]

More robust conversion from StringType to DateType. It is assumed that the timezone is already set to UTC in spark / java to avoid implicit timezone conversions.

The conversion can handle unix timestamps in seconds and in milliseconds:
  • Timestamps in the range [-MAX_TIMESTAMP_S, MAX_TIMESTAMP_S] are treated as seconds
  • Timestamps in the range [-inf, -MAX_TIMESTAMP_S) and (MAX_TIMESTAMP_S, inf] are treated as milliseconds
  • There is a time interval (1970-01-01 +- ~2.5 months)where we can not distinguish correctly between s and ms (e.g. 3974400000 would be treated as seconds (2095-12-11T00:00:00) as the value is smaller than MAX_TIMESTAMP_S, but it could also be a valid date in Milliseconds (1970-02-16T00:00:00)

Is able to additionally handle (compared to implicit Spark conversion): * Preceding whitespace * Trailing whitespace * Preceeding and trailing whitespace

Hint

Please have a look at the tests to get a better feeling how it behaves under tests/unit/transformer/test_mapper_custom_data_types.py::TestExtendedStringConversions and tests/data/test_fixtures/mapper_custom_data_types_fixtures.py

Example

>>> from spooq2.transformer import Mapper
>>>
>>> input_df.head(3)
[Row(input_string="2020-08-12T12:43:14+0000"),
 Row(input_string="1597069446"),
 Row(input_string="2020-08-12")]
>>> mapping = [("output_value", "input_string", "extended_string_to_date")]
>>> output_df = Mapper(mapping).transform(input_df)
>>> output_df.head(3)
[Row(input_string=datetime.datetime(2020, 8, 12)),
 Row(input_string=datetime.datetime(2020, 8, 10)),
 Row(input_string=datetime.datetime(2020, 8, 12))]
_generate_select_expression_for_extended_string_unix_timestamp_ms_to_timestamp(source_column, name)[source]

More robust conversion from StringType to TimestampType. It is assumed that the timezone is already set to UTC in spark / java to avoid implicit timezone conversions. Is able to additionally handle (compared to implicit Spark conversion):

  • Unix timestamps in milliseconds
  • Preceding whitespace
  • Trailing whitespace
  • Preceeding and trailing whitespace

Hint

Please have a look at the tests to get a better feeling how it behaves under tests/unit/transformer/test_mapper_custom_data_types.py::TestExtendedStringConversions and tests/data/test_fixtures/mapper_custom_data_types_fixtures.py

Example

>>> from spooq2.transformer import Mapper
>>>
>>> input_df.head(3)
[Row(input_string="2020-08-12T12:43:14+0000"),
 Row(input_string="1597069446000"),
 Row(input_string="2020-08-12")]
>>> mapping = [("output_value", "input_string", "extended_string_to_timestamp")]
>>> output_df = Mapper(mapping).transform(input_df)
>>> output_df.head(3)
[Row(input_string=datetime.datetime(2020, 8, 12, 12, 43, 14)),
 Row(input_string=datetime.datetime(2020, 8, 10, 14, 24, 6)),
 Row(input_string=datetime.datetime(2020, 8, 12, 0, 0, 0))]
_generate_select_expression_for_extended_string_unix_timestamp_ms_to_date(source_column, name)[source]

More robust conversion from StringType to DateType. It is assumed that the timezone is already set to UTC in spark / java to avoid implicit timezone conversions and that unix timestamps are in milli seconds

Hint

Please have a look at the tests to get a better feeling how it behaves under tests/unit/transformer/test_mapper_custom_data_types.py::TestExtendedStringConversions and tests/data/test_fixtures/mapper_custom_data_types_fixtures.py

Example

>>> from spooq2.transformer import Mapper
>>>
>>> input_df.head(3)
[Row(input_string="2020-08-12T12:43:14+0000"),
 Row(input_string="1597069446000"),
 Row(input_string="2020-08-12")]
>>> mapping = [("output_value", "input_string", "extended_string_to_date")]
>>> output_df = Mapper(mapping).transform(input_df)
>>> output_df.head(3)
[Row(input_string=datetime.datetime(2020, 8, 12)),
 Row(input_string=datetime.datetime(2020, 8, 10)),
 Row(input_string=datetime.datetime(2020, 8, 12))]