API Reference

This page lists all constructors and public methods of the three main classes.

Stability

Methods marked ✅ Stable are part of the documented public API and follow semantic versioning — they will not change in an incompatible way without a major version bump.

Methods without that mark are public but not yet stable: they exist and work, but their signatures or behaviour may change in a minor release.


EventStore

EventStore is the main entry point of the library.

import { EventStore } from 'event-storage';

EventStore extends Node's EventEmitter. After construction it emits a 'ready' event when it is safe to read and write.

Constructor

new EventStore([storeName], [config])
Parameter Type Default Description
storeName string 'eventstore' Prefix used for all storage files.
config.storageDirectory string './data' Directory where data files are stored.
config.streamsDirectory string '{storageDirectory}/streams' Directory for stream index files.
config.storageConfig object {} Options forwarded to the underlying Storage backend.
config.readOnly boolean false Mount the store in read-only mode.
config.streamMetadata object\|function Metadata stored once per stream partition at creation time. Either a { streamName: metaObj } map or a function (streamName) => object.

Methods

close() ✅ Stable

eventstore.close()

Close the event store and free all resources.


commit(streamName, events, [expectedVersion], [metadata], [callback]) ✅ Stable

eventstore.commit(streamName, events [, expectedVersion] [, metadata] [, callback])

Append one or more events to a stream.

Parameter Type Default Description
streamName string Name of the target stream.
events object\|Array<object> Event or array of events to commit.
expectedVersion number ExpectedVersion.Any Optimistic concurrency version check. Use ExpectedVersion.Any (-1) to skip, ExpectedVersion.EmptyStream (0) for a new stream, or any positive integer for an exact version match.
metadata object {} Additional metadata merged into every event's metadata envelope.
callback function(commit) Called after all events have been persisted.

Throws OptimisticConcurrencyError when the stream is not at expectedVersion.


getStreamVersion(streamName) ✅ Stable

eventstore.getStreamVersion(streamName) → number

Return the current version (event count) of a stream, or -1 if the stream does not exist.


getEventStream(streamName, [minRevision], [maxRevision]) ✅ Stable

eventstore.getEventStream(streamName [, minRevision [, maxRevision]]) → EventStream | false

Return an EventStream for the named stream, or false if no such stream exists. minRevision and maxRevision are 1-based and inclusive; negative values count from the end.


getAllEvents([minRevision], [maxRevision]) ✅ Stable

eventstore.getAllEvents([minRevision [, maxRevision]]) → EventStream

Return an EventStream covering every event in the store across all streams. Equivalent to getEventStream('_all', ...).


getEventStreamForCategory(categoryName, [minRevision], [maxRevision]) ✅ Stable

eventstore.getEventStreamForCategory(categoryName [, minRevision [, maxRevision]]) → EventStream

Return a joined EventStream for all streams whose names begin with categoryName + '-'. If a dedicated physical stream named categoryName already exists, that stream is returned directly.

Throws if no streams for the category exist.


createEventStream(streamName, matcher) ✅ Stable

eventstore.createEventStream(streamName, matcher) → EventStream

Create a new named stream backed by an index that includes every event matching matcher. matcher can be a plain object (property equality) or a predicate (event) => boolean.

Returns an EventStream over the pre-existing matching events.

Throws if a stream with that name already exists, or if the store is read-only.


deleteEventStream(streamName) ✅ Stable

eventstore.deleteEventStream(streamName)

Delete the index for the named stream. Does nothing if the stream does not exist. Existing events are not removed; the index will be rebuilt on the next write to that stream name.

Throws if the store is read-only.


closeEventStream(streamName) ✅ Stable

eventstore.closeEventStream(streamName)

Permanently seal a stream so that no new events are indexed into it. The stream remains readable; any attempt to write to it will throw. The closure is persisted by renaming the index file (e.g. stream-foo.indexstream-foo.closed.index).

Throws if the store is read-only, the stream does not exist, or the stream is already closed.


preCommit(hook) ✅ Stable

eventstore.preCommit(hook)

Register a hook called synchronously before each event is persisted. The hook receives (event, partitionMetadata) and may throw to abort the write. Equivalent to eventstore.on('preCommit', hook).

Throws if the store is read-only.


preRead(hook) ✅ Stable

eventstore.preRead(hook)

Register a hook called synchronously before each event is read. The hook receives (position, partitionMetadata) and may throw to abort the read. Equivalent to eventstore.on('preRead', hook).


length ✅ Stable

eventstore.length → number

Total number of events in the store.


on(event, listener) / addListener(event, listener)

eventstore.on(event, listener) → this

Register an event listener. For 'preCommit' and 'preRead' events the listener is forwarded to the underlying storage. All other events are handled by the standard EventEmitter.


once(event, listener)

eventstore.once(event, listener) → this

Like on() but the listener is invoked at most once.


off(event, listener) / removeListener(event, listener)

eventstore.off(event, listener) → this

Remove a previously registered listener.


fromStreams(streamName, streamNames, [minRevision], [maxRevision])

eventstore.fromStreams(streamName, streamNames [, minRevision [, maxRevision]]) → EventStream

Create a virtual EventStream by joining the listed streams in sequence-number order.

Parameter Type Description
streamName string Transient name for the joined stream.
streamNames Array<string> Names of the streams to join.

Throws if any named stream does not exist.


getConsumer(streamName, identifier, [initialState], [since])

eventstore.getConsumer(streamName, identifier [, initialState [, since]]) → Consumer

Return a durable Consumer that tracks its position across process restarts.

Parameter Type Default Description
streamName string Stream to consume, or '_all' for all events.
identifier string Unique consumer name (used for the state file).
initialState object {} Initial consumer state.
since number 0 Stream revision to start from.

scanConsumers(callback)

eventstore.scanConsumers(callback)

Asynchronously scan all consumer state files and return their identifiers.

callback signature: (error, consumers: Array<string>).


Events emitted

Event Payload Description
'ready' Emitted once the store is open and indexes are consistent.
'commit' commit Emitted after each successful commit() call.
'stream-created' streamName Emitted when a new stream index is created.
'stream-available' streamName Emitted when an existing stream is discovered on disk.
'stream-deleted' streamName Emitted after deleteEventStream().
'stream-closed' streamName Emitted after closeEventStream().
'unfinished-commit' lastEvent Emitted when a partial commit is detected on open.

Static properties

Property Value Description
EventStore.ExpectedVersion.Any -1 Skip version check.
EventStore.ExpectedVersion.EmptyStream 0 Assert the stream is new.
EventStore.OptimisticConcurrencyError Error Thrown when an expected-version check fails.
EventStore.LOCK_THROW constant Throw if storage lock is held (default).
EventStore.LOCK_RECLAIM constant Forcefully reclaim storage lock on open.

EventStream

EventStream is returned by EventStore.getEventStream() and related methods.

import { EventStream } from 'event-storage';

EventStream extends Node's stream.Readable (in objectMode).

Constructor

new EventStream(name, eventStore, [minRevision], [maxRevision])
Parameter Type Default Description
name string Stream name (must exist in eventStore).
eventStore EventStore The owning EventStore.
minRevision number 1 First event revision to include (1-based, inclusive). Negative values count from the end.
maxRevision number -1 Last event revision to include (inclusive). -1 means the last event.

In practice, EventStream instances are obtained from EventStore methods rather than constructed directly.

Methods

from(revision) ✅ Stable

stream.from(revision) → this

Set the starting revision (1-based, inclusive; negative counts from the end).


until(revision) ✅ Stable

stream.until(revision) → this

Set the ending revision (inclusive; negative counts from the end).


fromStart() ✅ Stable

stream.fromStart() → this

Reset the start position to the first event in the stream.


fromEnd() ✅ Stable

stream.fromEnd() → this

Set the start position to the last event in the stream.


toEnd() ✅ Stable

stream.toEnd() → this

Set the end position to the last event in the stream.


toStart() ✅ Stable

stream.toStart() → this

Set the end position to the first event in the stream.


first(amount) ✅ Stable

stream.first(amount) → this

Limit the stream to the first amount events in chronological order.


last(amount) ✅ Stable

stream.last(amount) → this

Limit the stream to the last amount events in chronological order.


forwards([amount]) ✅ Stable

stream.forwards([amount]) → this

Read the current range in forward (chronological) order. If amount is given, advance the end of the range by amount events from the current start.


backwards([amount]) ✅ Stable

stream.backwards([amount]) → this

Read the current range in backward (reverse-chronological) order. If amount is given, extend the range amount events backwards from the current start.


forEach(callback) ✅ Stable

stream.forEach(callback)

Iterate over every event, providing access to both the event payload and its storage metadata.

callback signature: (event, metadata, streamName).


previous(amount)

stream.previous(amount) → this

Move the end of the range amount events before the current start (used internally by last() / backwards()).


following(amount)

stream.following(amount) → this

Move the end of the range amount events after the current start (used internally by first() / forwards()).


reverse()

stream.reverse() → this

Swap minRevision and maxRevision, reversing the read direction.


events

stream.events → Array<object>

Return all events in the current range as an array (event payloads only). The result is cached after the first access; call reset() to clear the cache.


[Symbol.iterator]()

for (const event of stream) { … }

Make the stream iterable in a for…of loop, yielding event payloads.


reset()

stream.reset() → this

Reset the internal iterator and event cache so the stream can be iterated again from scratch.


next()

stream.next() → { payload, metadata, stream } | false

Return the next event object from the iterator, or false when the stream is exhausted.


Storage

Storage is the low-level append-only document store used internally by EventStore. It can be used directly for advanced use cases.

import { Storage } from 'event-storage';

// Writable storage (default export)
const store = new Storage('mystore', { dataDirectory: './data' });
store.open();

// Read-only storage
const reader = new Storage.ReadOnly('mystore', { dataDirectory: './data' });
reader.open();

Storage (writable) extends Storage.ReadOnly which in turn extends EventEmitter.

Constructor — Storage (writable)

new Storage([storageName], [config])

Inherits all options from Storage.ReadOnly plus:

Parameter Type Default Description
storageName string 'storage' Base name for all storage files.
config.dataDirectory string '.' Directory for data files.
config.indexDirectory string config.dataDirectory Directory for index files.
config.indexFile string '{storageName}.index' File name of the primary index.
config.serializer object JSON Object with serialize(doc) and deserialize(data) methods.
config.readBufferSize number 4096 Read buffer size in bytes.
config.writeBufferSize number 16384 Write buffer size in bytes.
config.maxWriteBufferDocuments number 0 (unlimited) Maximum number of buffered documents before an automatic flush.
config.syncOnFlush boolean false Call fsync on flush for strict durability.
config.dirtyReads boolean true Allow reading documents that are still in the write buffer.
config.partitioner function(doc, seqNum) single partition Returns the partition name for a document.
config.indexOptions object {} Options forwarded to every index on construction.
config.hmacSecret string '' Secret used to verify stored matcher HMACs.
config.metadata object\|function(partitionName) {} Metadata written to each partition header.
config.lock LOCK_THROW\|LOCK_RECLAIM LOCK_THROW How to handle an existing lock file on open.

Constructor — Storage.ReadOnly

new Storage.ReadOnly([storageName], [config])

Accepts the same options as Storage (writable) except for write-specific fields (writeBufferSize, maxWriteBufferDocuments, syncOnFlush, dirtyReads, partitioner, lock). In addition to the read API it watches the data directory for new partitions and index files created by a concurrent writer.

Shared methods (readable and writable)

open() ✅ Stable

storage.open() → boolean

Open the storage and all loaded indexes. Emits 'opened'. The writable variant also acquires a lock; throws StorageLockedError if the storage is already locked by another process (unless config.lock = LOCK_RECLAIM).


close() ✅ Stable

storage.close()

Close the storage and release all resources (file handles, indexes, write buffers, file watcher). Emits 'closed'.


preRead(hook) ✅ Stable

storage.preRead(hook)

Register a synchronous hook called before every document read. The hook receives (position, partitionMetadata) and may throw to abort the read. Equivalent to storage.on('preRead', hook).


read(number, [index]) ✅ Stable

storage.read(number [, index]) → object | false

Read a single document by its 1-based position inside the given index (or the primary index when omitted). Returns false when the position is out of range.


readRange(from, [until], [index]) ✅ Stable

storage.readRange(from [, until [, index]]) → Generator<object>

Return a generator that yields documents in the range [from, until] (1-based, inclusive). Negative values count from the end. Pass index = false to bypass the primary index and iterate partitions directly in sequence-number order.

If from > until (after normalisation) the range is yielded in reverse order.


openReadonlyIndex(name) ✅ Stable

storage.openReadonlyIndex(name) → ReadableIndex

Open an existing index file that carries a status marker in its name (e.g. stream-foo.closed) without registering it in the secondary-index write path.

Throws if the index file does not exist.


openIndex(name, [matcher]) ✅ Stable

storage.openIndex(name [, matcher]) → ReadableIndex

Open an existing secondary index for reading. If matcher is provided its HMAC is validated against the value stored in the index metadata.

Throws if the index does not exist or if the HMAC validation fails.


length

storage.length → number

Number of documents in the primary index.


Writable-only methods

preCommit(hook) ✅ Stable

storage.preCommit(hook)

Register a synchronous hook called before each document is written. The hook receives (document, partitionMetadata) and may throw to abort the write. Equivalent to storage.on('preCommit', hook).


write(document, [callback]) ✅ Stable

storage.write(document [, callback]) → number

Append a document to storage and return its 1-based sequence number.

callback is invoked once the document (and its index entry) has been flushed to disk.


ensureIndex(name, [matcher]) ✅ Stable

storage.ensureIndex(name [, matcher]) → ReadableIndex

Return an existing secondary index by name, or create it if it does not exist. When creating, matcher is required and can be either a property-equality object or a predicate (document) => boolean.

Throws if the index doesn't exist and no matcher was given.


flush() ✅ Stable

storage.flush() → boolean

Flush all write buffers for all partitions and indexes to disk synchronously. Returns true if any data was actually written.


reindex([fromSequenceNumber]) ✅ Stable

storage.reindex([fromSequenceNumber])

Rebuild the primary index and all currently loaded secondary indexes by scanning partition files directly, starting from fromSequenceNumber (default 0 = full rebuild).


unlock()

storage.unlock()

Forcefully remove the lock file, regardless of which process created it. Only call this when you are certain no other process has the storage open for writing.


lock()

storage.lock() → boolean

Attempt to acquire the write lock. Returns false if this instance already holds the lock, and throws StorageLockedError if another process holds it. Called automatically by open().


truncate(after)

storage.truncate(after)

Truncate all partitions and indexes after the given 1-based sequence number. Negative values count from the end. Used internally for crash recovery; use with care.


checkTornWrites()

storage.checkTornWrites()

Scan every partition for torn writes (documents that extend beyond the end of their file), repair the affected partitions, and rebuild any lagging index entries. Called automatically by open() / unlock() when a previous writer did not close cleanly.


forEachDistinctPartitionOf(entries, iterationHandler)

storage.forEachDistinctPartitionOf(entries, iterationHandler)

Iterate the unique partitions referenced by an iterable list of index entries, invoking iterationHandler(entry) once per distinct partition.


ReadOnly-only methods

storageFilesFilter(filename)

storage.storageFilesFilter(filename) → boolean

Return true when filename belongs to this storage instance (used internally as the file-watcher filter callback).


Events emitted by Storage

Event Payload Description
'opened' Emitted after open() completes.
'closed' Emitted after close() completes.
'ready' Emitted after open() on the writable storage.
'wrote' document, entry, indexPosition Emitted after each document is indexed.
'index-created' name Emitted when a new secondary index is created.
'index-add' name, length, document Emitted when a document is added to a secondary index.
'partition-created' id Emitted when a new partition file is opened.
'truncate' prevLength, newLength Emitted (read-only storage) when the primary index is truncated by the writer.