Async Patterns
Test eventual consistency and asynchronous operations using Kafka message waiting, database polling, and API polling with configurable timeouts.
Modern distributed systems operate asynchronously — an API call may trigger work that completes seconds or minutes later. TestMesh provides first-class patterns for testing these eventual consistency scenarios without flaky sleeps.
Kafka Consume
Wait for events to appear on a topic within a timeout
Database Polling
Poll a query until data appears or status changes
API Polling
Repeatedly call an endpoint until it reaches a terminal state
Combined Flows
Chain all patterns to verify a complete async pipeline
Kafka Message Validation
Wait for a Single Event
After an API call that produces a Kafka event, use kafka_consume with a match block to wait for the specific message:
flow:
name: "Verify Kafka Event After API Call"
steps:
- id: create_user
name: "Create User via API"
action: http_request
config:
method: POST
url: "${API_URL}/users"
body:
email: "${FAKER.internet.email}"
name: "${FAKER.name.fullName}"
save:
user_id: "response.body.id"
user_email: "response.body.email"
assert:
- status == 201
- id: verify_event
name: "Wait for user.created Event"
action: kafka_consume
config:
brokers: ["${KAFKA_BROKERS}"]
topic: "user.events"
group_id: "test-consumer-${EXECUTION_ID}"
timeout: 10s
match:
key: "${create_user.user_id}"
json_path:
- "$.event_type == 'user.created'"
- "$.user.id == '${create_user.user_id}'"
- "$.user.email == '${create_user.user_email}'"
save:
event_data: "result.messages[0].value"
assert:
- result.count > 0
- result.messages[0].value.event_type == "user.created"Always use a unique group_id per test execution (e.g., test-consumer-${EXECUTION_ID}). Reusing consumer groups across runs can cause messages to be skipped if they were already committed by a previous run.
Wait for Multiple Events
Assert that an order produces all expected lifecycle events:
- id: verify_multiple_events
name: "Verify Order Processing Events"
action: kafka_consume
config:
brokers: ["${KAFKA_BROKERS}"]
topic: "order.events"
group_id: "test-${EXECUTION_ID}"
timeout: 30s
max_messages: 5
match:
json_path:
- "$.order_id == '${order_id}'"
save:
event_types: "result.messages[*].value.event_type"
assert:
- result.count >= 3
- "'order.created' in ${event_types}"
- "'order.paid' in ${event_types}"
- "'order.shipped' in ${event_types}"Verify Events Across Multiple Topics in Parallel
- id: verify_cross_service_events
action: parallel
config:
steps:
- id: verify_user_event
action: kafka_consume
config:
topic: "user.events"
timeout: 10s
match:
key: "${user_id}"
- id: verify_notification_event
action: kafka_consume
config:
topic: "notification.events"
timeout: 10s
match:
json_path:
- "$.user_id == '${user_id}'"
- "$.type == 'welcome_email'"
- id: verify_analytics_event
action: kafka_consume
config:
topic: "analytics.events"
timeout: 10s
match:
json_path:
- "$.event == 'user_signup'"Database Polling
Wait for an Async Job to Complete
When an API accepts a job and processes it asynchronously, poll the database until the expected row appears:
flow:
name: "Verify Database Entry After Async Processing"
steps:
- id: submit_job
action: http_request
config:
method: POST
url: "${API_URL}/jobs"
body:
type: "data_import"
file_url: "${DATA_FILE_URL}"
save:
job_id: "response.body.job_id"
assert:
- status == 202
- id: wait_for_completion
name: "Wait for Job Completion in DB"
action: database_query
config:
query: "SELECT * FROM jobs WHERE id = $1 AND status = 'completed'"
params: ["${submit_job.job_id}"]
poll:
enabled: true
timeout: 60s
interval: 2s
save:
rows_processed: "result.rows[0].rows_processed"
assert:
- result.count == 1
- result.rows[0].status == "completed"
- result.rows[0].rows_processed > 0Wait for Status Change
Poll until a payment reaches a terminal state (not just "pending"):
- id: wait_for_payment_processed
action: database_query
config:
query: |
SELECT status, processed_at, error_message
FROM payments
WHERE id = $1
params: ["${payment_id}"]
poll:
enabled: true
timeout: 30s
interval: 1s
save:
payment_status: "result.rows[0].status"
assert:
- result.count == 1
- result.rows[0].status in ["completed", "failed"]
- result.rows[0].processed_at existsAssert on terminal states (e.g., completed or failed) rather than only on success. This ensures your test fails fast when something goes wrong instead of timing out waiting for a state that will never arrive.
API Polling Pattern
Use wait_until to repeatedly execute steps until a condition is satisfied:
flow:
name: "Wait for Async Job via API Polling"
steps:
- id: start_export
action: http_request
config:
method: POST
url: "${API_URL}/exports"
body:
format: "csv"
filters: { status: "active" }
save:
export_id: "response.body.export_id"
assert:
- status == 202
- id: wait_for_export
action: wait_until
config:
condition: "${check_status.status} in ['completed', 'failed']"
max_duration: 5m
interval: 5s
steps:
- id: check_status
action: http_request
config:
method: GET
url: "${API_URL}/exports/${start_export.export_id}"
output:
status: "response.body.status"
progress: "response.body.progress"
on_timeout: "fail"
save:
final_status: "check_status.status"
download_url: "check_status.response.body.download_url"
assert:
- "${check_status.status}" == "completed"
- "${check_status.response.body.download_url}" existsCombined Pattern: API → Kafka → Database
This is the most powerful pattern — verifying that a single API action correctly flows through your entire event-driven architecture:
flow:
name: "Complete Async Flow Verification"
description: "Test API → Kafka → Database async pipeline"
steps:
- id: create_order
action: http_request
config:
method: POST
url: "${API_URL}/orders"
body:
customer_id: "${customer_id}"
items:
- product_id: "prod_123"
quantity: 2
- product_id: "prod_456"
quantity: 1
save:
order_id: "response.body.order_id"
total_amount: "response.body.total"
assert:
- status == 201
# Verify the Kafka event was published
- id: verify_order_event
action: kafka_consume
config:
brokers: ["${KAFKA_BROKERS}"]
topic: "order.events"
group_id: "test-${EXECUTION_ID}"
timeout: 5s
match:
key: "${create_order.order_id}"
json_path:
- "$.event_type == 'order.created'"
assert:
- result.count > 0
# Verify the Kafka consumer updated the database
- id: verify_inventory_updated
action: database_query
config:
query: |
SELECT product_id, quantity_available
FROM inventory
WHERE product_id IN ('prod_123', 'prod_456')
poll:
enabled: true
timeout: 15s
interval: 1s
assert:
- result.count == 2
- id: verify_order_processed
action: database_query
config:
query: |
SELECT id, status, processed_at
FROM orders
WHERE id = $1 AND status IN ('processing', 'confirmed')
params: ["${create_order.order_id}"]
poll:
enabled: true
timeout: 20s
interval: 2s
save:
order_status: "result.rows[0].status"
assert:
- result.count == 1
- result.rows[0].status in ["processing", "confirmed"]
- result.rows[0].processed_at existsError Handling
Timeout Handling Without Failing the Flow
- id: wait_with_timeout_handling
action: database_query
config:
query: "SELECT * FROM jobs WHERE id = $1 AND status = 'completed'"
params: ["${job_id}"]
poll:
enabled: true
timeout: 30s
interval: 2s
on_error: "continue"
save:
timed_out: "error.timeout"
status: "result.rows[0].status"
- id: handle_timeout
when: "${wait_with_timeout_handling.timed_out} == true"
action: log
config:
level: warn
message: "Job did not complete within timeout"Retry on Message Not Found
- id: consume_with_retry
action: kafka_consume
config:
brokers: ["${KAFKA_BROKERS}"]
topic: "events"
timeout: 5s
match:
key: "${expected_key}"
retry:
max_attempts: 3
delay: 2s
backoff: "exponential"
assert:
- result.count > 0Best Practices
| Pattern | Recommended Timeout | Poll Interval |
|---|---|---|
| Kafka consume (fast async) | 5–10s | — |
| Database poll (medium ops) | 30–60s | 1–2s |
| API poll (long-running jobs) | 2–5m | 5–10s |
| Full async pipeline | 30s–2m | 1–2s |
Avoid polling intervals below 500ms — they create unnecessary load on your services without meaningfully improving test speed. For most cases, 1–2 second intervals strike the right balance.
Use Specific Match Criteria
# Good: match on a unique identifier so you don't consume the wrong message
match:
key: "${user_id}"
json_path:
- "$.event_type == 'user.created'"
- "$.user.email == '${expected_email}'"
# Bad: too generic, may match an unrelated user.created event
match:
json_path:
- "$.event_type == 'user.created'"