JDBC Source

class JDBCExtractor(jdbc_options, cache=True)[source]
class JDBCExtractorFullLoad(query, jdbc_options, cache=True)[source]

Connects to a JDBC Source and fetches the data defined by the provided Query.

Examples

>>> import spooq.extractor as E
>>>
>>> extractor = E.JDBCExtractorFullLoad(
>>>     query="select id, first_name, last_name, gender, created_at test_db.from users",
>>>     jdbc_options={
>>>         "url": "jdbc:postgresql://localhost/test_db",
>>>         "driver": "org.postgresql.Driver",
>>>         "user": "read_only",
>>>         "password": "test123",
>>>     },
>>> )
>>>
>>> extracted_df = extractor.extract()
>>> type(extracted_df)
pyspark.sql.dataframe.DataFrame
Parameters
Raises

exceptions.AssertionError – All jdbc_options values need to be present as string variables.

extract()[source]

This is the Public API Method to be called for all classes of Extractors

Returns

PySpark dataframe from the input JDBC connection.

Return type

DataFrame

class JDBCExtractorIncremental(partition, jdbc_options, source_table, spooq_values_table, spooq_values_db='spooq_values', spooq_values_partition_column='updated_at', cache=True)[source]

Connects to a JDBC Source and fetches the data with respect to boundaries. The boundaries are inferred from the partition to load and logs from previous loads stored in the spooq_values_table.

Examples

>>> import spooq.extractor as E
>>>
>>> # Boundaries derived from previously logged extractions => ("2020-01-31 03:29:59", False)
>>>
>>> extractor = E.JDBCExtractorIncremental(
>>>     partition="20200201",
>>>     jdbc_options={
>>>         "url": "jdbc:postgresql://localhost/test_db",
>>>         "driver": "org.postgresql.Driver",
>>>         "user": "read_only",
>>>         "password": "test123",
>>>     },
>>>     source_table="users",
>>>     spooq_values_table="spooq_jdbc_log_users",
>>> )
>>>
>>> extractor._construct_query_for_partition(extractor.partition)
select * from users where updated_at > "2020-01-31 03:29:59"
>>>
>>> extracted_df = extractor.extract()
>>> type(extracted_df)
pyspark.sql.dataframe.DataFrame
Parameters
  • partition (int or str) – Partition to extract. Needed for logging the incremental load in the spooq_values_table.

  • jdbc_options (dict, optional) –

    A set of parameters to configure the connection to the source:

    See pyspark.sql.DataFrameReader.jdbc() and https://spark.apache.org/docs/2.4.3/sql-data-sources-jdbc.html for more information.

  • source_table (str) – Defines the tablename of the source to be loaded from. For example ‘purchases’. This is necessary to build the query.

  • spooq_values_table (str) – Defines the Hive table where previous and future loads of a specific source table are logged. This is necessary to derive boundaries for the current partition.

  • spooq_values_db (str, optional) – Defines the Database where the spooq_values_table is stored. Defaults to ‘spooq_values’.

  • spooq_values_partition_column (str, optional) – The column name which is used for the boundaries. Defaults to ‘updated_at’.

  • cache (bool, defaults to True) – Defines, weather to cache() the dataframe, after it is loaded. Otherwise the Extractor will reload all data from the source system again, if a second action upon the dataframe is performed.

Raises

exceptions.AssertionError – All jdbc_options values need to be present as string variables.

extract()[source]

Extracts Data from a Source and converts it into a PySpark DataFrame.

Return type

DataFrame

Note

This method does not take ANY input parameters. All needed parameters are defined in the initialization of the Extractor Object.