Rules
Rules define how to transform and analyze data in your security lakehouse. They specify SQL-based transformations that run on your data to create security insights and detect threats.
Overview
Rules in DASL:
Transform data: Convert raw data into structured formats
Detect patterns: Identify security events and anomalies
Enrich data: Add context and calculated fields
Schedule execution: Run continuously or periodically
The primary components of a rule are:
Schedule: When and how often the rule runs
Input: Source tables and SQL transformation logic
Output: Summary and context for results
Metadata: Name, description, and configuration
Rule Structure
Rules have a complex nested structure with several required components:
from dasl_client import Rule, Schedule, Metadata
rule = Rule(
schedule=Schedule(...), # Required: when to run
input=Rule.Input(...), # Required: data sources and SQL
output=Rule.Output(...), # Required: output configuration
metadata=Metadata(...), # Optional: rule metadata
rule_metadata=Rule.RuleMetadata(...), # Optional: detection metadata
observables=[...] # Optional: threat indicators
)
Quick Reference
Common Operations:
from dasl_client import Client
client = Client.for_workspace()
# List all rules
rules = list(client.list_rules())
# Get specific rule
rule = client.get_rule("My Rule")
# Create new rule
client.create_rule("New Rule", rule_object)
# Update existing
client.replace_rule("My Rule", updated_rule)
# Delete
client.delete_rule("Old Rule")
Listing Rules
View all rules in your workspace:
from dasl_client import Client
client = Client.for_workspace()
# List all rules
for rule in client.list_rules():
print(f"Rule: {rule.metadata.name}")
print(f"Schedule: {rule.schedule.at_least_every}")
print(f"Enabled: {rule.schedule.enabled}")
if rule.input.stream and rule.input.stream.tables:
print(f"Input tables: {[t.name for t in rule.input.stream.tables]}")
print(f"Output: {rule.output.summary}")
print("---")
Getting Individual Rules
Retrieve a specific rule by name:
# Get a single rule
rule = client.get_rule("Failed Login Detection")
print(f"Rule name: {rule.metadata.name}")
print(f"Schedule: {rule.schedule.at_least_every}")
print(f"Input tables: {[t.name for t in rule.input.stream.tables] if rule.input.stream else 'None'}")
print(f"SQL content: {rule.input.stream.sql if rule.input.stream else 'None'}")
print(f"Output summary: {rule.output.summary}")
Create a Detection Rule
Let’s create a detection rule to detect blocked HTTP activity:
from dasl_client import Rule, Schedule
# Create a simple rule to detect failed logins
rule = Rule(
schedule=Schedule(
at_least_every="2h",
enabled=True,
),
input=Rule.Input(
stream=Rule.Input.Stream(
tables=[
Rule.Input.Stream.Table(name="http_activity"),
],
filter="disposition = 'Blocked'",
starting_timestamp=datetime(2025, 7, 8, 16, 47, 30),
),
),
output=Rule.Output(
summary="record was blocked",
),
)
try:
created_rule = client.create_rule("Detect Blocked HTTP Activity", rule)
print(f"Successfully created rule: {created_rule.metadata.name}")
except Exception as e:
print(f"Error creating rule: {e}")
Retrieve and Update a Detection Rule
You can retrieve and modify rules after creation:
# Get the rule we just created
my_rule = client.get_rule("Detect Blocked HTTP Activity")
print(f"Retrieved: {my_rule.metadata.name}")
print(f"Status: {my_rule.status}")
# Update the rule content
my_rule.input.stream.filter = "disposition = 'Denied'"
# Update the rule (this replaces the entire rule)
updated_rule = client.replace_rule("Detect Blocked HTTP Activity", my_rule)
print("Updated rule")
Deleting Rules
Remove rules that are no longer needed:
# Delete a rule
client.delete_rule("Old Test Rule")
print("Rule deleted")
Note: Deleting a rule stops its execution but doesn’t remove data already processed by the rule.
Next Steps
Workspace configuration: Workspace Configuration to manage rule execution settings
API reference: Client for detailed rule methods