Extractor Base Class

Extractors are used to fetch, extract and convert a source data set into a PySpark DataFrame. Exemplary extraction sources are JSON Files on file systems like HDFS, DBFS or EXT4 and relational database systems via JDBC.

Create your own Extractor

Let your extractor class inherit from the extractor base class. This includes the name, string representation and logger attributes from the superclass.

The only mandatory thing is to provide an extract() method which
takes
=> no input parameters
and returns a
=> PySpark DataFrame!

All configuration and parameterization should be done while initializing the class instance.

Here would be a simple example for a CSV Extractor:

Exemplary Sample Code

spooq/extractor/csv_extractor.py:
from pyspark.sql import SparkSession

from extractor import Extractor

class CSVExtractor(Extractor):
    """
    This is a simplified example on how to implement a new extractor class.
    Please take your time to write proper docstrings as they are automatically
    parsed via Sphinx to build the HTML and PDF documentation.
    Docstrings use the style of Numpy (via the napoleon plug-in).

    This class uses the :meth:`pyspark.sql.DataFrameReader.csv` method internally.

    Examples
    --------
    extracted_df = CSVExtractor(
        input_file='data/input_data.csv'
    ).extract()

    Parameters
    ----------
    input_file: :any:`str`
        The explicit file path for the input data set. Globbing support depends
        on implementation of Spark's csv reader!

    Raises
    ------
    :any:`exceptions.TypeError`:
        path can be only string, list or RDD
    """

    def __init__(self, input_file):
        super(CSVExtractor, self).__init__()
        self.input_file = input_file
        self.spark = SparkSession.Builder()\
            .enableHiveSupport()\
            .appName('spooq.extractor: {nm}'.format(nm=self.name))\
            .getOrCreate()

    def extract(self):
        self.logger.info('Loading Raw CSV Files from: ' + self.input_file)
        output_df = self.spark.read.load(
            input_file,
            format="csv",
            sep=";",
            inferSchema="true",
            header="true"
        )

        return output_df

References to include

spooq/extractor/__init__.py:
--- original 
+++ adapted 
@@ -1,8 +1,10 @@
 from jdbc import JDBCExtractorIncremental, JDBCExtractorFullLoad
 from json_files import JSONExtractor
+from csv_extractor import CSVExtractor
 
 __all__ = [
     "JDBCExtractorIncremental",
     "JDBCExtractorFullLoad",
     "JSONExtractor",
+    "CSVExtractor",
 ]

Tests

One of Spooq’s features is to provide tested code for multiple data pipelines. Please take your time to write sufficient unit tests! You can reuse test data from tests/data or create a new schema / data set if needed. A SparkSession is provided as a global fixture called spark_session.

tests/unit/extractor/test_csv.py:
import pytest

from spooq.extractor import CSVExtractor

@pytest.fixture()
def default_extractor():
    return CSVExtractor(input_path="data/input_data.csv")


class TestBasicAttributes(object):

    def test_logger_should_be_accessible(self, default_extractor):
        assert hasattr(default_extractor, "logger")

    def test_name_is_set(self, default_extractor):
        assert default_extractor.name == "CSVExtractor"

    def test_str_representation_is_correct(self, default_extractor):
        assert unicode(default_extractor) == "Extractor Object of Class CSVExtractor"

class TestCSVExtraction(object):

    def test_count(default_extractor):
        """Converted DataFrame has the same count as the input data"""
        expected_count = 312
        actual_count = default_extractor.extract().count()
        assert expected_count == actual_count

    def test_schema(default_extractor):
        """Converted DataFrame has the expected schema"""
        do_some_stuff()
        assert expected == actual

Documentation

You need to create a rst for your extractor which needs to contain at minimum the automodule or the autoclass directive.

docs/source/extractor/csv.rst:
CSV Extractor
=============

Some text if you like...

.. automodule:: spooq.extractor.csv_extractor

To automatically include your new extractor in the HTML documentation you need to add it to a toctree directive. Just refer to your newly created csv.rst file within the extractor overview page.

docs/source/extractor/overview.rst:
--- original 
+++ adapted 
@@ -7,8 +7,9 @@
 .. toctree::
 
     json
     jdbc
+    csv
 
 Class Diagram of Extractor Subpackage
 ------------------------------------------------
 .. uml:: ../diagrams/from_thesis/class_diagram/extractors.puml

That should be all!