script
Event plugin that runs a Python function to produce events with full programmatic control.
Runs a Python function to produce events. Use this plugin when your event logic requires control flow, external API calls, or computation that doesn't fit Jinja2 templates.
For most use cases, the template plugin is simpler and sufficient. Reach for script only when you need full Python expressiveness.
Parameters
| Parameter | Type | Constraints | Description |
|---|---|---|---|
path | path | Required | Path to the Python script file. |
Function signature
The script must define a produce function. It receives a dictionary with the current timestamp and tags and returns one or more event strings:
from datetime import datetime
def produce(params: dict) -> str | list[str]:
timestamp: datetime = params['timestamp']
tags: tuple[str, ...] = params['tags']
return f'{{"timestamp": "{timestamp.isoformat()}", "tags": {list(tags)}}}'The params dictionary contains:
| Key | Type | Description |
|---|---|---|
timestamp | datetime | Timezone-aware datetime of the current event. |
tags | tuple[str, ...] | Tags attached by the input plugin. |
Return type: a single str (treated as one event) or a list[str] (each element is a separate event). An empty list means no events are produced for this timestamp.
Examples
Basic — single JSON event
event:
script:
path: scripts/produce.pyimport json
from datetime import datetime
def produce(params: dict) -> str:
ts: datetime = params['timestamp']
event = {
'timestamp': ts.isoformat(),
'level': 'INFO',
'message': 'Request processed',
}
return json.dumps(event)For a timestamp 2026-02-20T10:00:00+00:00, this produces:
{"timestamp": "2026-02-20T10:00:00+00:00", "level": "INFO", "message": "Request processed"}Multiple events per timestamp
Return a list to emit multiple events for each timestamp. This is useful when a single "tick" should produce a batch of related records.
import json
import random
from datetime import datetime
SERVICES = ['api-gateway', 'auth-service', 'order-service', 'payment-service']
def produce(params: dict) -> list[str]:
ts: datetime = params['timestamp']
events = []
for service in SERVICES:
event = {
'timestamp': ts.isoformat(),
'service': service,
'cpu_percent': round(random.uniform(5, 95), 1),
'memory_mb': random.randint(128, 2048),
'request_count': random.randint(0, 500),
}
events.append(json.dumps(event))
return eventsEach timestamp produces 4 events — one metric per service:
{"timestamp": "2026-02-20T10:00:00+00:00", "service": "api-gateway", "cpu_percent": 42.3, "memory_mb": 512, "request_count": 187}
{"timestamp": "2026-02-20T10:00:00+00:00", "service": "auth-service", "cpu_percent": 12.7, "memory_mb": 256, "request_count": 43}
{"timestamp": "2026-02-20T10:00:00+00:00", "service": "order-service", "cpu_percent": 67.8, "memory_mb": 1024, "request_count": 312}
{"timestamp": "2026-02-20T10:00:00+00:00", "service": "payment-service", "cpu_percent": 23.1, "memory_mb": 384, "request_count": 89}Using tags for conditional logic
Tags from the input plugin let you vary behavior based on which schedule triggered the event.
input:
- cron:
expression: "*/5 * * * * *"
count: 1
tags: [regular]
- cron:
expression: "0 * * * *"
count: 1
tags: [hourly-summary]
event:
script:
path: scripts/tagged.pyimport json
import random
from datetime import datetime
def produce(params: dict) -> str:
ts: datetime = params['timestamp']
tags: tuple[str, ...] = params['tags']
if 'hourly-summary' in tags:
return json.dumps({
'timestamp': ts.isoformat(),
'type': 'summary',
'total_requests': random.randint(1000, 5000),
'error_rate': round(random.uniform(0.01, 0.1), 3),
'p99_latency_ms': random.randint(50, 500),
})
return json.dumps({
'timestamp': ts.isoformat(),
'type': 'request',
'method': random.choice(['GET', 'POST', 'PUT', 'DELETE']),
'path': random.choice(['/api/users', '/api/orders', '/api/products']),
'status': random.choice([200, 200, 200, 201, 400, 404, 500]),
})Stateful script — accumulating data across calls
Module-level variables persist across produce() calls, letting you build up state over time. The script is loaded once at startup, so top-level variables act as persistent storage.
import json
import random
from datetime import datetime
from uuid import uuid4
# Module-level state — persists across all produce() calls
active_sessions: dict[str, dict] = {}
session_counter = 0
def produce(params: dict) -> list[str]:
global session_counter
ts: datetime = params['timestamp']
events = []
# Randomly start new sessions
if random.random() < 0.3:
session_counter += 1
session_id = str(uuid4())
active_sessions[session_id] = {
'user': f'user_{session_counter}',
'started': ts.isoformat(),
'page_views': 0,
}
events.append(json.dumps({
'timestamp': ts.isoformat(),
'event': 'session_start',
'session_id': session_id,
'user': active_sessions[session_id]['user'],
}))
# Active sessions generate page views
for sid, session in list(active_sessions.items()):
session['page_views'] += 1
events.append(json.dumps({
'timestamp': ts.isoformat(),
'event': 'page_view',
'session_id': sid,
'user': session['user'],
'page': random.choice(['/home', '/products', '/cart', '/checkout']),
'total_views': session['page_views'],
}))
# End session after enough page views
if session['page_views'] > random.randint(5, 20):
events.append(json.dumps({
'timestamp': ts.isoformat(),
'event': 'session_end',
'session_id': sid,
'user': session['user'],
'total_views': session['page_views'],
}))
del active_sessions[sid]
return eventsOver time, sessions overlap and interact:
{"timestamp": "...", "event": "session_start", "session_id": "a1b2...", "user": "user_1"}
{"timestamp": "...", "event": "page_view", "session_id": "a1b2...", "user": "user_1", "page": "/home", "total_views": 1}
{"timestamp": "...", "event": "page_view", "session_id": "a1b2...", "user": "user_1", "page": "/products", "total_views": 2}
{"timestamp": "...", "event": "session_start", "session_id": "c3d4...", "user": "user_2"}
{"timestamp": "...", "event": "page_view", "session_id": "a1b2...", "user": "user_1", "page": "/cart", "total_views": 3}
{"timestamp": "...", "event": "page_view", "session_id": "c3d4...", "user": "user_2", "page": "/home", "total_views": 1}Distributed tracing — correlated microservice spans
This example generates realistic OpenTelemetry-style distributed traces across a microservice architecture. Each incoming timestamp triggers a full request flow through multiple services, producing properly correlated spans with parent-child relationships, realistic latencies, and occasional error propagation.
This is a scenario where script shines — managing trace context, propagating errors through a service graph, and computing dependent latencies requires logic that would be difficult to express in templates.
input:
- cron:
expression: "*/2 * * * * *" # a request every 2 seconds
count: 1
event:
script:
path: scripts/tracing.py
output:
- stdout:
formatter:
format: plainimport json
import random
from datetime import datetime, timedelta
from uuid import uuid4
# Service dependency graph: service → list of downstream calls
TOPOLOGY = {
'api-gateway': ['auth-service', 'order-service'],
'auth-service': ['user-db'],
'order-service': ['inventory-service', 'payment-service'],
'inventory-service': ['product-db'],
'payment-service': ['payment-provider'],
'user-db': [],
'product-db': [],
'payment-provider': [],
}
# Base latencies per service (ms)
BASE_LATENCY = {
'api-gateway': 2, 'auth-service': 5,
'order-service': 3, 'inventory-service': 4,
'payment-service': 8, 'user-db': 10,
'product-db': 8, 'payment-provider': 50,
}
# Error probability per service
ERROR_RATE = {
'payment-provider': 0.05,
'product-db': 0.02,
}
METHODS = ['POST /orders', 'GET /orders', 'POST /checkout', 'GET /products']
def _make_span(
trace_id: str,
parent_id: str | None,
service: str,
start: datetime,
duration_ms: float,
error: bool,
method: str,
) -> str:
span = {
'trace_id': trace_id,
'span_id': uuid4().hex[:16],
'parent_span_id': parent_id,
'service': service,
'operation': method,
'start_time': start.isoformat(),
'duration_ms': round(duration_ms, 2),
'status': 'ERROR' if error else 'OK',
}
if error:
span['error_message'] = f'{service}: internal error'
return json.dumps(span)
def _traverse(
service: str,
trace_id: str,
parent_id: str | None,
start: datetime,
method: str,
) -> tuple[list[str], float, bool]:
"""Walk the service graph depth-first, collecting spans."""
spans = []
own_latency = max(1, random.gauss(BASE_LATENCY[service], 2))
cursor = start + timedelta(milliseconds=own_latency * 0.3)
child_error = False
for downstream in TOPOLOGY[service]:
child_spans, child_dur, errored = _traverse(
downstream, trace_id, uuid4().hex[:16], cursor, method,
)
spans.extend(child_spans)
cursor += timedelta(milliseconds=child_dur)
child_error = child_error or errored
total = (cursor - start).total_seconds() * 1000 + own_latency * 0.7
is_error = child_error or (random.random() < ERROR_RATE.get(service, 0))
spans.append(
_make_span(trace_id, parent_id, service, start, total, is_error, method)
)
return spans, total, is_error
def produce(params: dict) -> list[str]:
ts: datetime = params['timestamp']
trace_id = uuid4().hex
method = random.choice(METHODS)
spans, _, _ = _traverse('api-gateway', trace_id, None, ts, method)
return spansEach timestamp produces a full trace — 8 correlated spans across the service graph:
{"trace_id": "a1b2c3d4...", "span_id": "f1e2d3c4...", "parent_span_id": "9a8b7c6d...", "service": "user-db", "operation": "POST /orders", "start_time": "2026-02-20T10:00:00.003...", "duration_ms": 12.34, "status": "OK"}
{"trace_id": "a1b2c3d4...", "span_id": "9a8b7c6d...", "parent_span_id": "5e6f7a8b...", "service": "auth-service", "operation": "POST /orders", "start_time": "2026-02-20T10:00:00.001...", "duration_ms": 18.56, "status": "OK"}
{"trace_id": "a1b2c3d4...", "span_id": "1c2d3e4f...", "parent_span_id": "7g8h9i0j...", "service": "product-db", "operation": "POST /orders", "start_time": "2026-02-20T10:00:00.022...", "duration_ms": 9.87, "status": "OK"}
{"trace_id": "a1b2c3d4...", "span_id": "7g8h9i0j...", "parent_span_id": "3k4l5m6n...", "service": "inventory-service", "operation": "POST /orders", "start_time": "2026-02-20T10:00:00.020...", "duration_ms": 15.23, "status": "OK"}
{"trace_id": "a1b2c3d4...", "span_id": "2o3p4q5r...", "parent_span_id": "3k4l5m6n...", "service": "payment-provider", "operation": "POST /orders", "start_time": "2026-02-20T10:00:00.038...", "duration_ms": 53.41, "status": "OK"}
{"trace_id": "a1b2c3d4...", "span_id": "6s7t8u9v...", "parent_span_id": "3k4l5m6n...", "service": "payment-service", "operation": "POST /orders", "start_time": "2026-02-20T10:00:00.036...", "duration_ms": 62.18, "status": "OK"}
{"trace_id": "a1b2c3d4...", "span_id": "3k4l5m6n...", "parent_span_id": "5e6f7a8b...", "service": "order-service", "operation": "POST /orders", "start_time": "2026-02-20T10:00:00.019...", "duration_ms": 80.92, "status": "OK"}
{"trace_id": "a1b2c3d4...", "span_id": "5e6f7a8b...", "parent_span_id": null, "service": "api-gateway", "operation": "POST /orders", "start_time": "2026-02-20T10:00:00+00:00", "duration_ms": 102.45, "status": "OK"}Key behaviors:
- All spans in a request share the same
trace_id. - Each span records its
parent_span_id, forming a tree. - Latencies accumulate realistically — a parent span's duration includes its children.
- Errors in leaf services (e.g.,
payment-provider) propagate up to parent spans. - The service topology, latencies, and error rates are all configurable by editing the module-level constants.
Pipe this into an output plugin that writes to Elasticsearch or a file, then visualize the traces in Jaeger or Grafana Tempo.