TestMesh
Features

Async Patterns

Test eventual consistency and asynchronous operations using Kafka message waiting, database polling, and API polling with configurable timeouts.

Modern distributed systems operate asynchronously — an API call may trigger work that completes seconds or minutes later. TestMesh provides first-class patterns for testing these eventual consistency scenarios without flaky sleeps.


Kafka Message Validation

Wait for a Single Event

After an API call that produces a Kafka event, use kafka_consume with a match block to wait for the specific message:

user-created-event.yaml
flow:
  name: "Verify Kafka Event After API Call"

  steps:
    - id: create_user
      name: "Create User via API"
      action: http_request
      config:
        method: POST
        url: "${API_URL}/users"
        body:
          email: "${FAKER.internet.email}"
          name: "${FAKER.name.fullName}"
      save:
        user_id: "response.body.id"
        user_email: "response.body.email"
      assert:
        - status == 201

    - id: verify_event
      name: "Wait for user.created Event"
      action: kafka_consume
      config:
        brokers: ["${KAFKA_BROKERS}"]
        topic: "user.events"
        group_id: "test-consumer-${EXECUTION_ID}"
        timeout: 10s
        match:
          key: "${create_user.user_id}"
          json_path:
            - "$.event_type == 'user.created'"
            - "$.user.id == '${create_user.user_id}'"
            - "$.user.email == '${create_user.user_email}'"
      save:
        event_data: "result.messages[0].value"
      assert:
        - result.count > 0
        - result.messages[0].value.event_type == "user.created"

Always use a unique group_id per test execution (e.g., test-consumer-${EXECUTION_ID}). Reusing consumer groups across runs can cause messages to be skipped if they were already committed by a previous run.

Wait for Multiple Events

Assert that an order produces all expected lifecycle events:

- id: verify_multiple_events
  name: "Verify Order Processing Events"
  action: kafka_consume
  config:
    brokers: ["${KAFKA_BROKERS}"]
    topic: "order.events"
    group_id: "test-${EXECUTION_ID}"
    timeout: 30s
    max_messages: 5
    match:
      json_path:
        - "$.order_id == '${order_id}'"
  save:
    event_types: "result.messages[*].value.event_type"
  assert:
    - result.count >= 3
    - "'order.created' in ${event_types}"
    - "'order.paid' in ${event_types}"
    - "'order.shipped' in ${event_types}"

Verify Events Across Multiple Topics in Parallel

- id: verify_cross_service_events
  action: parallel
  config:
    steps:
      - id: verify_user_event
        action: kafka_consume
        config:
          topic: "user.events"
          timeout: 10s
          match:
            key: "${user_id}"

      - id: verify_notification_event
        action: kafka_consume
        config:
          topic: "notification.events"
          timeout: 10s
          match:
            json_path:
              - "$.user_id == '${user_id}'"
              - "$.type == 'welcome_email'"

      - id: verify_analytics_event
        action: kafka_consume
        config:
          topic: "analytics.events"
          timeout: 10s
          match:
            json_path:
              - "$.event == 'user_signup'"

Database Polling

Wait for an Async Job to Complete

When an API accepts a job and processes it asynchronously, poll the database until the expected row appears:

async-job.yaml
flow:
  name: "Verify Database Entry After Async Processing"

  steps:
    - id: submit_job
      action: http_request
      config:
        method: POST
        url: "${API_URL}/jobs"
        body:
          type: "data_import"
          file_url: "${DATA_FILE_URL}"
      save:
        job_id: "response.body.job_id"
      assert:
        - status == 202

    - id: wait_for_completion
      name: "Wait for Job Completion in DB"
      action: database_query
      config:
        query: "SELECT * FROM jobs WHERE id = $1 AND status = 'completed'"
        params: ["${submit_job.job_id}"]
        poll:
          enabled: true
          timeout: 60s
          interval: 2s
      save:
        rows_processed: "result.rows[0].rows_processed"
      assert:
        - result.count == 1
        - result.rows[0].status == "completed"
        - result.rows[0].rows_processed > 0

Wait for Status Change

Poll until a payment reaches a terminal state (not just "pending"):

- id: wait_for_payment_processed
  action: database_query
  config:
    query: |
      SELECT status, processed_at, error_message
      FROM payments
      WHERE id = $1
    params: ["${payment_id}"]
    poll:
      enabled: true
      timeout: 30s
      interval: 1s
  save:
    payment_status: "result.rows[0].status"
  assert:
    - result.count == 1
    - result.rows[0].status in ["completed", "failed"]
    - result.rows[0].processed_at exists

Assert on terminal states (e.g., completed or failed) rather than only on success. This ensures your test fails fast when something goes wrong instead of timing out waiting for a state that will never arrive.


API Polling Pattern

Use wait_until to repeatedly execute steps until a condition is satisfied:

export-job.yaml
flow:
  name: "Wait for Async Job via API Polling"

  steps:
    - id: start_export
      action: http_request
      config:
        method: POST
        url: "${API_URL}/exports"
        body:
          format: "csv"
          filters: { status: "active" }
      save:
        export_id: "response.body.export_id"
      assert:
        - status == 202

    - id: wait_for_export
      action: wait_until
      config:
        condition: "${check_status.status} in ['completed', 'failed']"
        max_duration: 5m
        interval: 5s
        steps:
          - id: check_status
            action: http_request
            config:
              method: GET
              url: "${API_URL}/exports/${start_export.export_id}"
            output:
              status: "response.body.status"
              progress: "response.body.progress"
        on_timeout: "fail"
      save:
        final_status: "check_status.status"
        download_url: "check_status.response.body.download_url"
      assert:
        - "${check_status.status}" == "completed"
        - "${check_status.response.body.download_url}" exists

Combined Pattern: API → Kafka → Database

This is the most powerful pattern — verifying that a single API action correctly flows through your entire event-driven architecture:

order-flow-async.yaml
flow:
  name: "Complete Async Flow Verification"
  description: "Test API → Kafka → Database async pipeline"

  steps:
    - id: create_order
      action: http_request
      config:
        method: POST
        url: "${API_URL}/orders"
        body:
          customer_id: "${customer_id}"
          items:
            - product_id: "prod_123"
              quantity: 2
            - product_id: "prod_456"
              quantity: 1
      save:
        order_id: "response.body.order_id"
        total_amount: "response.body.total"
      assert:
        - status == 201

    # Verify the Kafka event was published
    - id: verify_order_event
      action: kafka_consume
      config:
        brokers: ["${KAFKA_BROKERS}"]
        topic: "order.events"
        group_id: "test-${EXECUTION_ID}"
        timeout: 5s
        match:
          key: "${create_order.order_id}"
          json_path:
            - "$.event_type == 'order.created'"
      assert:
        - result.count > 0

    # Verify the Kafka consumer updated the database
    - id: verify_inventory_updated
      action: database_query
      config:
        query: |
          SELECT product_id, quantity_available
          FROM inventory
          WHERE product_id IN ('prod_123', 'prod_456')
        poll:
          enabled: true
          timeout: 15s
          interval: 1s
      assert:
        - result.count == 2

    - id: verify_order_processed
      action: database_query
      config:
        query: |
          SELECT id, status, processed_at
          FROM orders
          WHERE id = $1 AND status IN ('processing', 'confirmed')
        params: ["${create_order.order_id}"]
        poll:
          enabled: true
          timeout: 20s
          interval: 2s
      save:
        order_status: "result.rows[0].status"
      assert:
        - result.count == 1
        - result.rows[0].status in ["processing", "confirmed"]
        - result.rows[0].processed_at exists

Error Handling

Timeout Handling Without Failing the Flow

- id: wait_with_timeout_handling
  action: database_query
  config:
    query: "SELECT * FROM jobs WHERE id = $1 AND status = 'completed'"
    params: ["${job_id}"]
    poll:
      enabled: true
      timeout: 30s
      interval: 2s
  on_error: "continue"
  save:
    timed_out: "error.timeout"
    status: "result.rows[0].status"

- id: handle_timeout
  when: "${wait_with_timeout_handling.timed_out} == true"
  action: log
  config:
    level: warn
    message: "Job did not complete within timeout"

Retry on Message Not Found

- id: consume_with_retry
  action: kafka_consume
  config:
    brokers: ["${KAFKA_BROKERS}"]
    topic: "events"
    timeout: 5s
    match:
      key: "${expected_key}"
  retry:
    max_attempts: 3
    delay: 2s
    backoff: "exponential"
  assert:
    - result.count > 0

Best Practices

PatternRecommended TimeoutPoll Interval
Kafka consume (fast async)5–10s
Database poll (medium ops)30–60s1–2s
API poll (long-running jobs)2–5m5–10s
Full async pipeline30s–2m1–2s

Avoid polling intervals below 500ms — they create unnecessary load on your services without meaningfully improving test speed. For most cases, 1–2 second intervals strike the right balance.

Use Specific Match Criteria

# Good: match on a unique identifier so you don't consume the wrong message
match:
  key: "${user_id}"
  json_path:
    - "$.event_type == 'user.created'"
    - "$.user.email == '${expected_email}'"

# Bad: too generic, may match an unrelated user.created event
match:
  json_path:
    - "$.event_type == 'user.created'"

What's Next

On this page