TestMesh
Features

Plugins

Extend TestMesh with custom action handlers, importers, exporters, and more using the HTTP-based plugin protocol.

TestMesh has a plugin system that lets you extend the test runner with custom capabilities. Plugins run as separate processes and communicate with TestMesh over a simple HTTP protocol — you can write them in any language.

Built-in Integrations

TestMesh ships several native integrations that work without any installation. They use the same action system as external plugins but run inside the API process — no HTTP overhead, no separate process to manage.

Built-in integrations appear in the dashboard under Plugins → Built-in Integrations and are always available in flows using dot-notation action names (e.g. kafka.produce, neo4j.query).

PluginActionsDocs
Apache Kafkakafka.produce, kafka.consume, kafka.admin.topics +2Reference
PostgreSQLpostgresql.query, postgresql.insert, postgresql.update +3Reference
Redisredis.get, redis.set, redis.del, redis.existsReference
Neo4jneo4j.query, neo4j.assertReference
MinIO / S3minio.put, minio.get, minio.delete, minio.assertReference
OTel / Tempootel.inject, otel.assertReference
Grafana Lokiloki.query, loki.assertReference
Prometheusprometheus.query, prometheus.assertReference

External plugins (the rest of this page) use an HTTP-based protocol and can be written in any language. Built-in integrations are Go implementations that ship with TestMesh — use them when you need reliability and performance without external dependencies.

Plugin Types

TestMesh supports five categories of plugins:

TypeDescription
actionCustom action handlers used as steps in flows
authCustom authentication providers
importerImport test flows from external formats
exporterExport flows to external formats
reporterGenerate custom report formats

Currently, action plugins are fully supported for flow execution. The other types are supported in the plugin registry but their invocation hooks are coming in upcoming releases.


Plugin Manifest

Every plugin requires a manifest.json file at its root:

manifest.json
{
  "id": "my-scraper",
  "name": "Web Scraper",
  "version": "1.0.0",
  "description": "Scrape and extract data from web pages",
  "author": "Your Name",
  "homepage": "https://github.com/yourname/testmesh-scraper",
  "type": "action",
  "entry_point": "server.py",
  "permissions": ["network", "filesystem"],
  "config": {
    "timeout": 30
  }
}
FieldRequiredDescription
idyesUnique identifier used as the action name in flows
nameyesHuman-readable name
versionyesSemver version string
typeyesOne of action, auth, exporter, importer, reporter
entry_pointyesFile to execute (.py, .js, .sh, or binary)
descriptionnoShort description
authornoAuthor name or email
homepagenoProject URL
permissionsnoDeclared capabilities (informational)
confignoDefault configuration values

HTTP Protocol

Action plugins run as an HTTP server on a dynamically assigned port. TestMesh starts your plugin process and communicates with it over localhost. Your plugin must implement three endpoints:

GET /health

Called after startup. TestMesh polls this until it returns {"status": "healthy"}.

{
  "status": "healthy",
  "version": "1.0.0",
  "uptime_seconds": 42
}

GET /info

Returns metadata about the plugin and the actions it provides. Use this to register sub-actions (e.g., scraper.navigate, scraper.extract).

{
  "id": "scraper",
  "name": "Web Scraper",
  "version": "1.0.0",
  "description": "Scrape web pages",
  "actions": [
    {
      "id": "scraper.navigate",
      "name": "Navigate",
      "description": "Navigate to a URL",
      "schema": {
        "type": "object",
        "properties": {
          "url": { "type": "string" }
        },
        "required": ["url"]
      }
    },
    {
      "id": "scraper.extract",
      "name": "Extract",
      "description": "Extract data using CSS selector",
      "schema": {
        "type": "object",
        "properties": {
          "selector": { "type": "string" },
          "attribute": { "type": "string" }
        }
      }
    }
  ]
}

POST /execute

Called when a flow step uses this plugin's action. Receives step config and execution context.

Request:

{
  "action": "scraper.extract",
  "config": {
    "selector": ".product-price",
    "attribute": "text"
  },
  "context": {
    "execution_id": "exec-123",
    "flow_id": "flow-456",
    "step_id": "step-789",
    "variables": {
      "base_url": "https://example.com"
    },
    "step_outputs": {
      "navigate": {
        "url": "https://example.com/products"
      }
    }
  }
}

Response:

{
  "success": true,
  "output": {
    "text": "$29.99",
    "count": 1
  },
  "logs": [
    {
      "level": "info",
      "message": "Found 1 element matching .product-price",
      "timestamp": "2024-01-01T12:00:00Z"
    }
  ],
  "metrics": {
    "duration_ms": 145
  }
}

On failure:

{
  "success": false,
  "output": {},
  "error": {
    "code": "SELECTOR_NOT_FOUND",
    "message": "No element found matching selector: .product-price",
    "details": {
      "selector": ".product-price",
      "url": "https://example.com/products"
    }
  }
}

POST /shutdown

Optional. Called when TestMesh wants to stop the plugin gracefully. If not implemented, the process is sent SIGKILL after 5 seconds.


Writing a Plugin

Python Plugin

server.py
#!/usr/bin/env python3
import os
import json
import time
from http.server import HTTPServer, BaseHTTPRequestHandler

START_TIME = time.time()

ACTIONS = [
    {
        "id": "my-plugin.hello",
        "name": "Say Hello",
        "description": "Returns a greeting",
        "schema": {
            "type": "object",
            "properties": {
                "name": {"type": "string"}
            },
            "required": ["name"]
        }
    }
]

class PluginHandler(BaseHTTPRequestHandler):
    def do_GET(self):
        if self.path == "/health":
            self.respond(200, {
                "status": "healthy",
                "version": "1.0.0",
                "uptime_seconds": int(time.time() - START_TIME)
            })
        elif self.path == "/info":
            self.respond(200, {
                "id": "my-plugin",
                "name": "My Plugin",
                "version": "1.0.0",
                "description": "Example plugin",
                "actions": ACTIONS
            })
        else:
            self.respond(404, {"error": "not found"})

    def do_POST(self):
        if self.path == "/execute":
            length = int(self.headers.get("Content-Length", 0))
            body = json.loads(self.rfile.read(length))

            action = body.get("action")
            config = body.get("config", {})

            if action == "my-plugin.hello":
                name = config.get("name", "World")
                self.respond(200, {
                    "success": True,
                    "output": {
                        "greeting": f"Hello, {name}!",
                        "timestamp": time.time()
                    },
                    "logs": [
                        {"level": "info", "message": f"Greeted {name}"}
                    ]
                })
            else:
                self.respond(400, {
                    "success": False,
                    "output": {},
                    "error": {
                        "code": "UNKNOWN_ACTION",
                        "message": f"Unknown action: {action}"
                    }
                })
        elif self.path == "/shutdown":
            self.respond(200, {"message": "shutting down"})
            os._exit(0)
        else:
            self.respond(404, {"error": "not found"})

    def respond(self, status, data):
        body = json.dumps(data).encode()
        self.send_response(status)
        self.send_header("Content-Type", "application/json")
        self.send_header("Content-Length", len(body))
        self.end_headers()
        self.wfile.write(body)

    def log_message(self, *args):
        pass  # Suppress default access logs

port = int(os.environ.get("PLUGIN_PORT", 8080))
server = HTTPServer(("127.0.0.1", port), PluginHandler)
print(f"Plugin listening on port {port}", flush=True)
server.serve_forever()

Node.js Plugin

server.js
const http = require('http');

const PORT = parseInt(process.env.PLUGIN_PORT || '8080');
const START_TIME = Date.now();

const server = http.createServer((req, res) => {
  const send = (status, data) => {
    const body = JSON.stringify(data);
    res.writeHead(status, {
      'Content-Type': 'application/json',
      'Content-Length': Buffer.byteLength(body),
    });
    res.end(body);
  };

  if (req.method === 'GET' && req.url === '/health') {
    send(200, {
      status: 'healthy',
      version: '1.0.0',
      uptime_seconds: Math.floor((Date.now() - START_TIME) / 1000),
    });
  } else if (req.method === 'GET' && req.url === '/info') {
    send(200, {
      id: 'my-js-plugin',
      name: 'My JS Plugin',
      version: '1.0.0',
      description: 'Example Node.js plugin',
      actions: [
        {
          id: 'my-js-plugin.transform',
          name: 'Transform JSON',
          description: 'Apply a jq-like transformation',
          schema: { type: 'object', properties: { input: {}, expression: { type: 'string' } } },
        },
      ],
    });
  } else if (req.method === 'POST' && req.url === '/execute') {
    let body = '';
    req.on('data', chunk => (body += chunk));
    req.on('end', () => {
      const { action, config, context } = JSON.parse(body);

      if (action === 'my-js-plugin.transform') {
        // Example: just echo the input back with a timestamp
        send(200, {
          success: true,
          output: {
            result: config.input,
            transformed_at: new Date().toISOString(),
          },
          logs: [{ level: 'info', message: 'Transformation complete' }],
          metrics: { duration_ms: 1 },
        });
      } else {
        send(400, {
          success: false,
          output: {},
          error: { code: 'UNKNOWN_ACTION', message: `Unknown action: ${action}` },
        });
      }
    });
  } else if (req.method === 'POST' && req.url === '/shutdown') {
    send(200, { message: 'shutting down' });
    process.exit(0);
  } else {
    send(404, { error: 'not found' });
  }
});

server.listen(PORT, '127.0.0.1', () => {
  console.log(`Plugin listening on port ${PORT}`);
});

Plugin Directory Structure

plugins/
└── my-plugin/
    ├── manifest.json   # Required: plugin metadata
    ├── server.py       # Entry point (as specified in manifest)
    └── requirements.txt

By default TestMesh looks for plugins in a plugins/ directory relative to the API binary. This is configurable via the PLUGINS_DIR environment variable.


Installing Plugins

Via API

# Install from a local directory
curl -X POST http://localhost:5016/api/v1/plugins/install \
  -H "Content-Type: application/json" \
  -d '{"source": "/path/to/my-plugin"}'

# List installed plugins
curl http://localhost:5016/api/v1/plugins

# Enable a plugin
curl -X POST http://localhost:5016/api/v1/plugins/my-plugin/enable

# Disable a plugin
curl -X POST http://localhost:5016/api/v1/plugins/my-plugin/disable

# Uninstall
curl -X DELETE http://localhost:5016/api/v1/plugins/my-plugin

Auto-Discovery

On startup, TestMesh automatically scans the plugins/ directory for any folder containing a manifest.json and loads enabled plugins:

# Trigger a rescan at runtime
curl -X POST http://localhost:5016/api/v1/plugins/discover

Using Plugin Actions in Flows

Once a plugin is installed and loaded, its action IDs become available in flows like any built-in action:

flow-with-plugin.yaml
flow:
  name: "Product Price Check"

  steps:
    - id: greet
      action: my-plugin.hello
      config:
        name: "TestMesh"
      output:
        message: "$.greeting"

    - id: log_greeting
      action: log
      config:
        message: "Got greeting: ${greet.message}"

If a plugin registers sub-actions via /info, each sub-action ID can be used directly:

steps:
  - id: scrape_price
    action: scraper.extract      # Sub-action from the "scraper" plugin
    config:
      selector: ".price"
      attribute: "text"
    output:
      price: "$.text"

Native Plugins

TestMesh ships with three built-in native plugins that are loaded automatically:

PluginActionsDescription
kafkakafka_producer, kafka_consumerKafka message broker integration
postgresqldatabase_queryPostgreSQL database queries
redisredis_get, redis_setRedis key-value operations

These are implemented directly in Go for performance and do not use the HTTP protocol.


Environment Variables

The plugin process receives two environment variables from TestMesh:

VariableDescription
PLUGIN_PORTThe localhost TCP port to listen on
PLUGIN_IDThe plugin's ID from manifest.json

Plugin Lifecycle

TestMesh starts → Discover plugins → For each enabled plugin:
  1. Spawn process (node/python/bash/binary based on entry_point extension)
  2. Set PLUGIN_PORT, PLUGIN_ID environment variables
  3. Poll GET /health every 100ms until {"status":"healthy"} (30s timeout)
  4. Call GET /info to discover sub-actions
  5. Register all action IDs in the executor

On test step execution:
  → POST /execute with action name, config, and context
  ← Receive output, logs, metrics

On shutdown:
  → POST /shutdown (graceful, 5s timeout)
  → SIGKILL if process hasn't exited

Plugins are currently installed from local directories. Remote installation from URLs is planned for a future release along with a community plugin registry.

What's Next

On this page