Previous
Manage pipelines
MQL patterns for common robotics data pipeline use cases. Each example can be passed to --mql or --mql-path when creating a pipeline.
_id in $projectThe $group stage produces documents with an _id field. If your pipeline runs multiple times and produces documents with the same _id, you get a duplicate key error. Always follow $group with $project to rename _id and set it to 0:
{ "$project": { "location": "$_id", "avg_temp": 1, "count": 1, "_id": 0 } }
Before creating a pipeline, run the MQL query manually in the query editor using MQL mode. This lets you verify the query returns the results you expect. The pipeline runs the same query with an automatic time constraint prepended.
$match earlyPlace $match stages at the beginning of your pipeline to reduce the amount of data processed by later stages. Filter by component_name, component_type, or other indexed fields for best performance. See indexed fields.
A sensor capturing at 10 Hz produces 36,000 readings per hour. This pipeline computes 1-minute averages, reducing the data to 60 summary documents per hour:
[
{ "$match": { "component_name": "accel-sensor" } },
{
"$group": {
"_id": {
"$dateTrunc": {
"date": "$time_received",
"unit": "minute"
}
},
"avg_x": { "$avg": "$data.readings.x" },
"avg_y": { "$avg": "$data.readings.y" },
"avg_z": { "$avg": "$data.readings.z" },
"sample_count": { "$sum": 1 }
}
},
{
"$project": {
"minute": "$_id",
"avg_x": 1,
"avg_y": 1,
"avg_z": 1,
"sample_count": 1,
"_id": 0
}
},
{ "$sort": { "minute": 1 } }
]
A vision service captures detection results. This pipeline counts how many detections of each class occurred per machine per hour:
[
{ "$match": { "component_type": "rdk:service:vision" } },
{ "$unwind": "$data.detections" },
{
"$group": {
"_id": {
"machine": "$robot_id",
"hour": {
"$dateTrunc": {
"date": "$time_received",
"unit": "hour"
}
},
"class": "$data.detections.class_name"
},
"count": { "$sum": 1 },
"avg_confidence": { "$avg": "$data.detections.confidence" }
}
},
{
"$project": {
"machine": "$_id.machine",
"hour": "$_id.hour",
"class": "$_id.class",
"count": 1,
"avg_confidence": { "$round": ["$avg_confidence", 2] },
"_id": 0
}
}
]
A sensor reports voltage and current. This pipeline computes power (voltage * current) and summarizes per hour:
[
{ "$match": { "component_name": "power-monitor" } },
{
"$addFields": {
"power_watts": {
"$multiply": ["$data.readings.voltage", "$data.readings.current"]
}
}
},
{
"$group": {
"_id": {
"$dateTrunc": {
"date": "$time_received",
"unit": "hour"
}
},
"avg_power": { "$avg": "$power_watts" },
"max_power": { "$max": "$power_watts" },
"readings": { "$sum": 1 }
}
},
{
"$project": {
"hour": "$_id",
"avg_power_watts": { "$round": ["$avg_power", 2] },
"max_power_watts": { "$round": ["$max_power", 2] },
"readings": 1,
"_id": 0
}
}
]
Aggregate sensor readings across all machines at each location to compare performance across sites:
[
{ "$match": { "component_name": "temperature-sensor" } },
{
"$group": {
"_id": "$location_id",
"avg_temp": { "$avg": "$data.readings.temperature" },
"min_temp": { "$min": "$data.readings.temperature" },
"max_temp": { "$max": "$data.readings.temperature" },
"machine_count": { "$addToSet": "$robot_id" },
"reading_count": { "$sum": 1 }
}
},
{
"$project": {
"location": "$_id",
"avg_temp": { "$round": ["$avg_temp", 1] },
"min_temp": 1,
"max_temp": 1,
"machines_reporting": { "$size": "$machine_count" },
"reading_count": 1,
"_id": 0
}
}
]
Was this page helpful?
Glad to hear it! If you have any other feedback please let us know:
We're sorry about that. To help us improve, please tell us what we can do better:
Thank you!