$changeStream (aggregation)
Definition
$changeStreamReturns a Change Stream cursor on a collection, a database, or an entire cluster. Must be used as the first stage in an aggregation pipeline.
The
$changeStreamstage has the following syntax:{ $changeStream: { allChangesForCluster: <boolean>, fullDocument: <string>, fullDocumentBeforeChange: <string>, resumeAfter: <document> showExpandedEvents: <boolean>, startAfter: <document> startAtOperationTime: <timestamp> } } ParameterDescriptionallChangesForClusterOptional: Sets whether the change stream should include all changes in the cluster. May only be opened on the
admindatabase.fullDocumentOptional: Specifies whether change notifications include a copy of the full document when modified by
updateoperations.default: Change notifications do not include the full document forupdateoperations.required: Change notifications includes a copy of the modified document as it appeared immediately after the change. If the document cannot be found, the change stream throws an error.To use this option, you must first use the
collModcommand to enable thechangeStreamPreAndPostImagesoption.New in version 6.0.
updateLookup: Change notifications includes a copy of the document modified by the change. This document is the current majority-committed document ornullif it no longer exists.whenAvailable: Change notification includes a copy of the modified document as it appeared immediately after the change ornullif the document is unavailable.To use this option, you must first use the
collModcommand to enable thechangeStreamPreAndPostImagesoption.New in version 6.0.
In the case of partial updates, the change notification also provides a description of the change.
fullDocumentBeforeChangeInclude the full document from before the change. This field accepts the following values:
off: Disables inclusion of the document from before the change.whenAvailable: Includes document from before the change. The query does not fail if the unmodified document is not available.required: Includes document from before the change. The query fails if the unmodified document is not available.
resumeAfterOptional. Specifies a resume token as the logical starting point for the change stream. Cannot be used to resume the change stream after an
invalidateevent.resumeAfteris mutually exclusive withstartAfterandstartAtOperationTime.showExpandedEventsSpecifies whether to include additional change events, such as such as DDL and index operations.
New in version 6.0.
startAfterOptional. Specifies a resume token as the logical starting point for the change stream. Unlike
resumeAfter,startAftercan resume notifications after aninvalidateevent by creating a new change stream.startAfteris mutually exclusive withresumeAfterandstartAtOperationTime.startAtOperationTimeSpecifies a time as the logical starting point for the change stream. Cannot be used with
resumeAfterorstartAfterfields.
Stable API Support
Change streams are included in Stable API V1. However, the showExpandedEvents option is not included in Stable API V1.
Examples
To create a change stream cursor using the aggregation stage, run
the aggregate command.
var cur = db.names.aggregate( [ { $changeStream: {} } ] )
To open the cursor, run cur.
When the change stream detects a change, the next() method returns
a change event notification. For example, after running cur.next(),
MongoDB returns a document similar to the following:
{ "_id": { _data: "8262E2EE54000000022B022C0100296E5A100448E5E3DD01364019AE8FE8C6859527E046645F6964006462E2EE54C8756C0D5CF6F0720004" }, "operationType": "insert", "clusterTime": Timestamp({ t: 1659039316, i: 2 }), "wallTime": ISODate("2022-07-28T20:15:16.148Z"), "fullDocument": { "_id": ObjectId("62e2ee54c8756c0d5cf6f072"), "name": "Walker Percy" }, "ns": { "db": "test", "coll": "names" }, "documentKey": { _id: ObjectId("62e2ee54c8756c0d5cf6f072") } }
To use the MongoDB .NET/C# driver to add a $changeStream stage to an aggregation
pipeline, call the ChangeStream() method on a PipelineDefinition object.
The following example creates a pipeline stage that returns a change stream cursor:
var pipeline = new EmptyPipelineDefinition<Movie>() .ChangeStream();
You can use a ChangeStreamStageOptions
object to customize the behavior of the change stream.
The following example performs the same $changeStream operation as the previous
example, but specifies the following options:
The
FullDocumentoption specifies that change notifications don't include a copy of the full document when modified by update operations.The
StartAtOperationTimeoption specifies the logical starting point for the change stream.
var changeStreamOptions = new ChangeStreamStageOptions() { FullDocument = ChangeStreamFullDocumentOption.Default, StartAtOperationTime = new BsonTimestamp(300), }; var pipeline = new EmptyPipelineDefinition<Movie>() .ChangeStream(changeStreamOptions);
Note
Prefer the Watch() Method
Where possible, use the IMongoCollection<TDocument>.Watch() method instead
of this aggregation stage. Use the ChangeStream() method
only if subsequent stages project away the resume token (_id) or if you don't
want the resulting cursor to automatically resume.
For more information on change stream notifications, see Change Events.