
Mongodb Atlas Stream Processing
Configure MongoDB Atlas Stream Processing pipelines with the correct source, sink, and mid-pipeline connection types when wiring change streams, Kafka, S3, HTTPS, and Lambda stages.
Overview
MongoDB Atlas Stream Processing is an agent skill most often used in Build (also Operate) that maps Atlas connection types to valid stream pipeline sources, sinks, and mid-pipeline stages.
Install
npx skills add https://github.com/mongodb/agent-skills --skill mongodb-atlas-stream-processingWhat is this skill?
- Connection capability matrix for Cluster, Kafka, Sample Stream, S3, Https, AWSLambda, AWS Kinesis, and SchemaRegistry ro
- Clarifies valid $source, $merge, $emit, $lookup, $https, and $externalFunction placement per connection type
- Documents Kafka source requirement for a topic field and async-only Lambda sinks versus sync/async mid-pipeline Lambda
- Points to the official ASP_example repo for quickstarts, processors, and Terraform examples
- Flags common mistakes such as using S3 or SchemaRegistry as sources or sinks where they are metadata-only or sink-only
- Connection reference table covers 8 connection types including Cluster, Kafka, Sample Stream, S3, Https, AWSLambda, AWS
- Official examples repository: mongodb/ASP_example with quickstarts and Terraform samples
Adoption & trust: 613 installs on skills.sh; 131 GitHub stars; 2/3 security scanners passed (skills.sh audits).
What problem does it solve?
You need a real-time Atlas pipeline but are unsure which connections can be $source, $merge, $emit, or enrichment without invalid stage combinations.
Who is it for?
Indie builders adding CDC, Kafka bridge, or webhook/Lambda enrichment on MongoDB Atlas Stream Processing.
Skip if: Teams that only need static CRUD on a single collection with no stream processors or external sinks.
When should I use this skill?
Designing or editing MongoDB Atlas Stream Processing pipelines and you need authoritative source/sink rules per connection type.
What do I get? / Deliverables
You pick supported connection patterns and operator chains aligned with ASP examples so pipelines deploy with fewer connection-capability errors.
- Valid ASP connection and stage layout aligned with capability matrix
- Processor configuration referencing ASP_example quickstarts or Terraform patterns
Recommended Skills
Journey fit
Spans multiple journey phases - primary shelf plus alternate fits below.
Stream pipeline design is canonical on the Build shelf under integrations because it connects Atlas to Kafka, S3, HTTPS, and AWS mid-product wiring. Atlas Stream Processing is cross-system plumbing (change streams, $emit, $externalFunction), which maps to integrations rather than app UI or pure schema modeling.
Where it fits
Map cluster change streams to a $merge sink and a Kafka $emit topic for a new notification service.
Choose Https mid-pipeline enrichment versus a final $https sink before locking processor JSON.
Debug a rejected pipeline stage after adding AWSLambda with the wrong async-only sink execution mode.
How it compares
Reference for Atlas stream connection rules—not a generic Drizzle/Neon provisioning skill or an MCP server catalog entry.
Common Questions / FAQ
Who is mongodb-atlas-stream-processing for?
Solo and indie builders shipping event-driven or analytics features on MongoDB Atlas who must wire change streams, Kafka, S3, HTTPS, or Lambda into ASP pipelines correctly.
When should I use mongodb-atlas-stream-processing?
During Build when designing integrations from Atlas to Kafka, S3, or HTTPS/Lambda, and during Operate when extending processors or fixing invalid source/sink stages on live streams.
Is mongodb-atlas-stream-processing safe to install?
Review the Security Audits panel on this Prism page and inspect the skill package before granting network or cloud credentials to your agent.
SKILL.md
READMESKILL.md - Mongodb Atlas Stream Processing
# Connection Configuration Reference **Official examples repo**: https://github.com/mongodb/ASP_example — check quickstarts, example processors, and Terraform examples. Start with quickstarts. ## Connection Capabilities — Source/Sink Reference Know what each connection type can do before creating pipelines: | Connection Type | As Source ($source) | As Sink ($merge / $emit) | Mid-Pipeline | Notes | |-----------------|---------------------|--------------------------|--------------|-------| | **Cluster** | ✅ Change streams | ✅ $merge to collections | ✅ $lookup | Change streams monitor insert/update/delete/replace operations | | **Kafka** | ✅ Topic consumer | ✅ $emit to topics | ❌ | Source MUST include `topic` field | | **Sample Stream** | ✅ Sample data | ❌ Not valid | ❌ | Testing/demo only | | **S3** | ❌ Not valid | ✅ $emit to buckets | ❌ | Sink only - use `path`, `format`, `compression` | | **Https** | ❌ Not valid | ✅ $https as sink | ✅ $https enrichment | Can be used mid-pipeline for enrichment OR as final sink stage | | **AWSLambda** | ❌ Not valid | ✅ $externalFunction (async only) | ✅ $externalFunction (sync or async) | **Sink:** `execution: "async"` required. **Mid-pipeline:** `execution: "sync"` or `"async"` | | **AWS Kinesis** | ✅ Stream consumer | ✅ $emit to streams | ❌ | Similar to Kafka pattern | | **SchemaRegistry** | ❌ Not valid | ❌ Not valid | ✅ Schema resolution | **Metadata only** - used by Kafka connections for Avro schemas | **Common connection usage mistakes to avoid:** - ❌ Using HTTPS connections as `$source` → HTTPS is for enrichment or sink only - ❌ Using `$externalFunction` as sink with `execution: "sync"` → Must use `execution: "async"` for sink stage - ❌ Forgetting change streams exist → Atlas Cluster is a powerful source, not just a sink - ❌ Using `$merge` with Kafka → Use `$emit` for Kafka sinks **$externalFunction execution modes:** - **Mid-pipeline:** Can use `execution: "sync"` (blocks until Lambda returns) or `execution: "async"` (non-blocking) - **Final sink stage:** MUST use `execution: "async"` only ## Connection Naming Best Practices **CRITICAL**: Connection names should clearly indicate their actual targets to avoid confusion and prevent writing data to wrong destinations. ### Good Naming Patterns **Match the actual target name:** - Cluster connection to "ClusterRestoreTest" → name it `cluster-restore-test` or `ClusterRestoreTest` - Cluster connection to "AtlasCluster" → name it `atlas-cluster` or `AtlasCluster` **Use descriptive names with context:** - `prod-kafka-orders` (indicates environment + service + purpose) - `dev-atlas-main` (indicates environment + service + designation) - `staging-s3-exports` (indicates environment + service + purpose) ### Bad Naming Patterns (AVOID) ❌ **Generic names that don't match targets:** - Connection "atlascluster" pointing to "ClusterRestoreTest" ← CONFUSING! - Connection "kafka" pointing to multiple different topics ← NOT SPECIFIC! ❌ **Reusing names across workspaces without context:** - "myconnection" in workspace A and workspace B with different targets ❌ **Names that don't indicate connection type:** - "connection1", "test", "temp" ← NO CONTEXT! ### Verification Workflow **Before creating processors**, always inspect your connections to verify they point where you expect: ``` 1. atlas-streams-discover → action: "list-connections" 2. atlas-streams-discover → action: "inspect-connection" for each 3. Verify connection name matches actual target (clusterName, bootstrapServers, url, etc.) 4. If mismatch exists, consider renaming or warn the user ``` See [development-workflow.md](development-workflow.md) "Pre-Deployment Connection Validation" section for the complete validation procedure. ## Important Notes - HTTPS connections are for `$https` enrichment ONLY — they are NOT valid as `$source` data sources - Store API authentication in connection settings, never hardcode in processor pipelines - AWS connections (S3, Kinesis, Lambda) require I