Datasources

Datasources define how external data flows into your security lakehouse. They specify source information, ingestion details, processing pipelines, and scheduling.

Overview

A datasource specifies:

  • Source information: Where the data comes from (AWS, GCP, etc.)

  • Ingestion details: How to read the data (S3 paths, API endpoints, etc.)

  • Processing pipeline: How to transform raw data through bronze, silver, and gold layers

  • Scheduling: When and how often to run the ingestion

Listing Datasources

View all datasources in your workspace:

from dasl_client import Client

client = Client.for_workspace()

# Get all datasources
for datasource in client.list_datasources():
    print(f"Name: {datasource.metadata.name}")
    print(f"Source: {datasource.source} - {datasource.source_type}")
    print(f"Enabled: {datasource.schedule.enabled}")
    print("---")

Getting Individual Datasources

Retrieve a specific datasource by name:

# Get a single datasource
datasource = client.get_datasource("My AWS WAF Logs")
print(f"Bronze table: {datasource.bronze.bronze_table}")

# Check if using a preset
if datasource.use_preset:
    print(f"Uses preset: {datasource.use_preset}")

Creating Datasources with Presets

The easiest way to create datasources is using presets:

from dasl_client import Client

client = Client.for_workspace()

# Create a datasource using a preset
datasource = client.create_datasource(
    name="My CloudTrail Logs",
    preset="aws-cloudtrail",
    source="my-cloudtrail-bucket",
    source_type="s3"
)

print(f"Created datasource: {datasource.metadata.name}")

Creating Custom Datasources

For more control, create datasources with custom specifications:

from dasl_client import DataSource, Schedule, BronzeSpec, SilverSpec

# Define the datasource structure
datasource = DataSource(
    source="aws",
    source_type="cloudtrail",
    autoloader=Autoloader(
        enabled=True,
        schedule=Schedule(
            at_least_every="1h",
            enabled=True
        )
    ),
    bronze=BronzeSpec(
        bronze_table="security_logs_bronze",
        skip_bronze_loading=False
    ),
    silver=SilverSpec(
        # Configure silver layer here, see the API reference for more details
    ),
    gold=GoldSpec(
        # Configure gold layer here, see the API reference for more details
    )
)

# Create the datasource
created_datasource = client.create_datasource("Custom Security Logs", datasource)

Updating Datasources

Modify existing datasources:

# Get existing datasource
datasource = client.get_datasource("My CloudTrail Logs")

# Update the schedule to run more frequently
datasource.schedule.at_least_every = "30m"

# Update the source location
datasource.source = "s3://new-cloudtrail-bucket/logs/"

# Replace the datasource
updated_datasource = client.replace_datasource("My CloudTrail Logs", datasource)
print("Datasource updated successfully")

Enabling and Disabling Datasources

Control datasource execution:

# Disable a datasource
datasource = client.get_datasource("My Datasource")
datasource.schedule.enabled = False
client.replace_datasource("My Datasource", datasource)

# Enable a datasource
datasource = client.get_datasource("My Datasource")
datasource.schedule.enabled = True
client.replace_datasource("My Datasource", datasource)

Deleting Datasources

Remove datasources that are no longer needed:

# Delete a datasource
client.delete_datasource("Old Test Datasource")
print("Datasource deleted")

Note: Deleting a datasource stops data ingestion but doesn’t remove data already processed.

Scheduling and Automation

Configure when and how often datasources run:

Schedule Options

from dasl_client import Schedule

# Run every hour
schedule = Schedule(
    at_least_every="1h",
    enabled=True
)

# Run every 15 minutes
schedule = Schedule(
    at_least_every="15m",
    enabled=True
)

# Run daily at a specific time
schedule = Schedule(
    exactly="0 2 * * *",  # Daily at 2 AM
    enabled=True
)

# Continuous streaming
schedule = Schedule(
    continuous=True,
    enabled=True
)

Compute Configuration

Configure compute resources for datasource processing:

datasource = DataSource(
    # ... other configuration ...
    compute_mode="streaming",  # or "batch"
    schedule=Schedule(
        at_least_every="1h",
        enabled=True,
        compute_group="high-memory"  # Optional: specify compute group
    )
)

Error Handling

Handle common errors when working with datasources:

from dasl_client.errors import ConflictError, BadRequestError, NotFoundError

try:
    datasource = client.create_datasource("My Datasource", datasource_spec)
except ConflictError:
    print("A datasource with this name already exists")
except BadRequestError as e:
    print(f"Invalid datasource specification: {e}")
except Exception as e:
    print(f"Unexpected error: {e}")

try:
    datasource = client.get_datasource("Non-existent Datasource")
except NotFoundError:
    print("Datasource not found")

Best Practices

  • Use presets when possible: They provide tested configurations for common data sources

  • Test with small data first: Validate your configuration before processing large datasets

  • Monitor performance: Check that datasources don’t impact system performance

  • Use appropriate scheduling: Don’t run more frequently than needed

  • Handle errors gracefully: Implement proper error handling in your scripts

Quick Reference

Common Operations:

from dasl_client import Client

client = Client.for_workspace()

# List all datasources
datasources = list(client.list_datasources())

# Get specific datasource
ds = client.get_datasource("My Datasource")

# Create with preset
client.create_datasource("New DS", preset="aws-cloudtrail")

# Update existing
client.replace_datasource("My DS", updated_datasource)

# Delete
client.delete_datasource("Old DS")

Next Steps

  • Rules: Rules to process your datasource data

  • Workspace configuration: Workspace Configuration to manage datasource settings

  • API reference: Client for detailed datasource methods