Kafka Connect SMT
Validate every Kafka record against a ContractGate semantic contract in real-time β before it reaches your data warehouse or AI systems. Invalid records go to a dead-letter topic. Valid records continue unchanged.
ΒΆQuick Start
Three steps from zero to validated records.
Sign up for a ContractGate account, create a contract, and copy your API key and contract UUID from the Account page.
Via Confluent Hub CLI:
confluent-hub install datacontractgate/kafka-connect-contractgate:latestbashOr manually β extract the ZIP into your Connect plugin path:
unzip kafka-connect-contractgate-0.1.0.zip \
-d /usr/share/confluent-hub-components/
# Restart Connect workers after installingbashAdd these lines to any existing connector's properties file:
transforms=contractgate
transforms.contractgate.type=io.datacontractgate.connect.smt.ContractGateValidator
transforms.contractgate.contractgate.api.url=https://contractgate-api.fly.dev
transforms.contractgate.contractgate.api.key=cg_live_YOUR_API_KEY
transforms.contractgate.contractgate.contract.id=YOUR_CONTRACT_UUID
# Route invalid records to a dead-letter topic
errors.deadletterqueue.topic.name=your-topic.dlq
errors.deadletterqueue.context.headers.enable=truepropertiesThat's it. Every record is now validated before reaching its destination.
ΒΆInstallation
Confluent Hub CLI
confluent-hub install datacontractgate/kafka-connect-contractgate:latestbashManual (Self-Managed Kafka)
Download the ZIP from the Confluent Hub listing and extract it into your plugin path:
# Typical plugin path locations:
# Confluent Platform: /usr/share/confluent-hub-components/
# Self-managed: /opt/kafka/plugins/
unzip kafka-connect-contractgate-0.1.0.zip -d /usr/share/confluent-hub-components/
# Add to connect-distributed.properties if not already:
plugin.path=/usr/share/confluent-hub-componentsbashConfluent Cloud (Custom Connector)
Upload the JAR via the Confluent Cloud UI under Connectors β Add plugin, then reference the SMT class in your connector config as shown above.
ΒΆConfiguration Reference
All settings are namespaced under contractgate.* to avoid conflicts with other SMTs in a chain.
| Key | Default | Description |
|---|---|---|
contractgate.api.urlrequired | β | Base URL of the ContractGate API. No trailing slash. Use https://contractgate-api.fly.dev for the hosted service. |
contractgate.contract.idrequired | β | UUID of the contract to validate against. Copy from the Contracts page. |
contractgate.api.key | "" | Your API key (x-api-key header). Leave blank only for local dev with auth disabled. |
contractgate.contract.version | "" (latest) | Pin to a specific contract version, e.g. 1.2.0. Leave blank to always use the latest stable version β recommended for most pipelines. When set, the version is sent as the X-Contract-Versionrequest header, which takes highest precedence in the server's resolution order. |
contractgate.on.failure | DLQ | What to do when a record fails. DLQ β throw DataException for dead-letter routing. TAG_AND_PASS β add violation headers and pass through. |
contractgate.dry.run | false | When true, validates without writing to the audit log. Useful for high-throughput pipelines where you want enforcement without DB write pressure. |
contractgate.connect.timeout.ms | 5000 | TCP connection timeout to the ContractGate API in milliseconds. |
contractgate.request.timeout.ms | 10000 | Total HTTP request/response timeout in milliseconds. Keep well below Kafka Connect's task timeout. |
contractgate.add.result.headers | true | Stamp contractgate.* metadata headers onto every record (pass or fail). See the headers reference below. |
contractgate.max.violation.headers | 5 | Maximum number of individual violation detail headers to add. High violation counts can bloat headers β cap at a useful number. |
ΒΆDead-Letter Queue Setup
When contractgate.on.failure=DLQ (the default), the SMT throws a DataExceptionon validation failure. Kafka Connect's built-in error handling routes the original record to a dead-letter topic.
Full DLQ connector config
# ββ ContractGate SMT ββββββββββββββββββββββββββββββββββββββ
transforms=contractgate
transforms.contractgate.type=io.datacontractgate.connect.smt.ContractGateValidator
transforms.contractgate.contractgate.api.url=https://contractgate-api.fly.dev
transforms.contractgate.contractgate.api.key=cg_live_YOUR_API_KEY
transforms.contractgate.contractgate.contract.id=YOUR_CONTRACT_UUID
transforms.contractgate.contractgate.on.failure=DLQ
# ββ Kafka Connect DLQ settings βββββββββββββββββββββββββββββ
# The topic to route failed records to (create it first)
errors.deadletterqueue.topic.name=orders.dlq
errors.deadletterqueue.topic.replication.factor=3
# Include the full violation summary in DLQ record headers
errors.deadletterqueue.context.headers.enable=true
# Log every DLQ-routed record (set to false at high error rates)
errors.log.enable=true
errors.log.include.messages=true
# Retry transient errors before sending to DLQ
errors.retry.timeout=60000
errors.retry.delay.max.ms=5000propertiesReading violation details from the DLQ
With errors.deadletterqueue.context.headers.enable=true, each DLQ record carries a header like:
__connect.errors.exception.message:
ContractGate validation failed β topic=orders partition=3 offset=1042
contract=3fa85f64-... version=1.2.0
2 violation(s): user_id [missing_required_field]: required field missing;
amount [range_violation]: value -5 below minimum 0textΒΆResult Headers
When contractgate.add.result.headers=true (default), the following headers are added to every record β both passing and failing.
| Header | Value |
|---|---|
contractgate.passed | "true" or "false" |
contractgate.contract.version | Resolved version string, e.g. "1.2.0" |
contractgate.violations.count | Number of violations (0 on pass) |
contractgate.violation.0.field | Dot-path of field, e.g. "customer.address.country" |
contractgate.violation.0.kind | missing_required_field Β· type_mismatch Β· enum_violation Β· range_violation Β· pattern_mismatch Β· length_violation Β· undeclared_field |
contractgate.violation.0.message | Human-readable explanation |
Violation headers repeat for indices 0β¦N up to contractgate.max.violation.headers (default 5).
ΒΆTag & Pass Mode
Set contractgate.on.failure=TAG_AND_PASS to never drop records. Invalid records get violation headers and continue downstream β consumers can inspect contractgate.passed and decide what to do.
Use this when you want observability without enforcement β for example, shadowing a new contract version before promoting it to stable.
transforms.contractgate.contractgate.on.failure=TAG_AND_PASS
# Records now flow through regardless of violations.
# Downstream consumers can branch on contractgate.passed=false.propertiesΒΆFull Examples
S3 Sink with DLQ
{
"name": "s3-sink-validated",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "4",
"topics": "orders",
"s3.region": "us-east-1",
"s3.bucket.name": "my-data-lake",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"transforms": "contractgate",
"transforms.contractgate.type": "io.datacontractgate.connect.smt.ContractGateValidator",
"transforms.contractgate.contractgate.api.url": "https://contractgate-api.fly.dev",
"transforms.contractgate.contractgate.api.key": "${file:/secrets/cg.properties:api.key}",
"transforms.contractgate.contractgate.contract.id": "YOUR_CONTRACT_UUID",
"errors.deadletterqueue.topic.name": "orders.dlq",
"errors.deadletterqueue.context.headers.enable": "true",
"errors.retry.timeout": "60000"
}
}jsonJDBC Sink (high-throughput, dry-run audit)
{
"name": "postgres-sink-validated",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "8",
"topics": "user_events",
"connection.url": "jdbc:postgresql://db:5432/analytics",
"transforms": "contractgate",
"transforms.contractgate.type": "io.datacontractgate.connect.smt.ContractGateValidator",
"transforms.contractgate.contractgate.api.url": "https://contractgate-api.fly.dev",
"transforms.contractgate.contractgate.api.key": "${file:/secrets/cg.properties:api.key}",
"transforms.contractgate.contractgate.contract.id": "YOUR_CONTRACT_UUID",
"transforms.contractgate.contractgate.dry.run": "true",
"transforms.contractgate.contractgate.on.failure": "DLQ",
"errors.deadletterqueue.topic.name": "user_events.dlq",
"errors.deadletterqueue.context.headers.enable": "true"
}
}jsonΒΆFAQ
Ready to get started?
Create a free account to get your API key and contract UUID.