from copy import deepcopy
from datetime import datetime, timedelta, timezone
from time import sleep
from typing import Any, Callable, Iterator, List, Optional, Tuple, TypeVar
from pydantic import Field
from pyspark.sql import DataFrame
from dasl_api import (
CoreV1Api,
DbuiV1Api,
DbuiV1QueryGenerateRequest,
DbuiV1QueryGenerateRequestTimeRange,
DbuiV1QueryGenerateStatus,
DbuiV1QueryLookupRequest,
DbuiV1QueryLookupResult,
DbuiV1QueryHistogramRequest,
DbuiV1QueryHistogramResult,
DbuiV1QueryLookupRequestPagination,
ContentV1Api,
WorkspaceV1Api,
WorkspaceV1CreateWorkspaceRequest,
api,
)
from dasl_client.auth.auth import (
Authorization,
DatabricksSecretAuth,
DatabricksTokenAuth,
ServiceAccountKeyAuth,
)
from dasl_client.conn.conn import get_base_conn
from dasl_client.errors.errors import ConflictError, error_handler
from .helpers import Helpers
from .exec_rule import ExecRule
from .regions import Regions
from .types import (
AdminConfig,
DataSource,
Dbui,
Metadata,
Rule,
WorkspaceConfig,
TransformRequest,
TransformResponse,
DataSourcePresetsList,
DataSourcePreset,
)
T = TypeVar("T")
[docs]
class Client:
"""
Antimatter security lakehouse client.
"""
[docs]
def __init__(
self,
auth: Authorization,
):
"""
Initialize a new client. You should generally prefer to use
the new_workspace function if creating a new workspace or the
for_workspace function if connecting to an existing workspace.
:param auth: Authorization instance for authorizing requests to
the dasl control plane.
:returns: Client
"""
self.auth = auth
[docs]
@staticmethod
def new_workspace(
admin_email: str,
app_client_id: str,
service_principal_id: str,
service_principal_secret: str,
workspace_url: Optional[str] = None,
region: str = Helpers.default_region,
dasl_host: Optional[str] = None,
) -> "Client":
"""
Register a new workspace and return a client for it.
:param admin_email: The email address associated with the (DASL)
workspace admin, if the workspace will be created.
:param app_client_id: The Databricks app connection client ID
to use for authentication calls related to the workspace.
:param service_principal_id: The ID of the Databricks service
principal that will interact with Databricks on your behalf.
:param service_principal_secret: An OAuth secret that entitles
the service principal to make Databricks API calls on your
behalf.
:param workspace_url: The full base URL of the Databricks workspace
being registered. If you omit this value, it will be inferred
if you are running within a Databricks notebook. Otherwise, an
exception will be raised.
:param region: The name of the DASL region.
:param dasl_host: The URL of the DASL server. This value should
not generally be specified. When specified, this value
overrides region.
:returns: Client for the newly created workspace.
"""
if dasl_host is None:
dasl_host = Regions.lookup(region)
with error_handler():
if workspace_url is None:
workspace_url = Helpers.current_workspace_url()
admin_config = AdminConfig(
workspace_url=workspace_url,
app_client_id=app_client_id,
service_principal_id=service_principal_id,
service_principal_secret=service_principal_secret,
)
workspace_name = Helpers.workspace_name_from_url(workspace_url)
api_key = (
api.WorkspaceV1Api(api_client=get_base_conn(host=dasl_host))
.workspace_v1_create_workspace(
WorkspaceV1CreateWorkspaceRequest(
admin_user=admin_email,
workspace_name=workspace_name,
admin_config=admin_config.to_api_obj().spec,
)
)
.admin_service_account.apikey
)
return Client(
ServiceAccountKeyAuth(workspace_name, api_key, host=dasl_host),
)
# TODO: should we allow user to use an API key here?
[docs]
@staticmethod
def for_workspace(
workspace_url: Optional[str] = None,
service_account_token: Optional[str] = None,
region: str = Helpers.default_region,
dasl_host: Optional[str] = None,
) -> "Client":
"""
Create a client for the argument workspace, if specified, or
the current workspace if running in databricks notebook context.
:param workspace_url: The full base URL of the Databricks workspace
being registered. If you omit this value, it will be inferred
if you are running within a Databricks notebook. Otherwise,
an exception will be raised.
:param service_account_token: Antimatter service account token.
If provided, the client will use this token for auth instead
of (automatic) secret-based auth.
:param region: The name of the DASL region.
:param dasl_host: The URL of the DASL server. This value should
not generally be specified. When specified, this value
overrides region.
:returns: Client for the existing workspace.
"""
if dasl_host is None:
dasl_host = Regions.lookup(region)
with error_handler():
if workspace_url is None:
workspace_url = Helpers.current_workspace_url()
if service_account_token is None:
return Client(
DatabricksSecretAuth(
Helpers.workspace_name_from_url(workspace_url),
host=dasl_host,
),
)
else:
return Client(
ServiceAccountKeyAuth(
Helpers.workspace_name_from_url(workspace_url),
service_account_token,
host=dasl_host,
)
)
[docs]
@staticmethod
def new_or_existing(
admin_email: str,
app_client_id: str,
service_principal_id: str,
service_principal_secret: str,
workspace_url: Optional[str] = None,
service_account_token: Optional[str] = None,
region: str = Helpers.default_region,
dasl_host: Optional[str] = None,
) -> "Client":
"""
Initialize a new client for the workspace associated with the
argument Databricks workspace_url. If no such workspace exists,
one will be created for you.
:param admin_email: The email address associated with the (DASL)
workspace admin, if the workspace will be created. Ignored if
the workspace already exists.
:param app_client_id: The Databricks app connection client ID
to use for authentication calls related to the workspace. If
the workspace already exists, the existing config will be
updated to use this client ID.
:param service_principal_id: The ID of the Databricks service
principal that will interact with Databricks on your behalf.
If the workspace already exists, the existing config will be
updated to use this service principal ID.
:param service_principal_secret: An OAuth secret that entitles
the service principal to make Databricks API calls on your
behalf. If the workspace already exists, the existing config
will be updated to use this service principal secret.
:param workspace_url: The full base URL of the Databricks workspace
being registered. If you omit this value, it will be inferred
if you are running within a Databricks notebook. Otherwise, an
exception will be raised. If the workspace already exists, the
existing config will be updated to use this value.
:param service_account_token: Antimatter service account token.
If provided, the client will use this token for auth instead
of (automatic) secret-based auth. Ignored if the workspace
doesn't exist.
:param region: The name of the DASL region.
:param dasl_host: The URL of the DASL server. This value should
not generally be specified. When specified, this value
overrides region.
:returns: Client for the newly created or existing workspace.
"""
try:
return Client.new_workspace(
admin_email,
app_client_id,
service_principal_id,
service_principal_secret,
workspace_url,
region,
dasl_host,
)
except ConflictError:
result = Client.for_workspace(
workspace_url, service_account_token, region, dasl_host
)
result.put_admin_config(
AdminConfig(
workspace_url=workspace_url,
app_client_id=app_client_id,
service_principal_id=service_principal_id,
service_principal_secret=service_principal_secret,
)
)
return result
def _workspace_client(self) -> WorkspaceV1Api:
return WorkspaceV1Api(self.auth.client())
def _core_client(self) -> CoreV1Api:
return CoreV1Api(self.auth.client())
def _dbui_client(self) -> DbuiV1Api:
return DbuiV1Api(self.auth.client())
def _content_client(self) -> ContentV1Api:
return ContentV1Api(self.auth.client())
def _workspace(self) -> str:
return self.auth.workspace()
def _list_iter_paginated(
self,
list_func: Callable[..., Any],
convert: Callable[[Any], T],
cursor: Optional[str] = None,
limit: Optional[int] = None,
) -> Iterator[T]:
"""
Generic helper for paginated list functions.
"""
current_cursor = cursor
results_so_far = 0
while True:
page_limit = limit - results_so_far if limit is not None else None
if limit is not None and page_limit <= 0:
break
with error_handler():
response = list_func(
workspace=self._workspace(),
cursor=current_cursor,
limit=page_limit,
)
for item in response.items:
yield convert(item)
results_so_far += 1
current_cursor = (
response.metadata.cursor if hasattr(response, "metadata") else None
)
if current_cursor is None:
break
[docs]
def get_admin_config(self) -> AdminConfig:
"""
Retrieve the AdminConfig from the DASL server. Note that the
service principal secret will be redacted server side, so if you
plan to make changes and issue a request using put_admin_config,
you will need to repopulate the service_principal_secret correctly
before passing the result back to put_admin_config.
:returns: AdminConfig containing the current settings.
"""
with error_handler():
return AdminConfig.from_api_obj(
self._workspace_client().workspace_v1_get_admin_config(
self._workspace()
)
)
[docs]
def put_admin_config(
self,
config: AdminConfig,
) -> None:
"""
Update the AdminConfig stored in the DASL server. See the AdminConfig
docs for details about its contents.
:param config: AdminConfig to replace the existing. Note that the
service principal credentials will be verified server side before
the request is accepted.
"""
with error_handler():
self._workspace_client().workspace_v1_put_admin_config(
self._workspace(),
config.to_api_obj(),
)
[docs]
def get_config(self) -> WorkspaceConfig:
"""
Retrieve the WorkspaceConfig from the DASL server. The returned
value can be updated directly and passed to put_config in order
to make changes.
:returns: WorkspaceConfig containing the current configuration.
"""
with error_handler():
return WorkspaceConfig.from_api_obj(
self._workspace_client().workspace_v1_get_config(self._workspace())
)
[docs]
def put_config(
self,
config_in: WorkspaceConfig,
) -> None:
"""
Update the WorkspaceConfig stored in the DASL server. See the
WorkspaceConfig docs for dtails about its contents.
:param config_in: WorkspaceConfig to replace the existing.
:returns: WorkspaceConfig. Note that the returned value is a
clone of config_in and may not be precisely equal to the
originally passed value.
"""
with error_handler():
config = deepcopy(config_in)
if config.metadata is None:
config.metadata = Metadata(
name="config",
workspace=self._workspace(),
)
# reset the version; let the server set the version for us
config.metadata.version = None
self._workspace_client().workspace_v1_put_config(
self._workspace(),
config.to_api_obj(),
)
[docs]
def get_datasource(self, name: str) -> DataSource:
"""
Get the DataSource with the argument name from the DASL server. The
returned value can be updated directly and passed to update_datasource
in order to make changes.
:param name: The unique name of the DataSource within this workspace
:returns: DataSource
"""
with error_handler():
return DataSource.from_api_obj(
self._core_client().core_v1_get_data_source(self._workspace(), name)
)
[docs]
def delete_datasource(self, name: str) -> None:
"""
Delete the DataSource with the argument name from the DASL server.
The DataSource will not necessarily be deleted immediately as the
server will dispatch background tasks to clean up any allocated
resources before actually deleting the resource, so it may take
some time before its name is available for reuse.
:param name: The unique name of the DataSource within this workspace
"""
with error_handler():
self._core_client().core_v1_delete_data_source(self._workspace(), name)
[docs]
def list_datasources(
self, cursor: Optional[str] = None, limit: Optional[int] = None
) -> Iterator[DataSource]:
"""
List the DataSources in this workspace. Each yielded DataSource
contains all fields in the DataSource as if it were fetched
using the get_datasource method.
:param cursor: The ID of a DataSource. If specified, the results
will contain DataSources starting (lexically) directly after
this DataSource. If not specified, then the results will begin
with the lexically least DataSource.
:param limit: The maximum number of DataSources to yield. If there
are fewer than this number of DataSources beginning directly
after `cursor`, then all such DataSources will be yielded. If
not specified, then all DataSources starting directly after
`cursor` will be returned.
:yields DataSource: One DataSource at a time in lexically
increasing order
"""
return self._list_iter_paginated(
list_func=self._core_client().core_v1_list_data_sources,
convert=DataSource.from_api_obj,
cursor=cursor,
limit=limit,
)
[docs]
def create_datasource(self, name: str, ds_in: DataSource) -> DataSource:
"""
Create a new DataSource. The chosen name must be unique for your
workspace, and cannot refer to a DataSource that already exists
and has not been deleted. See the documentation for delete_datasource
as there are some caveats around name reuse.
:param name: The unique name of this DataSource in the workspace.
:param ds_in: The specification of the DataSource to create. See
the documentation for the DataSource type for more details.
:returns DataSource: Note that the returned value is a
clone of ds_in and may not be precisely equal to the
originally passed value.
"""
with error_handler():
ds = deepcopy(ds_in)
if ds.metadata is None:
ds.metadata = Metadata(
name=name,
workspace=self._workspace(),
)
result = self._core_client().core_v1_create_data_source(
self._workspace(), ds.to_api_obj()
)
return DataSource.from_api_obj(result)
[docs]
def replace_datasource(self, name: str, ds_in: DataSource) -> DataSource:
"""
Replace an existing DataSource. The name must refer to a DataSource
that already exists in your workspace.
:param name: The name of the existing DataSource to replace.
:param ds_in: The specification of the DataSource taking the place
of the existing DataSource.
:returns DataSource: Note that the returned value is a
clone of ds_in and may not be precisely equal to the
originally passed value.
"""
with error_handler():
ds = deepcopy(ds_in)
if ds.metadata is None:
ds.metadata = Metadata(
name=name,
workspace=self._workspace(),
)
# reset the version; let the server set the version for us
ds.metadata.version = None
result = self._core_client().core_v1_replace_data_source(
self._workspace(),
name,
ds.to_api_obj(),
)
return DataSource.from_api_obj(result)
[docs]
def get_rule(self, name: str) -> Rule:
"""
Get the Rule with the argument name from the DASL server. The
returned value can be updated directly and passed to update_rule
in order to make changes.
:param name: The unique name of the Rule within this workspace
:returns: Rule
"""
with error_handler():
return Rule.from_api_obj(
self._core_client().core_v1_get_rule(self._workspace(), name)
)
[docs]
def delete_rule(self, name: str) -> None:
"""
Delete the Rule with the argument name from the DASL server.
The Rule will not necessarily be deleted immediately as the
server will dispatch background tasks to clean up any allocated
resources before actually deleting the resource, so it may take
some time before its name is available for reuse.
:param name: The unique name of the Rule within this workspace
"""
with error_handler():
self._core_client().core_v1_delete_rule(self._workspace(), name)
[docs]
def list_rules(
self, cursor: Optional[str] = None, limit: Optional[int] = None
) -> Iterator[Rule]:
"""
List the Rules in this workspace. Each yielded Rule contains
all fields in the Rule as if it were fetched using the
get_rule method.
:param cursor: The ID of a Rule. If specified, the results will
contain DataSources starting (lexically) directly after this
Rule. If not specified, then the results will begin with the
lexically least Rule.
:param limit: The maximum number of Rules to yield. If there are
fewer than this number of Rules beginning directly after
`cursor`, then all such Rules will be yielded. If not specified,
then all Rules starting directly after `cursor` will be returned.
:yields Rule: One Rule at a time in lexically increasing order.
"""
return self._list_iter_paginated(
list_func=self._core_client().core_v1_list_rules,
convert=Rule.from_api_obj,
cursor=cursor,
limit=limit,
)
[docs]
def create_rule(self, name: str, rule_in: Rule) -> Rule:
"""
Create a new Rule. The chosen name must be unique for your
workspace, and cannot refer to a Rule that already exists
and has not been deleted. See the documentation for delete_rule
as there are some caveats around name reuse.
:param name: The unique name of this Rule in the workspace.
:param rule_in: The specification of the Rule to create. See
the documentation for the Rule type for more details.
:returns Rule: Note that the returned value is a clone of
rule_in and may not be precisely equal to the originally
passed value.
"""
with error_handler():
rule = deepcopy(rule_in)
if rule.metadata is None:
rule.metadata = Metadata(
name=name,
workspace=self._workspace(),
)
result = self._core_client().core_v1_create_rule(
self._workspace(), rule.to_api_obj()
)
return Rule.from_api_obj(result)
[docs]
def replace_rule(self, name: str, rule_in: Rule) -> Rule:
"""
Replace an existing Rule. The name must refer to a Rule
that already exists in your workspace.
:param name: The name of the existing Rule to replace.
:param rule_in: The specification of the Rule taking the place
of the existing Rule.
:returns Rule: Note that the returned value is a clone of
rule_in and may not be precisely equal to the originally
passed value.
"""
with error_handler():
rule = deepcopy(rule_in)
if rule.metadata is None:
rule.metadata = Metadata(
name=name,
workspace=self._workspace(),
)
# reset the version; let the server set the version for us
rule.metadata.version = None
result = self._core_client().core_v1_replace_rule(
self._workspace(),
name,
rule.to_api_obj(),
)
return Rule.from_api_obj(result)
[docs]
def exec_rule(
self,
spark,
rule_in: Rule,
) -> ExecRule:
"""
Locally execute a Rule. Must be run from within a Databricks
notebook or else an exception will be raised. This is intended
to facilitate Rule development.
:param spark: Spark context from Databricks notebook. Will be
injected into the execution environment for use by the
Rule notebook.
:param rule_in: The specification of the Rule to execute.
:returns ExecRule: A class containing various information and
functionality relating to the execution. See the docs for
ExecRule for additional details, but note that you must
call its cleanup function or tables created just for this
request will leak.
"""
Helpers.ensure_databricks()
with error_handler():
result = self._core_client().core_v1_render_rule(
self._workspace(),
rule_in.to_api_obj(),
)
try:
import notebook_utils
except ImportError as e:
raise ImportError(
"Package 'notebook_utils' not found. "
"Install it within this this notebook using "
f"%pip install {result.notebook_utils_path}"
)
exec(result.content, {"spark": spark})
return ExecRule(spark, result.tables)
[docs]
def get_observable_events(
self,
warehouse: str,
kind: str,
value: str,
cursor: Optional[str] = None,
limit: Optional[int] = None,
) -> Dbui.ObservableEvents.EventsList:
"""
Get the observable events associated with a specific field and value.
:param warehouse: The warehouse id to perform the operation on
:param kind: The observable kind
:param value: The observable value
:param cursor: A cursor to be used when paginating results
:param limit: A limit of the number of results to return
:returns: EventsList
"""
with error_handler():
return Dbui.ObservableEvents.EventsList.from_api_obj(
self._dbui_client().dbui_v1_get_observable_events(
self._workspace(),
warehouse=warehouse,
kind=kind,
value=value,
cursor=cursor,
limit=limit,
)
)
[docs]
def list_presets(self) -> DataSourcePresetsList:
"""
List the Presets in this workspace. This will include any user defined
presets if a custom presets path has been configured in the workspace.
:returns: DataSourcePresetsList
"""
with error_handler():
return DataSourcePresetsList.from_api_obj(
self._content_client().content_v1_get_preset_data_sources(
self._workspace(),
)
)
[docs]
def get_preset(self, name: str) -> DataSourcePreset:
"""
Get the preset with the argument name from the DASL server. If the preset name
begins with 'internal_' it will instead be collected from the user catalog,
provided a preset path is set in the workspace config.
:param name: The unique name of the DataSource preset within this workspace.
:returns: DataSourcePreset
"""
with error_handler():
return DataSourcePreset.from_api_obj(
self._content_client().content_v1_get_preset_datasource(
self._workspace(), name
)
)
[docs]
def purge_preset_cache(self) -> None:
"""
Purge the datasource cache presets. This will cause the DASL workspace
to fetch presets from provided sources.
"""
with error_handler():
self._content_client().content_v1_preset_purge_cache(self._workspace())
[docs]
def generate_query(
self,
sql: str,
warehouse: Optional[str] = None,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
) -> str:
"""
Generate a query from the given SQL.
:param sql: The SQL to use to create the query data set.
:param warehouse: The SQL warehouse use to execute the SQL. If
omitted, the default SQL warehouse specified in the workspace
config will be used.
:param start_date: The optional starting date to filter by for
the provided sql used to create the data set. Only rows with
their time column (see the time_col parameter) greater than
or equal to this value will be included in the data set. You
must specify a value for this parameter if you wish to filter
by time. Valid values include actual timestamps and computed
timestamps (such as now()).
:param end_date: The optional ending date to filter by for the
provided sql used to create the data set. The same caveats
apply as with the start_time parameter. However, this parameter
is not required and if omitted when a start_date is provided,
the current date will be used.
:returns str: The ID of the query generation operation. This value
can be used with get_query_status to track the progress of
the generation process, and eventually to perform lookups
on the completed query.
"""
time_range = None
if start_date is not None or end_date is not None:
time_range = DbuiV1QueryGenerateRequestTimeRange(
startDate=start_date,
endDate=end_date,
)
req = DbuiV1QueryGenerateRequest(
warehouse=warehouse,
sql=sql,
timeRange=time_range,
)
with error_handler():
return (
self._dbui_client()
.dbui_v1_query_generate(
self._workspace(),
req,
)
.id
)
[docs]
def get_query_status(
self,
id: str,
) -> DbuiV1QueryGenerateStatus:
"""
Check the status of a query generation operation. Since generation
happens in the background, it is up to the caller to check the
status until the return value's status member is either equal to
"succeeded" or "failed".
:param id: The id of the query generation operation.
:returns DbuiV1QueryGenerateStatus: The imporant field is
status (as used in the example code).
The following example demonstrates usage of the API.
Example:
id = client.generate_query("SELECT now() as time")
result = None
while True:
time.sleep(3)
status = client.get_query_status(id)
if status.status == "failed":
raise Exception("query failed")
if status.status == "succeeded":
break
"""
with error_handler():
return self._dbui_client().dbui_v1_query_generate_status(
self._workspace(),
id,
)
[docs]
def query_lookup(
self,
id: str,
warehouse: Optional[str] = None,
pagination: Optional[DbuiV1QueryLookupRequestPagination] = None,
start_value: Optional[str] = None,
row_count: Optional[int] = None,
refinements: Optional[List[str]] = None,
) -> DbuiV1QueryLookupResult:
"""
Perform a lookup on a query, which applies refinements to the
query and returns the results.
:param id: The query ID returned from query_generate and
get_query_status.
:param warehouse: The optional SQL warehouse ID to use to compute
the results. If not specified, uses the default SQL warehouse
configured for the workspace.
:param pagination: A sequence of fields and a direction that can
be applied to a lookup request. If 'fetchPreceding' is true,
the prior n rows up to the first row that matches the provided
fields will be returned. Otherwise, the n rows following the
first row that matches the provided fields will be returned.
:param start_value: An optional start value to constrain the data
being returned. This will be applied to the primary ordering
column if provided, before any refinements.
:param row_count: The maximum number of rows to include in a page.
Defaults to 1000, and must be in the range [1,1000].
:param refinements: Pipeline filters to be applied to the result.
Any SQL which is valid as a pipeline stage (i.e. coming between
|> symbols) is valid here, such as ORDER BY id, or WHERE
column = 'value'.
"""
with error_handler():
return self._dbui_client().dbui_v1_query_lookup(
self._workspace(),
id,
DbuiV1QueryLookupRequest(
warehouse=warehouse,
startValue=start_value,
pagination=pagination,
rowCount=row_count,
refinements=refinements,
),
)
[docs]
def query_histogram(
self,
id: str,
interval: str,
warehouse: Optional[str] = None,
start_date: str = None,
end_date: Optional[str] = None,
refinements: Optional[List[str]] = None,
) -> DbuiV1QueryHistogramResult:
"""
Perform a lookup on a query, which applies refinements to the
query and returns the results.
:param id: The query ID returned from query_generate and
get_query_status.
:param warehouse: The optional SQL warehouse ID to use to compute
the results. If not specified, uses the default SQL warehouse
configured for the workspace.
:param start_date: The start date filter. The resulting frequency
map will be restricted to rows where the time column value
is greater than or equal to this value. Valid values include
literal timestamps and function calls such as now().
:param end_date: The optional end date filter. If specified, the
resulting frequency map will contain only rows where the time
column value is less than or equal to this value.
:param interval: The duration of each interval in the resulting
frequency map. This must be an interval string in the format:
'1 day', '3 minutes 2 seconds', '2 weeks'.
:param refinements: Pipeline filters to be applied to the result.
Any SQL which is valid as a pipeline stage (i.e. coming between
|> symbols) is valid here, such as ORDER BY id, or WHERE
column = 'value'.
"""
with error_handler():
return self._dbui_client().dbui_v1_query_histogram(
self._workspace(),
id,
DbuiV1QueryHistogramRequest(
warehouse=warehouse,
startDate=start_date,
endDate=end_date,
interval=interval,
refinements=refinements,
),
)
[docs]
def query_cancel(self, id: str) -> None:
"""
Cancel an existing query.
:param id: The query ID returned from query_generate and
get_query_status.
"""
with error_handler():
return self._dbui_client().dbui_v1_query_cancel(self._workspace(), id)