Manage data pipelines

Monitor and manage your data pipelines after creation. For creating pipelines, see Create a pipeline.

List pipelines

viam datapipelines list --org-id=<org-id>
pipelines = await data_client.list_data_pipelines(organization_id=ORG_ID)
for p in pipelines:
    print(f"{p.id}: {p.name} (enabled={p.enabled}, schedule={p.schedule})")
pipelines, err := dataClient.ListDataPipelines(ctx, orgID)
if err != nil {
    logger.Fatal(err)
}
for _, p := range pipelines {
    fmt.Printf("%s: %s (enabled=%v, schedule=%s)\n", p.ID, p.Name, p.Enabled, p.Schedule)
}

Get pipeline details

viam datapipelines describe --id=<pipeline-id>
pipeline = await data_client.get_data_pipeline(id="YOUR-PIPELINE-ID")
print(f"Name: {pipeline.name}")
print(f"Schedule: {pipeline.schedule}")
print(f"Enabled: {pipeline.enabled}")
print(f"Data source: {pipeline.data_source_type}")
print(f"Created: {pipeline.created_on}")
pipeline, err := dataClient.GetDataPipeline(ctx, "YOUR-PIPELINE-ID")
if err != nil {
    logger.Fatal(err)
}
fmt.Printf("Name: %s\nSchedule: %s\nEnabled: %v\n", pipeline.Name, pipeline.Schedule, pipeline.Enabled)

Monitor pipeline runs

Each pipeline run has a status and an associated time window showing which data it processed.

# Returns a page of runs (default page size: 10)
page = await data_client.list_data_pipeline_runs(id="YOUR-PIPELINE-ID")
for run in page.runs:
    print(f"Run {run.id}: {run.status}")
    print(f"  Data window: {run.data_start_time} to {run.data_end_time}")
    if run.error_message:
        print(f"  Error: {run.error_message}")

# Get the next page if there are more runs
if page.next_page_token:
    next_page = await page.next_page()
// Returns a page of runs (default page size: 10)
page, err := dataClient.ListDataPipelineRuns(ctx, "YOUR-PIPELINE-ID", 10)
if err != nil {
    logger.Fatal(err)
}
for _, run := range page.Runs {
    fmt.Printf("Run %s: %d\n", run.ID, run.Status)
    fmt.Printf("  Data window: %s to %s\n", run.DataStartTime, run.DataEndTime)
    if run.ErrorMessage != "" {
        fmt.Printf("  Error: %s\n", run.ErrorMessage)
    }
}

// Get the next page
nextPage, err := page.NextPage(ctx)

Run statuses:

StatusMeaning
SCHEDULEDThe run is queued and waiting to execute (2-minute delay before execution starts).
STARTEDThe run is executing the MQL aggregation against the data source.
COMPLETEDThe run finished and results are in the pipeline sink.
FAILEDThe run encountered an error. Check the error_message field.

If a run stays in STARTED for more than 10 minutes, it is automatically marked as failed and a new run is created for that time window.

Enable a pipeline

viam datapipelines enable --id=<pipeline-id>
err = dataClient.EnableDataPipeline(ctx, "YOUR-PIPELINE-ID")

Disable a pipeline

viam datapipelines disable --id=<pipeline-id>
err = dataClient.DisableDataPipeline(ctx, "YOUR-PIPELINE-ID")

Disabling a pipeline stops future scheduled runs but does not delete existing results. When you re-enable a pipeline, it resumes from the next scheduled time window. It does not backfill windows it missed while disabled.

Rename a pipeline

viam datapipelines rename --id=<pipeline-id> --name=new-name
await data_client.rename_data_pipeline(id="YOUR-PIPELINE-ID", name="new-name")
err = dataClient.RenameDataPipeline(ctx, "YOUR-PIPELINE-ID", "new-name")

Delete a pipeline

viam datapipelines delete --id=<pipeline-id>
await data_client.delete_data_pipeline(id="YOUR-PIPELINE-ID")
err = dataClient.DeleteDataPipeline(ctx, "YOUR-PIPELINE-ID")

Deleting a pipeline removes the pipeline configuration, its execution history, and all output data in the pipeline sink. This is not reversible. If you need to preserve pipeline results, export them before deleting the pipeline.

Troubleshooting

Pipeline consistently fails
  1. Check the error message in the run details (list_data_pipeline_runs or describe).
  2. Run the same MQL query manually in the query editor using MQL mode against the same data source. This isolates whether the issue is in the query or the pipeline configuration.
  3. Common failure causes:
    • Invalid MQL stage or syntax error
    • Query timeout (5-minute limit) on large datasets. Add a $match filter to reduce data.
    • Output exceeds 10,000 documents. Add $limit or make the $group less granular.
Re-enabled pipeline has gaps in results
This is expected. When you disable a pipeline, scheduled runs do not execute. When you re-enable it, it resumes from the next scheduled window. Missed windows are not retroactively processed, even if backfill is enabled. Backfill only applies to late-arriving data within windows the pipeline was active for.
Hot data store query returns no data
  • Verify the hot data store is enabled on the component. See Hot data store.
  • Check that data falls within the configured stored_hours window. Older data is removed hourly.
  • Verify data has been captured and synced within the retention window.