Datasources
Datasources define how external data flows into your security lakehouse. They specify source information, ingestion details, processing pipelines, and scheduling.
Overview
A datasource specifies:
Source information: Where the data comes from (AWS, GCP, etc.)
Ingestion details: How to read the data (S3 paths, API endpoints, etc.)
Processing pipeline: How to transform raw data through bronze, silver, and gold layers
Scheduling: When and how often to run the ingestion
Listing Datasources
View all datasources in your workspace:
from dasl_client import Client
client = Client.for_workspace()
# Get all datasources
for datasource in client.list_datasources():
print(f"Name: {datasource.metadata.name}")
print(f"Source: {datasource.source} - {datasource.source_type}")
print(f"Enabled: {datasource.schedule.enabled}")
print("---")
Getting Individual Datasources
Retrieve a specific datasource by name:
# Get a single datasource
datasource = client.get_datasource("My AWS WAF Logs")
print(f"Bronze table: {datasource.bronze.bronze_table}")
# Check if using a preset
if datasource.use_preset:
print(f"Uses preset: {datasource.use_preset}")
Creating Datasources with Presets
The easiest way to create datasources is using presets:
from dasl_client import Client
client = Client.for_workspace()
# Create a datasource using a preset
datasource = client.create_datasource(
name="My CloudTrail Logs",
preset="aws-cloudtrail",
source="my-cloudtrail-bucket",
source_type="s3"
)
print(f"Created datasource: {datasource.metadata.name}")
Creating Custom Datasources
For more control, create datasources with custom specifications:
from dasl_client import DataSource, Schedule, BronzeSpec, SilverSpec
# Define the datasource structure
datasource = DataSource(
source="aws",
source_type="cloudtrail",
autoloader=Autoloader(
enabled=True,
schedule=Schedule(
at_least_every="1h",
enabled=True
)
),
bronze=BronzeSpec(
bronze_table="security_logs_bronze",
skip_bronze_loading=False
),
silver=SilverSpec(
# Configure silver layer here, see the API reference for more details
),
gold=GoldSpec(
# Configure gold layer here, see the API reference for more details
)
)
# Create the datasource
created_datasource = client.create_datasource("Custom Security Logs", datasource)
Updating Datasources
Modify existing datasources:
# Get existing datasource
datasource = client.get_datasource("My CloudTrail Logs")
# Update the schedule to run more frequently
datasource.schedule.at_least_every = "30m"
# Update the source location
datasource.source = "s3://new-cloudtrail-bucket/logs/"
# Replace the datasource
updated_datasource = client.replace_datasource("My CloudTrail Logs", datasource)
print("Datasource updated successfully")
Enabling and Disabling Datasources
Control datasource execution:
# Disable a datasource
datasource = client.get_datasource("My Datasource")
datasource.schedule.enabled = False
client.replace_datasource("My Datasource", datasource)
# Enable a datasource
datasource = client.get_datasource("My Datasource")
datasource.schedule.enabled = True
client.replace_datasource("My Datasource", datasource)
Deleting Datasources
Remove datasources that are no longer needed:
# Delete a datasource
client.delete_datasource("Old Test Datasource")
print("Datasource deleted")
Note: Deleting a datasource stops data ingestion but doesn’t remove data already processed.
Scheduling and Automation
Configure when and how often datasources run:
Schedule Options
from dasl_client import Schedule
# Run every hour
schedule = Schedule(
at_least_every="1h",
enabled=True
)
# Run every 15 minutes
schedule = Schedule(
at_least_every="15m",
enabled=True
)
# Run daily at a specific time
schedule = Schedule(
exactly="0 2 * * *", # Daily at 2 AM
enabled=True
)
# Continuous streaming
schedule = Schedule(
continuous=True,
enabled=True
)
Compute Configuration
Configure compute resources for datasource processing:
datasource = DataSource(
# ... other configuration ...
compute_mode="streaming", # or "batch"
schedule=Schedule(
at_least_every="1h",
enabled=True,
compute_group="high-memory" # Optional: specify compute group
)
)
Error Handling
Handle common errors when working with datasources:
from dasl_client.errors import ConflictError, BadRequestError, NotFoundError
try:
datasource = client.create_datasource("My Datasource", datasource_spec)
except ConflictError:
print("A datasource with this name already exists")
except BadRequestError as e:
print(f"Invalid datasource specification: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
try:
datasource = client.get_datasource("Non-existent Datasource")
except NotFoundError:
print("Datasource not found")
Best Practices
Use presets when possible: They provide tested configurations for common data sources
Test with small data first: Validate your configuration before processing large datasets
Monitor performance: Check that datasources don’t impact system performance
Use appropriate scheduling: Don’t run more frequently than needed
Handle errors gracefully: Implement proper error handling in your scripts
Quick Reference
Common Operations:
from dasl_client import Client
client = Client.for_workspace()
# List all datasources
datasources = list(client.list_datasources())
# Get specific datasource
ds = client.get_datasource("My Datasource")
# Create with preset
client.create_datasource("New DS", preset="aws-cloudtrail")
# Update existing
client.replace_datasource("My DS", updated_datasource)
# Delete
client.delete_datasource("Old DS")
Next Steps
Rules: Rules to process your datasource data
Workspace configuration: Workspace Configuration to manage datasource settings
API reference: Client for detailed datasource methods