JDBC Source¶
-
class
JDBCExtractorFullLoad
(query, jdbc_options, cache=True)[source]¶ Bases:
spooq2.extractor.jdbc.JDBCExtractor
Connects to a JDBC Source and fetches the data defined by the provided Query.
Examples
>>> import spooq2.extractor as E >>> >>> extractor = E.JDBCExtractorFullLoad( >>> query="select id, first_name, last_name, gender, created_at test_db.from users", >>> jdbc_options={ >>> "url": "jdbc:postgresql://localhost/test_db", >>> "driver": "org.postgresql.Driver", >>> "user": "read_only", >>> "password": "test123", >>> }, >>> ) >>> >>> extracted_df = extractor.extract() >>> type(extracted_df) pyspark.sql.dataframe.DataFrame
Parameters: - query (
str
) – Defines the actual query sent to the JDBC Source. This has to be a valid SQL query with respect to the source system (e.g., T-SQL for Microsoft SQL Server). - jdbc_options (
dict
, optional) –- A set of parameters to configure the connection to the source:
- url (
str
) - A JDBC URL of the form jdbc:subprotocol:subname. e.g., jdbc:postgresql://localhost:5432/dbname - driver (
str
) - The class name of the JDBC driver to use to connect to this URL. - user (
str
) - Username to authenticate with the source database. - password (
str
) - Password to authenticate with the source database.
- url (
See
pyspark.sql.DataFrameReader.jdbc()
and https://spark.apache.org/docs/2.4.3/sql-data-sources-jdbc.html for more information. - cache (
bool
, defaults toTrue
) – Defines, weather tocache()
the dataframe, after it is loaded. Otherwise the Extractor will reload all data from the source system eachtime an action is performed on the DataFrame.
Raises: exceptions.AssertionError
: – All jdbc_options values need to be present as string variables.-
extract
()[source]¶ This is the Public API Method to be called for all classes of Extractors
Returns: PySpark dataframe from the input JDBC connection. Return type: pyspark.sql.DataFrame
- query (
-
class
JDBCExtractorIncremental
(partition, jdbc_options, source_table, spooq2_values_table, spooq2_values_db='spooq2_values', spooq2_values_partition_column='updated_at', cache=True)[source]¶ Bases:
spooq2.extractor.jdbc.JDBCExtractor
Connects to a JDBC Source and fetches the data with respect to boundaries. The boundaries are inferred from the partition to load and logs from previous loads stored in the
spooq2_values_table
.Examples
>>> import spooq2.extractor as E >>> >>> # Boundaries derived from previously logged extractions => ("2020-01-31 03:29:59", False) >>> >>> extractor = E.JDBCExtractorIncremental( >>> partition="20200201", >>> jdbc_options={ >>> "url": "jdbc:postgresql://localhost/test_db", >>> "driver": "org.postgresql.Driver", >>> "user": "read_only", >>> "password": "test123", >>> }, >>> source_table="users", >>> spooq2_values_table="spooq2_jdbc_log_users", >>> ) >>> >>> extractor._construct_query_for_partition(extractor.partition) select * from users where updated_at > "2020-01-31 03:29:59" >>> >>> extracted_df = extractor.extract() >>> type(extracted_df) pyspark.sql.dataframe.DataFrame
Parameters: - partition (
int
orstr
) – Partition to extract. Needed for logging the incremental load in thespooq2_values_table
. - jdbc_options (
dict
, optional) –- A set of parameters to configure the connection to the source:
- url (
str
) - A JDBC URL of the form jdbc:subprotocol:subname. e.g., jdbc:postgresql://localhost:5432/dbname - driver (
str
) - The class name of the JDBC driver to use to connect to this URL. - user (
str
) - Username to authenticate with the source database. - password (
str
) - Password to authenticate with the source database.
- url (
See
pyspark.sql.DataFrameReader.jdbc()
and https://spark.apache.org/docs/2.4.3/sql-data-sources-jdbc.html for more information. - source_table (
str
) – Defines the tablename of the source to be loaded from. For example ‘purchases’. This is necessary to build the query. - spooq2_values_table (
str
) – Defines the Hive table where previous and future loads of a specific source table are logged. This is necessary to derive boundaries for the current partition. - spooq2_values_db (
str
, optional) – Defines the Database where thespooq2_values_table
is stored. Defaults to ‘spooq2_values’. - spooq2_values_partition_column (
str
, optional) – The column name which is used for the boundaries. Defaults to ‘updated_at’. - cache (
bool
, defaults toTrue
) – Defines, weather tocache()
the dataframe, after it is loaded. Otherwise the Extractor will reload all data from the source system again, if a second action upon the dataframe is performed.
Raises: exceptions.AssertionError
: – All jdbc_options values need to be present as string variables.-
extract
()[source]¶ Extracts Data from a Source and converts it into a PySpark DataFrame.
Returns: Return type: pyspark.sql.DataFrame
Note
This method does not take ANY input parameters. All needed parameters are defined in the initialization of the Extractor Object.
- partition (