Extractor Base Class
Extractors are used to fetch, extract and convert a source data set into a PySpark DataFrame. Exemplary extraction sources are JSON Files on file systems like HDFS, DBFS or EXT4 and relational database systems via JDBC.
Create your own Extractor
Let your extractor class inherit from the extractor base class. This includes the name, string representation and logger attributes from the superclass.
All configuration and parameterization should be done while initializing the class instance.
Here would be a simple example for a CSV Extractor:
Exemplary Sample Code
from pyspark.sql import SparkSession
from extractor import Extractor
class CSVExtractor(Extractor):
"""
This is a simplified example on how to implement a new extractor class.
Please take your time to write proper docstrings as they are automatically
parsed via Sphinx to build the HTML and PDF documentation.
Docstrings use the style of Numpy (via the napoleon plug-in).
This class uses the :meth:`pyspark.sql.DataFrameReader.csv` method internally.
Examples
--------
extracted_df = CSVExtractor(
input_file='data/input_data.csv'
).extract()
Parameters
----------
input_file: :any:`str`
The explicit file path for the input data set. Globbing support depends
on implementation of Spark's csv reader!
Raises
------
:any:`exceptions.TypeError`:
path can be only string, list or RDD
"""
def __init__(self, input_file):
super(CSVExtractor, self).__init__()
self.input_file = input_file
self.spark = SparkSession.Builder()\
.enableHiveSupport()\
.appName('spooq.extractor: {nm}'.format(nm=self.name))\
.getOrCreate()
def extract(self):
self.logger.info('Loading Raw CSV Files from: ' + self.input_file)
output_df = self.spark.read.load(
input_file,
format="csv",
sep=";",
inferSchema="true",
header="true"
)
return output_df
References to include
--- original
+++ adapted
@@ -1,8 +1,10 @@
from jdbc import JDBCExtractorIncremental, JDBCExtractorFullLoad
from json_files import JSONExtractor
+from csv_extractor import CSVExtractor
__all__ = [
"JDBCExtractorIncremental",
"JDBCExtractorFullLoad",
"JSONExtractor",
+ "CSVExtractor",
]
Tests
One of Spooq’s features is to provide tested code for multiple data pipelines. Please take your time to write sufficient unit tests! You can reuse test data from tests/data or create a new schema / data set if needed. A SparkSession is provided as a global fixture called spark_session.
import pytest
from spooq.extractor import CSVExtractor
@pytest.fixture()
def default_extractor():
return CSVExtractor(input_path="data/input_data.csv")
class TestBasicAttributes(object):
def test_logger_should_be_accessible(self, default_extractor):
assert hasattr(default_extractor, "logger")
def test_name_is_set(self, default_extractor):
assert default_extractor.name == "CSVExtractor"
def test_str_representation_is_correct(self, default_extractor):
assert unicode(default_extractor) == "Extractor Object of Class CSVExtractor"
class TestCSVExtraction(object):
def test_count(default_extractor):
"""Converted DataFrame has the same count as the input data"""
expected_count = 312
actual_count = default_extractor.extract().count()
assert expected_count == actual_count
def test_schema(default_extractor):
"""Converted DataFrame has the expected schema"""
do_some_stuff()
assert expected == actual
Documentation
You need to create a rst for your extractor which needs to contain at minimum the automodule or the autoclass directive.
CSV Extractor
=============
Some text if you like...
.. automodule:: spooq.extractor.csv_extractor
To automatically include your new extractor in the HTML documentation you need to add it to a toctree directive. Just refer to your newly created csv.rst file within the extractor overview page.
--- original
+++ adapted
@@ -7,8 +7,9 @@
.. toctree::
json
jdbc
+ csv
Class Diagram of Extractor Subpackage
------------------------------------------------
.. uml:: ../diagrams/from_thesis/class_diagram/extractors.puml
That should be all!