kafka_publish / kafka_consume
Publish messages to and consume messages from Kafka topics, with filtering, SASL authentication, and multi-message support.
TestMesh provides two Kafka actions: kafka_publish for producing messages and kafka_consume for consuming and asserting on messages. Together they let you verify event-driven flows end-to-end.
kafka_publish
Publishes one message to a Kafka topic and captures the resulting offset and partition.
Minimal example
- id: publish_order_event
action: kafka_publish
config:
brokers:
- "localhost:9092"
topic: "order-events"
value:
event_type: "order.created"
order_id: "${order_id}"
user_id: "${user_id}"Config fields
| Field | Required | Description |
|---|---|---|
brokers | Yes | Array of broker addresses |
topic | Yes | Target topic name |
value | Yes | Message payload (object or string) |
key | No | Message key for partition routing |
headers | No | Kafka message headers |
partition | No | Specific partition number |
compression | No | none, gzip, snappy, or lz4 |
sasl | No | SASL authentication config |
Full producer example
- id: publish_user_event
action: kafka_publish
config:
brokers:
- "kafka:9092"
- "kafka-replica:9092"
topic: "user-events"
key: "${user_id}"
value:
event_type: "user.created"
user_id: "${user_id}"
email: "${user_email}"
timestamp: "${TIMESTAMP}"
headers:
correlation-id: "${EXECUTION_ID}"
source: "testmesh"
compression: "gzip"
sasl:
mechanism: "SCRAM-SHA-256"
username: "${KAFKA_USER}"
password: "${KAFKA_PASS}"
output:
message_offset: "result.offset"
message_partition: "result.partition"Response data
| Path | Description |
|---|---|
result.offset | Offset assigned to the published message |
result.partition | Partition the message was written to |
kafka_consume
Waits for messages on a Kafka topic and exposes them for assertions and output extraction.
Minimal example
- id: consume_order_event
action: kafka_consume
config:
brokers:
- "localhost:9092"
topic: "order-events"
group_id: "test-consumer-${RANDOM_ID}"
timeout: "30s"
assert:
- result.count > 0Config fields
| Field | Required | Description |
|---|---|---|
brokers | Yes | Array of broker addresses |
topic | Yes | Topic to consume from |
group_id | Yes | Consumer group ID |
timeout | Yes | How long to wait for messages |
max_messages | No | Maximum number of messages to consume (default: 1) |
from_beginning | No | Start from the earliest offset (default: false) |
match | No | Wait for a message matching specific criteria |
filter | No | Only return messages matching a key or header |
Use a unique group_id per test execution — for example, "test-${RANDOM_ID}" — so parallel or repeated runs do not share offset state.
Filtering: wait for a specific message
The match block makes the consumer wait until a message matching all specified conditions arrives, ignoring other messages on the topic.
- id: wait_for_user_event
action: kafka_consume
config:
brokers: ["kafka:9092"]
topic: "user-events"
group_id: "test-${RANDOM_ID}"
timeout: "30s"
match:
key: "${user_id}"
json_path:
- "$.event_type == 'user.created'"
- "$.user_id == '${user_id}'"Simple filter
filter:
key: "expected_key"
header:
correlation-id: "${EXECUTION_ID}"Response data
| Path | Description |
|---|---|
result.messages | Array of received messages |
result.count | Number of messages received |
result.messages[0].key | Message key |
result.messages[0].value | Parsed message value |
result.messages[0].headers | Message headers object |
result.messages[0].offset | Message offset |
result.messages[0].partition | Message partition |
End-to-end Kafka pattern
A typical event-driven test publishes an action, then consumes the resulting event to verify the system reacted correctly.
flow:
name: "User Created Event Flow"
env:
API_URL: "http://user-service:5001"
KAFKA_BROKERS: "kafka:9092"
steps:
# Trigger the action that should produce an event
- id: create_user
action: http_request
config:
method: POST
url: "${API_URL}/users"
body:
email: "test-${RANDOM_ID}@example.com"
name: "Test User"
output:
user_id: "response.body.id"
assert:
- status == 201
# Wait for the event to appear on the topic
- id: verify_user_created_event
action: kafka_consume
config:
brokers: ["${KAFKA_BROKERS}"]
topic: "user-events"
group_id: "test-${RANDOM_ID}"
timeout: "30s"
match:
key: "${create_user.user_id}"
json_path:
- "$.event_type == 'user.created'"
output:
event: "result.messages[0].value"
assert:
- result.count > 0
- event.event_type == "user.created"
- event.user_id == "${create_user.user_id}"
# Publish a downstream event and verify the consumer handles it
- id: publish_verification_email_request
action: kafka_publish
config:
brokers: ["${KAFKA_BROKERS}"]
topic: "email-requests"
key: "${create_user.user_id}"
value:
type: "verification"
user_id: "${create_user.user_id}"
email: "test-${RANDOM_ID}@example.com"
- id: verify_email_queued
action: kafka_consume
config:
brokers: ["${KAFKA_BROKERS}"]
topic: "email-sent-confirmations"
group_id: "test-${RANDOM_ID}"
timeout: "20s"
filter:
key: "${create_user.user_id}"
assert:
- result.count > 0SASL authentication
For clusters requiring authentication:
sasl:
mechanism: "PLAIN" # PLAIN, SCRAM-SHA-256, SCRAM-SHA-512
username: "${KAFKA_USER}"
password: "${KAFKA_PASS}"