Source code for dasl_client.types.rule
from pydantic import BaseModel
from typing import Dict, List, Optional, Union
from datetime import datetime, timezone
from dasl_api import (
CoreV1Rule,
CoreV1RuleSpec,
CoreV1RuleSpecMetadata,
CoreV1RuleSpecMetadataMitreInner,
CoreV1RuleSpecMetadataResponse,
CoreV1RuleSpecMetadataResponsePlaybooksInner,
CoreV1RuleSpecInput,
CoreV1RuleSpecInputBatch,
CoreV1RuleSpecInputStream,
CoreV1RuleSpecInputStreamCustom,
CoreV1RuleSpecInputStreamTablesInner,
CoreV1RuleSpecInputStreamTablesInnerWatermark,
CoreV1RuleObservable,
CoreV1RuleObservableRisk,
CoreV1RuleSpecOutput,
CoreV1RuleSpecCollate,
CoreV1RuleSpecInputBatchCustom,
)
from .helpers import Helpers
from .types import Metadata, ResourceStatus, Schedule
[docs]
class Rule(BaseModel):
"""
Rules define how to generate notables from input data.
Attributes:
metadata (Optional[Metadata]):
Standard object metadata.
rule_metadata (Optional[Rule.RuleMetadata]):
The rule configuration metadata.
schedule (Schedule):
The rule schedule.
input (Rule.Input):
The rule input configuration.
observables (Optional[List[Rule.Observable]]):
A list of observables.
output (Rule.Output):
The rule output configuration.
collate (Optional[Rule.Collate]):
The collate configuration.
status (Optional[ResourceStatus]):
The current status of the rule.
"""
[docs]
class RuleMetadata(BaseModel):
"""
RuleMetadata object wrapping CoreV1RuleSpecMetadata.
Attributes:
version (Optional[Union[float, int]]):
The current version of the rule.
category (Optional[str]):
The category of this detection. The available values are
configured in workspace config.
severity (Optional[str]):
The threat level associated with the notable.
fidelity (Optional[str]):
Fidelity is used to capture the maturity of a rule. Newly
created, untested rules should be marked as Investigative
fidelity. Older, more well-tested rules should be marked as
High fidelity. This helps an analyst determine how likely a
notable is a false positive.
mitre (Optional[List[Rule.RuleMetadata.Mitre]]):
Mitre ATT&CK tactic information.
objective (Optional[str]):
A longer form description of what this rule is attempting to
detect (objectMeta.comment is a summary).
response (Optional[Rule.RuleMetadata.Response]):
Response configuration for the rule.
"""
[docs]
class Mitre(BaseModel):
"""
Mitre ATT&CK details associated with a Rule.
Attributes:
taxonomy (Optional[str]):
Mitre ATT&CK taxonomy.
tactic (Optional[str]):
Mitre ATT&CK tactic.
technique_id (Optional[str]):
The Mitre ATT&CK technique identifier.
technique (Optional[str]):
The Mitre ATT&CK technique human-readable name.
sub_technique_id (Optional[str]):
The Mitre ATT&CK sub-technique identifier.
sub_technique (Optional[str]):
The Mitre ATT&CK sub-technique human-readable name.
"""
taxonomy: Optional[str] = None
tactic: Optional[str] = None
technique_id: Optional[str] = None
technique: Optional[str] = None
sub_technique_id: Optional[str] = None
sub_technique: Optional[str] = None
[docs]
@staticmethod
def from_api_obj(
obj: Optional[CoreV1RuleSpecMetadataMitreInner],
) -> "Rule.RuleMetadata.Mitre":
if obj is None:
return None
return Rule.RuleMetadata.Mitre(
taxonomy=obj.taxonomy,
tactic=obj.tactic,
technique_id=obj.technique_id,
technique=obj.technique,
sub_technique_id=obj.sub_technique_id,
sub_technique=obj.sub_technique,
)
[docs]
def to_api_obj(self) -> CoreV1RuleSpecMetadataMitreInner:
return CoreV1RuleSpecMetadataMitreInner(
taxonomy=self.taxonomy,
tactic=self.tactic,
technique_id=self.technique_id,
technique=self.technique,
sub_technique_id=self.sub_technique_id,
sub_technique=self.sub_technique,
)
[docs]
class Response(BaseModel):
"""
Details regarding how to respond to a notable generated
by the Rule.
Attributes:
guidelines (Optional[str]):
Response guidelines.
playbooks (Optional[List[Rule.RuleMetadata.Response.Playbook]]):
Suggested response playbooks.
"""
[docs]
class Playbook(BaseModel):
"""
Databricks notebook and template values to generate a playbook
for the analyst from the notable.
Attributes:
notebook (Optional[str]):
Notebook to run.
options (Optional[Dict[str, str]]):
These are templated, if they contain ${} this will be
filled in using the notable.
"""
notebook: Optional[str] = None
options: Optional[Dict[str, str]] = None
[docs]
@staticmethod
def from_api_obj(
obj: Optional[CoreV1RuleSpecMetadataResponsePlaybooksInner],
) -> "Rule.RuleMetadata.Response.Playbook":
if obj is None:
return None
return Rule.RuleMetadata.Response.Playbook(
notebook=obj.notebook,
options=obj.options,
)
[docs]
def to_api_obj(self) -> CoreV1RuleSpecMetadataResponsePlaybooksInner:
return CoreV1RuleSpecMetadataResponsePlaybooksInner(
notebook=self.notebook,
options=self.options,
)
guidelines: Optional[str] = None
playbooks: Optional[List["Rule.RuleMetadata.Response.Playbook"]] = None
[docs]
@staticmethod
def from_api_obj(
obj: Optional[CoreV1RuleSpecMetadataResponse],
) -> "Rule.RuleMetadata.Response":
if obj is None:
return None
playbooks = None
if obj.playbooks is not None:
playbooks = [
Rule.RuleMetadata.Response.Playbook.from_api_obj(item)
for item in obj.playbooks
]
return Rule.RuleMetadata.Response(
guidelines=obj.guidelines,
playbooks=playbooks,
)
[docs]
def to_api_obj(self) -> CoreV1RuleSpecMetadataResponse:
playbooks = None
if self.playbooks is not None:
playbooks = [item.to_api_obj() for item in self.playbooks]
return CoreV1RuleSpecMetadataResponse(
guidelines=self.guidelines,
playbooks=playbooks,
)
version: Optional[Union[float, int]] = None
category: Optional[str] = None
severity: Optional[str] = None
fidelity: Optional[str] = None
mitre: Optional[List["Rule.RuleMetadata.Mitre"]] = None
objective: Optional[str] = None
response: Optional["Rule.RuleMetadata.Response"] = None
[docs]
@staticmethod
def from_api_obj(obj: Optional[CoreV1RuleSpecMetadata]) -> "Rule.RuleMetadata":
if obj is None:
return None
mitre = None
if obj.mitre is not None:
mitre = [
Rule.RuleMetadata.Mitre.from_api_obj(item) for item in obj.mitre
]
return Rule.RuleMetadata(
version=obj.version,
category=obj.category,
severity=obj.severity,
fidelity=obj.fidelity,
mitre=mitre,
objective=obj.objective,
response=Rule.RuleMetadata.Response.from_api_obj(obj.response),
)
[docs]
def to_api_obj(self) -> CoreV1RuleSpecMetadata:
mitre = None
if self.mitre is not None:
mitre = [item.to_api_obj() for item in self.mitre]
return CoreV1RuleSpecMetadata(
version=self.version,
category=self.category,
severity=self.severity,
fidelity=self.fidelity,
mitre=mitre,
objective=self.objective,
response=Helpers.maybe(lambda o: o.to_api_obj(), self.response),
)
[docs]
class Input(BaseModel):
"""
Specification of input data for the Rule.
Attributes:
stream (Optional[Rule.Input.Stream]):
Input if the Rule should operate on streaming input data.
batch (Optional[Rule.Input.Batch]):
Input if the rule should operate on batched input data.
"""
[docs]
class CustomStream(BaseModel):
"""
Specification of a stream custom notebook for generating input to
the Rule.
Attributes:
notebook (Optional[str]):
options (Optional[Dict[str, str]]):
"""
notebook: Optional[str] = None
options: Optional[Dict[str, str]] = None
[docs]
@staticmethod
def from_api_obj(
obj: Optional[CoreV1RuleSpecInputStreamCustom],
) -> "Rule.Input.CustomStream":
if obj is None:
return None
return Rule.Input.CustomStream(
notebook=obj.notebook,
options=obj.options,
)
[docs]
def to_api_obj(self) -> CoreV1RuleSpecInputStreamCustom:
return CoreV1RuleSpecInputStreamCustom(
notebook=self.notebook,
options=self.options,
)
[docs]
class CustomBatch(BaseModel):
"""
Specification of a batch custom notebook for generating input to
the Rule.
Attributes:
notebook (Optional[str]):
options (Optional[Dict[str, str]]):
"""
notebook: Optional[str] = None
options: Optional[Dict[str, str]] = None
[docs]
@staticmethod
def from_api_obj(
obj: Optional[CoreV1RuleSpecInputBatchCustom],
) -> "Rule.Input.CustomBatch":
if obj is None:
return None
return Rule.Input.CustomBatch(
notebook=obj.notebook,
options=obj.options,
)
[docs]
def to_api_obj(self) -> CoreV1RuleSpecInputBatchCustom:
return CoreV1RuleSpecInputBatchCustom(
notebook=self.notebook,
options=self.options,
)
[docs]
class Stream(BaseModel):
"""
Specification for streaming input from a table or tables,
from a custom notebook, or directly from a SQL statement.
Attributes:
tables (Optional[List[Rule.Input.Stream.Table]]):
List of input tables and join rules.
filter (Optional[str]):
A filter expression to be applied to the input stream.
Note that this cannot be used in conjunction with a
custom SQL expression (i.e. the sql field).
sql (Optional[str]):
A custom SQL expression to apply to the input stream
before matching. Note that this cannot be used in
conjunction with a filter expression (i.e. the
filter member).
custom (Optional[Rule.Input.CustomStream]):
starting_timestamp (Optional[datetime]):
Starting timestamp for streaming input data. If this
value is not specified, then the timestamp when this rule
was created will be used. This setting is used to determine
the starting point for streaming historical data, and only
applies on the first run of the rule. Once some data has
been streamed and a checkpoint has been created, this
setting no longer has any impact.
"""
[docs]
class Table(BaseModel):
"""
Specification of a table for streaming input.
Attributes:
name (Optional[str]):
Name of the table.
watermark (Optional[Rule.Input.Stream.Table.Watermark]):
Watermark configuration.
alias (Optional[str]):
Alias name for the table.
join_type (Optional[str]):
For tables other than the first, how to join to the
preceding table.
join_expr (Optional[str]):
For tables other than the first, the join condition
expression to join with the preceding table.
streaming (Optional[bool]):
For tables other than the first, is this a streaming
join or static. Default is false, except on the first
table.
"""
[docs]
class Watermark(BaseModel):
"""
Watermark for a streaming input table.
Attributes:
event_time_column (str):
Which column is the event time for the delay
threshold.
delay_threshold (str):
A time duration string for the watermark delay.
drop_duplicates (Optional[List[str]]):
Pass into pyspark dropDuplicates (effectively
columns for group by).
"""
event_time_column: str
delay_threshold: str
drop_duplicates: Optional[List[str]] = None
[docs]
@staticmethod
def from_api_obj(
obj: Optional[CoreV1RuleSpecInputStreamTablesInnerWatermark],
) -> "Rule.Input.Stream.Table.Watermark":
if obj is None:
return None
return Rule.Input.Stream.Table.Watermark(
event_time_column=obj.event_time_column,
delay_threshold=obj.delay_threshold,
drop_duplicates=obj.drop_duplicates,
)
[docs]
def to_api_obj(
self,
) -> CoreV1RuleSpecInputStreamTablesInnerWatermark:
return CoreV1RuleSpecInputStreamTablesInnerWatermark(
event_time_column=self.event_time_column,
delay_threshold=self.delay_threshold,
drop_duplicates=self.drop_duplicates,
)
name: Optional[str] = None
watermark: Optional["Rule.Input.Stream.Table.Watermark"] = None
alias: Optional[str] = None
join_type: Optional[str] = None
join_expr: Optional[str] = None
streaming: Optional[bool] = None
[docs]
@staticmethod
def from_api_obj(
obj: Optional[CoreV1RuleSpecInputStreamTablesInner],
) -> "Rule.Input.Stream.Table":
if obj is None:
return None
return Rule.Input.Stream.Table(
name=obj.name,
watermark=Rule.Input.Stream.Table.Watermark.from_api_obj(
obj.watermark
),
alias=obj.alias,
join_type=obj.join_type,
join_expr=obj.join_expr,
streaming=obj.streaming,
)
[docs]
def to_api_obj(self) -> CoreV1RuleSpecInputStreamTablesInner:
return CoreV1RuleSpecInputStreamTablesInner(
name=self.name,
watermark=Helpers.maybe(
lambda o: o.to_api_obj(), self.watermark
),
alias=self.alias,
join_type=self.join_type,
join_expr=self.join_expr,
streaming=self.streaming,
)
tables: Optional[List["Rule.Input.Stream.Table"]] = None
filter: Optional[str] = None
sql: Optional[str] = None
custom: Optional["Rule.Input.CustomStream"] = None
starting_timestamp: Optional[datetime] = None
[docs]
@staticmethod
def from_api_obj(
obj: Optional[CoreV1RuleSpecInputStream],
) -> "Rule.Input.Stream":
if obj is None:
return None
tables = None
if obj.tables is not None:
tables = [
Rule.Input.Stream.Table.from_api_obj(item)
for item in obj.tables
]
starting_timestamp = obj.starting_timestamp
if starting_timestamp is not None and starting_timestamp.tzinfo is None:
starting_timestamp = starting_timestamp.replace(tzinfo=timezone.utc)
return Rule.Input.Stream(
tables=tables,
filter=obj.filter,
sql=obj.sql,
custom=Rule.Input.CustomStream.from_api_obj(obj.custom),
starting_timestamp=starting_timestamp,
)
[docs]
def to_api_obj(self) -> CoreV1RuleSpecInputStream:
tables = None
if self.tables is not None:
tables = [item.to_api_obj() for item in self.tables]
# tzinfo must be attached to the starting timestamp or else
# the serialization (without trailing time zone) will be
# rejected by the server.
starting_timestamp = self.starting_timestamp
if starting_timestamp is not None and starting_timestamp.tzinfo is None:
starting_timestamp = starting_timestamp.replace(tzinfo=timezone.utc)
return CoreV1RuleSpecInputStream(
tables=tables,
filter=self.filter,
sql=self.sql,
custom=Helpers.maybe(lambda o: o.to_api_obj(), self.custom),
starting_timestamp=starting_timestamp,
)
[docs]
class Batch(BaseModel):
"""
Specification for batch input to a Rule, either from a
SQL statement or a custom notebook.
Attributes:
sql (Optional[str]):
custom (Optional[Rule.Input.CustomBatch]):
"""
sql: Optional[str] = None
custom: Optional["Rule.Input.CustomBatch"] = None
[docs]
@staticmethod
def from_api_obj(
obj: Optional[CoreV1RuleSpecInputBatch],
) -> "Rule.Input.Batch":
if obj is None:
return None
return Rule.Input.Batch(
sql=obj.sql,
custom=Rule.Input.CustomBatch.from_api_obj(obj.custom),
)
[docs]
def to_api_obj(self) -> CoreV1RuleSpecInputBatch:
return CoreV1RuleSpecInputBatch(
sql=self.sql,
custom=Helpers.maybe(lambda o: o.to_api_obj(), self.custom),
)
stream: Optional["Rule.Input.Stream"] = None
batch: Optional["Rule.Input.Batch"] = None
[docs]
@staticmethod
def from_api_obj(obj: Optional[CoreV1RuleSpecInput]) -> "Rule.Input":
return Rule.Input(
stream=Rule.Input.Stream.from_api_obj(obj.stream),
batch=Rule.Input.Batch.from_api_obj(obj.batch),
)
[docs]
def to_api_obj(self) -> CoreV1RuleSpecInput:
to_api_obj = lambda o: o.to_api_obj()
return CoreV1RuleSpecInput(
stream=Helpers.maybe(to_api_obj, self.stream),
batch=Helpers.maybe(to_api_obj, self.batch),
)
[docs]
class Observable(BaseModel):
"""
Observable associated with a Rule.
Attributes:
kind (str):
value (str):
relationship (str):
risk (Rule.Observable.Risk):
"""
[docs]
class Risk(BaseModel):
"""
Risk level associated with an Observable.
Attributes:
impact (str):
A SQL expression indicating the impact (should evaluate to
a number between 0-100).
confidence (str):
A SQL expression indicating the confidence (should
evaluate to a number between 0-100).
"""
impact: str
confidence: str
[docs]
@staticmethod
def from_api_obj(
obj: Optional[CoreV1RuleObservableRisk],
) -> "Rule.Observable.Risk":
if obj is None:
return None
return Rule.Observable.Risk(
impact=obj.impact,
confidence=obj.confidence,
)
[docs]
def to_api_obj(self) -> CoreV1RuleObservableRisk:
return CoreV1RuleObservableRisk(
impact=self.impact,
confidence=self.confidence,
)
kind: str
value: str
relationship: str
risk: "Rule.Observable.Risk"
[docs]
@staticmethod
def from_api_obj(obj: Optional[CoreV1RuleObservable]) -> "Rule.Observable":
if obj is None:
return None
return Rule.Observable(
kind=obj.kind,
value=obj.value,
relationship=obj.relationship,
risk=Rule.Observable.Risk.from_api_obj(obj.risk),
)
[docs]
def to_api_obj(self) -> CoreV1RuleObservable:
return CoreV1RuleObservable(
kind=self.kind,
value=self.value,
relationship=self.relationship,
risk=self.risk.to_api_obj(),
)
[docs]
class Output(BaseModel):
"""
Output from a Rule after matching.
Attributes:
summary (Optional[str]):
context (Optional[Dict[str, str]]):
"""
summary: Optional[str] = None
context: Optional[Dict[str, str]] = None
[docs]
@staticmethod
def from_api_obj(obj: Optional[CoreV1RuleSpecOutput]) -> "Rule.Output":
if obj is None:
return None
return Rule.Output(
summary=obj.summary,
context=obj.context,
)
[docs]
def to_api_obj(self) -> CoreV1RuleSpecOutput:
return CoreV1RuleSpecOutput(
summary=self.summary,
context=self.context,
)
[docs]
class Collate(BaseModel):
"""
Configuration for collating multiple notables generated from a Rule
within a given timeframe.
Attributes:
collate_on (Optional[List[str]]):
SQL column(s) to collate notables on. This must be either
'summary', 'context', or a JSON key within 'context'. Only
one level of nesting is currently supported.
within (Optional[str]):
A time duration string for the window to collate within.
action (Optional[str]):
append | updateStats | inhibit.
"""
collate_on: Optional[List[str]] = None
within: Optional[str] = None
action: Optional[str] = None
[docs]
@staticmethod
def from_api_obj(obj: Optional[CoreV1RuleSpecCollate]) -> "Rule.Collate":
if obj is None:
return None
return Rule.Collate(
collate_on=obj.collate_on,
within=obj.within,
action=obj.action,
)
[docs]
def to_api_obj(self) -> CoreV1RuleSpecCollate:
return CoreV1RuleSpecCollate(
collate_on=self.collate_on,
within=self.within,
action=self.action,
)
metadata: Optional[Metadata] = None
rule_metadata: Optional["Rule.RuleMetadata"] = None
schedule: Schedule
input: "Rule.Input"
observables: Optional[List["Rule.Observable"]] = None
output: "Rule.Output"
collate: Optional["Rule.Collate"] = None
status: Optional[ResourceStatus] = None
[docs]
@staticmethod
def from_api_obj(obj: CoreV1Rule) -> "Rule":
observables = None
if obj.spec.observables is not None:
observables = [
Rule.Observable.from_api_obj(item) for item in obj.spec.observables
]
return Rule(
metadata=Metadata.from_api_obj(obj.metadata),
rule_metadata=Rule.RuleMetadata.from_api_obj(obj.spec.metadata),
schedule=Schedule.from_api_obj(obj.spec.schedule),
input=Rule.Input.from_api_obj(obj.spec.input),
observables=observables,
output=Rule.Output.from_api_obj(obj.spec.output),
collate=Rule.Collate.from_api_obj(obj.spec.collate),
status=ResourceStatus.from_api_obj(obj.status),
)
[docs]
def to_api_obj(self) -> CoreV1Rule:
observables = None
if self.observables is not None:
observables = [item.to_api_obj() for item in self.observables]
to_api_obj = lambda o: o.to_api_obj()
return CoreV1Rule(
api_version="v1",
kind="Rule",
metadata=Helpers.maybe(to_api_obj, self.metadata),
spec=CoreV1RuleSpec(
metadata=Helpers.maybe(to_api_obj, self.rule_metadata),
schedule=self.schedule.to_api_obj(),
input=self.input.to_api_obj(),
observables=observables,
output=self.output.to_api_obj(),
collate=Helpers.maybe(to_api_obj, self.collate),
),
status=Helpers.maybe(to_api_obj, self.status),
)