Event Streams
An event stream is an ordered sequence of events that can be iterated. Every stream is backed by a lightweight index file and implements the JavaScript Iterable protocol as well as the Node.js ReadableStream interface.
Writing Events (Commits)
Events are appended to a named stream with commit. The stream is created automatically the first time you write to it.
eventstore.commit('user-123', [
{ type: 'UserRegistered', email: 'alice@example.com' },
{ type: 'ProfileCompleted', name: 'Alice' }
], EventStore.ExpectedVersion.EmptyStream, (err) => {
if (err) throw err;
console.log('Committed!');
});
A single commit call is the unit of atomicity: either all events in the array are persisted or none of them are.
Optimistic Concurrency
When multiple sources produce events concurrently (e.g. multiple HTTP requests for the same aggregate), you need optimistic concurrency control to prevent conflicting writes.
The pattern is:
- Read the stream and note its current
version. - Build new events based on the current state.
- Commit with
expectedVersionset to the version noted in step 1.
If another writer has committed in the meantime the version will have advanced and an OptimisticConcurrencyError will be thrown. You then replay state and retry.
const model = new MyAggregateModel();
const stream = eventstore.getEventStream('order-42');
stream.forEach((event) => model.apply(event));
const expectedVersion = stream.version;
// ... later, handling an incoming command:
const newEvents = model.handle(command);
try {
eventstore.commit('order-42', newEvents, expectedVersion, () => {
// success
});
} catch (e) {
if (e instanceof EventStore.OptimisticConcurrencyError) {
// Replay and retry, or return a conflict error to the caller
}
}
expectedVersion values:
| Value | Meaning |
|---|---|
EventStore.ExpectedVersion.Any (-1) |
No check — always succeeds (default). |
EventStore.ExpectedVersion.EmptyStream (0) |
Stream must not exist yet. |
| Positive integer | Stream must be at exactly that version. |
Reading Streams
Basic Iteration
const stream = eventstore.getEventStream('user-123');
for (const event of stream) {
console.log(event.type);
}
// Or with forEach, which also gives you metadata:
stream.forEach((event, metadata, streamName) => {
console.log(event.type, metadata.streamVersion);
});
Raw Mode (NDJSON for Network Streaming)
Use raw mode when you want to stream events directly over HTTP/TCP without deserializing and re-serializing each document in userland.
const stream = eventstore.getEventStream(
'user-123',
1,
-1,
{ payload: { type: 'UserRegistered' } },
true
);
// Each chunk is one compact JSON document plus "\n"
stream.pipe(httpResponse);
Raw mode is available on all stream-reading APIs (getEventStream, getAllEvents, fromStreams, getEventStreamForCategory) and also on query().
Matcher semantics differ by mode:
| Mode | Function matcher | Object matcher |
|---|---|---|
raw=false (default) |
(payload, metadata) => boolean |
Matched against { stream, payload, metadata } |
raw=true |
(buffer) => boolean |
Byte-level matcher against compact JSON bytes |
Object Matcher Syntax
Object matchers use the same shape in both modes. In object mode they are evaluated against
{ stream, payload, metadata }. In raw mode the same matcher is compiled into byte-level checks.
Supported forms:
- Scalar equality
javascript
{ payload: { type: 'OrderPlaced' } }
- Nested object matching
javascript
{ metadata: { tenantId: 'acme' } }
- Array values with OR semantics
javascript
{ payload: { type: ['OrderPlaced', 'OrderCancelled'] } }
- Scalar comparison operators
javascript
{ payload: { amount: { $gte: 100, $lt: 1000 } } }
Supported operators are $gt, $gte, $lt, $lte, $eq, and $ne.
Multiple operators on the same field are combined with AND semantics.
$eq is equivalent to plain equality, so prefer the simpler form when possible:
{ payload: { type: 'OrderPlaced' } }
// equivalent to:
{ payload: { type: { $eq: 'OrderPlaced' } } }
Operator matching is intended for scalar values (string, number, boolean, null).
For arrays, objects, or custom raw encodings, use plain equality or a function matcher.
Important: raw object matchers require the default compact JSON serializer format. If you use a custom serializer, use a raw function matcher instead.
Revision Ranges
Retrieve only a slice of the stream by passing minRevision and maxRevision:
const all = eventstore.getEventStream('user-123', 1, -1); // all events
const first50 = eventstore.getEventStream('user-123', 1, 50); // events 1–50
const last10 = eventstore.getEventStream('user-123', -10, -1); // last 10 events
const reversed = eventstore.getEventStream('user-123', -1, -10); // last 10, newest first
Passing a negative maxRevision that is less than minRevision causes the stream to iterate in reverse order.
Fluent API
Since version 0.9 there is a more readable fluent builder:
// All events, newest first
eventstore.getEventStream('user-123').backwards();
// First 10 events
eventstore.getEventStream('user-123').first(10);
// equivalent:
eventstore.getEventStream('user-123').fromStart().forwards(10);
// Last 10 events (oldest first)
eventstore.getEventStream('user-123').last(10);
// Last 10 events, newest first
eventstore.getEventStream('user-123').last(10).backwards();
// Events from revision 16 onwards
eventstore.getEventStream('user-123').from(16).toEnd();
// Events from the start up to revision 10 (inclusive), newest first
eventstore.getEventStream('user-123').from(10).toStart();
// 10 events starting at revision 5
eventstore.getEventStream('user-123').from(5).forwards(10);
// Events between revisions 9 and 5 (reverse)
eventstore.getEventStream('user-123').from(9).until(5);
Note: The revision boundary is fixed when you call
getEventStream(). Events appended after that call will not appear in the iterator. If you need live updates, use Consumers.
Creating Additional Streams
You can create derived read streams that contain only a filtered subset of another stream, or even events from multiple streams combined.
// Only events whose type is 'FooHappened' or 'BarHappened'
const projectionStream = eventstore.createStream(
'my-projection-stream',
(event) => ['FooHappened', 'BarHappened'].includes(event.type)
);
for (const event of projectionStream) {
// ...
}
The matcher function is persisted in the index file so you do not need to re-specify it when reopening the store. See Security for how the matcher is protected against tampering.
Joining Streams
Iterate events from multiple streams in their global insertion order using fromStreams:
const joined = eventstore.fromStreams(
'transient-join', // A temporary name for this join — not persisted
['order-42', 'order-99']
);
for (const event of joined) {
// Events from both streams, ordered by when they were written globally
}
The result is not persisted and cannot be used with consumers. For frequently-needed joins, create a permanent derived stream with createStream instead.
Stream Categories
Flat category streams (category-id)
Name your streams as <category>-<identity> (e.g. user-123, user-456) to take advantage of category-level queries:
eventstore.commit('user-' + user.id, [new UserRegistered(user.id, user.email)]);
// ...later, iterate all user events across all user instances:
const allUsersStream = eventstore.getEventStreamForCategory('user');
for (const event of allUsersStream) {
// events from user-1, user-2, user-42, ... in global order
}
If you already created a dedicated stream for the category (e.g. via createStream), that stream is returned directly.
This layout stores every stream as a flat file in the data directory:
data/
eventstore.user-1
eventstore.user-2
eventstore.user-42
…
streams/
eventstore.stream-user-1.index
eventstore.stream-user-2.index
eventstore.stream-user-42.index
…
This works well for small-to-medium numbers of entity instances. When the number of streams grows into the hundreds of thousands or millions (think users on a large platform), the flat layout can degrade directory-listing performance, because most operating systems slow down significantly when a single directory contains very large numbers of files.
Hierarchical category streams (category/id)
For large-scale deployments, use a slash-separated stream name to organize streams into a directory hierarchy on disk:
eventstore.commit('user/' + user.id, [new UserRegistered(user.id, user.email)]);
This maps to:
data/
eventstore.user/
1
2
42
…
streams/
eventstore.stream-user/
1.index
2.index
42.index
…
Category queries work the same way — pass any prefix up to (but not including) the last separator. The query returns every stream whose name starts with that prefix followed by either - or /:
// All user streams, regardless of depth
const allUsersStream = eventstore.getEventStreamForCategory('user');
// Narrowed to a sub-category (streams starting with 'user/a3/')
const shardStream = eventstore.getEventStreamForCategory('user/a3');
// Narrowed further to a two-level sub-category (streams starting with 'user/a3/f7/')
const leafStream = eventstore.getEventStreamForCategory('user/a3/f7');
getEventStreamForCategory unions both layouts: it returns events from all matching streams in global insertion order, regardless of whether they use the dash or slash convention.
Hash-based sharding for very large entity populations
When millions of entity instances exist (e.g., users on a large platform), even a single subdirectory can become oversized. A two-level hash prefix distributes streams across thousands of small, balanced directories:
function streamName(entityType, id) {
// Two hex chars → 256 × 256 = 65 536 shards maximum
const hex = id.toString(16).padStart(8, '0');
const shard1 = hex.slice(0, 2); // e.g. "a3"
const shard2 = hex.slice(2, 4); // e.g. "f7"
return `${entityType}/${shard1}/${shard2}/${id}`;
}
// Stream for user 12345678 → "user/00/bc/614e" (actually padded hex of 12345678)
eventstore.commit(streamName('user', user.id), [
new UserRegistered(user.id, user.email)
]);
On disk this looks like:
data/
eventstore.user/
00/
00/
1
2
01/
257
…
a3/
f7/
12345678
…
To read a specific user's stream you compose the same name:
const userStream = eventstore.getEventStream(streamName('user', user.id));
Sub-category queries work at any depth — pass the shared prefix to scope the result set:
// All users whose ID hashes to shard a3/f7
const leafStream = eventstore.getEventStreamForCategory('user/a3/f7');
// All users in the a3 top-level shard
const shardStream = eventstore.getEventStreamForCategory('user/a3');
// All user events across every shard
const allUsersStream = eventstore.getEventStreamForCategory('user');
Choosing a hash depth
| Estimated entity count | Recommended depth | Max streams per leaf dir |
|---|---|---|
| Up to ~100 k | 1 level (type/XX/id) |
~390 |
| Up to ~25 M | 2 levels (type/XX/YY/id) |
~1 526 |
Two hex characters per level gives 256 shards per level. Adjust by using more or fewer hex characters (e.g. 3 chars = 4 096 shards/level) to match the expected entity population. Additional nesting levels are structurally supported but have not been benchmarked at very large entity counts.
UUID-keyed entities
For entities identified by UUID v4, the first characters already have high entropy and can serve directly as the shard prefix:
function userStream(uuid) {
// "a3f7c2d1-…" → "user/a3/f7/a3f7c2d1-…"
return `user/${uuid.slice(0, 2)}/${uuid.slice(2, 4)}/${uuid}`;
}
Event Metadata
Every event carries storage-level metadata alongside the event payload. Access it through forEach:
stream.forEach((event, metadata, streamName) => {
console.log(metadata);
// {
// commitId: '…', unique ID for the whole commit
// committedAt: 1700000000000, ms timestamp
// commitVersion: 1, sequence number within the commit (1-based)
// commitSize: 2, total events in the commit
// streamVersion: 5 version of this event within the stream
// }
});
You can also pass extra metadata through when committing, and it will be merged into this object:
eventstore.commit('my-stream', [{ type: 'Foo' }], EventStore.ExpectedVersion.Any, () => {}, {
causationId: 'cmd-123',
correlationId: 'req-456'
});