Source code for dasl_client.types.datasource

from pydantic import BaseModel
from typing import Dict, List, Optional

from dasl_api import (
    ContentV1DatasourcePresetAutoloaderCloudFiles,
    CoreV1DataSource,
    CoreV1DataSourceAutoloaderSpec,
    CoreV1DataSourceSpec,
    CoreV1DataSourceSpecCustom,
    CoreV1DataSourceSpecGold,
    CoreV1DataSourceSpecGoldPresetOverrides,
    CoreV1DataSourceSpecGoldPresetOverridesAddTablesInner,
    CoreV1DataSourceSpecGoldPresetOverridesAddTablesInnerCustom,
    CoreV1DataSourceSpecGoldPresetOverridesModifyTablesInner,
    CoreV1DataSourceSpecSilverTransformPresetOverridesModifyTablesInnerCustom,
    CoreV1DataSourceSpecSilver,
    CoreV1DataSourceSpecSilverBronzeTablesInner,
    CoreV1DataSourceSpecSilverBronzeTablesInnerWatermark,
    CoreV1DataSourceSpecSilverPreTransform,
    CoreV1DataSourceSpecSilverPreTransformCustom,
    CoreV1DataSourceSpecSilverPreTransformPresetOverrides,
    CoreV1DataSourceSpecSilverTransformPresetOverridesModifyTablesInner,
    CoreV1DataSourceSpecSilverTransformPresetOverridesAddTablesInner,
    CoreV1DataSourceSpecSilverTransform,
    CoreV1DataSourceSpecSilverTransformPresetOverrides,
    CoreV1DataSourceSpecBronze,
    CoreV1DataSourceSpecBronzeClustering,
    CoreV1DataSourceFieldSpec,
    CoreV1DataSourceFieldSpecAssertInner,
    CoreV1DataSourceFieldSpecJoin,
    CoreV1DataSourceFieldSpecJoinWithCSV,
    CoreV1DataSourceFieldUtils,
    CoreV1DataSourceFieldUtilsUnreferencedColumns,
    CoreV1DataSourceFieldUtilsJsonExtractInner,
    CoreV1DataSourcePrimaryKeySpec,
)

from .helpers import Helpers
from .types import Metadata, ResourceStatus, Schedule


[docs] class FieldSpec(BaseModel): """ FieldSpec Attributes: name (Optional[str]): The name of the field. comment (Optional[str]): The comment to apply to the field. var_assert (Optional[List[FieldSpec.Assert]]): A list of SQL expressions that must evaluate to true for every processed row. If the assertion is false, an operational alert is raised using 'message' for each row. var_from (Optional[str]): This field obtains its value from the source column of this name. Use this to bring in a column from some upstream table. alias (Optional[str]): This field obtains its value from the destination (transformed) column of this name. Use this to alias a column from within the same table (ie. silver table). You cannot alias a column from some upstream table. expr (Optional[str]): This field obtains its value from the given SQL expression. literal (Optional[str]): This field obtains its value from the given literal string. For other data types, use expr. join (Optional[FieldSpec.Join]): This field obtains its value from joining to another table. """
[docs] class Assert(BaseModel): """ An assertion within a FieldSpec. Attributes: expr (Optional[str]): The SQL expression that must evaluate to true for every processed row. message (Optional[str]): The message to include in the operational alert if the assertion fails. """ expr: Optional[str] = None message: Optional[str] = None
[docs] @staticmethod def from_api_obj( obj: Optional[CoreV1DataSourceFieldSpecAssertInner], ) -> "FieldSpec.Assert": if obj is None: return None return FieldSpec.Assert( expr=obj.expr, message=obj.message, )
[docs] def to_api_obj(self) -> CoreV1DataSourceFieldSpecAssertInner: return CoreV1DataSourceFieldSpecAssertInner( expr=self.expr, message=self.message, )
[docs] class Join(BaseModel): """ A join expression within a FieldSpec. Attributes: with_table (Optional[str]): The table to join to. with_csv (Optional[FieldSpec.Join.WithCSV]): The CSV configuration used for the join. lhs (Optional[str]): The column in the source dataframe to join on. rhs (Optional[str]): The column in the joined table to join on. select (Optional[str]): A SQL expression to create the new field from the joined dataset. """
[docs] class WithCSV(BaseModel): """ A CSV file used for joins within a FieldSpec. Attributes: path (Optional[str]): The path to the CSV file. """ path: Optional[str] = None
[docs] @staticmethod def from_api_obj( obj: Optional[CoreV1DataSourceFieldSpecJoinWithCSV], ) -> "FieldSpec.Join.WithCSV": if obj is None: return None return FieldSpec.Join.WithCSV(path=obj.path)
[docs] def to_api_obj(self) -> CoreV1DataSourceFieldSpecJoinWithCSV: return CoreV1DataSourceFieldSpecJoinWithCSV(path=self.path)
with_table: Optional[str] = None with_csv: Optional["FieldSpec.Join.WithCSV"] = None lhs: Optional[str] = None rhs: Optional[str] = None select: Optional[str] = None
[docs] @staticmethod def from_api_obj( obj: Optional[CoreV1DataSourceFieldSpecJoin], ) -> "FieldSpec.Join": if obj is None: return None return FieldSpec.Join( with_table=obj.with_table, with_csv=FieldSpec.Join.WithCSV.from_api_obj(obj.with_csv), lhs=obj.lhs, rhs=obj.rhs, select=obj.select, )
[docs] def to_api_obj(self) -> CoreV1DataSourceFieldSpecJoin: return CoreV1DataSourceFieldSpecJoin( with_table=self.with_table, with_csv=Helpers.maybe(lambda o: o.to_api_obj(), self.with_csv), lhs=self.lhs, rhs=self.rhs, select=self.select, )
name: Optional[str] = None comment: Optional[str] = None var_assert: Optional[List["FieldSpec.Assert"]] = None var_from: Optional[str] = None alias: Optional[str] = None expr: Optional[str] = None literal: Optional[str] = None join: Optional["FieldSpec.Join"] = None
[docs] @staticmethod def from_api_obj(obj: Optional[CoreV1DataSourceFieldSpec]) -> "FieldSpec": if obj is None: return None var_assert = None if obj.var_assert is not None: var_assert = [ FieldSpec.Assert.from_api_obj(item) for item in obj.var_assert ] return FieldSpec( name=obj.name, comment=obj.comment, var_assert=var_assert, var_from=obj.var_from, alias=obj.alias, expr=obj.expr, literal=obj.literal, join=FieldSpec.Join.from_api_obj(obj.join), )
[docs] def to_api_obj(self) -> CoreV1DataSourceFieldSpec: var_assert = None if self.var_assert is not None: var_assert = [item.to_api_obj() for item in self.var_assert] to_api_obj = lambda o: o.to_api_obj() return CoreV1DataSourceFieldSpec( name=self.name, comment=self.comment, var_assert=var_assert, var_from=self.var_from, alias=self.alias, expr=self.expr, literal=self.literal, join=Helpers.maybe(to_api_obj, self.join), )
[docs] class FieldUtils(BaseModel): """ FieldUtils Attributes: unreferenced_columns (Optional[FieldUtils.UnreferencedColumns]): Defines whether columns not referenced in the FieldSpecs should be preserved or omitted. json_extract (Optional[List[FieldUtils.JsonExtract]]): A list of configurations for extracting JSON fields from a column. """
[docs] class UnreferencedColumns(BaseModel): """ Configuration related to unreferenced columns. Attributes: preserve (Optional[bool]): Indicates whether columns not referenced in the FieldSpecs should be preserved. embed_column (Optional[str]): Specifies a name for a new column to contain all unreferenced fields. omit_columns (Optional[List[str]]): Lists columns to exclude from the output. duplicate_prefix (Optional[str]): Adds a prefix to resolve ambiguous duplicate field names. """ preserve: Optional[bool] = None embed_column: Optional[str] = None omit_columns: Optional[List[str]] = None duplicate_prefix: Optional[str] = None
[docs] @staticmethod def from_api_obj( obj: Optional[CoreV1DataSourceFieldUtilsUnreferencedColumns], ) -> "FieldUtils.UnreferencedColumns": if obj is None: return None return FieldUtils.UnreferencedColumns( preserve=obj.preserve, embed_column=obj.embed_column, omit_columns=obj.omit_columns, duplicate_prefix=obj.duplicate_prefix, )
[docs] def to_api_obj(self) -> CoreV1DataSourceFieldUtilsUnreferencedColumns: return CoreV1DataSourceFieldUtilsUnreferencedColumns( preserve=self.preserve, embed_column=self.embed_column, omit_columns=self.omit_columns, duplicate_prefix=self.duplicate_prefix, )
[docs] class JsonExtract(BaseModel): """ Configuration for extracting JSON fields from table columns. Attributes: source (Optional[str]): The column name containing JSON string(s) to extract from. omit_fields (Optional[List[str]]): Specifies high-level fields to exclude from extraction. duplicate_prefix (Optional[str]): Adds a prefix to resolve duplicate field names during extraction. embed_column (Optional[str]): Specifies a column name to store the extracted JSON object. """ source: Optional[str] = None omit_fields: Optional[List[str]] = None duplicate_prefix: Optional[str] = None embed_column: Optional[str] = None
[docs] @staticmethod def from_api_obj( obj: Optional[CoreV1DataSourceFieldUtilsJsonExtractInner], ) -> "FieldUtils.JsonExtract": if obj is None: return None return FieldUtils.JsonExtract( source=obj.source, omit_fields=obj.omit_fields, duplicate_prefix=obj.duplicate_prefix, embed_column=obj.embed_column, )
[docs] def to_api_obj(self) -> CoreV1DataSourceFieldUtilsJsonExtractInner: return CoreV1DataSourceFieldUtilsJsonExtractInner( source=self.source, omit_fields=self.omit_fields, duplicate_prefix=self.duplicate_prefix, embed_column=self.embed_column, )
unreferenced_columns: Optional["FieldUtils.UnreferencedColumns"] = None json_extract: Optional[List["FieldUtils.JsonExtract"]] = None
[docs] @staticmethod def from_api_obj(obj: Optional[CoreV1DataSourceFieldUtils]) -> "FieldUtils": if obj is None: return None json_extract = None if obj.json_extract is not None: json_extract = [ FieldUtils.JsonExtract.from_api_obj(item) for item in obj.json_extract ] return FieldUtils( unreferenced_columns=FieldUtils.UnreferencedColumns.from_api_obj( obj.unreferenced_columns ), json_extract=json_extract, )
[docs] def to_api_obj(self) -> CoreV1DataSourceFieldUtils: json_extract = None if self.json_extract is not None: json_extract = [item.to_api_obj() for item in self.json_extract] return CoreV1DataSourceFieldUtils( unreferenced_columns=self.unreferenced_columns.to_api_obj(), json_extract=json_extract, )
[docs] class BronzeSpec(BaseModel): """ Configuration for bronze table within a DataSource. Attributes: clustering (Optional[BronzeSpec.Clustering]): Describes optional liquid clustering configuration for the bronze table. bronze_table (Optional[str]): The name of the bronze table to create and hold the imported data. skip_bronze_loading (Optional[bool]): Indicates whether to skip the bronze loading step. load_as_single_variant (Optional[bool]): Indicates whether to ingest data into a single VARIANT-typed column called `data` pre_transform (Optional[List[List[str]]]): A list of pre-transform steps to execute. The outer list form stages and the inner list contains SQL select expressions to be executed within each stage """
[docs] class Clustering(BaseModel): """ Configuration of liquid clustering for a bronze table. Attributes: column_names (Optional[List[str]]): List of column names to include in liquid clustering. time_column (Optional[str]): Name of the column that holds 'time' information for clustering. """ column_names: Optional[List[str]] = None time_column: Optional[str] = None
[docs] @staticmethod def from_api_obj( obj: Optional[CoreV1DataSourceSpecBronzeClustering], ) -> "BronzeSpec.Clustering": if obj is None: return None return BronzeSpec.Clustering( column_names=obj.column_names, time_column=obj.time_column, )
[docs] def to_api_obj(self) -> CoreV1DataSourceSpecBronzeClustering: return CoreV1DataSourceSpecBronzeClustering( column_names=self.column_names, time_column=self.time_column, )
clustering: Optional["BronzeSpec.Clustering"] = None bronze_table: Optional[str] = None skip_bronze_loading: Optional[bool] = None load_as_single_variant: Optional[bool] = None pre_transform: Optional[List[List[str]]] = None
[docs] @staticmethod def from_api_obj(obj: Optional[CoreV1DataSourceSpecBronze]) -> "BronzeSpec": if obj is None: return None return BronzeSpec( clustering=BronzeSpec.Clustering.from_api_obj(obj.clustering), bronze_table=obj.bronze_table, skip_bronze_loading=obj.skip_bronze_loading, load_as_single_variant=obj.load_as_single_variant, pre_transform=obj.pre_transform, )
[docs] def to_api_obj(self) -> CoreV1DataSourceSpecBronze: return CoreV1DataSourceSpecBronze( clustering=Helpers.maybe(lambda o: o.to_api_obj(), self.clustering), bronze_table=self.bronze_table, skip_bronze_loading=self.skip_bronze_loading, load_as_single_variant=self.load_as_single_variant, pre_transform=self.pre_transform, )
[docs] class SilverSpec(BaseModel): """ Configuration for silver table in a DataSource. Attributes: bronze_tables (Optional[List[SilverSpec.BronzeTable]]): A list of bronze tables to be joined for silver transformation. pre_transform (Optional[SilverSpec.PreTransform]): Pretransformation configuration. transform (Optional[SilverSpec.Transform]): Transformation configuration for silver processing. """
[docs] class BronzeTable(BaseModel): """ Reference to a bronze table for a silver table. Attributes: name (Optional[str]): Name of the bronze table. streaming (Optional[bool]): True if the input should be streamed from the bronze table. watermark (Optional[SilverSpec.BronzeTable.Watermark]): Bronze table watermark. alias (Optional[str]): Alias name for the table. join_type (Optional[str]): How to join to the preceding table. join_expr (Optional[str]): The join condition expression. """
[docs] class Watermark(BaseModel): """ Watermark for a bronze source table within a silver table. Attributes: event_time_column (Optional[str]): Which column is the event time for the delay threshold. delay_threshold (Optional[str]): A time duration string for the watermark delay. drop_duplicates (Optional[List[str]]): Columns to pass to pyspark dropDuplicates. """ event_time_column: Optional[str] = None delay_threshold: Optional[str] = None drop_duplicates: Optional[List[str]] = None
[docs] @staticmethod def from_api_obj( obj: Optional[CoreV1DataSourceSpecSilverBronzeTablesInnerWatermark], ) -> "SilverSpec.BronzeTable.Watermark": if obj is None: return None return SilverSpec.BronzeTable.Watermark( event_time_column=obj.event_time_column, delay_threshold=obj.delay_threshold, drop_duplicates=obj.drop_duplicates, )
[docs] def to_api_obj( self, ) -> CoreV1DataSourceSpecSilverBronzeTablesInnerWatermark: return CoreV1DataSourceSpecSilverBronzeTablesInnerWatermark( event_time_column=self.event_time_column, delay_threshold=self.delay_threshold, drop_duplicates=self.drop_duplicates, )
name: Optional[str] = None streaming: Optional[bool] = None watermark: Optional["SilverSpec.BronzeTable.Watermark"] = None alias: Optional[str] = None join_type: Optional[str] = None join_expr: Optional[str] = None
[docs] @staticmethod def from_api_obj( obj: Optional[CoreV1DataSourceSpecSilverBronzeTablesInner], ) -> "SilverSpec.BronzeTable": if obj is None: return None return SilverSpec.BronzeTable( name=obj.name, streaming=obj.streaming, watermark=SilverSpec.BronzeTable.Watermark.from_api_obj(obj.watermark), alias=obj.alias, join_type=obj.join_type, join_expr=obj.join_expr, )
[docs] def to_api_obj(self) -> CoreV1DataSourceSpecSilverBronzeTablesInner: return CoreV1DataSourceSpecSilverBronzeTablesInner( name=self.name, streaming=self.streaming, watermark=Helpers.maybe(lambda o: o.to_api_obj(), self.watermark), alias=self.alias, join_type=self.join_type, join_expr=self.join_expr, )
[docs] class PreTransform(BaseModel): """ Pre-transform for a silver table. Attributes: use_preset (Optional[str]): Preset to use. skip_pre_transform (Optional[bool]): If True, skip pre-transform entirely. custom (Optional[SilverSpec.PreTransform.Custom]): Custom pretransform function and options. filter (Optional[str]): A SQL filter to apply at the beginning of the preTransform phase. post_filter (Optional[str]): A SQL filter to apply at the end of the preTransform phase. preset_overrides (Optional[SilverSpec.PreTransform.PresetOverrides]): Overrides for preset filters. add_fields (Optional[List[FieldSpec]]): User defined fields to add to the transformation. """
[docs] class Custom(BaseModel): """ Custom pre-transform function for silver table. Attributes: function (Optional[str]): options (Optional[Dict[str, str]]): """ function: Optional[str] = None options: Optional[Dict[str, str]] = None
[docs] @staticmethod def from_api_obj( obj: Optional[CoreV1DataSourceSpecSilverPreTransformCustom], ) -> "SilverSpec.PreTransform.Custom": if obj is None: return None return SilverSpec.PreTransform.Custom( function=obj.function, options=obj.options, )
[docs] def to_api_obj(self) -> CoreV1DataSourceSpecSilverPreTransformCustom: return CoreV1DataSourceSpecSilverPreTransformCustom( function=self.function, options=self.options, )
[docs] class PresetOverrides(BaseModel): """ Overrides for the preset. Attributes: omit_fields (Optional[List[str]]): A list of fields to omit from the chosen preset. """ omit_fields: Optional[List[str]] = None
[docs] @staticmethod def from_api_obj( obj: Optional[CoreV1DataSourceSpecSilverPreTransformPresetOverrides], ) -> "SilverSpec.PreTransform.PresetOverrides": if obj is None: return None return SilverSpec.PreTransform.PresetOverrides( omit_fields=obj.omit_fields )
[docs] def to_api_obj( self, ) -> CoreV1DataSourceSpecSilverPreTransformPresetOverrides: return CoreV1DataSourceSpecSilverPreTransformPresetOverrides( omit_fields=self.omit_fields, )
use_preset: Optional[str] = None skip_pre_transform: Optional[bool] = None custom: Optional["SilverSpec.PreTransform.Custom"] = None filter: Optional[str] = None post_filter: Optional[str] = None preset_overrides: Optional["SilverSpec.PreTransform.PresetOverrides"] = None add_fields: Optional[List[FieldSpec]] = None
[docs] @staticmethod def from_api_obj( obj: Optional[CoreV1DataSourceSpecSilverPreTransform], ) -> "SilverSpec.PreTransform": if obj is None: return None add_fields = None if obj.add_fields is not None: add_fields = [FieldSpec.from_api_obj(item) for item in obj.add_fields] return SilverSpec.PreTransform( use_preset=obj.use_preset, skip_pre_transform=obj.skip_pre_transform, custom=SilverSpec.PreTransform.Custom.from_api_obj(obj.custom), filter=obj.filter, post_filter=obj.post_filter, preset_overrides=SilverSpec.PreTransform.PresetOverrides.from_api_obj( obj.preset_overrides ), add_fields=add_fields, )
[docs] def to_api_obj(self) -> CoreV1DataSourceSpecSilverPreTransform: add_fields = None if self.add_fields is not None: add_fields = [item.to_api_obj() for item in self.add_fields] to_api_obj = lambda o: o.to_api_obj() return CoreV1DataSourceSpecSilverPreTransform( use_preset=self.use_preset, skip_pre_transform=self.skip_pre_transform, custom=Helpers.maybe(to_api_obj, self.custom), filter=self.filter, post_filter=self.post_filter, preset_overrides=Helpers.maybe(to_api_obj, self.preset_overrides), add_fields=add_fields, )
[docs] class Transform(BaseModel): """ Silver table transform. Attributes: skip_silver_transform (Optional[bool]): If True, skip transform entirely. preset_overrides (Optional[SilverSpec.Transform.PresetOverrides]): Preset overrides for the silver transformation. """
[docs] class PresetOverrides(BaseModel): """ Overrides for preset transform settings. Attributes: modify_tables (Optional[List[SilverSpec.Transform.PresetOverrides.ModifyTables]]): Modifications forexisting tables. omit_tables (Optional[List[str]]): A list of tables to omit from the preset. add_tables (Optional[List[SilverSpec.Transform.PresetOverrides.AddTables]]): User defined tables to include in the transformation. """
[docs] class Custom(BaseModel): """ Custom function for use in silver table transform. Attributes: function (Optional[str]): options (Optional[Dict[str, str]]): """ function: Optional[str] = None options: Optional[Dict[str, str]] = None
[docs] @staticmethod def from_api_obj( obj: Optional[ CoreV1DataSourceSpecSilverTransformPresetOverridesModifyTablesInnerCustom ], ) -> "SilverSpec.Transform.PresetOverrides.Custom": if obj is None: return None return SilverSpec.Transform.PresetOverrides.Custom( function=obj.function, options=obj.options, )
[docs] def to_api_obj( self, ) -> CoreV1DataSourceSpecSilverTransformPresetOverridesModifyTablesInnerCustom: return CoreV1DataSourceSpecSilverTransformPresetOverridesModifyTablesInnerCustom( function=self.function, options=self.options, )
[docs] class ModifyTables(BaseModel): """ Table modifications as part of a silver transform. Attributes: name (Optional[str]): custom (Optional[SilverSpec.Transform.PresetOverrides.Custom]): omit_fields (Optional[List[str]]): override_liquid_columns (Optional[List[str]]): add_fields (Optional[List[FieldSpec]]): filter (Optional[str]): post_filter (Optional[str]): utils (Optional[FieldUtils]): """ name: Optional[str] = None custom: Optional["SilverSpec.Transform.PresetOverrides.Custom"] = None omit_fields: Optional[List[str]] = None override_liquid_columns: Optional[List[str]] = None add_fields: Optional[List[FieldSpec]] = None filter: Optional[str] = None post_filter: Optional[str] = None utils: Optional[FieldUtils] = None
[docs] @staticmethod def from_api_obj( obj: Optional[ CoreV1DataSourceSpecSilverTransformPresetOverridesModifyTablesInner ], ) -> "SilverSpec.Transform.PresetOverrides.ModifyTables": if obj is None: return None add_fields = None if obj.add_fields is not None: add_fields = [ FieldSpec.from_api_obj(item) for item in obj.add_fields ] return SilverSpec.Transform.PresetOverrides.ModifyTables( name=obj.name, custom=SilverSpec.Transform.PresetOverrides.Custom.from_api_obj( obj.custom ), omit_fields=obj.omit_fields, override_liquid_columns=obj.override_liquid_columns, add_fields=add_fields, filter=obj.filter, post_filter=obj.post_filter, utils=FieldUtils.from_api_obj(obj.utils), )
[docs] def to_api_obj( self, ) -> ( CoreV1DataSourceSpecSilverTransformPresetOverridesModifyTablesInner ): add_fields = None if self.add_fields is not None: add_fields = [item.to_api_obj() for item in self.add_fields] to_api_obj = lambda o: o.to_api_obj() return CoreV1DataSourceSpecSilverTransformPresetOverridesModifyTablesInner( name=self.name, custom=Helpers.maybe(to_api_obj, self.custom), omit_fields=self.omit_fields, override_liquid_columns=self.override_liquid_columns, add_fields=add_fields, filter=self.filter, post_filter=self.post_filter, utils=Helpers.maybe(to_api_obj, self.utils), )
[docs] class AddTables(BaseModel): """ Tables to add during a silver table transform. Attributes: custom (Optional[SilverSpec.Transform.PresetOverrides.Custom]): name (Optional[str]): filter (Optional[str]): post_filter (Optional[str]): override_liquid_columns (Optional[List[str]]): fields (Optional[List[FieldSpec]]): utils (Optional[FieldUtils]): """ custom: Optional["SilverSpec.Transform.PresetOverrides.Custom"] = None name: Optional[str] = None filter: Optional[str] = None post_filter: Optional[str] = None override_liquid_columns: Optional[List[str]] = None fields: Optional[List[FieldSpec]] = None utils: Optional[FieldUtils] = None
[docs] @staticmethod def from_api_obj( obj: Optional[ CoreV1DataSourceSpecSilverTransformPresetOverridesAddTablesInner ], ) -> "SilverSpec.Transform.PresetOverrides.AddTables": if obj is None: return None fields = None if obj.fields is not None: fields = [FieldSpec.from_api_obj(item) for item in obj.fields] return SilverSpec.Transform.PresetOverrides.AddTables( custom=SilverSpec.Transform.PresetOverrides.Custom.from_api_obj( obj.custom ), name=obj.name, filter=obj.filter, post_filter=obj.post_filter, override_liquid_columns=obj.override_liquid_columns, fields=fields, utils=FieldUtils.from_api_obj(obj.utils), )
[docs] def to_api_obj( self, ) -> CoreV1DataSourceSpecSilverTransformPresetOverridesAddTablesInner: fields = None if self.fields is not None: fields = [item.to_api_obj() for item in self.fields] to_api_obj = lambda o: o.to_api_obj() return CoreV1DataSourceSpecSilverTransformPresetOverridesAddTablesInner( custom=Helpers.maybe(to_api_obj, self.custom), name=self.name, filter=self.filter, post_filter=self.post_filter, override_liquid_columns=self.override_liquid_columns, fields=fields, utils=Helpers.maybe(to_api_obj, self.utils), )
modify_tables: Optional[ List["SilverSpec.Transform.PresetOverrides.ModifyTables"] ] = None omit_tables: Optional[List[str]] = None add_tables: Optional[ List["SilverSpec.Transform.PresetOverrides.AddTables"] ] = None
[docs] @staticmethod def from_api_obj( obj: Optional[CoreV1DataSourceSpecSilverTransformPresetOverrides], ) -> "SilverSpec.Transform.PresetOverrides": if obj is None: return None modify_tables = None if obj.modify_tables is not None: modify_tables = [ SilverSpec.Transform.PresetOverrides.ModifyTables.from_api_obj( item ) for item in obj.modify_tables ] add_tables = None if obj.add_tables is not None: add_tables = [ SilverSpec.Transform.PresetOverrides.AddTables.from_api_obj( item ) for item in obj.add_tables ] return SilverSpec.Transform.PresetOverrides( modify_tables=modify_tables, omit_tables=obj.omit_tables, add_tables=add_tables, )
[docs] def to_api_obj(self) -> CoreV1DataSourceSpecSilverTransformPresetOverrides: modify_tables = None if self.modify_tables is not None: modify_tables = [item.to_api_obj() for item in self.modify_tables] add_tables = None if self.add_tables is not None: add_tables = [item.to_api_obj() for item in self.add_tables] return CoreV1DataSourceSpecSilverTransformPresetOverrides( modify_tables=modify_tables, omit_tables=self.omit_tables, add_tables=add_tables, )
skip_silver_transform: Optional[bool] = None do_not_materialize: Optional[bool] = None preset_overrides: Optional["SilverSpec.Transform.PresetOverrides"] = None
[docs] @staticmethod def from_api_obj( obj: Optional[CoreV1DataSourceSpecSilverTransform], ) -> "SilverSpec.Transform": if obj is None: return None return SilverSpec.Transform( skip_silver_transform=obj.skip_silver_transform, do_not_materialize=obj.do_not_materialize, preset_overrides=SilverSpec.Transform.PresetOverrides.from_api_obj( obj.preset_overrides ), )
[docs] def to_api_obj(self) -> CoreV1DataSourceSpecSilverTransform: return CoreV1DataSourceSpecSilverTransform( skipSilverTransform=self.skip_silver_transform, doNotMaterialize=self.do_not_materialize, presetOverrides=Helpers.maybe( lambda o: o.to_api_obj(), self.preset_overrides ), )
bronze_tables: Optional[List["SilverSpec.BronzeTable"]] = None pre_transform: Optional["SilverSpec.PreTransform"] = None transform: Optional["SilverSpec.Transform"] = None
[docs] @staticmethod def from_api_obj(obj: Optional[CoreV1DataSourceSpecSilver]) -> "SilverSpec": if obj is None: return None bronze_tables = None if obj.bronze_tables is not None: bronze_tables = [ SilverSpec.BronzeTable.from_api_obj(item) for item in obj.bronze_tables ] return SilverSpec( bronze_tables=bronze_tables, pre_transform=SilverSpec.PreTransform.from_api_obj(obj.pre_transform), transform=SilverSpec.Transform.from_api_obj(obj.transform), )
[docs] def to_api_obj(self) -> CoreV1DataSourceSpecSilver: bronze_tables = None if self.bronze_tables is not None: bronze_tables = [item.to_api_obj() for item in self.bronze_tables] to_api_obj = lambda o: o.to_api_obj() return CoreV1DataSourceSpecSilver( bronze_tables=bronze_tables, pre_transform=Helpers.maybe(to_api_obj, self.pre_transform), transform=Helpers.maybe(to_api_obj, self.transform), )
[docs] class GoldSpec(BaseModel): """ Configuration for gold table in a DataSource. Attributes: omit_tables (Optional[List[str]]): A list of tables to omit from the preset. modify_tables (Optional[List[GoldSpec.ModifyTables]]): Modifications for existing gold table definitions. add_tables (Optional[List[GoldSpec.AddTables]]): User defined tables to add to the gold configuration. """
[docs] class ModifyTables(BaseModel): """ Modification to gold tables during transformation. Attributes: name (Optional[str]): Table name. source_table (Optional[str]): Used to match against the preset's gold stanzas input fields. custom (Optional[GoldSpec.ModifyTables.Custom]): Custom function for modifying tables. omit_fields (Optional[List[str]]): A list of fields to omit. add_fields (Optional[List[FieldSpec]]): Fields to add. filter (Optional[str]): A SQL filter to apply before processing. post_filter (Optional[str]): A SQL filter to apply after processing. """
[docs] class Custom(BaseModel): """ Custom function to use as part of a gold table modification. Attributes: function (Optional[str]): options (Optional[Dict[str, str]]): """ function: Optional[str] = None options: Optional[Dict[str, str]] = None
[docs] @staticmethod def from_api_obj( obj: Optional[ CoreV1DataSourceSpecSilverTransformPresetOverridesModifyTablesInnerCustom ], ) -> "GoldSpec.ModifyTables.Custom": if obj is None: return None return GoldSpec.ModifyTables.Custom( function=obj.function, options=obj.options, )
[docs] def to_api_obj( self, ) -> CoreV1DataSourceSpecSilverTransformPresetOverridesModifyTablesInnerCustom: return CoreV1DataSourceSpecSilverTransformPresetOverridesModifyTablesInnerCustom( function=self.function, options=self.options, )
name: Optional[str] = None source_table: Optional[str] = None custom: Optional["GoldSpec.ModifyTables.Custom"] = None omit_fields: Optional[List[str]] = None add_fields: Optional[List[FieldSpec]] = None filter: Optional[str] = None post_filter: Optional[str] = None
[docs] @staticmethod def from_api_obj( obj: Optional[CoreV1DataSourceSpecGoldPresetOverridesModifyTablesInner], ) -> "GoldSpec.ModifyTables": if obj is None: return None add_fields = None if obj.add_fields is not None: add_fields = [FieldSpec.from_api_obj(item) for item in obj.add_fields] return GoldSpec.ModifyTables( name=obj.name, source_table=obj.source_table, custom=GoldSpec.ModifyTables.Custom.from_api_obj(obj.custom), omit_fields=obj.omit_fields, add_fields=add_fields, filter=obj.filter, post_filter=obj.post_filter, )
[docs] def to_api_obj( self, ) -> CoreV1DataSourceSpecGoldPresetOverridesModifyTablesInner: add_fields = None if self.add_fields is not None: add_fields = [item.to_api_obj() for item in self.add_fields] to_api_obj = lambda o: o.to_api_obj() return CoreV1DataSourceSpecGoldPresetOverridesModifyTablesInner( name=self.name, source_table=self.source_table, custom=Helpers.maybe(to_api_obj, self.custom), omit_fields=self.omit_fields, add_fields=add_fields, filter=self.filter, post_filter=self.post_filter, )
[docs] class AddTables(BaseModel): """ Tables to add during gold table transformation. Attributes: custom (Optional[GoldSpec.AddTables.Custom]): Custom function for adding tables. name (Optional[str]): The name of the table to add. source_table (Optional[str]): The source table/dataframe for the gold table. filter (Optional[str]): A SQL filter to apply. post_filter (Optional[str]): A SQL filter to apply after processing. fields (Optional[List[FieldSpec]]): Field specifications for the new table. """
[docs] class Custom(BaseModel): """ Custom function for adding tables during gold transformation. Attributes: function (Optional[str]): options (Optional[Dict[str, str]]): """ function: Optional[str] = None options: Optional[Dict[str, str]] = None
[docs] @staticmethod def from_api_obj( obj: Optional[ CoreV1DataSourceSpecGoldPresetOverridesAddTablesInnerCustom ], ) -> "GoldSpec.AddTables.Custom": if obj is None: return None return GoldSpec.AddTables.Custom( function=obj.function, options=obj.options, )
[docs] def to_api_obj( self, ) -> CoreV1DataSourceSpecGoldPresetOverridesAddTablesInnerCustom: return CoreV1DataSourceSpecGoldPresetOverridesAddTablesInnerCustom( function=self.function, options=self.options, )
name: Optional[str] = None source_table: Optional[str] = None custom: Optional["GoldSpec.AddTables.Custom"] = None filter: Optional[str] = None post_filter: Optional[str] = None fields: Optional[List[FieldSpec]] = None
[docs] @staticmethod def from_api_obj( obj: Optional[CoreV1DataSourceSpecGoldPresetOverridesAddTablesInner], ) -> "GoldSpec.AddTables": if obj is None: return None fields = None if obj.fields is not None: fields = [FieldSpec.from_api_obj(item) for item in obj.fields] return GoldSpec.AddTables( custom=GoldSpec.AddTables.Custom.from_api_obj(obj.custom), name=obj.name, source_table=obj.source_table, filter=obj.filter, post_filter=obj.post_filter, fields=fields, )
[docs] def to_api_obj(self) -> CoreV1DataSourceSpecGoldPresetOverridesAddTablesInner: fields = None if self.fields is not None: fields = [item.to_api_obj() for item in self.fields] to_api_obj = lambda o: o.to_api_obj() return CoreV1DataSourceSpecGoldPresetOverridesAddTablesInner( custom=Helpers.maybe(to_api_obj, self.custom), name=self.name, source_table=self.source_table, filter=self.filter, post_filter=self.post_filter, fields=fields, )
omit_tables: Optional[List[str]] = None modify_tables: Optional[List["GoldSpec.ModifyTables"]] = None add_tables: Optional[List["GoldSpec.AddTables"]] = None
[docs] @staticmethod def from_api_obj(obj: Optional[CoreV1DataSourceSpecGold]) -> "GoldSpec": if obj is None: return None omit_tables = None modify_tables = None add_tables = None if obj.preset_overrides is not None: omit_tables = obj.preset_overrides.omit_tables if obj.preset_overrides.modify_tables is not None: modify_tables = [ GoldSpec.ModifyTables.from_api_obj(item) for item in obj.preset_overrides.modify_tables ] if obj.preset_overrides.add_tables is not None: add_tables = [ GoldSpec.AddTables.from_api_obj(item) for item in obj.preset_overrides.add_tables ] return GoldSpec( omit_tables=omit_tables, modify_tables=modify_tables, add_tables=add_tables, )
[docs] def to_api_obj(self) -> CoreV1DataSourceSpecGold: modify_tables = None if self.modify_tables is not None: modify_tables = [item.to_api_obj() for item in self.modify_tables] add_tables = None if self.add_tables is not None: add_tables = [item.to_api_obj() for item in self.add_tables] return CoreV1DataSourceSpecGold( preset_overrides=CoreV1DataSourceSpecGoldPresetOverrides( omit_tables=self.omit_tables, modify_tables=modify_tables, add_tables=add_tables, ), )
[docs] class DataSource(BaseModel): """ A DataSource resource. Attributes: metadata (Optional[Metadata]): Standard object metadata. source (Optional[str]): The name of the originator of the data. source_type (Optional[str]): The type of data being imported. schedule (Optional[Schedule]): The schedule for data ingestion. custom (Optional[DataSource.CustomNotebook]): A custom notebook for the datasource. primary_key (Optional[PrimaryKey]): Primary key configuration of the datasource. use_preset (Optional[str]): The name of the preset to use for this data source. autoloader (Optional[DataSource.Autoloader]): Autoloader configuration. bronze (Optional[BronzeSpec]): Bronze table configuration. compute_mode (Optional[str]): The compute mode to use for this datasource's job. silver (Optional[SilverSpec]): Silver transformation configuration. gold (Optional[GoldSpec]): Gold transformation configuration. status (Optional[ResourceStatus]): The current status of the datasource. """
[docs] class CustomNotebook(BaseModel): """ A custom notebook for generating data. Attributes: notebook (Optional[str]): Path to the notebook in the Databricks workspace. """ notebook: Optional[str] = None
[docs] @staticmethod def from_api_obj( obj: Optional[CoreV1DataSourceSpecCustom], ) -> "DataSource.CustomNotebook": if obj is None: return None return DataSource.CustomNotebook( notebook=obj.notebook, )
[docs] def to_api_obj(self) -> CoreV1DataSourceSpecCustom: return CoreV1DataSourceSpecCustom( notebook=self.notebook, )
[docs] class PrimaryKey(BaseModel): """ PrimaryKey configuration for DataSource Attributes: time_column (str): column name used as timestamp portion of the sortable synthetic key additionalColumns (List[str]): list of columns to compute hashkey over """ time_column: str additional_columns: List[str]
[docs] @staticmethod def from_api_obj( obj: Optional[CoreV1DataSourcePrimaryKeySpec], ) -> "DataSource.PrimaryKey": if obj is None: return None return DataSource.PrimaryKey( time_column=obj.time_column, additional_columns=obj.additional_columns, )
[docs] def to_api_obj(self) -> CoreV1DataSourcePrimaryKeySpec: return CoreV1DataSourcePrimaryKeySpec( timeColumn=self.time_column, additionalColumns=self.additional_columns, )
[docs] class Autoloader(BaseModel): """ Autoloader configuration for the DataSource. Attributes: format (Optional[str]): The format of the data (e.g., json, parquet, csv, etc.). location (str): External location for the volume in Unity Catalog. schema_file (Optional[str]): An optional file containing the schema of the data source. cloud_files (Optional[DataSource.Autoloader.CloudFiles]): CloudFiles configuration. """
[docs] class CloudFiles(BaseModel): """ CloudFiles configuration for the Autoloader. Attributes: schema_hints_file (Optional[str]): schema_hints (Optional[str]): """ schema_hints_file: Optional[str] = None schema_hints: Optional[str] = None
[docs] @staticmethod def from_api_obj( obj: Optional[ContentV1DatasourcePresetAutoloaderCloudFiles], ) -> "DataSource.Autoloader.CloudFiles": if obj is None: return None return DataSource.Autoloader.CloudFiles( schema_hints_file=obj.schema_hints_file, schema_hints=obj.schema_hints, )
[docs] def to_api_obj(self) -> ContentV1DatasourcePresetAutoloaderCloudFiles: return ContentV1DatasourcePresetAutoloaderCloudFiles( schema_hints_file=self.schema_hints_file, schema_hints=self.schema_hints, )
format: Optional[str] = None location: str schema_file: Optional[str] = None cloud_files: Optional["DataSource.Autoloader.CloudFiles"] = None
[docs] @staticmethod def from_api_obj( obj: Optional[CoreV1DataSourceAutoloaderSpec], ) -> "DataSource.Autoloader": if obj is None: return None return DataSource.Autoloader( format=obj.format, location=obj.location, schema_file=obj.schema_file, cloud_files=DataSource.Autoloader.CloudFiles.from_api_obj( obj.cloud_files ), )
[docs] def to_api_obj(self) -> CoreV1DataSourceAutoloaderSpec: return CoreV1DataSourceAutoloaderSpec( format=self.format, location=self.location, schema_file=self.schema_file, cloud_files=Helpers.maybe(lambda o: o.to_api_obj(), self.cloud_files), )
metadata: Optional[Metadata] = None source: Optional[str] = None source_type: Optional[str] = None schedule: Optional[Schedule] = None custom: Optional["DataSource.CustomNotebook"] = None primary_key: Optional["DataSource.PrimaryKey"] = None use_preset: Optional[str] = None use_preset_version: Optional[int] = None autoloader: Optional["DataSource.Autoloader"] = None compute_mode: Optional[str] = None bronze: Optional[BronzeSpec] = None silver: Optional[SilverSpec] = None gold: Optional[GoldSpec] = None status: Optional[ResourceStatus] = None
[docs] @staticmethod def from_api_obj(obj: CoreV1DataSource) -> "DataSource": return DataSource( metadata=Metadata.from_api_obj(obj.metadata), source=obj.spec.source, source_type=obj.spec.source_type, schedule=Schedule.from_api_obj(obj.spec.schedule), custom=DataSource.CustomNotebook.from_api_obj(obj.spec.custom), primary_key=DataSource.PrimaryKey.from_api_obj(obj.spec.primary_key), use_preset=obj.spec.use_preset, use_preset_version=obj.spec.use_preset_version, autoloader=DataSource.Autoloader.from_api_obj(obj.spec.autoloader), compute_mode=obj.spec.compute_mode, bronze=BronzeSpec.from_api_obj(obj.spec.bronze), silver=SilverSpec.from_api_obj(obj.spec.silver), gold=GoldSpec.from_api_obj(obj.spec.gold), status=ResourceStatus.from_api_obj(obj.status), )
[docs] def to_api_obj(self) -> CoreV1DataSource: to_api_obj = lambda o: o.to_api_obj() return CoreV1DataSource( api_version="v1", kind="DataSource", metadata=Helpers.maybe(to_api_obj, self.metadata), spec=CoreV1DataSourceSpec( source=self.source, source_type=self.source_type, schedule=Helpers.maybe(to_api_obj, self.schedule), custom=Helpers.maybe(to_api_obj, self.custom), primary_key=Helpers.maybe(to_api_obj, self.primary_key), use_preset=self.use_preset, use_preset_version=self.use_preset_version, autoloader=Helpers.maybe(to_api_obj, self.autoloader), compute_mode=self.compute_mode, bronze=Helpers.maybe(to_api_obj, self.bronze), silver=Helpers.maybe(to_api_obj, self.silver), gold=Helpers.maybe(to_api_obj, self.gold), ), status=Helpers.maybe(to_api_obj, self.status), )