Docs Menu
Docs Home
/ / /
Go Driver
/

Monitor Data with Change Streams

In this guide, you can learn how to use a change stream to monitor real-time changes to your database. A change stream is a MongoDB Server feature that allows your application to subscribe to data changes on a collection, database, or deployment.

A change stream outputs new change events, providing access to real-time data changes. You can open a change stream on a collection, database, or client object.

The examples in this guide use the following Course struct as a model for documents in the courses collection:

type Course struct {
Title string
Enrollment int32
}

To run the examples in this guide, load these documents into the courses collection in the db database by using the following snippet:

coll := client.Database("db").Collection("courses")
docs := []interface{}{
Course{Title: "World Fiction", Enrollment: 35},
Course{Title: "Abstract Algebra", Enrollment: 60},
}
result, err := coll.InsertMany(context.TODO(), docs)

Tip

Nonexistent Databases and Collections

If the necessary database and collection don't exist when you perform a write operation, the server implicitly creates them.

Each document contains a description of a university course that includes the course title and maximum enrollment, corresponding to the title and enrollment fields in each document.

Note

Each example output shows truncated _data, clusterTime, and ObjectID values because the driver generates them uniquely.

You can watch for changes in MongoDB by using the Watch() method on the following objects:

  • Collection: Monitor changes to a specific collection

  • Database: Monitor changes to all collections in a database

  • MongoClient: Monitor changes across all databases

For each object, the Watch() method opens a change stream to emit change event documents when they occur.

The Watch() method requires a context parameter and a pipeline parameter. To return all changes, pass in an empty Pipeline object.

The Watch() method optionally takes an aggregation pipeline which consists of an array of aggregation stages as the first parameter. The aggregation stages filter and transform the change events.

The following example opens a change stream on the courses collection and prints the change stream events as they occur:

changeStream, err := coll.Watch(context.TODO(), mongo.Pipeline{})
if err != nil {
panic(err)
}
defer changeStream.Close(context.TODO())
// Iterates over the cursor to print the change stream events
for changeStream.Next(context.TODO()) {
fmt.Println(changeStream.Current)
}

If you modify the courses collection in a separate program or shell, this code prints your changes as they occur. Inserting a document with a title value of "Advanced Screenwriting" and an enrollment value of 20 results in the following change event:

map[_id:map[_data:...] clusterTime: {...}
documentKey:map[_id:ObjectID("...")] fullDocument:map[_id:ObjectID("...")
enrollment:20 title:Advanced Screenwriting] ns: map[coll:courses db:db]
operationType:insert]

To view a fully runnable example, see Open a Change Stream Example: Full File section in this guide.

Use the pipeline parameter to modify the change stream output. This parameter allows you to only watch for certain change events. Format the pipeline parameter as an array of documents, with each document representing an aggregation stage.

You can use the following pipeline stages in this parameter:

  • $addFields

  • $match

  • $project

  • $replaceRoot

  • $replaceWith

  • $redact

  • $set

  • $unset

The following example opens a change stream on the db database but only watches for new delete operations:

db := client.Database("db")
pipeline := bson.D{{"$match", bson.D{{"operationType", "delete"}}}}
changeStream, err := db.Watch(context.TODO(), mongo.Pipeline{pipeline})
if err != nil {
panic(err)
}
defer changeStream.Close(context.TODO())
// Iterates over the cursor to print the delete operation change events
for changeStream.Next(context.TODO()) {
fmt.Println(changeStream.Current)
}

Note

The Watch() method was called on the db database, so the code outputs new delete operations on any collection within this database.

Note

Example Setup

This example connects to an instance of MongoDB by using a connection URI. To learn more about connecting to your MongoDB instance, see the Create a MongoClient guide. This example also uses the restaurants collection in the sample_restaurants database included in the Atlas sample datasets. You can load them into your database on the free tier of MongoDB Atlas by following the Get Started with Atlas Guide.

The following example opens a change stream on the restaurants collection and prints inserted documents:

coll := client.Database("sample_restaurants").Collection("restaurants")
// Creates instructions to watch for insert operations
pipeline := mongo.Pipeline{bson.D{{"$match", bson.D{{"operationType", "insert"}}}}}
// Creates a change stream that receives change events
cs, err := coll.Watch(context.TODO(), pipeline)
if err != nil {
panic(err)
}
defer cs.Close(context.TODO())
fmt.Println("Waiting For Change Events. Insert something in MongoDB!")
// Prints a message each time the change stream receives an event
for cs.Next(context.TODO()) {
var event bson.M
if err := cs.Decode(&event); err != nil {
panic(err)
}
output, err := json.MarshalIndent(event["fullDocument"], "", " ")
if err != nil {
panic(err)
}
fmt.Printf("%s\n", output)
}
if err := cs.Err(); err != nil {
panic(err)
}

View a fully runnable example.

After you run the full example, run the Insert a Document full file example in a different shell. When you run the insert operation, you see output similar to the following:

// results truncated {
"_id": ..., "name": "8282", "cuisine": "Korean"
}

Important

When you finish working with this usage example, make sure to shut it down by closing your terminal.

Use the options parameter to modify the behavior of the Watch() method.

You can specify the following options for the Watch() method:

  • ResumeAfter

  • StartAfter

  • FullDocument

  • FullDocumentBeforeChange

  • BatchSize

  • MaxAwaitTime

  • Collation

  • StartAtOperationTime

  • Comment

  • ShowExpandedEvents

  • Custom

  • CustomPipeline

For more information on these options, see the db.collection.watch() entry in the Server manual.

When you perform any CRUD operation on a collection, by default, the corresponding change event document contains only the delta of the fields modified by the operation. You can see the full document before and after a change, in addition to the delta, by specifying settings in the options parameter of the Watch() method.

If you want to see a document's post-image, the full version of the document after a change, set the FullDocument field of the options parameter to one of the following values:

  • UpdateLookup: The change event document includes a copy of the entire changed document.

  • WhenAvailable: The change event document includes a post-image of the modified document for change events if the post-image is available.

  • Required: The output is the same as for WhenAvailable, but the driver raises a server-side error if the post-image is not available.

If you want to see a document's pre-image, the full version of the document before a change, set the FullDocumentBeforeChange field of the options parameter to one of the following values:

  • WhenAvailable: The change event document includes a pre-image of the modified document for change events if the pre-image is available.

  • Required: The output is the same as for WhenAvailable, but the driver raises a server-side error if the pre-image is not available.

Important

To access document pre- and post-images, you must enable changeStreamPreAndPostImages for the collection. See the Change Streams section of the collMod Database Command guide in the MongoDB Server manual for instructions and more information.

Note

There is no pre-image for an inserted document and no post-image for a deleted document.

The following example calls the Watch() method on the courses collection. It specifies a value for the FullDocument field of the options parameter to output a copy of the entire modified document, instead of only the changed fields:

opts := options.ChangeStream().SetFullDocument(options.UpdateLookup)
changeStream, err := coll.Watch(context.TODO(), mongo.Pipeline{}, opts)
if err != nil {
panic(err)
}
defer changeStream.Close(context.TODO())
for changeStream.Next(context.TODO()) {
fmt.Println(changeStream.Current)
}

Updating the enrollment value of the document with the title of "World Fiction" from 35 to 30 results in the following change event:

{"_id": {"_data": "..."},"operationType": "update","clusterTime":
{"$timestamp": {"t":"...","i":"..."}},"fullDocument": {"_id":
{"$oid":"..."},"title": "World Fiction","enrollment": {"$numberInt":"30"}},
"ns": {"db": "db","coll": "courses"},"documentKey": {"_id": {"$oid":"..."}},
"updateDescription": {"updatedFields": {"enrollment": {"$numberInt":"30"}},
"removedFields": [],"truncatedArrays": []}}

Without specifying the FullDocument option, the same update operation no longer outputs the "fullDocument" value in the change event document.

For more information on change streams, see Change Streams in the Server manual.

To learn more about the Watch() method, see the following API documentation:

Back

Logging

On this page