TestMesh
YAML ReferenceActions

kafka_publish / kafka_consume

Publish messages to and consume messages from Kafka topics, with filtering, SASL authentication, and multi-message support.

TestMesh provides two Kafka actions: kafka_publish for producing messages and kafka_consume for consuming and asserting on messages. Together they let you verify event-driven flows end-to-end.

kafka_publish

Publishes one message to a Kafka topic and captures the resulting offset and partition.

Minimal example

publish-event.yaml
- id: publish_order_event
  action: kafka_publish
  config:
    brokers:
      - "localhost:9092"
    topic: "order-events"
    value:
      event_type: "order.created"
      order_id: "${order_id}"
      user_id: "${user_id}"

Config fields

FieldRequiredDescription
brokersYesArray of broker addresses
topicYesTarget topic name
valueYesMessage payload (object or string)
keyNoMessage key for partition routing
headersNoKafka message headers
partitionNoSpecific partition number
compressionNonone, gzip, snappy, or lz4
saslNoSASL authentication config

Full producer example

- id: publish_user_event
  action: kafka_publish
  config:
    brokers:
      - "kafka:9092"
      - "kafka-replica:9092"
    topic: "user-events"
    key: "${user_id}"
    value:
      event_type: "user.created"
      user_id: "${user_id}"
      email: "${user_email}"
      timestamp: "${TIMESTAMP}"
    headers:
      correlation-id: "${EXECUTION_ID}"
      source: "testmesh"
    compression: "gzip"
    sasl:
      mechanism: "SCRAM-SHA-256"
      username: "${KAFKA_USER}"
      password: "${KAFKA_PASS}"
  output:
    message_offset: "result.offset"
    message_partition: "result.partition"

Response data

PathDescription
result.offsetOffset assigned to the published message
result.partitionPartition the message was written to

kafka_consume

Waits for messages on a Kafka topic and exposes them for assertions and output extraction.

Minimal example

consume-event.yaml
- id: consume_order_event
  action: kafka_consume
  config:
    brokers:
      - "localhost:9092"
    topic: "order-events"
    group_id: "test-consumer-${RANDOM_ID}"
    timeout: "30s"
  assert:
    - result.count > 0

Config fields

FieldRequiredDescription
brokersYesArray of broker addresses
topicYesTopic to consume from
group_idYesConsumer group ID
timeoutYesHow long to wait for messages
max_messagesNoMaximum number of messages to consume (default: 1)
from_beginningNoStart from the earliest offset (default: false)
matchNoWait for a message matching specific criteria
filterNoOnly return messages matching a key or header

Use a unique group_id per test execution — for example, "test-${RANDOM_ID}" — so parallel or repeated runs do not share offset state.

Filtering: wait for a specific message

The match block makes the consumer wait until a message matching all specified conditions arrives, ignoring other messages on the topic.

- id: wait_for_user_event
  action: kafka_consume
  config:
    brokers: ["kafka:9092"]
    topic: "user-events"
    group_id: "test-${RANDOM_ID}"
    timeout: "30s"
    match:
      key: "${user_id}"
      json_path:
        - "$.event_type == 'user.created'"
        - "$.user_id == '${user_id}'"

Simple filter

filter:
  key: "expected_key"
  header:
    correlation-id: "${EXECUTION_ID}"

Response data

PathDescription
result.messagesArray of received messages
result.countNumber of messages received
result.messages[0].keyMessage key
result.messages[0].valueParsed message value
result.messages[0].headersMessage headers object
result.messages[0].offsetMessage offset
result.messages[0].partitionMessage partition

End-to-end Kafka pattern

A typical event-driven test publishes an action, then consumes the resulting event to verify the system reacted correctly.

event-driven-test.yaml
flow:
  name: "User Created Event Flow"
  env:
    API_URL: "http://user-service:5001"
    KAFKA_BROKERS: "kafka:9092"

  steps:
    # Trigger the action that should produce an event
    - id: create_user
      action: http_request
      config:
        method: POST
        url: "${API_URL}/users"
        body:
          email: "test-${RANDOM_ID}@example.com"
          name: "Test User"
      output:
        user_id: "response.body.id"
      assert:
        - status == 201

    # Wait for the event to appear on the topic
    - id: verify_user_created_event
      action: kafka_consume
      config:
        brokers: ["${KAFKA_BROKERS}"]
        topic: "user-events"
        group_id: "test-${RANDOM_ID}"
        timeout: "30s"
        match:
          key: "${create_user.user_id}"
          json_path:
            - "$.event_type == 'user.created'"
      output:
        event: "result.messages[0].value"
      assert:
        - result.count > 0
        - event.event_type == "user.created"
        - event.user_id == "${create_user.user_id}"

    # Publish a downstream event and verify the consumer handles it
    - id: publish_verification_email_request
      action: kafka_publish
      config:
        brokers: ["${KAFKA_BROKERS}"]
        topic: "email-requests"
        key: "${create_user.user_id}"
        value:
          type: "verification"
          user_id: "${create_user.user_id}"
          email: "test-${RANDOM_ID}@example.com"

    - id: verify_email_queued
      action: kafka_consume
      config:
        brokers: ["${KAFKA_BROKERS}"]
        topic: "email-sent-confirmations"
        group_id: "test-${RANDOM_ID}"
        timeout: "20s"
        filter:
          key: "${create_user.user_id}"
      assert:
        - result.count > 0

SASL authentication

For clusters requiring authentication:

sasl:
  mechanism: "PLAIN"              # PLAIN, SCRAM-SHA-256, SCRAM-SHA-512
  username: "${KAFKA_USER}"
  password: "${KAFKA_PASS}"

On this page