Get Started
This section will guide you through a simple ETL pipeline built with Spooq to showcase how to use this library.
Sample Input Data:
{
"id": 18,
"guid": "b12b59ba-5c78-4057-a998-469497005c1f",
"attributes": {
"first_name": "Jeannette",
"last_name": "O'Loghlen",
"gender": "F",
"email": "gpirri3j@oracle.com",
"ip_address": "64.19.237.154",
"university": "",
"birthday": "1972-05-16T22:17:41Z",
"friends": [
{
"first_name": "Noémie",
"last_name": "Tibbles",
"id": "9952"
},
{
"first_name": "Bérangère",
"last_name": null,
"id": "3391"
},
{
"first_name": "Danièle",
"last_name": null,
"id": "9637"
},
{
"first_name": null,
"last_name": null,
"id": "9939"
},
{
"first_name": "Anaëlle",
"last_name": null,
"id": "18994"
}
]
},
"meta": {
"created_at_sec": "1547371284",
"created_at_ms": "1547204429000",
"version": "24"
}
}
Application Code for Creating a User Table
from pyspark.sql import functions as F, types as T
from spooq.extractor import JSONExtractor
from spooq.transformer import Mapper, ThresholdCleaner, NewestByGroup
from spooq.loader import HiveLoader
from spooq.transformer import mapper_transformations as spq
users_mapping = [
("id", "id", spq.to_num),
("guid", "guid", "string"),
("forename", "attributes.first_name", "string"),
("surename", "attributes.last_name", "string"),
("gender", "attributes.gender", spq.apply(func=F.lower)),
("has_email", "attributes.email", spq.has_value),
("has_university", "attributes.university", spq.has_value),
("created_at", "meta.created_at_ms", spq.to_timestamp),
]
# Extract
source_df = JSONExtractor(input_path="tests/data/schema_v1/sequenceFiles").extract()
# Transform
mapped_df = Mapper(users_mapping).transform(source_df)
cleaned_df = ThresholdCleaner(
thresholds={"created_at": {"min": "2019-01-01", "max": F.current_date(), "default": None}},
column_to_log_cleansed_values="cleansed_values",
store_as_map=True,
).transform(mapped_df)
deduplicated_df = NewestByGroup(group_by="id", order_by="created_at").transform(cleaned_df)
# Load
HiveLoader(
db_name="users_and_friends",
table_name="users",
partition_definitions=[
{"column_name": "dt", "column_type": "IntegerType", "default_value": 20200201}
],
repartition_size=10,
)
id |
guid |
forename |
surename |
gender |
has_email |
has_university |
created_at |
cleansed_values |
---|---|---|---|---|---|---|---|---|
1 |
799eb359-2e98-4526-a2ec-455b03e57b5c |
Orran |
Haug |
m |
false |
false |
null |
{created_at -> 2018-11-30 11:19:43} |
N |
… |
… |
… |
… |
… |
… |
… |
… |
Application Code for Creating a Friends_Mapping Table
from pyspark.sql import functions as F, types as T
from spooq.extractor import JSONExtractor
from spooq.transformer import Mapper, ThresholdCleaner, NewestByGroup, Exploder
from spooq.loader import HiveLoader
from spooq.transformer import mapper_transformations as spq
friends_mapping = [
("id", "id", spq.to_num),
("guid", "guid", "string"),
("friend_id", "friend.id", spq.to_num),
("created_at", "meta.created_at_ms", spq.to_timestamp),
]
# Extract
source_df = JSONExtractor(input_path="tests/data/schema_v1/sequenceFiles").extract()
# Transform
deduplicated_friends_df = NewestByGroup(group_by="id", order_by="meta.created_at_ms").transform(source_df)
exploded_friends_df = Exploder(path_to_array="attributes.friends", exploded_elem_name="friend").transform(deduplicated_friends_df)
mapped_friends_df = Mapper(mapping=friends_mapping).transform(exploded_friends_df)
cleaned_friends_df = ThresholdCleaner(
thresholds={"created_at": {"min": "2019-01-01", "max": F.current_date(), "default": None}},
column_to_log_cleansed_values="cleansed_values",
store_as_map=True,
).transform(mapped_friends_df)
friends_pipeline.set_loader(
L.HiveLoader(
db_name="users_and_friends",
table_name="friends_mapping",
partition_definitions=[
{"column_name": "dt", "column_type": "IntegerType", "default_value": 20200201}
],
repartition_size=20,
)
)
id |
guid |
friend_id |
created_at |
---|---|---|---|
26 |
bd666d9d-9bb2-494e-8dc3-ab1c29a67ab2 |
8001 |
2019-05-29 23:25:27 |
26 |
bd666d9d-9bb2-494e-8dc3-ab1c29a67ab2 |
5623 |
2019-05-29 23:25:27 |
26 |
bd666d9d-9bb2-494e-8dc3-ab1c29a67ab2 |
17713 |
2019-05-29 23:25:27 |
65 |
2a5fd4e4-2cfa-41c6-9771-46c666e7c2eb |
4428 |
null |
65 |
2a5fd4e4-2cfa-41c6-9771-46c666e7c2eb |
13011 |
null |
N |
… |
… |
… |
Application Code for Updating Both, the Users and Friends_Mapping Table, at once
This script extracts and transforms the common activities for both tables as they share the same input data set.
Caching the dataframe avoids redundant processes and reloading when an action is executed (the load step f.e.).
This could have been written with pipeline objects as well (by providing the Pipeline an input_df
and/or output_df
to bypass
extractors and loaders) but would have led to unnecessary verbosity. This example should also show the flexibility of
Spooq for activities and steps which are not directly supported.
from pyspark.sql import functions as F, types as T
from spooq.extractor import JSONExtractor
from spooq.transformer import Mapper, ThresholdCleaner, NewestByGroup, Exploder
from spooq.loader import HiveLoader
from spooq.transformer import mapper_transformations as spq
mapping = [
("id", "id", spq.to_num),
("guid", "guid", "string"),
("forename", "attributes.first_name", "string"),
("surename", "attributes.last_name", "string"),
("gender", "attributes.gender", spq.apply(func=F.lower)),
("has_email", "attributes.email", spq.has_value),
("has_university", "attributes.university", spq.has_value),
("created_at", "meta.created_at_ms", spq.to_timestamp),
("friends", "attributes.friends", "as_is"),
]
# Transformations used by both output tables
common_df = JSONExtractor(input_path="tests/data/schema_v1/sequenceFiles").extract()
common_df = Mapper(mapping=mapping).transform(common_df)
common_df = ThresholdCleaner(
thresholds={"created_at": {"min": "2019-01-01", "max": F.current_date(), "default": None}},
column_to_log_cleansed_values="cleansed_values",
store_as_map=True,
).transform(common_df)
common_df = NewestByGroup(group_by="id", order_by="created_at").transform(common_df)
common_df.cache()
# Loading of users table
HiveLoader(
db_name="users_and_friends",
table_name="users",
partition_definitions=[
{"column_name": "dt", "column_type": "IntegerType", "default_value": 20200201}
],
repartition_size=10,
).load(common_df.drop("friends"))
# Transformations for friends_mapping table
friends_df = Exploder(path_to_array="friends", exploded_elem_name="friend").transform(
common_df
)
friends_df = Mapper(
mapping=[
("id", "id", "string"),
("guid", "guid", "string"),
("friend_id", "friend.id", spq.to_num),
("created_at", "created_at", "TimestampType"),
]
).transform(friends_df)
# Loading of friends_mapping table
HiveLoader(
db_name="users_and_friends",
table_name="friends_mapping",
partition_definitions=[
{"column_name": "dt", "column_type": "IntegerType", "default_value": 20200201}
],
repartition_size=20,
).load(friends_df)