Hive Database

class HiveLoader(db_name, table_name, partition_definitions=[{'column_name': 'dt', 'column_type': 'IntegerType', 'default_value': None}], clear_partition=True, repartition_size=40, auto_create_table=True, overwrite_partition_value=True)[source]

Bases: spooq2.loader.loader.Loader

Persists a PySpark DataFrame into a Hive Table.

Examples

>>> HiveLoader(
>>>     db_name="users_and_friends",
>>>     table_name="friends_partitioned",
>>>     partition_definitions=[{
>>>         "column_name": "dt",
>>>         "column_type": "IntegerType",
>>>         "default_value": 20200201}],
>>>     clear_partition=True,
>>>     repartition_size=10,
>>>     overwrite_partition_value=False,
>>>     auto_create_table=False,
>>> ).load(input_df)
>>> HiveLoader(
>>>     db_name="users_and_friends",
>>>     table_name="all_friends",
>>>     partition_definitions=[],
>>>     repartition_size=200,
>>>     auto_create_table=True,
>>> ).load(input_df)
Parameters:
  • db_name (str) – The database name to load the data into.
  • table_name (str) – The table name to load the data into. The database name must not be included in this parameter as it is already defined in the db_name parameter.
  • partition_definitions (list of dict) –

    (Defaults to [{“column_name”: “dt”, “column_type”: “IntegerType”, “default_value”: None}]).

    • column_name (str) - The Column’s Name to partition by.
    • column_type (str) - The PySpark SQL DataType for the Partition Value as a String. This should normally either be ‘IntegerType()’ or ‘StringType()’
    • default_value (str or int) - If column_name does not contain a value or overwrite_partition_value is set, this value will be used for the partitioning
  • clear_partition (bool, (Defaults to True)) – This flag tells the Loader to delete the defined partitions before inserting the input DataFrame into the target table. Has no effect if no partitions are defined.
  • repartition_size (int, (Defaults to 40)) – The DataFrame will be repartitioned on Spark level before inserting into the table. This effects the number of output files on which the Hive table is based.
  • auto_create_table (bool, (Defaults to True)) – Whether the target table will be created if it does not yet exist.
  • overwrite_partition_value (bool, (Defaults to True)) – Defines whether the values of columns defined in partition_definitions should explicitly set by default_values.
Raises:
  • exceptions.AssertionError: – partition_definitions has to be a list containing dicts. Expected dict content: ‘column_name’, ‘column_type’, ‘default_value’ per partition_definitions item.
  • exceptions.AssertionError: – Items of partition_definitions have to be dictionaries.
  • exceptions.AssertionError: – No column name set!
  • exceptions.AssertionError: – Not a valid (PySpark) datatype for the partition column {name} | {type}.
  • exceptions.AssertionError: – clear_partition is only supported if overwrite_partition_value is also enabled. This would otherwise result in clearing partitions on basis of dynamically values (from DataFrame) instead of explicitly defining the partition(s) to clear.
load(input_df)[source]

Persists data from a PySpark DataFrame to a target table.

Parameters:input_df (pyspark.sql.DataFrame) – Input DataFrame which has to be loaded to a target destination.

Note

This method takes only a single DataFrame as an input parameter. All other needed parameters are defined in the initialization of the Loader object.

Activity Diagram

@startuml

skinparam monochrome true
skinparam defaultFontname Bitstream Vera Sans Mono
skinparam defaultTextAlignment center

start
  :repartition DataFrame to **""repartition_size""**;
  while (for **""partition_definition""** in **""partition_definitions""**)
    if (**""column_name""** not in DataFrame //or// **""overwrite_partition_value""**) then (true)
      :add/overwrite **""column_name""** with **""default_value""**;
      :cast **""column_name""** as **""column_type""**;
    else (false)
    endif
  endwhile
  if (**""full_table_name""** already exists) then (true)
    :assert DataFrame schema equals
    **""full_table_name""** schema;
    if (**""clear_partition""**) then (true)
      :drop partition in **""full_table_name""**;
    else (false)
    endif
  else (false)
    if (**""auto_create_table""**) then (true)
    :partition DataFrame
    by **""partition_definitions""**;
    :create **""full_table_name""**;
    else (false)
      end
    endif
  endif
  :insert into **""full_table_name""**;

stop
@enduml