Annotator (Load and Update Column Comments)

class AnnotatorMode(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Possible values: [‘insert’, ‘upsert’]

class MissingColumnHandling(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Possible values: [‘raise_error’, ‘skip’]

exception ColumnNotFound[source]
load_comments_from_metastore_table(sql_source_table_identifier: str) Dict[str, str][source]

Extracts comments from metadata stored in table (loaded by path or metastore).

Parameters

sql_source_table_identifier (str) –

This is the fully qualified table name used for the DESCRIBE <sql_source_table_identifier> SQL command. Some examples:

  • delta.`/path/to/table/files.delta`

  • my_db.my_table

  • my_catalog.my_db.my_table

Returns

Dictionary containing all extracted, non-null comments per column. ({<column_name>: <comment>})

Return type

dict

update_comment(df: DataFrame, column_name: str, comment: str, annotator_mode: AnnotatorMode = AnnotatorMode.upsert, missing_column_handling: MissingColumnHandling = MissingColumnHandling.raise_error, logger: Logger = None) DataFrame[source]

Updates a single column’s comment within the provided dataframe.

Parameters
  • df (DataFrame) – The dataframe that contains the column to be updated.

  • column_name (str) – The name of the column to be commented.

  • comment (str) – The comment to apply to the referenced column.

  • annotator_mode (AnnotatorMode, defaults to AnnotatorMode.upsert) – Defines if existing columns get overwritten.

  • missing_column_handling (MissingColumnHandling, defaults to MissingColumnHandling.raise_error) – Defines how to behave when the column_name is not found in the dataframe.

Raises

ColumnNotFound – If column is missing from the dataframe and missing_column_handling is set to raise_error

Returns

Dataframe with updated metadata (comment) for the specified column.

Return type

DataFrame

class Annotator(comments_mapping: Dict[str, str] = None, mode: AnnotatorMode = AnnotatorMode.upsert, missing_column_handling: MissingColumnHandling = MissingColumnHandling.raise_error, sql_source_table_identifier: str = None)[source]

Inserts or upserts column comments to dataframes. Can also be used just to fetch column comments from an existing table.

Parameters
  • comments_mapping (dict, Defaults to {}) – Dictionary consisting of column names and comments

  • mode (AnnotatorMode, Defaults to AnnotatorMode.upsert) – This mode defines how the transformer should react to existing column comments. insert will leave existing untouched while upsert overwrites them if a new comment is provided. Existing columns are defined as comments already defined in the dataframe or the sql_source_table!

  • missing_column_handling (MissingColumnHandling, Defaults to MissingColumnHandling.raise_error) – This mode defines how the transformer should react to missing columns that are referenced in the comments_mapping.

  • sql_source_table_identifier (str, Defaults to None) – The transformer will load any existing comments from the defined source table to the comments_mapping

Example

>>> '''Fetching comments from an existing source table and applying those plus explicitely defined comments'''
>>> # Schema of sql_source_table (/tmp/path/to/my/silver_table.delta):
>>> #     col_A string COMMENT "Comment from sql_source_table",
>>> #     col_B string COMMENT "Comment from sql_source_table",
>>> #     col_Z string COMMENT "Comment from sql_source_table",
>>>
>>> # Schema of input dataframe
>>> #     col_A string,
>>> #     col_B string,
>>> #     col_Y string,
>>>
>>> from spooq.transformer import Annotator
>>> from spooq.transformer.annotator import AnnotatorMode, MissingColumnHandling
>>>
>>> spark.createDataFrame(
>>>     [],
>>>     schema='''
>>>         col_A string COMMENT "Comment from sql_source_table",
>>>         col_B string COMMENT "Comment from sql_source_table",
>>>         col_Z string COMMENT "Comment from sql_source_table"
>>>     '''
>>> ).write.format("delta").mode("overwrite").options(mergeSchema=True).save("/tmp/path/to/my/silver_table.delta")
>>> input_df = spark.createDataFrame([], "col_A string, col_B string, col_Y string")
>>> comments_mapping = {
>>>     "col_A": "Updated comment from comments_mapping",
>>>     "col_Y": "New comment from comments_mapping",
>>> }
>>>
>>> output_df = Annotator(
>>>     comments_mapping=comments_mapping,
>>>     mode=AnnotatorMode.upsert,
>>>     missing_column_handling=MissingColumnHandling.raise_error,
>>>     sql_source_table_identifier="delta.`/tmp/path/to/my/silver_table.delta`",
>>> ).transform(input_df)
>>>
>>> print(json.dumps({col["name"]: col["metadata"]["comment"] for col in output_df.schema.J["fields"]}, indent=2))
{
"col_A": "Updated comment from comments_mapping",
"col_B": "Comment from sql_source_table",
"col_Y": "New comment from comments_mapping"
}

Note

Here are some use cases to use this transformer:
  • Add explicitely defined comments from comments_mapping to dataframe within silver pipeline

  • Apply comments from existing silver table to dataframe within gold pipeline

  • Apply comments from columns_mapping within the Mapper transformer

  • Apply comments from existing silver table to gold dataframe within the Mapper transformer

  • Combine any of them

Raises

ColumnNotFound – If column is missing from the dataframe and missing_column_handling is set to raise_error:

transform(input_df: DataFrame) DataFrame[source]

Performs a transformation on a DataFrame.

Parameters

input_df (DataFrame) – Input DataFrame

Returns

Transformed DataFrame.

Return type

DataFrame

Note

This method does only take the Input DataFrame as a parameters. Any other needed parameters are defined in the initialization of the Transformator Object.