database_query
Run SQL queries against PostgreSQL, MySQL, or MongoDB, with parameterized queries, polling support, and row-level assertions.
The database_query action executes a query against a relational or document database and exposes the result rows, row count, and duration.
Minimal example
- id: get_user
action: database_query
config:
query: "SELECT * FROM users WHERE id = ?"
params: ["${user_id}"]
assert:
- result.count == 1
- result.rows[0].email existsConfig fields
connection
The database connection. If omitted, the default connection configured in the flow's environment is used.
config:
connection:
type: "postgresql" # "postgresql", "mysql", or "mongodb"
host: "localhost"
port: 5432
database: "testdb"
username: "${DB_USER}"
password: "${DB_PASS}"
ssl: false
timeout: "10s"
pool_size: 5Connection strings can also be passed as a single connection_string field: connection_string: "postgres://${DB_USER}:${DB_PASS}@localhost:5432/testdb".
query (required)
The SQL query or MongoDB query document.
config:
query: "SELECT id, email, status FROM users WHERE status = ?"For multi-line queries:
config:
query: |
SELECT u.id, u.email, p.plan
FROM users u
JOIN subscriptions p ON u.id = p.user_id
WHERE u.created_at > ?
ORDER BY u.created_at DESC
LIMIT 10params
Positional parameters for parameterized queries. Always use parameterized queries rather than string interpolation to avoid SQL injection in test data.
config:
query: "SELECT * FROM users WHERE id = ? AND status = ?"
params:
- "${user_id}"
- "active"named_params
Named parameters as an alternative to positional ? placeholders.
config:
query: "SELECT * FROM users WHERE id = :user_id AND status = :status"
named_params:
user_id: "${user_id}"
status: "active"transaction
Wrap the query in a database transaction. Defaults to false.
config:
transaction: true
query: |
INSERT INTO orders (user_id, total) VALUES (?, ?)
RETURNING id
params: ["${user_id}", "${order_total}"]poll
Poll until the query returns a result matching the assertions. Useful for testing eventual consistency in async systems.
config:
query: "SELECT status FROM orders WHERE id = ?"
params: ["${order_id}"]
poll:
enabled: true
timeout: "30s" # Maximum time to wait
interval: "2s" # How often to retry the queryResponse data
| Path | Description |
|---|---|
result.rows | Array of result rows (each row is an object) |
result.count | Number of rows returned |
result.rows[0] | First row |
result.rows[0].field | Field value in first row |
Output extraction
output:
rows: "result.rows"
row_count: "result.count"
first_row: "result.rows[0]"
user_email: "result.rows[0].email"
user_status: "result.rows[0].status"
log_id: "result.rows[0].id"Assertions
assert:
- result.count > 0 # At least one row
- result.count == 1 # Exactly one row
- result.rows[0].email exists # Field present
- result.rows[0].status == "active" # Field equality
- result.rows[0].email_verified == false # Boolean field
- result.rows[0].created_at exists # Timestamp existsExamples
- id: get_user
action: database_query
config:
query: "SELECT id, email, status FROM users WHERE id = ?"
params: ["${user_id}"]
output:
user_email: "result.rows[0].email"
user_status: "result.rows[0].status"
assert:
- result.count == 1
- result.rows[0].status == "active"- id: create_log_entry
action: database_query
config:
query: |
INSERT INTO logs (user_id, action, created_at)
VALUES (?, ?, NOW())
RETURNING id, created_at
params:
- "${user_id}"
- "login"
output:
log_id: "result.rows[0].id"
created_at: "result.rows[0].created_at"
assert:
- result.count == 1
- result.rows[0].id exists- id: activate_user
action: database_query
config:
query: |
UPDATE users
SET status = 'active', updated_at = NOW()
WHERE id = ?
params: ["${user_id}"]
assert:
- result.count == 1
- id: verify_update
action: database_query
config:
query: "SELECT status FROM users WHERE id = ?"
params: ["${user_id}"]
assert:
- result.rows[0].status == "active"- id: delete_user
action: database_query
config:
query: "DELETE FROM users WHERE id = ?"
params: ["${user_id}"]
- id: confirm_deleted
action: database_query
config:
query: "SELECT COUNT(*) as count FROM users WHERE id = ?"
params: ["${user_id}"]
assert:
- result.rows[0].count == 0- id: find_active_users
action: database_query
config:
connection:
type: mongodb
host: "localhost"
port: 27017
database: "myapp"
query: |
{
"collection": "users",
"operation": "find",
"filter": { "status": "active" },
"limit": 10,
"sort": { "created_at": -1 }
}
output:
users: "result.rows"
assert:
- result.count > 0Polling for eventual consistency
- id: create_order_event
action: http_request
config:
method: POST
url: "${ORDER_SERVICE}/orders"
body:
user_id: "${user_id}"
items: ["item_1", "item_2"]
output:
order_id: "response.body.id"
- id: wait_for_order_in_db
action: database_query
config:
query: "SELECT status FROM orders WHERE id = ?"
params: ["${create_order_event.order_id}"]
poll:
enabled: true
timeout: "30s"
interval: "2s"
assert:
- result.count == 1
- result.rows[0].status == "confirmed"Join query with multiple assertions
- id: verify_order_with_items
action: database_query
config:
query: |
SELECT o.id, o.status, o.total, COUNT(oi.id) as item_count
FROM orders o
JOIN order_items oi ON o.id = oi.order_id
WHERE o.id = ?
GROUP BY o.id, o.status, o.total
params: ["${order_id}"]
assert:
- result.count == 1
- result.rows[0].status == "confirmed"
- result.rows[0].item_count == 3
- result.rows[0].total > 0http_request
Make HTTP requests — GET, POST, PUT, PATCH, DELETE and more — with headers, authentication, query parameters, body, and response assertions.
kafka_publish / kafka_consume
Publish messages to and consume messages from Kafka topics, with filtering, SASL authentication, and multi-message support.