Hive Database

class HiveLoader(db_name, table_name, partition_definitions=[{'column_name': 'dt', 'column_type': 'IntegerType', 'default_value': None}], clear_partition=True, repartition_size=40, auto_create_table=True, overwrite_partition_value=True)[source]

Bases: spooq2.loader.loader.Loader

Persists a PySpark DataFrame into a Hive Table.


>>> HiveLoader(
>>>     db_name="users_and_friends",
>>>     table_name="friends_partitioned",
>>>     partition_definitions=[{
>>>         "column_name": "dt",
>>>         "column_type": "IntegerType",
>>>         "default_value": 20200201}],
>>>     clear_partition=True,
>>>     repartition_size=10,
>>>     overwrite_partition_value=False,
>>>     auto_create_table=False,
>>> ).load(input_df)
>>> HiveLoader(
>>>     db_name="users_and_friends",
>>>     table_name="all_friends",
>>>     partition_definitions=[],
>>>     repartition_size=200,
>>>     auto_create_table=True,
>>> ).load(input_df)
  • db_name (str) – The database name to load the data into.
  • table_name (str) – The table name to load the data into. The database name must not be included in this parameter as it is already defined in the db_name parameter.
  • partition_definitions (list of dict) –

    (Defaults to [{“column_name”: “dt”, “column_type”: “IntegerType”, “default_value”: None}]).

    • column_name (str) - The Column’s Name to partition by.
    • column_type (str) - The PySpark SQL DataType for the Partition Value as a String. This should normally either be ‘IntegerType()’ or ‘StringType()’
    • default_value (str or int) - If column_name does not contain a value or overwrite_partition_value is set, this value will be used for the partitioning
  • clear_partition (bool, (Defaults to True)) – This flag tells the Loader to delete the defined partitions before inserting the input DataFrame into the target table. Has no effect if no partitions are defined.
  • repartition_size (int, (Defaults to 40)) – The DataFrame will be repartitioned on Spark level before inserting into the table. This effects the number of output files on which the Hive table is based.
  • auto_create_table (bool, (Defaults to True)) – Whether the target table will be created if it does not yet exist.
  • overwrite_partition_value (bool, (Defaults to True)) – Defines whether the values of columns defined in partition_definitions should explicitly set by default_values.
  • exceptions.AssertionError: – partition_definitions has to be a list containing dicts. Expected dict content: ‘column_name’, ‘column_type’, ‘default_value’ per partition_definitions item.
  • exceptions.AssertionError: – Items of partition_definitions have to be dictionaries.
  • exceptions.AssertionError: – No column name set!
  • exceptions.AssertionError: – Not a valid (PySpark) datatype for the partition column {name} | {type}.
  • exceptions.AssertionError: – clear_partition is only supported if overwrite_partition_value is also enabled. This would otherwise result in clearing partitions on basis of dynamically values (from DataFrame) instead of explicitly defining the partition(s) to clear.

Persists data from a PySpark DataFrame to a target table.

Parameters:input_df (pyspark.sql.DataFrame) – Input DataFrame which has to be loaded to a target destination.


This method takes only a single DataFrame as an input parameter. All other needed parameters are defined in the initialization of the Loader object.

Activity Diagram


skinparam monochrome true
skinparam defaultFontname Bitstream Vera Sans Mono
skinparam defaultTextAlignment center

  :repartition DataFrame to **""repartition_size""**;
  while (for **""partition_definition""** in **""partition_definitions""**)
    if (**""column_name""** not in DataFrame //or// **""overwrite_partition_value""**) then (true)
      :add/overwrite **""column_name""** with **""default_value""**;
      :cast **""column_name""** as **""column_type""**;
    else (false)
  if (**""full_table_name""** already exists) then (true)
    :assert DataFrame schema equals
    **""full_table_name""** schema;
    if (**""clear_partition""**) then (true)
      :drop partition in **""full_table_name""**;
    else (false)
  else (false)
    if (**""auto_create_table""**) then (true)
    :partition DataFrame
    by **""partition_definitions""**;
    :create **""full_table_name""**;
    else (false)
  :insert into **""full_table_name""**;