Annotator (Load and Update Column Comments)
- class AnnotatorMode(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]
Possible values: [‘insert’, ‘upsert’]
- class MissingColumnHandling(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]
Possible values: [‘raise_error’, ‘skip’]
- load_comments_from_metastore_table(sql_source_table_identifier: str) Dict[str, str][source]
Extracts comments from metadata stored in table (loaded by path or metastore).
- Parameters
sql_source_table_identifier (
str) –This is the fully qualified table name used for the
DESCRIBE <sql_source_table_identifier>SQL command. Some examples:delta.`/path/to/table/files.delta`
my_db.my_table
my_catalog.my_db.my_table
- Returns
Dictionary containing all extracted, non-null comments per column. ({<column_name>: <comment>})
- Return type
- update_comment(df: DataFrame, column_name: str, comment: str, annotator_mode: AnnotatorMode = AnnotatorMode.upsert, missing_column_handling: MissingColumnHandling = MissingColumnHandling.raise_error, logger: Logger = None) DataFrame[source]
Updates a single column’s comment within the provided dataframe.
- Parameters
df (
DataFrame) – The dataframe that contains the column to be updated.column_name (
str) – The name of the column to be commented.comment (
str) – The comment to apply to the referenced column.annotator_mode (
AnnotatorMode, defaults to AnnotatorMode.upsert) – Defines if existing columns get overwritten.missing_column_handling (
MissingColumnHandling, defaults toMissingColumnHandling.raise_error) – Defines how to behave when thecolumn_nameis not found in the dataframe.
- Raises
ColumnNotFound – If column is missing from the dataframe and missing_column_handling is set to
raise_error- Returns
Dataframe with updated metadata (comment) for the specified column.
- Return type
DataFrame
- class Annotator(comments_mapping: Dict[str, str] = None, mode: AnnotatorMode = AnnotatorMode.upsert, missing_column_handling: MissingColumnHandling = MissingColumnHandling.raise_error, sql_source_table_identifier: str = None)[source]
Inserts or upserts column comments to dataframes. Can also be used just to fetch column comments from an existing table.
- Parameters
comments_mapping (
dict, Defaults to {}) – Dictionary consisting of column names and commentsmode (
AnnotatorMode, Defaults toAnnotatorMode.upsert) – This mode defines how the transformer should react to existing column comments.insertwill leave existing untouched whileupsertoverwrites them if a new comment is provided. Existing columns are defined as comments already defined in the dataframe or the sql_source_table!missing_column_handling (
MissingColumnHandling, Defaults toMissingColumnHandling.raise_error) – This mode defines how the transformer should react to missing columns that are referenced in thecomments_mapping.sql_source_table_identifier (
str, Defaults to None) – The transformer will load any existing comments from the defined source table to thecomments_mapping
Example
>>> '''Fetching comments from an existing source table and applying those plus explicitely defined comments''' >>> # Schema of sql_source_table (/tmp/path/to/my/silver_table.delta): >>> # col_A string COMMENT "Comment from sql_source_table", >>> # col_B string COMMENT "Comment from sql_source_table", >>> # col_Z string COMMENT "Comment from sql_source_table", >>> >>> # Schema of input dataframe >>> # col_A string, >>> # col_B string, >>> # col_Y string, >>> >>> from spooq.transformer import Annotator >>> from spooq.transformer.annotator import AnnotatorMode, MissingColumnHandling >>> >>> spark.createDataFrame( >>> [], >>> schema=''' >>> col_A string COMMENT "Comment from sql_source_table", >>> col_B string COMMENT "Comment from sql_source_table", >>> col_Z string COMMENT "Comment from sql_source_table" >>> ''' >>> ).write.format("delta").mode("overwrite").options(mergeSchema=True).save("/tmp/path/to/my/silver_table.delta") >>> input_df = spark.createDataFrame([], "col_A string, col_B string, col_Y string") >>> comments_mapping = { >>> "col_A": "Updated comment from comments_mapping", >>> "col_Y": "New comment from comments_mapping", >>> } >>> >>> output_df = Annotator( >>> comments_mapping=comments_mapping, >>> mode=AnnotatorMode.upsert, >>> missing_column_handling=MissingColumnHandling.raise_error, >>> sql_source_table_identifier="delta.`/tmp/path/to/my/silver_table.delta`", >>> ).transform(input_df) >>> >>> print(json.dumps({col["name"]: col["metadata"]["comment"] for col in output_df.schema.J["fields"]}, indent=2)) { "col_A": "Updated comment from comments_mapping", "col_B": "Comment from sql_source_table", "col_Y": "New comment from comments_mapping" }
Note
- Here are some use cases to use this transformer:
Add explicitely defined comments from
comments_mappingto dataframe within silver pipelineApply comments from existing silver table to dataframe within gold pipeline
Apply comments from
columns_mappingwithin theMappertransformerApply comments from existing silver table to gold dataframe within the
MappertransformerCombine any of them
- Raises
ColumnNotFound – If column is missing from the dataframe and missing_column_handling is set to
raise_error:
- transform(input_df: DataFrame) DataFrame[source]
Performs a transformation on a DataFrame.
- Parameters
input_df (
DataFrame) – Input DataFrame- Returns
Transformed DataFrame.
- Return type
DataFrame
Note
This method does only take the Input DataFrame as a parameters. Any other needed parameters are defined in the initialization of the Transformator Object.