Mapper

Class

class Mapper(mapping: List[Tuple[Any, Any, Any]], ignore_ambiguous_columns: bool = False, missing_column_handling: str = 'raise_error', mode: str = 'replace', **kwargs)[source]

Constructs and applies a PySpark SQL expression, based on the provided mapping.

Examples

>>> from pyspark.sql import functions as F, types as T
>>> from spooq.transformer import Mapper
>>> from spooq.transformer import mapper_transformations as spq
>>>
>>> mapping = [
>>>     ("id",            "data.relationships.food.data.id",  spq.to_str),
>>>     ("version",       "data.version",                     spq.to_int),
>>>     ("type",          "elem.attributes.type",             "string"),
>>>     ("created_at",    "elem.attributes.created_at",       spq.to_timestamp),
>>>     ("created_on",    "elem.attributes.created_at",       spq.to_timestamp(cast="date")),
>>>     ("processed_at",  F.current_timestamp(),              spq.as_is,
>>> ]
>>> mapper = Mapper(mapping=mapping)
>>> mapper.transform(input_df).printSchema()
root
 |-- id: string (nullable = true)
 |-- version: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- created_at: timestamp (nullable = true)
 |-- created_on: date (nullable = true)
 |-- processed_at: timestamp (nullable = false)
Parameters
  • mapping (list of tuple containing three elements, respectively.) – This is the main parameter for this transformation. It gives information about the column names for the output DataFrame, the column names (paths) from the input DataFrame, and their data types. Custom data types are also supported, which can clean, pivot, anonymize, … the data itself. Please have a look at the spooq.transformer.mapper_custom_data_types module for more information.

  • missing_column_handling (str, Defaults to ‘raise_error’) –

    Specifies how to proceed in case a source column does not exist in the source DataFrame:
    • raise_error (default)

      Raise an exception

    • nullify

      Create source column filled with null

    • skip

      Skip the mapping transformation

  • ignore_ambiguous_columns (bool, Defaults to False) – It can happen that the input DataFrame has ambiguous column names (like “Key” vs “key”) which will raise an exception with Spark when reading. This flag surpresses this exception and skips those affected columns.

  • mode (str, Defaults to “replace”) –

    Defines weather the mapping should fully replace the schema of the input DataFrame or just add to it. Following modes are supported:

    • replace

      The output schema is the same as the provided mapping. => output schema: new columns

    • append

      The columns provided in the mapping are added at the end of the input schema. If a column already exists in the input DataFrame, its position is kept. => output schema: input columns + new columns

    • prepend

      The columns provided in the mapping are added at the beginning of the input schema. If a column already exists in the input DataFrame, its position is kept. => output schema: new columns + input columns

Keyword Arguments

ignore_missing_columns (bool, Defaults to False) – DEPRECATED: please use missing_column_handling instead!

Note

Let’s talk about Mappings:

The mapping should be a list of tuples that contain all necessary information per column.

  • Column Name: str

    Sets the name of the column in the resulting output DataFrame.

  • Source Path / Name / Column / Function: str, Column or functions

    Points to the name of the column in the input DataFrame. If the input is a flat DataFrame, it will essentially be the column name. If it is of complex type, it will point to the path of the actual value. For example: data.relationships.sample.data.id, where id is the value we want. It is also possible to directly pass a PySpark Column which will get evaluated. This can contain arbitrary logic supported by Spark. For example: F.current_date() or F.when(F.col("size") == 180, F.lit("tall")).otherwise(F.lit("tiny")).

  • DataType: str, DataType or mapper_transformations

    DataTypes can be types from pyspark.sql.types (like T.StringType()), simple strings supported by PySpark (like “string”) and custom transformations provided by spooq (like spq.to_timestamp). You can find more information about the transformations at https://spooq.rtfd.io/en/latest/transformer/mapper.html#module-spooq.transformer.mapper_transformations.

Note

The available input columns can vary from batch to batch if you use schema inference (f.e. on json data) for the extraction. Via the parameter missing_column_handling you can specify a strategy on how to handle missing columns on the input DataFrame. It is advised to use the ‘raise_error’ option as it can uncover typos and bugs.

Custom Transformations

This is a collection of module level functions to be applied to a DataFrame. These methods can be used with the Mapper transformer or directly within a select or withColumn statement.

All functions support following generic functionalities:

  • alt_src_cols: Alternative source columns that will be used within a coalesce function if provided

  • cast: Explicit casting after the transformation (sane defaults are set for each function)

to_str, to_int, to_long, to_float, to_double are convenience methods with a hardcoded cast that cannot be changed.

All examples assume following code has been executed before:

>>> from pyspark.sql import Row
>>> from pyspark.sql import functions as F, types as T
>>> from spooq.transformer import Mapper
>>> from spooq.transformer import mapper_transformations as spq

spooq.transformer.mapper_transformations.as_is([...])

Returns a renamed column without any casting.

spooq.transformer.mapper_transformations.to_num([...])

More robust conversion to number data types (Default: LongType).

spooq.transformer.mapper_transformations.to_bool([...])

More robust conversion to BooleanType.

spooq.transformer.mapper_transformations.to_timestamp([...])

More robust conversion to TimestampType (or as a formatted string).

spooq.transformer.mapper_transformations.str_to_array([...])

Splits a string into a list (ArrayType).

spooq.transformer.mapper_transformations.map_values([...])

Maps input values to specified output values.

spooq.transformer.mapper_transformations.meters_to_cm([...])

Converts meters to cm and casts the result to an IntegerType.

spooq.transformer.mapper_transformations.has_value([...])

Returns True if the source_column is

spooq.transformer.mapper_transformations.apply([...])

Applies a function / partial

spooq.transformer.mapper_transformations.to_json_string([...])

Returns a column as json compatible string.

spooq.transformer.mapper_transformations.to_str([...])

Convenience transformation that only casts to string.

spooq.transformer.mapper_transformations.to_int([...])

Syntactic sugar for calling to_num(cast="int")

spooq.transformer.mapper_transformations.to_long([...])

Syntactic sugar for calling to_num(cast="long")

spooq.transformer.mapper_transformations.to_float([...])

Syntactic sugar for calling to_num(cast="float")

spooq.transformer.mapper_transformations.to_double([...])

Syntactic sugar for calling to_num(cast="double")

Custom Mapping Functions as Strings [DEPRECATED]

This is a collection of module level methods to construct a specific PySpark DataFrame query for custom defined data types.

These methods are not meant to be called directly but via the the Mapper transformer. Please see that particular class on how to apply custom data types.

For injecting your own custom data types, please have a visit to the add_custom_data_type() method!

spooq.transformer.mapper_custom_data_types._generate_select_expression_for_IntBoolean(...)

Deprecated!

spooq.transformer.mapper_custom_data_types._generate_select_expression_for_IntNull(...)

Deprecated!

spooq.transformer.mapper_custom_data_types._generate_select_expression_for_StringBoolean(...)

Deprecated!

spooq.transformer.mapper_custom_data_types._generate_select_expression_for_StringNull(...)

Deprecated!

spooq.transformer.mapper_custom_data_types._generate_select_expression_for_TimestampMonth(...)

Deprecated!

spooq.transformer.mapper_custom_data_types._generate_select_expression_for_as_is(...)

Deprecated!

spooq.transformer.mapper_custom_data_types._generate_select_expression_for_extended_string_to_boolean(...)

Deprecated!

spooq.transformer.mapper_custom_data_types._generate_select_expression_for_extended_string_to_date(...)

Deprecated!

spooq.transformer.mapper_custom_data_types._generate_select_expression_for_extended_string_to_double(...)

Deprecated!

spooq.transformer.mapper_custom_data_types._generate_select_expression_for_extended_string_to_float(...)

Deprecated!

spooq.transformer.mapper_custom_data_types._generate_select_expression_for_extended_string_to_int(...)

Deprecated!

spooq.transformer.mapper_custom_data_types._generate_select_expression_for_extended_string_to_long(...)

Deprecated!

spooq.transformer.mapper_custom_data_types._generate_select_expression_for_extended_string_to_timestamp(...)

Deprecated!

spooq.transformer.mapper_custom_data_types._generate_select_expression_for_extended_string_unix_timestamp_ms_to_date(...)

Deprecated!

spooq.transformer.mapper_custom_data_types._generate_select_expression_for_extended_string_unix_timestamp_ms_to_timestamp(...)

Deprecated!

spooq.transformer.mapper_custom_data_types._generate_select_expression_for_has_value(...)

Deprecated!

spooq.transformer.mapper_custom_data_types._generate_select_expression_for_json_string(...)

Deprecated!

spooq.transformer.mapper_custom_data_types._generate_select_expression_for_keep(...)

Deprecated!

spooq.transformer.mapper_custom_data_types._generate_select_expression_for_meters_to_cm(...)

Deprecated!

spooq.transformer.mapper_custom_data_types._generate_select_expression_for_no_change(...)

Deprecated!

spooq.transformer.mapper_custom_data_types._generate_select_expression_for_timestamp_ms_to_ms(...)

Deprecated!

spooq.transformer.mapper_custom_data_types._generate_select_expression_for_timestamp_ms_to_s(...)

Deprecated!

spooq.transformer.mapper_custom_data_types._generate_select_expression_for_timestamp_s_to_ms(...)

Deprecated!

spooq.transformer.mapper_custom_data_types._generate_select_expression_for_timestamp_s_to_s(...)

Deprecated!

spooq.transformer.mapper_custom_data_types._generate_select_expression_for_unix_timestamp_ms_to_spark_timestamp(...)

Deprecated!

spooq.transformer.mapper_custom_data_types._generate_select_expression_without_casting(...)

Deprecated!