Eventum Logo

Eventum

Output

kafka

Output plugin that produces events to Apache Kafka topics.

Produces events to Apache Kafka topics using async producer. Supports SASL/SSL authentication, compression, idempotent and transactional delivery.

Parameters

Connection

ParameterTypeDefaultConstraintsDescription
bootstrap_serverslist of stringsRequired. Min 1 item.Kafka broker addresses in host:port format.
client_idstring or nullnullNon-empty if set.Client name passed in each request to brokers.
metadata_max_age_msinteger300000>= 0Period after which metadata is force-refreshed (ms).
request_timeout_msinteger40000>= 1Produce request timeout (ms).
connections_max_idle_msinteger540000>= 0Close idle connections after this time (ms).

Topic & Message

ParameterTypeDefaultConstraintsDescription
topicstringRequired. Non-empty.Target Kafka topic.
keystring or nullnullNon-empty if set.Message key applied to all produced messages.
encodingstring"utf-8"Non-empty.Encoding for converting event strings and keys to bytes.

Performance & Reliability

ParameterTypeDefaultConstraintsDescription
acksinteger10, 1, or -1Acknowledgments: 0=fire-and-forget, 1=leader, -1=all replicas.
compression_typestring or nullnull"gzip", "snappy", "lz4", or "zstd"Compression codec.
max_batch_sizeinteger16384>= 1Max buffered data per partition (bytes).
max_request_sizeinteger1048576>= 1Max produce request size (bytes).
linger_msinteger0>= 0Artificial delay for batching (ms).
retry_backoff_msinteger100>= 0Backoff between retries (ms).
enable_idempotencebooleanfalseExactly-once delivery guarantee.
transactional_idstring or nullnullNon-empty if set.Transactional producer identifier.
transaction_timeout_msinteger60000>= 1Transaction timeout (ms).

Security

ParameterTypeDefaultConstraintsDescription
security_protocolstring"PLAINTEXT""PLAINTEXT", "SSL", "SASL_PLAINTEXT", or "SASL_SSL"Broker communication protocol.
sasl_mechanismstring or nullnull"PLAIN", "SCRAM-SHA-256", or "SCRAM-SHA-512"SASL authentication mechanism.
sasl_plain_usernamestring or nullnullNon-empty if set. Must pair with sasl_plain_password.SASL username.
sasl_plain_passwordstring or nullnullNon-empty if set. Must pair with sasl_plain_username.SASL password.
sasl_kerberos_service_namestring"kafka"Non-empty.Kerberos service name.
sasl_kerberos_domain_namestring or nullnullNon-empty if set.Kerberos domain name.

SSL/TLS

ParameterTypeDefaultConstraintsDescription
ssl_cafilepath or nullnullPath to CA certificate.
ssl_certfilepath or nullnullMust pair with ssl_keyfile.Path to client certificate.
ssl_keyfilepath or nullnullMust pair with ssl_certfile.Path to client key.

Formatter

ParameterTypeDefaultDescription
formatterformatterjsonHow events are serialized before producing.

Behavior

  • Events are produced to the specified Kafka topic asynchronously.
  • The default json formatter serializes each event as a single-line JSON string, then encodes it to bytes using the configured encoding.
  • When enable_idempotence is true, the producer ensures exactly-once delivery semantics.
  • The producer batches messages internally based on linger_ms and max_batch_size for throughput optimization.

Examples

Basic production to a topic:

output:
  - kafka:
      bootstrap_servers:
        - broker1:9092
        - broker2:9092
      topic: events

SASL_SSL authentication:

output:
  - kafka:
      bootstrap_servers:
        - kafka.prod:9093
      topic: security-events
      security_protocol: SASL_SSL
      sasl_mechanism: SCRAM-SHA-256
      sasl_plain_username: ${params.kafka_user}
      sasl_plain_password: ${secrets.kafka_password}
      ssl_cafile: certs/ca.pem

High-throughput with compression and batching:

output:
  - kafka:
      bootstrap_servers:
        - broker1:9092
        - broker2:9092
        - broker3:9092
      topic: high-volume-events
      key: my-partition-key
      acks: 1
      compression_type: lz4
      linger_ms: 50
      max_batch_size: 65536
      enable_idempotence: true

On this page