
Neo4j Kafka Skill
Configure Neo4j Connector for Kafka sink/source strategies, CDC, Schema Registry, EOS, and DLQ for graph streaming pipelines.
Install
npx skills add https://github.com/neo4j-contrib/neo4j-skills --skill neo4j-kafka-skillWhat is this skill?
- Sink strategies: Cypher, Pattern, CDC (schema + source-id), CUD, plus exactly-once semantics and DLQ error handling.
- Source modes: CDC-based (Neo4j 5.13+) and query-based for any edition.
- Native Neo4j CDC API (db.cdc.query) with CDC cursor-loop examples in Python and Java.
- Confluent Cloud managed Neo4j Sink and Schema Registry support for Avro and JSON Schema.
- Explicitly not legacy Neo4j Streams plugin—points to Connector for Kafka for Neo4j 5.0+.
Adoption & trust: 1 installs on skills.sh; 80 GitHub stars; 3/3 security scanners passed (skills.sh audits); trending (+100% hot-view momentum).
Recommended Skills
Journey fit
Canonical shelf is Build integrations because the skill is primarily connector configuration and streaming topology design wired to Neo4j. Integrations subphase matches Kafka Connect, Confluent Cloud managed connector, and CDC cursor-loop patterns between Neo4j and event buses.
Common Questions / FAQ
Is Neo4j Kafka Skill safe to install?
skills.sh reports 3 of 3 security scanners passed. Review the Security Audits panel on this page before installing in production.
SKILL.md
READMESKILL.md - Neo4j Kafka Skill
# neo4j-kafka-skill ## What it covers Configure and operate the **Neo4j Connector for Kafka** (sink and source) and the **native Neo4j CDC API**. | Component | Covered | |---|---| | Sink: Cypher strategy | ✅ | | Sink: Pattern strategy | ✅ | | Sink: CDC strategy (schema + source-id) | ✅ | | Sink: CUD strategy | ✅ | | Sink: Exactly-once semantics (EOS) | ✅ | | Sink: Error handling / DLQ | ✅ | | Source: CDC-based (Neo4j 5.13+) | ✅ | | Source: Query-based (any edition) | ✅ | | Native CDC API (`db.cdc.query`) | ✅ | | Confluent Cloud managed connector | ✅ | | Schema Registry (Avro / JSON Schema) | ✅ | | CDC cursor-loop pattern (Python + Java) | ✅ | ## Not covered - Cypher query authoring → [neo4j-cypher-skill](../neo4j-cypher-skill/) - Bulk CSV/JSON file import → [neo4j-import-skill](../neo4j-import-skill/) - GDS algorithms → [neo4j-gds-skill](../neo4j-gds-skill/) - Legacy Neo4j Streams plugin (deprecated, use Connector for Kafka ≥ 5.0) ## Install ```bash # Self-managed Kafka Connect — download JAR from Confluent Hub or neo4j.com confluent-hub install neo4j/kafka-connect-neo4j:latest # Or download directly curl -L https://github.com/neo4j/neo4j-kafka-connector/releases/latest/download/neo4j-kafka-connector.zip \ -o neo4j-kafka-connector.zip ``` Confluent Cloud: Neo4j Sink is available as a fully managed connector — no JAR install required. Select from the Confluent Cloud connector catalog. ## References - `references/sink-config.md` — complete sink connector property reference - `references/cdc-api.md` — CDC procedure details, event schema, cursor-loop examples - [Neo4j Connector for Kafka docs](https://neo4j.com/docs/kafka/current/) - [Neo4j CDC docs](https://neo4j.com/docs/cdc/current/) # Neo4j Native CDC API — Patterns, Event Structure, Cursor Loop Source: [neo4j.com/docs/cdc/current/](https://neo4j.com/docs/cdc/current/) ## Requirements | Requirement | Detail | |---|---| | Neo4j version | 5.13+ | | Edition | Enterprise Edition, AuraDB Business Critical, AuraDB VDC | | Self-managed config | `db.cdc.enabled=true` in `neo4j.conf` | | Aura | Enabled by default on BC/VDC tiers | CDC is NOT available on Community Edition or AuraDB Free/Professional. --- ## Procedures ### `db.cdc.current()` Returns cursor for the last committed transaction. Cursor is **exclusive** — does not include changes from that transaction. ```cypher CALL db.cdc.current() YIELD id RETURN id AS cursor; ``` Use as starting point for "stream from now forward". ### `db.cdc.earliest()` Returns cursor for the earliest available change in CDC buffer. ```cypher CALL db.cdc.earliest() YIELD id RETURN id AS cursor; ``` Use to replay full CDC history. ### `db.cdc.query(from, selectors)` | Parameter | Type | Default | Description | |---|---|---|---| | `from` | STRING | `""` (= current) | Starting cursor (exclusive) | | `selectors` | LIST OF MAP | `[]` (= all) | Filter criteria | Returns: `id`, `txId`, `seq`, `metadata`, `event` --- ## Selector Reference Selectors are ANDed within one map, ORed across list items. ```cypher // AND: node labeled Person AND operation is CREATE [{select: 'n', labels: ['Person'], operation: 'c'}] // OR: Person creates OR Organization updates [ {select: 'n', labels: ['Person'], operation: 'c'}, {select: 'n', labels: ['Organization'], operation: 'u'} ] ``` | Field | Values | Scope | Description | |---|---|---|---| | `select` | `'e'` (all), `'n'` (nodes), `'r'` (rels) | both | Entity type filter | | `operation` | `'c'` (create), `'u'` (update), `'d'` (delete) | both | Operation type | | `labels` | `['Label1', 'Label2']` | nodes | Node must have ALL listed labels | | `type` | `'REL_TYPE'` | rels | Relationship type | | `elementId` | element ID string | both | Specific entity by ID | | `key` | `{prop: value}` | both | Match by key property (requires key constraint) | | `changesTo` | `['prop1', 'prop2']` | both | ALL listed properties must change (AND) | | `authenticatedUser` | username string | both | Filter