| name | kafka-connector-v4 |
| description | Set up, configure, and troubleshoot the Snowflake Kafka Connector V4 (Snowpipe Streaming high-performance architecture). Covers fresh installations, connector property configuration for default pipe and user-defined pipe modes, server-side and client-side validation, JMX monitoring, migration from V3, and common error diagnosis. Triggers: kafka connector v4, kafka connector setup, kafka connector config, kafka connector troubleshoot, snowflake kafka connector, configure kafka connector, kafka connector help, kafka streaming connector. |
When to use
Use this skill when the user wants to:
- Fresh setup -- Deploy a new Kafka Connector V4 from scratch on an existing Kafka Connect cluster
- Configure -- Choose between pipe modes, validation modes, topic-to-table mapping, converters, error handling, or performance tuning
- Troubleshoot -- Diagnose connector failures, ingestion errors, lag, schema evolution issues, or authentication problems
- Migrate from V3 -- Upgrade from the classic Kafka connector (V3) to V4 with correct compatibility flags and offset migration
When NOT to use this skill:
- If the user wants a custom Kafka consumer using the Snowpipe Streaming SDK directly (redirect to the
custom-kafka-consumer skill)
- If the user wants to try Snowpipe Streaming without Kafka (redirect to the
snowpipe-streaming-quickstart skill)
What this skill provides
- Fresh Setup Workflow (Steps 0-5) -- Confirms prerequisites, generates RSA key-pair, creates Snowflake role/user/grants, generates a ready-to-deploy connector configuration file, and guides connector startup and verification.
- Configuration Advisor (Steps 6-7) -- Helps the user choose the right pipe mode (default vs user-defined), validation mode (server-side vs client-side), and generates the appropriate connector properties.
- Troubleshooting Guide (Step 8) -- A symptom-based diagnostic that identifies common issues and provides targeted fixes.
- V3 Migration Assistant (Step 9) -- Generates a migration-safe configuration with correct compatibility flags and offset migration settings.
Critical concepts
Architecture
- Connector class:
com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector
- Underlying technology: Snowpipe Streaming V2 (Rust-based SDK, off-heap memory management)
- Throughput: Up to 10 GB/s with 5-10 second end-to-end latency
- Pricing: Flat, throughput-based pricing (per GB ingested)
- Authentication: Key-pair only (no OAuth in V4)
- Java: 11+ required
Two pipe modes
Default pipe mode (simplest):
- Auto-creates a default pipe named
{tableName}-STREAMING
- Maps Kafka record JSON keys to table columns by name (case-insensitive)
- No explicit
CREATE PIPE needed
- Supports schema evolution when
ENABLE_SCHEMA_EVOLUTION=TRUE on the table
- Works with both server-side and client-side validation
User-defined pipe mode (flexible):
- User creates a PIPE with
COPY INTO ... FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING')) syntax
- Full control over transformations: renaming columns, type casting, filtering, masking
- Access to
RECORD_METADATA fields via $1:RECORD_METADATA.topic, $1:RECORD_METADATA.offset, etc.
- Pipe name must match the table name
- Works with server-side validation only (client-side validation fails with ERROR_5030)
Two validation modes
Server-side validation (default, snowflake.validation=server_side):
- Snowflake backend performs validation -- maximum throughput
- Invalid records captured in Error Tables
- Works with both default pipe and user-defined pipe modes
- DLQ is NOT used for ingestion validation errors (only for converter deserialization errors)
Client-side validation (snowflake.validation=client_side):
- Connector validates before sending to Snowflake
- Supports Dead Letter Queue (DLQ) for invalid records
- Better type inference for schema evolution
- Only works with default pipe mode
V4 default changes vs V3
| Configuration | V3 Default | V4 Default |
|---|
snowflake.enable.schematization | false (RECORD_CONTENT/RECORD_METADATA VARIANT columns) | true (record fields mapped to individual columns) |
snowflake.validation | Client-side equivalent | server_side |
snowflake.compatibility.enable.autogenerated.table.name.sanitization | true equivalent (invalid chars replaced, names uppercased) | false (topic names used as-is, case preserved) |
snowflake.compatibility.enable.column.identifier.normalization | true equivalent (column names uppercased) | false (column identifiers preserve case) |
Warning about topic names with special characters: With V4 defaults (table name sanitization disabled), a topic named orders-prod creates a table literally named "orders-prod" which requires double-quoting in all SQL. To avoid this, either use snowflake.topic2table.map to map to a clean table name, or set snowflake.compatibility.enable.autogenerated.table.name.sanitization=true to replicate V3 behavior (uppercased, invalid chars replaced).
RECORD_METADATA structure
| Field | Type | Description |
|---|
topic | String | Kafka topic name |
partition | String | Kafka partition number |
offset | Number | Offset in the partition |
CreateTime / LogAppendTime | Number | Kafka record timestamp (ms since epoch) |
SnowflakeConnectorPushTime | Number | When connector buffered the record (ms since epoch) |
key | String | Kafka message key (requires StringConverter for key.converter) |
headers | Object | User-defined key-value pairs |
RECORD_METADATA adds ~150 bytes overhead per record. Set snowflake.metadata.all=false to disable if not needed.
JVM memory guidance
The Rust-based Snowpipe Streaming SDK allocates off-heap (system) memory for buffering. Limit JVM heap to approximately 50% of available RAM. For example, on a worker with 8 GB RAM, set -Xmx4g. Minimum 5 MB per Kafka partition.
Supported platforms
| Package | Tested Versions |
|---|
| Apache Kafka | 2.8.2, 3.7.2, 4.1.1 |
| Confluent | 6.2.15, 7.8.2, 8.2.0 |
| Java | 11+ (SE recommended) |
Instructions
IMPORTANT EXECUTION GUIDELINES:
- Announce each step clearly -- Before executing each step, print a clear header like "Step X -- [Step Name]" so the user knows where they are.
- Batch commands aggressively -- Minimize permission prompts. Run independent operations in parallel.
- Log all SQL to
kafka_connector_v4_sql.log -- Every time you execute SQL, immediately append it to the log file with a step header and timestamp.
- Private keys: NEVER display in chat. Use
<REDACTED> in all logs and output.
- Detect user intent first (Step 0) -- Branch to the appropriate workflow based on what the user needs.
Step 0 -- Detect intent and branch
Before doing anything, determine what the user needs:
- If they say "setup" / "install" / "deploy" / "new connector" --> proceed through Steps 1-5 (Fresh Setup)
- If they say "configure" / "config" / "properties" / "settings" --> skip to Step 6 (Configuration Advisor)
- If they say "troubleshoot" / "debug" / "error" / "failing" / "lag" --> skip to Step 8 (Troubleshooting)
- If they say "migrate" / "v3 to v4" / "upgrade" --> skip to Step 9 (Migration)
If intent is unclear, ask:
What would you like help with?
- Fresh setup -- Deploy a new Kafka Connector V4
- Configure -- Help with connector properties for your use case
- Troubleshoot -- Diagnose an issue with an existing connector
- Migrate from V3 -- Upgrade from the classic connector
Step 1 -- Gather prerequisites (Fresh Setup)
Goal: Verify the user has everything needed before starting.
Ask the user for (in a single prompt):
- Kafka distribution: Confluent or OSS Apache Kafka?
- Kafka Connect mode: Distributed or Standalone?
- Java version (must be 11+)
- Snowflake account URL (e.g.,
myorg-myaccount.snowflakecomputing.com)
- Snowflake user name for the connector
- Snowflake role name for the connector
- Target database and schema
- Topic name(s)
- Whether they already have RSA key-pair auth configured
Run in parallel:
- Bash: Check Java version:
java -version 2>&1
- SQL:
SELECT CURRENT_USER(), CURRENT_ROLE(), CURRENT_DATABASE(), CURRENT_SCHEMA(), CURRENT_WAREHOUSE(), CURRENT_ACCOUNT();
Error handling:
- Java version < 11 --> tell user to upgrade
- No warehouse set --> tell user to set one with
USE WAREHOUSE
- Role lacks privileges --> guide to Step 2
Step 2 -- Configure Snowflake (role, user, grants)
Goal: Create or configure the Snowflake role and necessary grants.
Execute as one single SQL call:
USE ROLE SECURITYADMIN;
CREATE ROLE IF NOT EXISTS {{KAFKA_ROLE}};
GRANT USAGE ON DATABASE {{DATABASE}} TO ROLE {{KAFKA_ROLE}};
GRANT USAGE ON SCHEMA {{DATABASE}}.{{SCHEMA}} TO ROLE {{KAFKA_ROLE}};
GRANT CREATE TABLE ON SCHEMA {{DATABASE}}.{{SCHEMA}} TO ROLE {{KAFKA_ROLE}};
GRANT CREATE PIPE ON SCHEMA {{DATABASE}}.{{SCHEMA}} TO ROLE {{KAFKA_ROLE}};
GRANT ROLE {{KAFKA_ROLE}} TO USER {{KAFKA_USER}};
Note: If user already has a role, skip creation and verify grants. If ingesting into an existing table, grant INSERT on that specific table.
Step 3 -- Generate RSA key-pair (if needed)
Goal: Generate key-pair for connector authentication.
If user already has key-pair auth, skip this step.
Single Bash call (generate key-pair and extract public key body):
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt 2>/dev/null \
&& chmod 600 rsa_key.p8 \
&& openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub 2>/dev/null \
&& echo "RSA key-pair generated successfully"
Then silently read rsa_key.pub, strip the header/footer lines, and store the base64 body.
SECURITY: NEVER display any key content in chat. Use <REDACTED> in logs.
Register the public key via SQL:
ALTER USER {{KAFKA_USER}} SET RSA_PUBLIC_KEY='{{PUBLIC_KEY_BODY}}';
DESC USER {{KAFKA_USER}};
Step 4 -- Generate connector configuration
Goal: Generate a ready-to-deploy connector configuration file.
Ask the user:
- Do you want the default pipe mode (simplest, auto-creates tables/pipes) or user-defined pipe mode (custom transformations)?
- Do you want server-side validation (default, max throughput, Error Tables) or client-side validation (DLQ support)?
- Do your topic names match your desired table names? If not, provide mappings for
topic2table.map.
- Is your data format JSON or Avro (with Schema Registry)?
- How many Kafka partitions? (Recommend: set
tasks.max equal to partition count)
Based on answers, generate the config file using the appropriate template from the Templates section.
Write the config file:
- Distributed mode: Write
kafka-connector-v4-config.json
- Standalone mode: Write
kafka-connector-v4-config.properties
For the private key value, instruct the user:
Open rsa_key.p8, remove the -----BEGIN PRIVATE KEY----- and -----END PRIVATE KEY----- header/footer lines, remove all newlines, and paste the single base64 string as the value for snowflake.private.key. In production, use a ConfigProvider to externalize this secret (AWS KMS, Azure Key Vault, HashiCorp Vault).
If user chose user-defined pipe mode, also generate the pipe SQL (see Step 7).
Step 5 -- Deploy and verify
Goal: Start the connector and verify data is flowing.
Provide the startup command:
Distributed mode:
curl -X POST -H "Content-Type: application/json" --data @kafka-connector-v4-config.json http://localhost:8083/connectors
Standalone mode:
<kafka_dir>/bin/connect-standalone.sh <kafka_dir>/config/connect-standalone.properties kafka-connector-v4-config.properties
Verification steps (guide the user):
-
Check connector status:
curl -s http://localhost:8083/connectors/{{CONNECTOR_NAME}}/status | python3 -m json.tool
-
Wait a few seconds for data to arrive, then query the table:
SELECT COUNT(*) FROM {{DATABASE}}.{{SCHEMA}}.{{TABLE}};
SELECT * FROM {{DATABASE}}.{{SCHEMA}}.{{TABLE}} LIMIT 10;
-
Estimate end-to-end latency (see Template 8 in Templates section).
Present success summary with row count and latency estimate.
Step 6 -- Configuration advisor (standalone entry point)
Goal: Help users who need configuration guidance for a specific topic.
Ask what they need help with:
- Pipe mode selection -- Default (simple, auto-creates) vs user-defined (custom transformations). If user-defined, proceed to Step 7.
- Validation mode -- Server-side (default, max throughput, Error Tables) vs client-side (DLQ support). Note: client-side only works with default pipe.
- Topic-to-table mapping -- Static (topic name = table name), explicit (
topic2table.map), regex patterns, many-to-one. Examples:
- Explicit:
snowflake.topic2table.map=orders:ORDER_TABLE,customers:CUSTOMER_TABLE
- Many-to-one:
snowflake.topic2table.map=topic1:SHARED_TABLE,topic2:SHARED_TABLE,topic3:SHARED_TABLE
- Regex:
snowflake.topic2table.map=.*_events:ALL_EVENTS
- Quoted (special chars):
snowflake.topic2table.map="my:topic":"My_Table"
- Schema evolution -- Set
ENABLE_SCHEMA_EVOLUTION=TRUE on the table. Adds new columns automatically. Server-side validation may infer wrong types; pre-create table if exact types matter.
- Iceberg tables -- Must pre-create the table, grant USAGE on external volume, no schema evolution support.
- Converters -- JSON (
JsonConverter), Avro (AvroConverter + Schema Registry), Protobuf (ProtobufConverter). Note: StringConverter and ByteArrayConverter not supported with schematization=true.
- Performance tuning --
tasks.max = partition count, JVM heap ~50% RAM, same-region deployment, cache settings.
- Error handling --
errors.tolerance (none vs all), DLQ config, Error Tables for server-side.
- Monitoring -- JMX metrics setup, key alerting metrics (see monitoring section in Critical Concepts).
For each sub-topic, provide the relevant configuration snippet and a brief explanation.
Step 7 -- User-defined pipe setup
Goal: Guide user through creating a user-defined pipe for custom transformations.
-
Create the destination table:
CREATE TABLE {{DATABASE}}.{{SCHEMA}}.{{TABLE}} (
);
-
Create a pipe with the same name as the table:
CREATE PIPE {{DATABASE}}.{{SCHEMA}}.{{TABLE}} AS
COPY INTO {{DATABASE}}.{{SCHEMA}}.{{TABLE}}
FROM (
SELECT
$1:field_name::TYPE AS column_name,
$1:RECORD_METADATA.topic::STRING AS source_topic,
$1:RECORD_METADATA.offset::NUMBER AS kafka_offset
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
-
Grant OPERATE on the pipe:
GRANT OPERATE ON PIPE {{DATABASE}}.{{SCHEMA}}.{{TABLE}} TO ROLE {{KAFKA_ROLE}};
Field accessor syntax:
- Simple fields:
$1:field_name
- Fields with spaces or special characters:
$1['field name']
- Nested fields:
$1:parent.child or $1:parent['child field']
- Metadata fields:
$1:RECORD_METADATA.topic, $1:RECORD_METADATA.offset, $1:RECORD_METADATA.SnowflakeConnectorPushTime
Important: Client-side validation does NOT work with user-defined pipes. The connector will fail with ERROR_5030. Use snowflake.validation=server_side (the default). This means you cannot use both DLQ and user-defined pipes -- if you need custom transformations, use server-side validation with Error Tables for error handling instead of DLQ.
Step 8 -- Troubleshooting (standalone entry point)
Goal: Diagnose and fix common connector issues.
Ask the user to describe their symptom, then branch:
Symptom: Connector fails at startup
"Unrecognized configuration" errors:
- You may be using V3 properties that were removed in V4. Removed properties include:
snowflake.ingestion.method
buffer.flush.time, buffer.size.bytes, buffer.count.records
snowflake.streaming.max.client.lag, snowflake.streaming.max.memory.limit.bytes
snowflake.snowpipe.*
snowflake.authenticator, snowflake.oauth.*
snowflake.streaming.iceberg.enabled
- All Snowflake-provided custom converters (
SnowflakeJsonConverter, etc.)
- Remove these properties and use V4 equivalents.
"Compatibility validator" errors:
- New installation: Set
snowflake.streaming.validate.compatibility.with.classic=false
- Migration from V3: Explicitly set all required compatibility properties:
snowflake.validation=client_side
snowflake.compatibility.enable.column.identifier.normalization=true
snowflake.compatibility.enable.autogenerated.table.name.sanitization=true
snowflake.enable.schematization=true or false
snowflake.streaming.classic.offset.migration=skip or best_effort or strict
"Authentication failed":
- Verify
snowflake.private.key is valid Base64-encoded PKCS#8 (no header/footer lines, no newlines)
- If key is encrypted, set
snowflake.private.key.passphrase
- Verify the role has required privileges (see Step 2)
"Unsupported converter" with schematization:
StringConverter and ByteArrayConverter are not supported when snowflake.enable.schematization=true (the default)
- Use
JsonConverter, AvroConverter, or ProtobufConverter
Symptom: Connector running but no data in Snowflake
- Check connector status:
curl -s http://localhost:8083/connectors/<name>/status
- Check if topics have data: use
kafka-console-consumer --topic <topic> --bootstrap-server <server> --from-beginning --max-messages 5
- Check role has INSERT on the target table
- Check cache: if table/pipe was created after connector started, cache may be stale. Reduce
snowflake.cache.table.exists.expire.ms or restart the connector.
Symptom: ERROR_5030 (ingestion error)
Common causes:
- Data type mismatch between Kafka record and table schema
- Client-side validation with user-defined pipe (not supported -- switch to
server_side)
- Schema changes that can't auto-evolve
To investigate:
- Check Error Table for server-side validation:
SHOW TABLES LIKE '%ERRORS%' IN SCHEMA {{DATABASE}}.{{SCHEMA}};
SELECT * FROM {{DATABASE}}.{{SCHEMA}."{{TABLE}}_ERRORS" ORDER BY ERROR_TIMESTAMP DESC LIMIT 20;
- Check DLQ for client-side validation: consume from your DLQ topic to inspect failed records
- Enable
errors.log.enable=true for verbose logging
- Review connector logs for specific error details
Symptom: Ingestion lag growing
Check JMX metrics: latest-consumer-offset minus persisted-in-snowflake-offset
If JMX is unavailable, estimate lag from the Snowflake side using Template 8 (latency estimation query). High latency_ms values indicate data is taking a long time from connector buffering to queryability.
Fixes:
- Increase tasks: Set
tasks.max closer to the total number of Kafka partitions
- Check backpressure: If
backpressure-rewind-count is increasing, the SDK is at capacity -- scale out Kafka Connect cluster
- Review JVM memory: Heap should be ~50% of available RAM, rest for Rust SDK off-heap memory
- Same region: Ensure Kafka Connect cluster is in the same cloud region as Snowflake account
Symptom: Duplicate records after migration
Use the deduplication query (see Template 9 in Templates section) based on RECORD_METADATA topic, partition, and offset fields.
Important: Deduplication requires snowflake.metadata.topic and snowflake.metadata.offset.and.partition to be enabled (they are by default).
Symptom: Schema evolution producing wrong types
Server-side validation can't always infer the correct data type (e.g., interprets "2026-04-13" as DATE not TEXT, can't infer binary columns).
Fixes:
- Pre-create the table with the correct schema before starting the connector
- Use client-side validation (
snowflake.validation=client_side) for better type inference
- Avoid running DDL on tables while the connector is actively ingesting
Symptom: Channel recovery count increasing
Possible causes:
- Schema changes on target table conflicting with connector's cached schema
- Permission changes affecting the connector's role
- Network instability between Kafka Connect and Snowflake
Review connector logs for specific recovery reasons. Reduce SDK log noise with export SS_LOG_LEVEL=warn.
Symptom: Out of memory (OOM) errors
The V4 connector uses a Rust-based SDK that allocates off-heap (system) memory outside the JVM heap. OOM can occur at both levels:
- JVM heap OOM: Reduce heap size to ~50% of available RAM (e.g.,
-Xmx4g on 8 GB machine). The connector itself needs less heap than V3.
- System memory OOM: The Rust SDK allocates ~5 MB per Kafka partition for buffering. If total partition count is very high, the system may run out of physical memory. Scale out to more Kafka Connect workers to spread partitions across machines.
- Both: If using other connectors on the same cluster, account for their memory needs too.
Symptom: Connector task repeatedly restarting
If a task fails and Kafka Connect auto-restarts it in a loop:
- Check the task failure reason:
curl -s http://localhost:8083/connectors/<name>/status
- Look for the
trace field in failed tasks for the stack trace
- Common causes: authentication failure (key expired), table dropped while connector running, schema incompatibility
- Set
enable.task.fail.on.authorization.errors=true to fail fast on auth issues rather than retry indefinitely
Symptom: Connection failures through proxy
If the connector cannot reach Snowflake through a corporate proxy:
jvm.proxy.host={{PROXY_HOST}}
jvm.proxy.port={{PROXY_PORT}}
jvm.proxy.username={{PROXY_USER}}
jvm.proxy.password={{PROXY_PASSWORD}}
Symptom: SDK client count growing unbounded
If the sdk-client-count JMX metric grows continuously, there may be a client leak. Each distinct target table should have one SDK client. If the count exceeds the number of distinct tables, contact Snowflake Support.
Step 9 -- V3 migration assistant (standalone entry point)
Goal: Generate a migration-safe V4 configuration from an existing V3 deployment.
Ask the user:
- Was your V3 connector using Snowpipe mode (default,
snowflake.ingestion.method=SNOWPIPE) or Snowpipe Streaming mode (SNOWPIPE_STREAMING)?
- What was your V3 connector name?
- (If Streaming mode) Did your V3 config include
snowflake.streaming.channel.name.include.connector.name=true?
- Do you want to adopt V4 defaults (recommended for new behavior) or reproduce V3 behavior exactly?
Based on answers, generate the migration config:
From V3 Snowpipe mode: Use Template 6 (offset migration = skip)
From V3 Snowpipe Streaming mode: Use Template 7 (offset migration = best_effort or strict)
Migration procedure:
- Stop the V3 connector.
- (Snowpipe mode only) Wait for all staged data to be ingested into Snowflake before proceeding. Snowpipe stages files asynchronously.
- Deploy the V4 configuration using the same connector name as V3 (same Kafka consumer group).
- Start the V4 connector.
- Verify data flow (see Step 5).
- Complete within 7 days (
offsets.retention.minutes default). After 7 days, Kafka consumer group offsets expire.
If your V3 connector uses OAuth authentication, you must switch to key-pair auth before migrating. V4 does not support OAuth. Generate an RSA key-pair (see Step 3), register the public key on the Snowflake user, and update the connector config to use snowflake.private.key instead of snowflake.oauth.* properties.
If using Snowflake custom converters in V3, replace them with community equivalents:
SnowflakeJsonConverter --> org.apache.kafka.connect.json.JsonConverter
SnowflakeAvroConverter --> io.confluent.connect.avro.AvroConverter
SnowflakeAvroConverterWithoutSchemaRegistry --> io.confluent.connect.avro.AvroConverter
SnowflakeProtobufConverter --> io.confluent.connect.protobuf.ProtobufConverter
Downgrade path: Reverse migration is possible but expect duplicates. Use the deduplication query (Template 9) to clean up.
Best practices
- Never persist private keys in chat or logs. Use
<REDACTED> when logging SQL containing key material. In production, use a ConfigProvider (AWS KMS, Azure Key Vault, HashiCorp Vault) to externalize secrets.
- Set
tasks.max equal to the number of Kafka partitions for optimal throughput. Do not exceed the number of CPU cores across the Kafka Connect cluster.
- Limit JVM heap to ~50% of available RAM to leave room for the Rust-based SDK's off-heap memory.
- Run Kafka Connect in the same cloud region as your Snowflake account for best throughput.
- For new installations, always set
snowflake.streaming.validate.compatibility.with.classic=false.
- Pre-create tables when you need exact control over column types.
- Enable
ENABLE_SCHEMA_EVOLUTION=TRUE on tables if your Kafka record schema may change over time. All connector-created tables have this enabled by default.
- Set
snowflake.metadata.all=false if you don't need RECORD_METADATA (~150 bytes savings per record).
- For testing, set
snowflake.cache.table.exists=false and snowflake.cache.pipe.exists=false to avoid cache staleness. Use default cache settings (5 min) in production.
- Enable MDC logging (
enable.mdc.logging=true) when running multiple connector instances to correlate log entries.
- Reduce SDK log noise with
export SS_LOG_LEVEL=warn on Kafka Connect workers.
Examples
Sample user queries and how the skill responds:
- "Set up Kafka connector V4" --> Full fresh setup flow (Steps 1-5)
- "Help me configure my Kafka connector properties" --> Configuration Advisor (Step 6)
- "My Kafka connector is failing with ERROR_5030" --> Troubleshooting, ERROR_5030 branch (Step 8)
- "Migrate my V3 connector to V4" --> V3 Migration Assistant (Step 9)
- "I need a user-defined pipe with custom transformations" --> Steps 6-7
- "How do I monitor my Kafka connector?" --> Configuration Advisor, monitoring sub-topic (Step 6)
- "Set up DLQ for my Kafka connector" --> Configuration Advisor, error handling sub-topic (Step 6)
- "My connector is lagging behind" --> Troubleshooting, ingestion lag branch (Step 8)
Templates
Template 1: Minimal V4 config (Distributed mode, JSON)
{
"name": "{{CONNECTOR_NAME}}",
"config": {
"connector.class": "com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector",
"topics": "{{TOPICS}}",
"tasks.max": "{{TASKS_MAX}}",
"snowflake.url.name": "{{ACCOUNT_URL}}",
"snowflake.user.name": "{{USER}}",
"snowflake.private.key": "{{PRIVATE_KEY_BASE64}}",
"snowflake.database.name": "{{DATABASE}}",
"snowflake.schema.name": "{{SCHEMA}}",
"snowflake.role.name": "{{ROLE}}",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"snowflake.streaming.validate.compatibility.with.classic": "false"
}
}
Template 2: Minimal V4 config (Standalone mode, Properties)
name={{CONNECTOR_NAME}}
connector.class=com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector
topics={{TOPICS}}
tasks.max={{TASKS_MAX}}
snowflake.url.name={{ACCOUNT_URL}}
snowflake.user.name={{USER}}
snowflake.private.key={{PRIVATE_KEY_BASE64}}
snowflake.database.name={{DATABASE}}
snowflake.schema.name={{SCHEMA}}
snowflake.role.name={{ROLE}}
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.storage.StringConverter
snowflake.streaming.validate.compatibility.with.classic=false
Template 3: Full V4 config with common options (Distributed mode, JSON)
{
"name": "{{CONNECTOR_NAME}}",
"config": {
"connector.class": "com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector",
"topics": "{{TOPICS}}",
"tasks.max": "{{TASKS_MAX}}",
"snowflake.url.name": "{{ACCOUNT_URL}}",
"snowflake.user.name": "{{USER}}",
"snowflake.private.key": "{{PRIVATE_KEY_BASE64}}",
"snowflake.database.name": "{{DATABASE}}",
"snowflake.schema.name": "{{SCHEMA}}",
"snowflake.role.name": "{{ROLE}}",
"snowflake.topic2table.map": "{{TOPIC_TABLE_MAP}}",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"snowflake.enable.schematization": "true",
"snowflake.validation": "server_side",
"errors.tolerance": "none",
"errors.log.enable": "true",
"snowflake.metadata.all": "true",
"jmx": "true",
"enable.mdc.logging": "false",
"snowflake.streaming.validate.compatibility.with.classic": "false"
}
}
Template 4: Snowflake role/user/grants SQL
USE ROLE SECURITYADMIN;
CREATE ROLE IF NOT EXISTS {{KAFKA_ROLE}};
GRANT USAGE ON DATABASE {{DATABASE}} TO ROLE {{KAFKA_ROLE}};
GRANT USAGE ON SCHEMA {{DATABASE}}.{{SCHEMA}} TO ROLE {{KAFKA_ROLE}};
GRANT CREATE TABLE ON SCHEMA {{DATABASE}}.{{SCHEMA}} TO ROLE {{KAFKA_ROLE}};
GRANT CREATE PIPE ON SCHEMA {{DATABASE}}.{{SCHEMA}} TO ROLE {{KAFKA_ROLE}};
GRANT CREATE VIEW ON SCHEMA {{DATABASE}}.{{SCHEMA}} TO ROLE {{KAFKA_ROLE}};
GRANT INSERT ON TABLE {{DATABASE}}.{{SCHEMA}}.{{TABLE}} TO ROLE {{KAFKA_ROLE}};
GRANT ROLE {{KAFKA_ROLE}} TO USER {{KAFKA_USER}};
Template 5: User-defined pipe with transformations
CREATE TABLE {{DATABASE}}.{{SCHEMA}}.{{TABLE}} (
order_id NUMBER,
customer_name VARCHAR,
order_total NUMBER,
source_topic VARCHAR,
kafka_offset NUMBER
);
CREATE PIPE {{DATABASE}}.{{SCHEMA}}.{{TABLE}} AS
COPY INTO {{DATABASE}}.{{SCHEMA}}.{{TABLE}}
FROM (
SELECT
$1:order_id::NUMBER,
$1:customer_name,
$1:order_total::NUMBER,
$1:RECORD_METADATA.topic::STRING AS source_topic,
$1:RECORD_METADATA.offset::NUMBER AS kafka_offset
FROM TABLE(DATA_SOURCE(TYPE => 'STREAMING'))
);
GRANT OPERATE ON PIPE {{DATABASE}}.{{SCHEMA}}.{{TABLE}} TO ROLE {{KAFKA_ROLE}};
Template 6: V3-to-V4 migration config (from Snowpipe mode)
# Required: new connector class
connector.class=com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector
# Your existing connection settings
topics={{TOPICS}}
tasks.max={{TASKS_MAX}}
snowflake.url.name={{ACCOUNT_URL}}
snowflake.user.name={{USER}}
snowflake.private.key={{PRIVATE_KEY_BASE64}}
snowflake.database.name={{DATABASE}}
snowflake.schema.name={{SCHEMA}}
snowflake.role.name={{ROLE}}
# Community converter (replaces Snowflake converters)
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.storage.StringConverter
# Compatibility flags to reproduce V3 behavior
snowflake.validation=client_side
snowflake.compatibility.enable.autogenerated.table.name.sanitization=true
snowflake.compatibility.enable.column.identifier.normalization=true
# Set to 'false' to keep V3 RECORD_CONTENT/RECORD_METADATA columns,
# or 'true' to adopt V4 schematized columns (recommended for new tables).
snowflake.enable.schematization=false
# Offset migration: skip SSv1 migration (not needed for Snowpipe mode)
snowflake.streaming.classic.offset.migration=skip
# Disable compatibility validator after confirming config
snowflake.streaming.validate.compatibility.with.classic=false
Template 7: V3-to-V4 migration config (from Snowpipe Streaming mode)
# Required: new connector class
connector.class=com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector
# Your existing connection settings
topics={{TOPICS}}
tasks.max={{TASKS_MAX}}
snowflake.url.name={{ACCOUNT_URL}}
snowflake.user.name={{USER}}
snowflake.private.key={{PRIVATE_KEY_BASE64}}
snowflake.database.name={{DATABASE}}
snowflake.schema.name={{SCHEMA}}
snowflake.role.name={{ROLE}}
# Community converter (replaces Snowflake converters)
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.storage.StringConverter
# Compatibility flags to reproduce V3 behavior
snowflake.validation=client_side
snowflake.compatibility.enable.autogenerated.table.name.sanitization=true
snowflake.compatibility.enable.column.identifier.normalization=true
# Set to 'false' to keep V3 RECORD_CONTENT/RECORD_METADATA columns,
# or 'true' to adopt V4 schematized columns (recommended for new tables).
snowflake.enable.schematization=false
# Offset migration: recover offsets from V3 SSv1 channels
# Use 'strict' to fail if SSv1 channels aren't found,
# or 'best_effort' to fall back to Kafka consumer group offsets.
snowflake.streaming.classic.offset.migration=best_effort
# Must match your V3 snowflake.streaming.channel.name.include.connector.name setting
snowflake.streaming.classic.offset.migration.include.connector.name={{MATCH_V3_SETTING}}
# Disable compatibility validator after confirming config
snowflake.streaming.validate.compatibility.with.classic=false
Template 8: Latency estimation query
SELECT
RECORD_METADATA:topic::STRING AS topic,
RECORD_METADATA:partition::NUMBER AS partition,
RECORD_METADATA:offset::NUMBER AS offset,
TIMESTAMPDIFF('millisecond',
TO_TIMESTAMP(RECORD_METADATA:SnowflakeConnectorPushTime::BIGINT, 3),
CONVERT_TIMEZONE('UTC', CURRENT_TIMESTAMP())
) AS latency_ms
FROM {{DATABASE}}.{{SCHEMA}}.{{TABLE}}
ORDER BY latency_ms DESC
LIMIT 10;
Template 9: Deduplication query (post-migration)
Important: Test this query on a small dataset first. The approach uses an IN subquery on (topic, partition, offset) tuples to identify groups with duplicates. For large tables, consider using INSERT INTO ... SELECT DISTINCT into a new table as a safer alternative.
DELETE FROM {{DATABASE}}.{{SCHEMA}}.{{TABLE}}
WHERE RECORD_METADATA IS NOT NULL
AND (RECORD_METADATA:topic, RECORD_METADATA:partition, RECORD_METADATA:offset)
IN (
SELECT RECORD_METADATA:topic, RECORD_METADATA:partition, RECORD_METADATA:offset
FROM (
SELECT RECORD_METADATA,
ROW_NUMBER() OVER (
PARTITION BY RECORD_METADATA:topic, RECORD_METADATA:partition, RECORD_METADATA:offset
ORDER BY RECORD_METADATA:offset
) AS rn
FROM {{DATABASE}}.{{SCHEMA}}.{{TABLE}}
WHERE RECORD_METADATA IS NOT NULL
)
WHERE rn > 1
);
Template 10: JMX monitoring setup
Add these JVM properties to your Kafka Connect worker startup:
KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port={{JMX_PORT}} -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"
Key MBeans to monitor (domain: snowflake.kafka.connector):
Task-level:
put-records (Meter) -- records received, use rates for throughput
put-duration (Timer) -- put call duration, high values indicate bottlenecks
backpressure-rewind-count (Counter) -- rewinds due to SDK backpressure
assigned-partitions (Gauge) -- partitions assigned to this task
Channel-level (per partition):
processed-offset (Gauge) -- latest offset buffered by connector
persisted-in-snowflake-offset (Gauge) -- latest offset durably committed in Snowflake
latest-consumer-offset (Gauge) -- latest offset available from Kafka
channel-recovery-count (Gauge) -- channel recovery events
Alerting recommendations:
- Ingestion lag:
latest-consumer-offset - persisted-in-snowflake-offset growing
- Backpressure:
backpressure-rewind-count increasing over time
- Channel recovery:
channel-recovery-count increasing
- Put duration:
put-duration p99 exceeding acceptable threshold
Template 11: DLQ configuration (client-side validation)
# Enable client-side validation (required for DLQ)
snowflake.validation=client_side
# Continue on errors (route invalid records to DLQ instead of failing)
errors.tolerance=all
# DLQ topic name
errors.deadletterqueue.topic.name={{DLQ_TOPIC}}
# Enable error logging
errors.log.enable=true
Warning: Setting errors.tolerance=all without configuring a DLQ topic causes invalid records to be silently dropped. This can cause data loss.
Template 12: Avro with Schema Registry config (Distributed mode, JSON)
{
"name": "{{CONNECTOR_NAME}}",
"config": {
"connector.class": "com.snowflake.kafka.connector.SnowflakeStreamingSinkConnector",
"topics": "{{TOPICS}}",
"tasks.max": "{{TASKS_MAX}}",
"snowflake.url.name": "{{ACCOUNT_URL}}",
"snowflake.user.name": "{{USER}}",
"snowflake.private.key": "{{PRIVATE_KEY_BASE64}}",
"snowflake.database.name": "{{DATABASE}}",
"snowflake.schema.name": "{{SCHEMA}}",
"snowflake.role.name": "{{ROLE}}",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "{{SCHEMA_REGISTRY_URL}}",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"snowflake.streaming.validate.compatibility.with.classic": "false"
}
}
Note: For secure Schema Registry access, also set value.converter.basic.auth.credentials.source=USER_INFO and value.converter.basic.auth.user.info={{REGISTRY_USER}}:{{REGISTRY_PASSWORD}}.