Mapper¶
Class¶
-
class
Mapper
(mapping, ignore_missing_columns=False, mode='replace')[source]¶ Bases:
spooq2.transformer.transformer.Transformer
Constructs and applies a PySpark SQL expression, based on the provided mapping.
Examples
>>> from pyspark.sql import functions as F >>> from spooq2.transformer import Mapper >>> >>> mapping = [ >>> ("id", "data.relationships.food.data.id", "StringType"), >>> ("version", "data.version", "extended_string_to_int"), >>> ("type", "elem.attributes.type", "StringType"), >>> ("created_at", "elem.attributes.created_at", "extended_string_to_timestamp"), >>> ("created_on", "elem.attributes.created_at", "extended_string_to_date"), >>> ("process_date", F.current_timestamp(), "DateType"), >>> ] >>> mapper = Mapper(mapping=mapping) >>> mapper.transform(input_df).printSchema() root |-- id: string (nullable = true) |-- version: integer (nullable = true) |-- type: string (nullable = true) |-- created_at: timestamp (nullable = true) |-- created_on: date (nullable = true) |-- process_date: date (nullable = false)
Parameters: - mapping (
list
oftuple
containing threestr
orColumn
orfunctions
) – This is the main parameter for this transformation. It gives information about the column names for the output DataFrame, the column names (paths) from the input DataFrame, and their data types. Custom data types are also supported, which can clean, pivot, anonymize, … the data itself. Please have a look at thespooq2.transformer.mapper_custom_data_types
module for more information. - ignore_missing_columns (
bool
, Defaults to False) – Specifies if the mapping transformation should use NULL if a referenced input column is missing in the provided DataFrame. If set to False, it will raise an exception. - mode (
str
, Defaults to “replace”) –Defines weather the mapping should fully replace the schema of the input DataFrame or just add to it. Following modes are supported:
- replace
- The output schema is the same as the provided mapping. => output schema: new columns
- append
- The columns provided in the mapping are added at the end of the input schema. If a column already exists in the input DataFrame, its position is kept. => output schema: input columns + new columns
- prepend
- The columns provided in the mapping are added at the beginning of the input schema. If a column already exists in the input DataFrame, its position is kept. => output schema: new columns + input columns
Note
Let’s talk about Mappings:
The mapping should be a list of tuples that contain all necessary information per column.
- Column Name:
str
- Sets the name of the column in the resulting output DataFrame.
- Column Name:
- Source Path / Name / Column / Function:
str
orColumn
orfunctions
- Points to the name of the column in the input DataFrame. If the input
is a flat DataFrame, it will essentially be the column name. If it is of complex
type, it will point to the path of the actual value. For example:
data.relationships.sample.data.id
, where id is the value we want. It is also possible to directly pass a PySpark Column which will get evaluated. This can contain arbitrary logic supported by Spark. For example:F.current_date()
orF.when(F.col("size") == 180, F.lit("tall")).otherwise(F.lit("tiny"))
.
- Source Path / Name / Column / Function:
- DataType:
str
orDataType
- DataTypes can be types from
pyspark.sql.types
, selected custom datatypes or injected, ad-hoc custom datatypes. The datatype will be interpreted as a PySpark built-in if it is a member ofpyspark.sql.types
module. If it is not an importable PySpark data type, a method to construct the statement will be called by the data type’s name.
- DataType:
Note
The available input columns can vary from batch to batch if you use schema inference (f.e. on json data) for the extraction. Ignoring missing columns on the input DataFrame is highly encouraged in this case. Although, if you have tight control over the structure of the extracted DataFrame, setting ignore_missing_columns to True is advised as it can uncover typos and bugs.
Note
Please see
spooq2.transformer.mapper_custom_data_types
for all available custom data types and how to inject your own.Note
Attention: Decimal is NOT SUPPORTED by Hive! Please use Double instead!
-
transform
(input_df)[source]¶ Performs a transformation on a DataFrame.
Parameters: input_df ( pyspark.sql.DataFrame
) – Input DataFrameReturns: Transformed DataFrame. Return type: pyspark.sql.DataFrame
Note
This method does only take the Input DataFrame as a parameters. All other needed parameters are defined in the initialization of the Transformator Object.
- mapping (
Available Custom Mapping Methods¶
as_is / keep / no_change / without_casting (aliases)¶
Only renaming applied. No casting / transformation.
unix_timestamp_ms_to_spark_timestamp¶
unix timestamp in ms (LongType) -> timestamp (TimestampType)
extended_string_to_int¶
Number (IntegerType, FloatType, StringType) -> Number (IntegerType)
extended_string_to_long¶
Number (IntegerType, FloatType, StringType) -> Number (LongType)
extended_string_to_float¶
Number (IntegerType, FloatType, StringType) -> Number (FloatType)
extended_string_to_double¶
Number (IntegerType, FloatType, StringType) -> Number (DoubleType)
extended_string_to_boolean¶
Number (IntegerType, FloatType, StringType, BooleanType) -> boolean (BooleanType)
extended_string_to_timestamp¶
unix timestamp in s or text (IntegerType, FloatType, StringType) -> timestamp (TimestampType)
extended_string_to_date¶
unix timestamp in s or text (IntegerType, FloatType, StringType) -> date (DateType)
extended_string_unix_timestamp_ms_to_timestamp¶
unix timestamp in ms or text (IntegerType, FloatType, StringType) -> timestamp (TimestampType)
extended_string_unix_timestamp_ms_to_date¶
unix timestamp in ms or text (IntegerType, FloatType, StringType) -> date (DateType)
meters_to_cm¶
Number (IntegerType, FloatType, StringType) -> Number * 100 (IntegerType)
has_value¶
Any data -> False if NULL or empty string, otherwise True (BooleanType)
json_string¶
Any input data type will be returned as json (StringType). Complex data types are supported!
timestamp_ms_to_ms¶
Unix timestamp in ms (LongType) -> Unix timestamp in ms (LongType) if timestamp is between 1970 and 2099
timestamp_ms_to_s¶
Unix timestamp in ms (LongType) -> Unix timestamp in s (LongType) if timestamp is between 1970 and 2099
timestamp_s_to_ms¶
Unix timestamp in s (LongType) -> Unix timestamp in ms (LongType) if timestamp is between 1970 and 2099
timestamp_s_to_s¶
Unix timestamp in s (LongType) -> Unix timestamp in s (LongType) if timestamp is between 1970 and 2099
StringNull¶
Any data -> NULL (StringType)
IntNull¶
Any data -> NULL (IntegerType)
StringBoolean¶
Any data -> “1” (StringType) if source columns contains any valid data, otherwise NULL
IntBoolean¶
Any data -> 1 (IntegerType) if source columns contains any valid data, otherwise NULL
TimestampMonth¶
Timestamp (TimestampType / DateType) -> 1st day of the input value’s month (TimestampType)
Custom Mapping Methods Details¶
This is a collection of module level methods to construct a specific PySpark DataFrame query for custom defined data types.
These methods are not meant to be called directly but via the
the Mapper
transformer.
Please see that particular class on how to apply custom data types.
For injecting your own custom data types, please have a visit to the
add_custom_data_type()
method!
-
add_custom_data_type
(function_name, func)[source]¶ Registers a custom data type in runtime to be used with the
Mapper
transformer.Example
>>> import spooq2.transformer.mapper_custom_data_types as custom_types >>> import spooq2.transformer as T >>> from pyspark.sql import Row, functions as F, types as T
>>> def hello_world(source_column, name): >>> "A UDF (User Defined Function) in Python" >>> def _to_hello_world(col): >>> if not col: >>> return None >>> else: >>> return "Hello World" >>> >>> udf_hello_world = F.udf(_to_hello_world, T.StringType()) >>> return udf_hello_world(source_column).alias(name) >>> >>> input_df = spark.createDataFrame( >>> [Row(hello_from=u'tsivorn1@who.int'), >>> Row(hello_from=u''), >>> Row(hello_from=u'gisaksen4@skype.com')] >>> ) >>> >>> custom_types.add_custom_data_type(function_name="hello_world", func=hello_world) >>> transformer = T.Mapper(mapping=[("hello_who", "hello_from", "hello_world")]) >>> df = transformer.transform(input_df) >>> df.show() +-----------+ | hello_who| +-----------+ |Hello World| | null| |Hello World| +-----------+
>>> def first_and_last_name(source_column, name): >>> "A PySpark SQL expression referencing multiple columns" >>> return F.concat_ws("_", source_column, F.col("attributes.last_name")).alias(name) >>> >>> custom_types.add_custom_data_type(function_name="full_name", func=first_and_last_name) >>> >>> transformer = T.Mapper(mapping=[ >>> ("first_name", "attributes.first_name", "StringType"), >>> ("last_name", "attributes.last_name", "StringType"), >>> ("full_name", "attributes.first_name", "full_name"), >>> ])
Parameters: - function_name (
str
) – The name of your custom data type - func (compatible function) – The PySpark dataframe function which will be called on a column, defined in the mapping
of the Mapper class.
Required input parameters are
source_column
andname
. Please see the note about required input parameter of custom data types for more information!
Note
Required input parameter of custom data types:
- source_column (
pyspark.sql.Column
) - This is where your logic will be applied. - The mapper transformer takes care of calling this method with the right column so you can just
handle it like an object which you would get from
df["some_attribute"]
. - name (
str
) - The name how the resulting column will be named. Nested attributes are not - supported. The Mapper transformer takes care of calling this method with the right column name.
- function_name (
-
_get_select_expression_for_custom_type
(source_column, name, data_type)[source]¶ Internal method for calling functions dynamically
-
_generate_select_expression_for_as_is
(source_column, name)[source]¶ alias for _generate_select_expression_without_casting
-
_generate_select_expression_for_keep
(source_column, name)[source]¶ alias for _generate_select_expression_without_casting
-
_generate_select_expression_for_no_change
(source_column, name)[source]¶ alias for _generate_select_expression_without_casting
-
_generate_select_expression_without_casting
(source_column, name)[source]¶ Returns a column without casting. This is especially useful if you need to keep a complex data type, like an array, list or a struct.
>>> from spooq2.transformer import Mapper >>> >>> input_df.head(3) [Row(friends=[Row(first_name=None, id=3993, last_name=None), Row(first_name=u'Ruò', id=17484, last_name=u'Trank')]), Row(friends=[]), Row(friends=[Row(first_name=u'Daphnée', id=16707, last_name=u'Lyddiard'), Row(first_name=u'Adélaïde', id=17429, last_name=u'Wisdom')])] >>> mapping = [("my_friends", "friends", "as_is")] >>> output_df = Mapper(mapping).transform(input_df) >>> output_df.head(3) [Row(my_friends=[Row(first_name=None, id=3993, last_name=None), Row(first_name=u'Ruò', id=17484, last_name=u'Trank')]), Row(my_friends=[]), Row(my_friends=[Row(first_name=u'Daphnée', id=16707, last_name=u'Lyddiard'), Row(first_name=u'Adélaïde', id=17429, last_name=u'Wisdom')])]
-
_generate_select_expression_for_json_string
(source_column, name)[source]¶ Returns a column as json compatible string. Nested hierarchies are supported. The unicode representation of a column will be returned if an error occurs.
Example
>>> from spooq2.transformer import Mapper >>> >>> input_df.head(3) [Row(friends=[Row(first_name=None, id=3993, last_name=None), Row(first_name=u'Ruò', id=17484, last_name=u'Trank')]), Row(friends=[]), Row(friends=[Row(first_name=u'Daphnée', id=16707, last_name=u'Lyddiard'), Row(first_name=u'Adélaïde', id=17429, last_name=u'Wisdom')])] >>> mapping = [("friends_json", "friends", "json_string")] >>> mapping = [("friends_json", "friends", "json_string")] >>> output_df = Mapper(mapping).transform(input_df) >>> output_df.head(3) [Row(friends_json=u'[{"first_name": null, "last_name": null, "id": 3993}, {"first_name": "Ru\u00f2", "last_name": "Trank", "id": 17484}]'), Row(friends_json=None), Row(friends_json=u'[{"first_name": "Daphn\u00e9e", "last_name": "Lyddiard", "id": 16707}, {"first_name": "Ad\u00e9la\u00efde", "last_name": "Wisdom", "id": 17429}]')]
-
_generate_select_expression_for_timestamp_ms_to_ms
(source_column, name)[source]¶ This Constructor is used for unix timestamps. The values are cleaned next to casting and renaming. If the values are not between 01.01.1970 and 31.12.2099, NULL will be returned. Cast to
pyspark.sql.types.LongType
Example
>>> from pyspark.sql import Row >>> from spooq2.transformer import Mapper >>> >>> input_df = spark.createDataFrame([ >>> Row(time_sec=1581540839000), # 02/12/2020 @ 8:53pm (UTC) >>> Row(time_sec=-4887839000), # Invalid! >>> Row(time_sec=4737139200000) # 02/12/2120 @ 12:00am (UTC) >>> ]) >>> >>> mapping = [("unix_ts", "time_sec", "timestamp_ms_to_ms")] >>> output_df = Mapper(mapping).transform(input_df) >>> output_df.head(3) [Row(unix_ts=1581540839000), Row(unix_ts=None), Row(unix_ts=None)]
Note
input in milli seconds output in milli seconds
-
_generate_select_expression_for_timestamp_ms_to_s
(source_column, name)[source]¶ This Constructor is used for unix timestamps. The values are cleaned next to casting and renaming. If the values are not between 01.01.1970 and 31.12.2099, NULL will be returned. Cast to
pyspark.sql.types.LongType
Example
>>> from pyspark.sql import Row >>> from spooq2.transformer import Mapper >>> >>> input_df = spark.createDataFrame([ >>> Row(time_sec=1581540839000), # 02/12/2020 @ 8:53pm (UTC) >>> Row(time_sec=-4887839000), # Invalid! >>> Row(time_sec=4737139200000) # 02/12/2120 @ 12:00am (UTC) >>> ]) >>> >>> mapping = [("unix_ts", "time_sec", "timestamp_ms_to_s")] >>> output_df = Mapper(mapping).transform(input_df) >>> output_df.head(3) [Row(unix_ts=1581540839), Row(unix_ts=None), Row(unix_ts=None)]
Note
input in milli seconds output in seconds
-
_generate_select_expression_for_timestamp_s_to_ms
(source_column, name)[source]¶ This Constructor is used for unix timestamps. The values are cleaned next to casting and renaming. If the values are not between 01.01.1970 and 31.12.2099, NULL will be returned. Cast to
pyspark.sql.types.LongType
Example
>>> from pyspark.sql import Row >>> from spooq2.transformer import Mapper >>> >>> input_df = spark.createDataFrame([ >>> Row(time_sec=1581540839), # 02/12/2020 @ 8:53pm (UTC) >>> Row(time_sec=-4887839), # Invalid! >>> Row(time_sec=4737139200) # 02/12/2120 @ 12:00am (UTC) >>> ]) >>> >>> mapping = [("unix_ts", "time_sec", "timestamp_s_to_ms")] >>> output_df = Mapper(mapping).transform(input_df) >>> output_df.head(3) [Row(unix_ts=1581540839000), Row(unix_ts=None), Row(unix_ts=None)]
Note
input in seconds output in milli seconds
-
_generate_select_expression_for_timestamp_s_to_s
(source_column, name)[source]¶ This Constructor is used for unix timestamps. The values are cleaned next to casting and renaming. If the values are not between 01.01.1970 and 31.12.2099, NULL will be returned. Cast to
pyspark.sql.types.LongType
Example
>>> from pyspark.sql import Row >>> from spooq2.transformer import Mapper >>> >>> input_df = spark.createDataFrame([ >>> Row(time_sec=1581540839), # 02/12/2020 @ 8:53pm (UTC) >>> Row(time_sec=-4887839), # Invalid! >>> Row(time_sec=4737139200) # 02/12/2120 @ 12:00am (UTC) >>> ]) >>> >>> mapping = [("unix_ts", "time_sec", "timestamp_s_to_ms")] >>> output_df = Mapper(mapping).transform(input_df) >>> output_df.head(3) [Row(unix_ts=1581540839), Row(unix_ts=None), Row(unix_ts=None)]
Note
input in seconds output in seconds
-
_generate_select_expression_for_StringNull
(source_column, name)[source]¶ Used for Anonymizing. Input values will be ignored and replaced by NULL, Cast to
pyspark.sql.types.StringType
Example
>>> from pyspark.sql import Row >>> from spooq2.transformer import Mapper >>> >>> input_df = spark.createDataFrame( >>> [Row(email=u'tsivorn1@who.int'), >>> Row(email=u''), >>> Row(email=u'gisaksen4@skype.com')] >>> ) >>> >>> mapping = [("email", "email", "StringNull")] >>> output_df = Mapper(mapping).transform(input_df) >>> output_df.head(3) [Row(email=None), Row(email=None), Row(email=None)]
-
_generate_select_expression_for_IntNull
(source_column, name)[source]¶ Used for Anonymizing. Input values will be ignored and replaced by NULL, Cast to
pyspark.sql.types.IntegerType
Example
>>> from pyspark.sql import Row >>> from spooq2.transformer import Mapper >>> >>> input_df = spark.createDataFrame( >>> [Row(facebook_id=3047288), >>> Row(facebook_id=0), >>> Row(facebook_id=57815)] >>> ) >>> >>> mapping = [("facebook_id", "facebook_id", "IntNull")] >>> output_df = Mapper(mapping).transform(input_df) >>> output_df.head(3) [Row(facebook_id=None), Row(facebook_id=None), Row(facebook_id=None)]
-
_generate_select_expression_for_StringBoolean
(source_column, name)[source]¶ Used for Anonymizing. The column’s value will be replaced by “1” if it is:
- not NULL and
- not an empty string
Example
>>> from pyspark.sql import Row >>> from spooq2.transformer import Mapper >>> >>> input_df = spark.createDataFrame( >>> [Row(email=u'tsivorn1@who.int'), >>> Row(email=u''), >>> Row(email=u'gisaksen4@skype.com')] >>> ) >>> >>> mapping = [("email", "email", "StringBoolean")] >>> output_df = Mapper(mapping).transform(input_df) >>> output_df.head(3) [Row(email=u'1'), Row(email=None), Row(email=u'1')]
-
_generate_select_expression_for_IntBoolean
(source_column, name)[source]¶ Used for Anonymizing. The column’s value will be replaced by 1 if it contains a non-NULL value.
Example
>>> from pyspark.sql import Row >>> from spooq2.transformer import Mapper >>> >>> input_df = spark.createDataFrame( >>> [Row(facebook_id=3047288), >>> Row(facebook_id=0), >>> Row(facebook_id=None)] >>> ) >>> >>> mapping = [("facebook_id", "facebook_id", "IntBoolean")] >>> output_df = Mapper(mapping).transform(input_df) >>> output_df.head(3) [Row(facebook_id=1), Row(facebook_id=1), Row(facebook_id=None)]
Note
0 (zero) or negative numbers are still considered as valid values and therefore converted to 1.
-
_generate_select_expression_for_TimestampMonth
(source_column, name)[source]¶ Used for Anonymizing. Can be used to keep the age but obscure the explicit birthday. This custom datatype requires a
pyspark.sql.types.TimestampType
column as input. The datetime value will be set to the first day of the month.Example
>>> from pyspark.sql import Row >>> from datetime import datetime >>> from spooq2.transformer import Mapper >>> >>> input_df = spark.createDataFrame( >>> [Row(birthday=datetime(2019, 2, 9, 2, 45)), >>> Row(birthday=None), >>> Row(birthday=datetime(1988, 1, 31, 8))] >>> ) >>> >>> mapping = [("birthday", "birthday", "TimestampMonth")] >>> output_df = Mapper(mapping).transform(input_df) >>> output_df.head(3) [Row(birthday=datetime.datetime(2019, 2, 1, 0, 0)), Row(birthday=None), Row(birthday=datetime.datetime(1988, 1, 1, 0, 0))]
-
_generate_select_expression_for_meters_to_cm
(source_column, name)[source]¶ Convert meters to cm and cast the result to an IntegerType.
Example
>>> from pyspark.sql import Row >>> from spooq2.transformer import Mapper >>> >>> input_df = spark.createDataFrame( >>> [Row(size_in_m=1.80), >>> Row(size_in_m=1.65), >>> Row(size_in_m=2.05)] >>> ) >>> >>> mapping = [("size_in_cm", "size_in_m", "meters_to_cm")] >>> output_df = Mapper(mapping).transform(input_df) >>> output_df.head(3) [Row(size_in_cm=180), Row(size_in_cm=165), Row(size_in_cm=205)]
-
_generate_select_expression_for_has_value
(source_column, name)[source]¶ - Returns True if the source_column is
- not NULL and
- not “” (empty string)
otherwise it returns False
Warning
This means that it will return True for values which would indicate a False value. Like “false” or 0!!!
Example
>>> from pyspark.sql import Row >>> from spooq2.transformer import Mapper >>> >>> input_df = spark.createDataFrame( >>> [Row(input_key=1.80), >>> Row(input_key=None), >>> Row(input_key="some text"), >>> Row(input_key="")] >>> ) >>> >>> mapping = [("input_key", "result", "has_value")] >>> output_df = Mapper(mapping).transform(input_df) >>> output_df.head(4) [Row(result=True), Row(result=False), Row(result=True), Row(result=False)]
-
_generate_select_expression_for_unix_timestamp_ms_to_spark_timestamp
(source_column, name)[source]¶ Convert unix timestamps in milliseconds to a Spark TimeStampType. It is assumed that the timezone is already set to UTC in spark / java to avoid implicit timezone conversions.
Example
>>> from pyspark.sql import Row >>> from spooq2.transformer import Mapper >>> >>> input_df = spark.createDataFrame( >>> [Row(unix_timestamp_in_ms=1591627696951), >>> Row(unix_timestamp_in_ms=1596812952000), >>> Row(unix_timestamp_in_ms=946672200000)] >>> ) >>> >>> mapping = [("spark_timestamp", "unix_timestamp_in_ms", "unix_timestamp_ms_to_spark_timestamp")] >>> output_df = Mapper(mapping).transform(input_df) >>> output_df.head(3) [Row(spark_timestamp=datetime.datetime(2020, 6, 8, 16, 48, 16, 951000)), Row(spark_timestamp=datetime.datetime(2020, 8, 7, 17, 9, 12)), Row(spark_timestamp=datetime.datetime(1999, 12, 31, 21, 30))]
-
_generate_select_expression_for_extended_string_to_int
(source_column, name)[source]¶ More robust conversion from StringType to IntegerType. Is able to additionally handle (compared to implicit Spark conversion):
- Preceding whitespace
- Trailing whitespace
- Preceeding and trailing whitespace
- underscores as thousand separators
Hint
Please have a look at the tests to get a better feeling how it behaves under tests/unit/transformer/test_mapper_custom_data_types.py::TestExtendedStringConversions and tests/data/test_fixtures/mapper_custom_data_types_fixtures.py
Example
>>> from spooq2.transformer import Mapper >>> >>> input_df.head(3) [Row(input_string=" 123456 "), Row(input_string="Hello"), Row(input_string="123_456")] >>> mapping = [("output_value", "input_string", "extended_string_to_int")] >>> output_df = Mapper(mapping).transform(input_df) >>> output_df.head(3) [Row(input_string=123456), Row(input_string=None), Row(input_string=123456)]
-
_generate_select_expression_for_extended_string_to_long
(source_column, name)[source]¶ More robust conversion from StringType to LongType. Is able to additionally handle (compared to implicit Spark conversion):
- Preceding whitespace
- Trailing whitespace
- Preceeding and trailing whitespace
- underscores as thousand separators
Hint
Please have a look at the tests to get a better feeling how it behaves under tests/unit/transformer/test_mapper_custom_data_types.py::TestExtendedStringConversions and tests/data/test_fixtures/mapper_custom_data_types_fixtures.py
Example
>>> from spooq2.transformer import Mapper >>> >>> input_df.head(3) [Row(input_string=" 21474836470 "), Row(input_string="Hello"), Row(input_string="21_474_836_470")] >>> mapping = [("output_value", "input_string", "extended_string_to_long")] >>> output_df = Mapper(mapping).transform(input_df) >>> output_df.head(3) [Row(input_string=21474836470), Row(input_string=None), Row(input_string=21474836470)]
-
_generate_select_expression_for_extended_string_to_float
(source_column, name)[source]¶ More robust conversion from StringType to FloatType. Is able to additionally handle (compared to implicit Spark conversion):
- Preceding whitespace
- Trailing whitespace
- Preceeding and trailing whitespace
- underscores as thousand separators
Hint
Please have a look at the tests to get a better feeling how it behaves under tests/unit/transformer/test_mapper_custom_data_types.py::TestExtendedStringConversions and tests/data/test_fixtures/mapper_custom_data_types_fixtures.py
Example
>>> from spooq2.transformer import Mapper >>> >>> input_df.head(3) [Row(input_string=" 836470.819 "), Row(input_string="Hello"), Row(input_string="836_470.819")] >>> mapping = [("output_value", "input_string", "extended_string_to_float")] >>> output_df = Mapper(mapping).transform(input_df) >>> output_df.head(3) [Row(input_string=836470.819), Row(input_string=None), Row(input_string=836470.819)]
-
_generate_select_expression_for_extended_string_to_double
(source_column, name)[source]¶ More robust conversion from StringType to DoubleType. Is able to additionally handle (compared to implicit Spark conversion):
- Preceding whitespace
- Trailing whitespace
- Preceeding and trailing whitespace
- underscores as thousand separators
Hint
Please have a look at the tests to get a better feeling how it behaves under tests/unit/transformer/test_mapper_custom_data_types.py::TestExtendedStringConversions and tests/data/test_fixtures/mapper_custom_data_types_fixtures.py
Example
>>> from spooq2.transformer import Mapper >>> >>> input_df.head(3) [Row(input_string=" 21474838464.70 "), Row(input_string="Hello"), Row(input_string="21_474_838_464.70")] >>> mapping = [("output_value", "input_string", "extended_string_to_double")] >>> output_df = Mapper(mapping).transform(input_df) >>> output_df.head(3) [Row(input_string=21474838464.7), Row(input_string=None), Row(input_string=21474838464.70)]
-
_generate_select_expression_for_extended_string_to_boolean
(source_column, name)[source]¶ More robust conversion from StringType to BooleanType. Is able to additionally handle (compared to implicit Spark conversion):
- Preceding whitespace
- Trailing whitespace
- Preceeding and trailing whitespace
Warning
This does not handle numbers (cast as string) the same way as numbers (cast as number) to boolean conversion! F.e.
- 100 to boolean => True
- “100” to extended_string_to_boolean => False
- “100” to boolean => False
Hint
Please have a look at the tests to get a better feeling how it behaves under tests/unit/transformer/test_mapper_custom_data_types.py::TestExtendedStringConversions and tests/data/test_fixtures/mapper_custom_data_types_fixtures.py
Example
>>> from spooq2.transformer import Mapper >>> >>> input_df.head(3) [Row(input_string=" true "), Row(input_string="0"), Row(input_string="y")] >>> mapping = [("output_value", "input_string", "extended_string_to_boolean")] >>> output_df = Mapper(mapping).transform(input_df) >>> output_df.head(3) [Row(input_string=True), Row(input_string=False), Row(input_string=True)]
-
_generate_select_expression_for_extended_string_to_timestamp
(source_column, name)[source]¶ More robust conversion from StringType to TimestampType. It is assumed that the timezone is already set to UTC in spark / java to avoid implicit timezone conversions.
- The conversion can handle unix timestamps in seconds and in milliseconds:
- Timestamps in the range [-MAX_TIMESTAMP_S, MAX_TIMESTAMP_S] are treated as seconds
- Timestamps in the range [-inf, -MAX_TIMESTAMP_S) and (MAX_TIMESTAMP_S, inf] are treated as milliseconds
- There is a time interval (1970-01-01 +- ~2.5 months)where we can not distinguish correctly between s and ms (e.g. 3974400000 would be treated as seconds (2095-12-11T00:00:00) as the value is smaller than MAX_TIMESTAMP_S, but it could also be a valid date in Milliseconds (1970-02-16T00:00:00)
Is able to additionally handle (compared to implicit Spark conversion): * Preceding whitespace * Trailing whitespace * Preceeding and trailing whitespace
Hint
Please have a look at the tests to get a better feeling how it behaves under tests/unit/transformer/test_mapper_custom_data_types.py::TestExtendedStringConversions and tests/data/test_fixtures/mapper_custom_data_types_fixtures.py
Example
>>> from spooq2.transformer import Mapper >>> >>> input_df.head(3) [Row(input_string="2020-08-12T12:43:14+0000"), Row(input_string="1597069446"), Row(input_string="2020-08-12")] >>> mapping = [("output_value", "input_string", "extended_string_to_timestamp")] >>> output_df = Mapper(mapping).transform(input_df) >>> output_df.head(3) [Row(input_string=datetime.datetime(2020, 8, 12, 12, 43, 14)), Row(input_string=datetime.datetime(2020, 8, 10, 14, 24, 6)), Row(input_string=datetime.datetime(2020, 8, 12, 0, 0, 0))]
-
_generate_select_expression_for_extended_string_to_date
(source_column, name)[source]¶ More robust conversion from StringType to DateType. It is assumed that the timezone is already set to UTC in spark / java to avoid implicit timezone conversions.
- The conversion can handle unix timestamps in seconds and in milliseconds:
- Timestamps in the range [-MAX_TIMESTAMP_S, MAX_TIMESTAMP_S] are treated as seconds
- Timestamps in the range [-inf, -MAX_TIMESTAMP_S) and (MAX_TIMESTAMP_S, inf] are treated as milliseconds
- There is a time interval (1970-01-01 +- ~2.5 months)where we can not distinguish correctly between s and ms (e.g. 3974400000 would be treated as seconds (2095-12-11T00:00:00) as the value is smaller than MAX_TIMESTAMP_S, but it could also be a valid date in Milliseconds (1970-02-16T00:00:00)
Is able to additionally handle (compared to implicit Spark conversion): * Preceding whitespace * Trailing whitespace * Preceeding and trailing whitespace
Hint
Please have a look at the tests to get a better feeling how it behaves under tests/unit/transformer/test_mapper_custom_data_types.py::TestExtendedStringConversions and tests/data/test_fixtures/mapper_custom_data_types_fixtures.py
Example
>>> from spooq2.transformer import Mapper >>> >>> input_df.head(3) [Row(input_string="2020-08-12T12:43:14+0000"), Row(input_string="1597069446"), Row(input_string="2020-08-12")] >>> mapping = [("output_value", "input_string", "extended_string_to_date")] >>> output_df = Mapper(mapping).transform(input_df) >>> output_df.head(3) [Row(input_string=datetime.datetime(2020, 8, 12)), Row(input_string=datetime.datetime(2020, 8, 10)), Row(input_string=datetime.datetime(2020, 8, 12))]
-
_generate_select_expression_for_extended_string_unix_timestamp_ms_to_timestamp
(source_column, name)[source]¶ More robust conversion from StringType to TimestampType. It is assumed that the timezone is already set to UTC in spark / java to avoid implicit timezone conversions. Is able to additionally handle (compared to implicit Spark conversion):
- Unix timestamps in milliseconds
- Preceding whitespace
- Trailing whitespace
- Preceeding and trailing whitespace
Hint
Please have a look at the tests to get a better feeling how it behaves under tests/unit/transformer/test_mapper_custom_data_types.py::TestExtendedStringConversions and tests/data/test_fixtures/mapper_custom_data_types_fixtures.py
Example
>>> from spooq2.transformer import Mapper >>> >>> input_df.head(3) [Row(input_string="2020-08-12T12:43:14+0000"), Row(input_string="1597069446000"), Row(input_string="2020-08-12")] >>> mapping = [("output_value", "input_string", "extended_string_to_timestamp")] >>> output_df = Mapper(mapping).transform(input_df) >>> output_df.head(3) [Row(input_string=datetime.datetime(2020, 8, 12, 12, 43, 14)), Row(input_string=datetime.datetime(2020, 8, 10, 14, 24, 6)), Row(input_string=datetime.datetime(2020, 8, 12, 0, 0, 0))]
-
_generate_select_expression_for_extended_string_unix_timestamp_ms_to_date
(source_column, name)[source]¶ More robust conversion from StringType to DateType. It is assumed that the timezone is already set to UTC in spark / java to avoid implicit timezone conversions and that unix timestamps are in milli seconds
Hint
Please have a look at the tests to get a better feeling how it behaves under tests/unit/transformer/test_mapper_custom_data_types.py::TestExtendedStringConversions and tests/data/test_fixtures/mapper_custom_data_types_fixtures.py
Example
>>> from spooq2.transformer import Mapper >>> >>> input_df.head(3) [Row(input_string="2020-08-12T12:43:14+0000"), Row(input_string="1597069446000"), Row(input_string="2020-08-12")] >>> mapping = [("output_value", "input_string", "extended_string_to_date")] >>> output_df = Mapper(mapping).transform(input_df) >>> output_df.head(3) [Row(input_string=datetime.datetime(2020, 8, 12)), Row(input_string=datetime.datetime(2020, 8, 10)), Row(input_string=datetime.datetime(2020, 8, 12))]