Docs/Kafka Connect

Kafka Connect SMT

Validate every Kafka record against a ContractGate semantic contract in real-time β€” before it reaches your data warehouse or AI systems. Invalid records go to a dead-letter topic. Valid records continue unchanged.

Java 11+Kafka Connect 2.8+Apache 2.0v0.1.0

ΒΆQuick Start

Three steps from zero to validated records.

1
Get your credentials

Sign up for a ContractGate account, create a contract, and copy your API key and contract UUID from the Account page.

2
Install the connector

Via Confluent Hub CLI:

confluent-hub install datacontractgate/kafka-connect-contractgate:latest
bash

Or manually β€” extract the ZIP into your Connect plugin path:

unzip kafka-connect-contractgate-0.1.0.zip \
  -d /usr/share/confluent-hub-components/
# Restart Connect workers after installing
bash
3
Add the SMT to your connector config

Add these lines to any existing connector's properties file:

transforms=contractgate
transforms.contractgate.type=io.datacontractgate.connect.smt.ContractGateValidator
transforms.contractgate.contractgate.api.url=https://contractgate-api.fly.dev
transforms.contractgate.contractgate.api.key=cg_live_YOUR_API_KEY
transforms.contractgate.contractgate.contract.id=YOUR_CONTRACT_UUID

# Route invalid records to a dead-letter topic
errors.deadletterqueue.topic.name=your-topic.dlq
errors.deadletterqueue.context.headers.enable=true
properties

That's it. Every record is now validated before reaching its destination.

ΒΆInstallation

Confluent Hub CLI

confluent-hub install datacontractgate/kafka-connect-contractgate:latest
bash

Manual (Self-Managed Kafka)

Download the ZIP from the Confluent Hub listing and extract it into your plugin path:

# Typical plugin path locations:
# Confluent Platform: /usr/share/confluent-hub-components/
# Self-managed:       /opt/kafka/plugins/

unzip kafka-connect-contractgate-0.1.0.zip -d /usr/share/confluent-hub-components/

# Add to connect-distributed.properties if not already:
plugin.path=/usr/share/confluent-hub-components
bash

Confluent Cloud (Custom Connector)

Upload the JAR via the Confluent Cloud UI under Connectors β†’ Add plugin, then reference the SMT class in your connector config as shown above.

ΒΆConfiguration Reference

All settings are namespaced under contractgate.* to avoid conflicts with other SMTs in a chain.

KeyDefaultDescription
contractgate.api.urlrequiredβ€”Base URL of the ContractGate API. No trailing slash. Use https://contractgate-api.fly.dev for the hosted service.
contractgate.contract.idrequiredβ€”UUID of the contract to validate against. Copy from the Contracts page.
contractgate.api.key""Your API key (x-api-key header). Leave blank only for local dev with auth disabled.
contractgate.contract.version"" (latest)Pin to a specific contract version, e.g. 1.2.0. Leave blank to always use the latest stable version β€” recommended for most pipelines. When set, the version is sent as the X-Contract-Versionrequest header, which takes highest precedence in the server's resolution order.
contractgate.on.failureDLQWhat to do when a record fails. DLQ β€” throw DataException for dead-letter routing. TAG_AND_PASS β€” add violation headers and pass through.
contractgate.dry.runfalseWhen true, validates without writing to the audit log. Useful for high-throughput pipelines where you want enforcement without DB write pressure.
contractgate.connect.timeout.ms5000TCP connection timeout to the ContractGate API in milliseconds.
contractgate.request.timeout.ms10000Total HTTP request/response timeout in milliseconds. Keep well below Kafka Connect's task timeout.
contractgate.add.result.headerstrueStamp contractgate.* metadata headers onto every record (pass or fail). See the headers reference below.
contractgate.max.violation.headers5Maximum number of individual violation detail headers to add. High violation counts can bloat headers β€” cap at a useful number.

ΒΆDead-Letter Queue Setup

When contractgate.on.failure=DLQ (the default), the SMT throws a DataExceptionon validation failure. Kafka Connect's built-in error handling routes the original record to a dead-letter topic.

Full DLQ connector config

# ── ContractGate SMT ──────────────────────────────────────
transforms=contractgate
transforms.contractgate.type=io.datacontractgate.connect.smt.ContractGateValidator
transforms.contractgate.contractgate.api.url=https://contractgate-api.fly.dev
transforms.contractgate.contractgate.api.key=cg_live_YOUR_API_KEY
transforms.contractgate.contractgate.contract.id=YOUR_CONTRACT_UUID
transforms.contractgate.contractgate.on.failure=DLQ

# ── Kafka Connect DLQ settings ─────────────────────────────
# The topic to route failed records to (create it first)
errors.deadletterqueue.topic.name=orders.dlq
errors.deadletterqueue.topic.replication.factor=3

# Include the full violation summary in DLQ record headers
errors.deadletterqueue.context.headers.enable=true

# Log every DLQ-routed record (set to false at high error rates)
errors.log.enable=true
errors.log.include.messages=true

# Retry transient errors before sending to DLQ
errors.retry.timeout=60000
errors.retry.delay.max.ms=5000
properties

Reading violation details from the DLQ

With errors.deadletterqueue.context.headers.enable=true, each DLQ record carries a header like:

__connect.errors.exception.message:
  ContractGate validation failed β€” topic=orders partition=3 offset=1042
  contract=3fa85f64-... version=1.2.0
  2 violation(s): user_id [missing_required_field]: required field missing;
  amount [range_violation]: value -5 below minimum 0
text

ΒΆResult Headers

When contractgate.add.result.headers=true (default), the following headers are added to every record β€” both passing and failing.

HeaderValue
contractgate.passed"true" or "false"
contractgate.contract.versionResolved version string, e.g. "1.2.0"
contractgate.violations.countNumber of violations (0 on pass)
contractgate.violation.0.fieldDot-path of field, e.g. "customer.address.country"
contractgate.violation.0.kindmissing_required_field Β· type_mismatch Β· enum_violation Β· range_violation Β· pattern_mismatch Β· length_violation Β· undeclared_field
contractgate.violation.0.messageHuman-readable explanation

Violation headers repeat for indices 0…N up to contractgate.max.violation.headers (default 5).

ΒΆTag & Pass Mode

Set contractgate.on.failure=TAG_AND_PASS to never drop records. Invalid records get violation headers and continue downstream β€” consumers can inspect contractgate.passed and decide what to do.

Use this when you want observability without enforcement β€” for example, shadowing a new contract version before promoting it to stable.

transforms.contractgate.contractgate.on.failure=TAG_AND_PASS
# Records now flow through regardless of violations.
# Downstream consumers can branch on contractgate.passed=false.
properties

ΒΆFull Examples

S3 Sink with DLQ

{
  "name": "s3-sink-validated",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "4",
    "topics": "orders",
    "s3.region": "us-east-1",
    "s3.bucket.name": "my-data-lake",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.json.JsonFormat",

    "transforms": "contractgate",
    "transforms.contractgate.type": "io.datacontractgate.connect.smt.ContractGateValidator",
    "transforms.contractgate.contractgate.api.url": "https://contractgate-api.fly.dev",
    "transforms.contractgate.contractgate.api.key": "${file:/secrets/cg.properties:api.key}",
    "transforms.contractgate.contractgate.contract.id": "YOUR_CONTRACT_UUID",

    "errors.deadletterqueue.topic.name": "orders.dlq",
    "errors.deadletterqueue.context.headers.enable": "true",
    "errors.retry.timeout": "60000"
  }
}
json

JDBC Sink (high-throughput, dry-run audit)

{
  "name": "postgres-sink-validated",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "tasks.max": "8",
    "topics": "user_events",
    "connection.url": "jdbc:postgresql://db:5432/analytics",

    "transforms": "contractgate",
    "transforms.contractgate.type": "io.datacontractgate.connect.smt.ContractGateValidator",
    "transforms.contractgate.contractgate.api.url": "https://contractgate-api.fly.dev",
    "transforms.contractgate.contractgate.api.key": "${file:/secrets/cg.properties:api.key}",
    "transforms.contractgate.contractgate.contract.id": "YOUR_CONTRACT_UUID",
    "transforms.contractgate.contractgate.dry.run": "true",
    "transforms.contractgate.contractgate.on.failure": "DLQ",

    "errors.deadletterqueue.topic.name": "user_events.dlq",
    "errors.deadletterqueue.context.headers.enable": "true"
  }
}
json

ΒΆFAQ

Does this work with Avro / Protobuf / Schema Registry records?
Yes. The SMT converts any record value to JSON before sending to ContractGate β€” Struct (Avro/Protobuf), Map, String, and byte[] are all handled automatically. Your schema registry setup is untouched.
What happens if the ContractGate API is unreachable?
The SMT fails open β€” it logs a warning and passes the record through unchanged. This prevents a transient API outage from halting your pipeline. You can tighten this with Kafka Connect's task-level retry and restart policies.
Does it add latency to my pipeline?
The validation engine itself runs in under 50Β΅s per record. End-to-end latency depends on network topology between your Kafka brokers and the ContractGate API β€” for same-datacenter deployments this is typically sub-millisecond. We publish benchmark results as we gather them from production deployments.
Can I use it with Confluent Cloud managed connectors?
Yes β€” upload the JAR as a custom connector plugin via the Confluent Cloud UI, then reference the SMT class in your connector config as shown in the examples above.
How do I pin a contract version?
Set contractgate.contract.version=1.2.0. The connector sends this as the X-Contract-Version request header, which the server treats with highest precedence. Leave it blank (the default) to always resolve to the latest stable version β€” this lets you promote new contract versions without redeploying connectors.
Can I chain this with other SMTs?
Yes. List multiple transforms: transforms=contractgate,maskPii,routeByField. The ContractGate SMT works anywhere in the chain; put it first to validate raw records before any transforms mutate them.

Ready to get started?

Create a free account to get your API key and contract UUID.