JDBC Source
- class JDBCExtractorFullLoad(query, jdbc_options, cache=True)[source]
Connects to a JDBC Source and fetches the data defined by the provided Query.
Examples
>>> import spooq.extractor as E >>> >>> extractor = E.JDBCExtractorFullLoad( >>> query="select id, first_name, last_name, gender, created_at test_db.from users", >>> jdbc_options={ >>> "url": "jdbc:postgresql://localhost/test_db", >>> "driver": "org.postgresql.Driver", >>> "user": "read_only", >>> "password": "test123", >>> }, >>> ) >>> >>> extracted_df = extractor.extract() >>> type(extracted_df) pyspark.sql.dataframe.DataFrame
- Parameters
query (
str
) – Defines the actual query sent to the JDBC Source. This has to be a valid SQL query with respect to the source system (e.g., T-SQL for Microsoft SQL Server).jdbc_options (
dict
, optional) –- A set of parameters to configure the connection to the source:
url (
str
) - A JDBC URL of the form jdbc:subprotocol:subname. e.g., jdbc:postgresql://localhost:5432/dbnamedriver (
str
) - The class name of the JDBC driver to use to connect to this URL.user (
str
) - Username to authenticate with the source database.password (
str
) - Password to authenticate with the source database.
See
pyspark.sql.DataFrameReader.jdbc()
and https://spark.apache.org/docs/2.4.3/sql-data-sources-jdbc.html for more information.cache (
bool
, defaults toTrue
) – Defines, weather tocache()
the dataframe, after it is loaded. Otherwise the Extractor will reload all data from the source system eachtime an action is performed on the DataFrame.
- Raises
exceptions.AssertionError – All jdbc_options values need to be present as string variables.
- class JDBCExtractorIncremental(partition, jdbc_options, source_table, spooq_values_table, spooq_values_db='spooq_values', spooq_values_partition_column='updated_at', cache=True)[source]
Connects to a JDBC Source and fetches the data with respect to boundaries. The boundaries are inferred from the partition to load and logs from previous loads stored in the
spooq_values_table
.Examples
>>> import spooq.extractor as E >>> >>> # Boundaries derived from previously logged extractions => ("2020-01-31 03:29:59", False) >>> >>> extractor = E.JDBCExtractorIncremental( >>> partition="20200201", >>> jdbc_options={ >>> "url": "jdbc:postgresql://localhost/test_db", >>> "driver": "org.postgresql.Driver", >>> "user": "read_only", >>> "password": "test123", >>> }, >>> source_table="users", >>> spooq_values_table="spooq_jdbc_log_users", >>> ) >>> >>> extractor._construct_query_for_partition(extractor.partition) select * from users where updated_at > "2020-01-31 03:29:59" >>> >>> extracted_df = extractor.extract() >>> type(extracted_df) pyspark.sql.dataframe.DataFrame
- Parameters
partition (
int
orstr
) – Partition to extract. Needed for logging the incremental load in thespooq_values_table
.jdbc_options (
dict
, optional) –- A set of parameters to configure the connection to the source:
url (
str
) - A JDBC URL of the form jdbc:subprotocol:subname. e.g., jdbc:postgresql://localhost:5432/dbnamedriver (
str
) - The class name of the JDBC driver to use to connect to this URL.user (
str
) - Username to authenticate with the source database.password (
str
) - Password to authenticate with the source database.
See
pyspark.sql.DataFrameReader.jdbc()
and https://spark.apache.org/docs/2.4.3/sql-data-sources-jdbc.html for more information.source_table (
str
) – Defines the tablename of the source to be loaded from. For example ‘purchases’. This is necessary to build the query.spooq_values_table (
str
) – Defines the Hive table where previous and future loads of a specific source table are logged. This is necessary to derive boundaries for the current partition.spooq_values_db (
str
, optional) – Defines the Database where thespooq_values_table
is stored. Defaults to ‘spooq_values’.spooq_values_partition_column (
str
, optional) – The column name which is used for the boundaries. Defaults to ‘updated_at’.cache (
bool
, defaults toTrue
) – Defines, weather tocache()
the dataframe, after it is loaded. Otherwise the Extractor will reload all data from the source system again, if a second action upon the dataframe is performed.
- Raises
exceptions.AssertionError – All jdbc_options values need to be present as string variables.