Output
kafka
Output plugin that produces events to Apache Kafka topics.
Produces events to Apache Kafka topics using async producer. Supports SASL/SSL authentication, compression, idempotent and transactional delivery.
Parameters
Connection
| Parameter | Type | Default | Constraints | Description |
|---|---|---|---|---|
bootstrap_servers | list of strings | — | Required. Min 1 item. | Kafka broker addresses in host:port format. |
client_id | string or null | null | Non-empty if set. | Client name passed in each request to brokers. |
metadata_max_age_ms | integer | 300000 | >= 0 | Period after which metadata is force-refreshed (ms). |
request_timeout_ms | integer | 40000 | >= 1 | Produce request timeout (ms). |
connections_max_idle_ms | integer | 540000 | >= 0 | Close idle connections after this time (ms). |
Topic & Message
| Parameter | Type | Default | Constraints | Description |
|---|---|---|---|---|
topic | string | — | Required. Non-empty. | Target Kafka topic. |
key | string or null | null | Non-empty if set. | Message key applied to all produced messages. |
encoding | string | "utf-8" | Non-empty. | Encoding for converting event strings and keys to bytes. |
Performance & Reliability
| Parameter | Type | Default | Constraints | Description |
|---|---|---|---|---|
acks | integer | 1 | 0, 1, or -1 | Acknowledgments: 0=fire-and-forget, 1=leader, -1=all replicas. |
compression_type | string or null | null | "gzip", "snappy", "lz4", or "zstd" | Compression codec. |
max_batch_size | integer | 16384 | >= 1 | Max buffered data per partition (bytes). |
max_request_size | integer | 1048576 | >= 1 | Max produce request size (bytes). |
linger_ms | integer | 0 | >= 0 | Artificial delay for batching (ms). |
retry_backoff_ms | integer | 100 | >= 0 | Backoff between retries (ms). |
enable_idempotence | boolean | false | — | Exactly-once delivery guarantee. |
transactional_id | string or null | null | Non-empty if set. | Transactional producer identifier. |
transaction_timeout_ms | integer | 60000 | >= 1 | Transaction timeout (ms). |
Security
| Parameter | Type | Default | Constraints | Description |
|---|---|---|---|---|
security_protocol | string | "PLAINTEXT" | "PLAINTEXT", "SSL", "SASL_PLAINTEXT", or "SASL_SSL" | Broker communication protocol. |
sasl_mechanism | string or null | null | "PLAIN", "SCRAM-SHA-256", or "SCRAM-SHA-512" | SASL authentication mechanism. |
sasl_plain_username | string or null | null | Non-empty if set. Must pair with sasl_plain_password. | SASL username. |
sasl_plain_password | string or null | null | Non-empty if set. Must pair with sasl_plain_username. | SASL password. |
sasl_kerberos_service_name | string | "kafka" | Non-empty. | Kerberos service name. |
sasl_kerberos_domain_name | string or null | null | Non-empty if set. | Kerberos domain name. |
SSL/TLS
| Parameter | Type | Default | Constraints | Description |
|---|---|---|---|---|
ssl_cafile | path or null | null | — | Path to CA certificate. |
ssl_certfile | path or null | null | Must pair with ssl_keyfile. | Path to client certificate. |
ssl_keyfile | path or null | null | Must pair with ssl_certfile. | Path to client key. |
Formatter
| Parameter | Type | Default | Description |
|---|---|---|---|
formatter | formatter | json | How events are serialized before producing. |
Behavior
- Events are produced to the specified Kafka topic asynchronously.
- The default
jsonformatter serializes each event as a single-line JSON string, then encodes it to bytes using the configuredencoding. - When
enable_idempotenceistrue, the producer ensures exactly-once delivery semantics. - The producer batches messages internally based on
linger_msandmax_batch_sizefor throughput optimization.
Examples
Basic production to a topic:
output:
- kafka:
bootstrap_servers:
- broker1:9092
- broker2:9092
topic: eventsSASL_SSL authentication:
output:
- kafka:
bootstrap_servers:
- kafka.prod:9093
topic: security-events
security_protocol: SASL_SSL
sasl_mechanism: SCRAM-SHA-256
sasl_plain_username: ${params.kafka_user}
sasl_plain_password: ${secrets.kafka_password}
ssl_cafile: certs/ca.pemHigh-throughput with compression and batching:
output:
- kafka:
bootstrap_servers:
- broker1:9092
- broker2:9092
- broker3:9092
topic: high-volume-events
key: my-partition-key
acks: 1
compression_type: lz4
linger_ms: 50
max_batch_size: 65536
enable_idempotence: true