Get Started

This section will guide you through a simple ETL pipeline built with Spooq to showcase how to use this library.

Sample Input Data:

{
  "id": 18,
  "guid": "b12b59ba-5c78-4057-a998-469497005c1f",
  "attributes": {
    "first_name": "Jeannette",
    "last_name": "O'Loghlen",
    "gender": "F",
    "email": "gpirri3j@oracle.com",
    "ip_address": "64.19.237.154",
    "university": "",
    "birthday": "1972-05-16T22:17:41Z",
    "friends": [
      {
        "first_name": "Noémie",
        "last_name": "Tibbles",
        "id": "9952"
      },
      {
        "first_name": "Bérangère",
        "last_name": null,
        "id": "3391"
      },
      {
        "first_name": "Danièle",
        "last_name": null,
        "id": "9637"
      },
      {
        "first_name": null,
        "last_name": null,
        "id": "9939"
      },
      {
        "first_name": "Anaëlle",
        "last_name": null,
        "id": "18994"
      }
    ]
  },
  "meta": {
    "created_at_sec": "1547371284",
    "created_at_ms": "1547204429000",
    "version": "24"
  }
}

Application Code for Creating a User Table

from pyspark.sql import functions as F, types as T
from spooq.extractor import JSONExtractor
from spooq.transformer import Mapper, ThresholdCleaner, NewestByGroup
from spooq.loader import HiveLoader
from spooq.transformer import mapper_transformations as spq

users_mapping = [
    ("id",              "id",                     spq.to_num),
    ("guid",            "guid",                   "string"),
    ("forename",        "attributes.first_name",  "string"),
    ("surename",        "attributes.last_name",   "string"),
    ("gender",          "attributes.gender",      spq.apply(func=F.lower)),
    ("has_email",       "attributes.email",       spq.has_value),
    ("has_university",  "attributes.university",  spq.has_value),
    ("created_at",      "meta.created_at_ms",     spq.to_timestamp),
]

# Extract
source_df = JSONExtractor(input_path="tests/data/schema_v1/sequenceFiles").extract()

# Transform
mapped_df = Mapper(users_mapping).transform(source_df)
cleaned_df = ThresholdCleaner(
    thresholds={"created_at": {"min": "2019-01-01", "max": F.current_date(), "default": None}},
    column_to_log_cleansed_values="cleansed_values",
    store_as_map=True,
).transform(mapped_df)
deduplicated_df = NewestByGroup(group_by="id", order_by="created_at").transform(cleaned_df)

# Load
HiveLoader(
    db_name="users_and_friends",
    table_name="users",
    partition_definitions=[
        {"column_name": "dt", "column_type": "IntegerType", "default_value": 20200201}
    ],
    repartition_size=10,
)
Table “user”
id guid forename surename gender has_email has_university created_at cleansed_values
1 799eb359-2e98-4526-a2ec-455b03e57b5c Orran Haug m false false null {created_at -> 2018-11-30 11:19:43}
N

Application Code for Creating a Friends_Mapping Table

from pyspark.sql import functions as F, types as T
from spooq.extractor import JSONExtractor
from spooq.transformer import Mapper, ThresholdCleaner, NewestByGroup, Exploder
from spooq.loader import HiveLoader
from spooq.transformer import mapper_transformations as spq

friends_mapping = [
    ("id",          "id",                  spq.to_num),
    ("guid",        "guid",                "string"),
    ("friend_id",   "friend.id",           spq.to_num),
    ("created_at",  "meta.created_at_ms",  spq.to_timestamp),
]

# Extract
source_df = JSONExtractor(input_path="tests/data/schema_v1/sequenceFiles").extract()

# Transform
deduplicated_friends_df = NewestByGroup(group_by="id", order_by="meta.created_at_ms").transform(source_df)
exploded_friends_df = Exploder(path_to_array="attributes.friends", exploded_elem_name="friend").transform(deduplicated_friends_df)
mapped_friends_df = Mapper(mapping=friends_mapping).transform(exploded_friends_df)
cleaned_friends_df = ThresholdCleaner(
    thresholds={"created_at": {"min": "2019-01-01", "max": F.current_date(), "default": None}},
    column_to_log_cleansed_values="cleansed_values",
    store_as_map=True,
).transform(mapped_friends_df)

friends_pipeline.set_loader(
    L.HiveLoader(
        db_name="users_and_friends",
        table_name="friends_mapping",
        partition_definitions=[
            {"column_name": "dt", "column_type": "IntegerType", "default_value": 20200201}
        ],
        repartition_size=20,
    )
)
Table “friends_mapping”
id guid friend_id created_at
26 bd666d9d-9bb2-494e-8dc3-ab1c29a67ab2 8001 2019-05-29 23:25:27
26 bd666d9d-9bb2-494e-8dc3-ab1c29a67ab2 5623 2019-05-29 23:25:27
26 bd666d9d-9bb2-494e-8dc3-ab1c29a67ab2 17713 2019-05-29 23:25:27
65 2a5fd4e4-2cfa-41c6-9771-46c666e7c2eb 4428 null
65 2a5fd4e4-2cfa-41c6-9771-46c666e7c2eb 13011 null
N

Application Code for Updating Both, the Users and Friends_Mapping Table, at once

This script extracts and transforms the common activities for both tables as they share the same input data set. Caching the dataframe avoids redundant processes and reloading when an action is executed (the load step f.e.). This could have been written with pipeline objects as well (by providing the Pipeline an input_df and/or output_df to bypass extractors and loaders) but would have led to unnecessary verbosity. This example should also show the flexibility of Spooq for activities and steps which are not directly supported.

from pyspark.sql import functions as F, types as T
from spooq.extractor import JSONExtractor
from spooq.transformer import Mapper, ThresholdCleaner, NewestByGroup, Exploder
from spooq.loader import HiveLoader
from spooq.transformer import mapper_transformations as spq

mapping = [
    ("id",              "id",                     spq.to_num),
    ("guid",            "guid",                   "string"),
    ("forename",        "attributes.first_name",  "string"),
    ("surename",        "attributes.last_name",   "string"),
    ("gender",          "attributes.gender",      spq.apply(func=F.lower)),
    ("has_email",       "attributes.email",       spq.has_value),
    ("has_university",  "attributes.university",  spq.has_value),
    ("created_at",      "meta.created_at_ms",     spq.to_timestamp),
    ("friends",         "attributes.friends",     "as_is"),
]

# Transformations used by both output tables
common_df = JSONExtractor(input_path="tests/data/schema_v1/sequenceFiles").extract()
common_df = Mapper(mapping=mapping).transform(common_df)
common_df = ThresholdCleaner(
    thresholds={"created_at": {"min": "2019-01-01", "max": F.current_date(), "default": None}},
    column_to_log_cleansed_values="cleansed_values",
    store_as_map=True,
).transform(common_df)
common_df = NewestByGroup(group_by="id", order_by="created_at").transform(common_df)
common_df.cache()

# Loading of users table
HiveLoader(
    db_name="users_and_friends",
    table_name="users",
    partition_definitions=[
        {"column_name": "dt", "column_type": "IntegerType", "default_value": 20200201}
    ],
    repartition_size=10,
).load(common_df.drop("friends"))

# Transformations for friends_mapping table
friends_df = Exploder(path_to_array="friends", exploded_elem_name="friend").transform(
    common_df
)
friends_df = Mapper(
    mapping=[
        ("id",          "id",          "string"),
        ("guid",        "guid",        "string"),
        ("friend_id",   "friend.id",   spq.to_num),
        ("created_at",  "created_at",  "TimestampType"),
    ]
).transform(friends_df)

# Loading of friends_mapping table
HiveLoader(
    db_name="users_and_friends",
    table_name="friends_mapping",
    partition_definitions=[
        {"column_name": "dt", "column_type": "IntegerType", "default_value": 20200201}
    ],
    repartition_size=20,
).load(friends_df)

Dataflow Chart

@startuml

skinparam monochrome true
skinparam defaultFontname Bitstream Vera Sans Mono
skinparam defaultFontSize 18

hide empty members

' title Spooq - Data Flow Diagram

allow_mixing

actor Client

package "Pipeline"  {
    class "Pipeline Instance" {
        execute()
    }
}

Client -> "Pipeline Instance"
note on link
execute()
end note

database "Source System"
database "Target System"

package "Extractor"  {

    "Pipeline Instance" --> "Extractor Instance"

    class "Extractor Instance" {
        extract()
    }

}

"Extractor Instance" <- "Source System"
note on link
    Raw Data
end note


package "Transformers"  {

    "Extractor Instance" --> "Transformer  Instance 1"

    note on link
        DataFrame
    end note

    class "Transformer  Instance 1" {
        transform(input_df: DataFrame)
    }

    "Transformer  Instance 1" --> "Transformer  Instance 2"

    note on link
        DataFrame
    end note

    class "Transformer  Instance 2" {
        transform(input_df: DataFrame)
    }

    "Transformer  Instance 2" --> "Transformer  Instance N"

    note on link
        DataFrame
    end note

    class "Transformer  Instance N" {
        transform(input_df: DataFrame)
    }

}

package "Loader" {

    "Transformer  Instance N" -> "Loader Instance"

    note on link
        DataFrame
    end note

    class "Loader Instance" {
        load(input_df: DataFrame)
    }
}

"Target System" <----- "Loader Instance"
note on link
    Transformed Data
end note

@enduml

Typical Data Flow of a Spooq Data Pipeline