Hive Database¶
-
class
HiveLoader
(db_name, table_name, partition_definitions=[{'column_name': 'dt', 'column_type': 'IntegerType', 'default_value': None}], clear_partition=True, repartition_size=40, auto_create_table=True, overwrite_partition_value=True)[source]¶ Bases:
spooq2.loader.loader.Loader
Persists a PySpark DataFrame into a Hive Table.
Examples
>>> HiveLoader( >>> db_name="users_and_friends", >>> table_name="friends_partitioned", >>> partition_definitions=[{ >>> "column_name": "dt", >>> "column_type": "IntegerType", >>> "default_value": 20200201}], >>> clear_partition=True, >>> repartition_size=10, >>> overwrite_partition_value=False, >>> auto_create_table=False, >>> ).load(input_df)
>>> HiveLoader( >>> db_name="users_and_friends", >>> table_name="all_friends", >>> partition_definitions=[], >>> repartition_size=200, >>> auto_create_table=True, >>> ).load(input_df)
Parameters: - db_name (
str
) – The database name to load the data into. - table_name (
str
) – The table name to load the data into. The database name must not be included in this parameter as it is already defined in the db_name parameter. - partition_definitions (
list
ofdict
) –(Defaults to [{“column_name”: “dt”, “column_type”: “IntegerType”, “default_value”: None}]).
- column_name (
str
) - The Column’s Name to partition by. - column_type (
str
) - The PySpark SQL DataType for the Partition Value as a String. This should normally either be ‘IntegerType()’ or ‘StringType()’ - default_value (
str
orint
) - If column_name does not contain a value or overwrite_partition_value is set, this value will be used for the partitioning
- column_name (
- clear_partition (
bool
, (Defaults to True)) – This flag tells the Loader to delete the defined partitions before inserting the input DataFrame into the target table. Has no effect if no partitions are defined. - repartition_size (
int
, (Defaults to 40)) – The DataFrame will be repartitioned on Spark level before inserting into the table. This effects the number of output files on which the Hive table is based. - auto_create_table (
bool
, (Defaults to True)) – Whether the target table will be created if it does not yet exist. - overwrite_partition_value (
bool
, (Defaults to True)) – Defines whether the values of columns defined in partition_definitions should explicitly set by default_values.
Raises: exceptions.AssertionError
: – partition_definitions has to be a list containing dicts. Expected dict content: ‘column_name’, ‘column_type’, ‘default_value’ per partition_definitions item.exceptions.AssertionError
: – Items of partition_definitions have to be dictionaries.exceptions.AssertionError
: – No column name set!exceptions.AssertionError
: – Not a valid (PySpark) datatype for the partition column {name} | {type}.exceptions.AssertionError
: – clear_partition is only supported if overwrite_partition_value is also enabled. This would otherwise result in clearing partitions on basis of dynamically values (from DataFrame) instead of explicitly defining the partition(s) to clear.
-
load
(input_df)[source]¶ Persists data from a PySpark DataFrame to a target table.
Parameters: input_df ( pyspark.sql.DataFrame
) – Input DataFrame which has to be loaded to a target destination.Note
This method takes only a single DataFrame as an input parameter. All other needed parameters are defined in the initialization of the Loader object.
- db_name (
Activity Diagram¶