Operator descriptions
taken from here
most.js API- Reading these docs
- API Notes
- Creating streams
- Handling errors
- Transforming streams
- Filtering streams
- Transducer support
- Slicing streams
- Looping
- Adapting fluent APIs
- Consuming streams
- Combining streams
- Combining higher order streams
- Awaiting promises
- awaitPromises, alias await
- Rate limiting streams
- Delaying streams
- Sharing stream
Notation
You'll see diagrams like the following:
These are timeline diagrams that try to give a simple, representative notion of how a stream behaves over time. Time proceeds from left to right, using letters and symbols to indicate certain things:
-
- an instant in time where no event occurs- letters (a,b,c,d,etc) - an event at an instant in time
|
- end of streamX
- an error occurred at an instant in time>
- stream continues infinitely- Typically,
>
means you can assume that a stream will continue to repeat some common pattern infinitely
- Typically,
Examples
stream: a|
A stream that emits a
and then ends immediately.
stream: a-b---|
A stream that emits a
, then b
, and some time later ends.
stream: a-b-X
A stream that emits a
, then b
, then fails.
stream: abc-def->
A stream that emits a
, then b
, then c
, then nothing, then d
, then e
, then f
, and then continues infinitely.
Most.js implements a subset of the draft ES Observable proposal:
stream[Symbol.observable]() -> Observable
returns a compatible observable with asubscribe
method that other implementations can consume.most.from(observable) -> Stream
coerces a compliantobservable
(one that provides[Symbol.observable]()
) to a most.js stream.stream.forEach(f) -> Promise
is fully compatible with the draft ES ObservableforEach
API.stream.subscribe(observer) -> Subscription
subscribes to a most.js Stream using the draft ES Observablesubscribe
API.
This allows most.js to interoperate seamlessly with other implementations, such as RxJS 5, and Kefir.
Consuming most.js streams with other libraries
Consult the documentation of other libraries for specifics. Any functions and methods that accept draft ES Observables should accept most.js Streams seamlessly.
Consuming draft ES Observables with most.js
Use most.from
to coerce any observable to a most.js stream:
You can use most.from
in other creative ways as well:
A similar approach works with other higher order operations such as join
and switch
.
Or with merge, combine, etc. by coercing first
Creating streams
most.just
Alias: most.of
most.just(x) -> Stream
most.of(x) -> Stream
Create a stream containing only x.
most.fromPromise
most.fromPromise(promise) -> Stream
Create a stream containing the outcome of a promise. If the promise fulfills, the stream will contain the promise's value. If the promise rejects, the stream will be in an error state with the promise's rejection reason as its error. See recoverWith for error recovery.
most.from
most.from(Iterable | Observable) -> Stream
Create a stream containing all items from an Iterable or Observable.
The observable must provide minimal draft ES observable compliance as per the es-observable draft: it must have a [Symbol.observable]()
method that returns an object with a well-behaved .subscribe()
method.
The iterable can be an Array, Array-like, or anything that supports the iterable protocol or iterator protocol, such as a generator. Providing a finite iterable, such as an Array, creates a finite stream. Providing an infinite iterable, such as an infinite generator, creates an infinite stream.
Note: from
will fail fast by throwing a TypeError
synchronously when passed a value that is not an Iterable
, Iterator
, or Observable
. This indicates an invalid use of from
, which should be fixed/prevented rather than handled at runtime.
most.periodic
most.periodic(period) -> Stream
most.periodic(period, x) -> Stream
(deprecated)
Note: periodic's second argument (x
) is deprecated. To create a periodic stream with a specific value use constant(x, periodic(period))
Create an infinite stream containing events that arrive every period
milliseconds, and whose value is undefined
.
most.empty
most.empty() -> Stream
Create an already-ended stream containing no events.
most.never
most.never() -> Stream
Create a stream that contains no events and never ends.
most.iterate
most.iterate(f, initial) -> Stream
Build an infinite stream by computing successive items iteratively. Conceptually, the stream will contain: [initial, f(initial), f(f(initial)), ...]
The iterating function may return a promise. This allows most.iterate
to be used to build asynchronous streams of future values. For example:
most.unfold
most.unfold(f, seed) -> Stream
Build a stream by computing successive items. Whereas reduce
tears down a stream to a final value, unfold
builds up a stream from a seed value.
The unfolding function accepts a seed value and must return a tuple: {value:*, seed:*, done:boolean}
, or a promise for a tuple. Returning a promise allows most.unfold
to be used to build asynchronous streams of future values.
tuple.value
will be emitted as an event.tuple.seed
will be passed to the next invocation of the unfolding function.tuple.done
can be used to stop unfolding. Whentuple.done == true
, unfolding will stop. Additionally, whentuple.done == true
:tuple.value
(deprecated) will be used as the stream's end signal value. In future versions,tuple.value
will be ignored whentuple.done
istrue
tuple.seed
will be ignored
Note that if the unfolding function never returns a tuple with tuple.done == true
, the stream will be infinite.
most.generate
most.generate(generator, ...args) -> Stream
Build a stream by running an asynchronous generator: a generator which yields promises.
When the generator yields a promise, the promise's fulfillment value will be added to the stream. If the promise rejects, an exception will be thrown in the generator. You can use try/catch
to handle the exception.
most.fromEvent
most.fromEvent(eventType, source [, useCapture=false]) -> Stream
Create a stream containing events from the provided EventTarget, such as a DOM element, or EventEmitter. This provides a simple way to coerce existing event sources into streams.
When passing an EventTarget, you can provide useCapture
as the 3rd parameter, and it will be passed through to addEventListener
and removeEventListener
. When not provided, useCapture
defaults to false
.
When the stream ends (for example, by using take, takeUntil, etc.), it will automatically be disconnected from the event source. For example, in the case of DOM events, the underlying DOM event listener will be removed automatically.
Notes on EventEmitter
- When source event has more than one argument, all the arguments will be aggregated into array in resulting Stream.
- EventEmitters and EventTargets, such as DOM nodes, behave differently in that EventEmitter allows events to be delivered in the same tick as a listener is added. When using EventEmitter,
most.fromEvent
, will ensure asynchronous event delivery, thereby preventing hazards of "maybe sync, maybe async" (aka zalgo) event delivery.
startWith
stream.startWith(x) -> Stream
most.startWith(x, stream) -> Stream
Create a new stream containing x
followed by all events in stream
.
concat
stream1.concat(stream2) -> Stream
most.concat(stream1, stream2) -> Stream
Create a new stream containing all events in stream1
followed by all events in stream2
.
Note that this effectively timeshifts events from stream2
past the end time of stream1
. In contrast, other operations such as combine
, merge
, chain preserve event arrival times, allowing events from the multiple combined streams to interleave.
Handling errors
recoverWith
Alias: flatMapError
stream.recoverWith(f) -> Stream
most.recoverWith(f, stream) -> Stream
Recover from a stream failure by calling a function to create a new stream.
When a stream fails with an error, the error will be passed to f
. f
must return a new stream to replace the error.
most.throwError
most.throwError(error) -> Stream
Create a stream in the error state. This can be useful for functions that need to return a stream, but need to signal an error.
Transforming streams
map
stream.map(f) -> Stream
most.map(f, stream) -> Stream
Create a new stream by applying f
to each event of the input stream.
constant
stream.constant(x) -> Stream
most.constant(x, stream) -> Stream
Create a new stream by replacing each event of the input stream with x
.
scan
stream.scan(f, initial) -> Stream
most.scan(f, initial, stream) -> Stream
Create a new stream containing incrementally accumulated results, starting with the provided initial value.
function f(accumulated, x) -> newAccumulated
Unlike reduce which produces a single, final result, scan emits incremental results. The resulting stream is of the same proportion as the original. For example, if the original contains 10 events, the resulting stream will contain 11 (the initial value, followed by 10 incremental events). If the original stream is infinite, the resulting stream will be infinite.
chain
Alias: flatMap
stream.chain(f) -> Stream
most.chain(f, stream) -> Stream
Transform each event in stream
into a stream, and then merge it into the resulting stream. Note that f
must return a stream.
function f(x) -> Stream
Note the difference between concatMap
and chain
: concatMap
concatenates, while chain
merges.
continueWith
stream.continueWith(f) -> Stream
most.continueWith(f, stream) -> Stream
Replace the end signal with a new stream returned by f. Note that f must return a stream.
function f(x) -> Stream
concatMap
stream.concatMap(f) -> Stream
most.concatMap(f, stream) -> Stream
Transform each event in stream
into a stream, and then concatenate it onto the end of the resulting stream. Note that f
must return a stream.
The mapping function f
is applied lazily. That is, f
is called only once it is time to concatenate a new stream.
function f(x) -> Stream
Note the difference between concatMap
and chain
: concatMap
concatenates, while chain
merges.
ap
streamOfFunctions.ap(stream) -> Stream
most.ap(streamOfFunctions, stream) -> Stream
Apply the latest function in streamOfFunctions
to the latest value in stream
.
In effect, ap
applies a time-varying function to a time-varying value.
timestamp
stream.timestamp() -> Stream
most.timestamp(stream) -> Stream
Materialize event timestamps, transforming Stream<X>
into Stream<{ time:number, value:X }>
tap
stream.tap(f) -> Stream
most.tap(f, stream) -> Stream
Perform a side-effect for each event in stream
.
For each event in stream
, f
is called, but the value of its result is ignored. If f
fails (ie throws), then the returned stream will also fail. The stream returned by tap
will contain the same events as the original stream.
Filtering streams
filter
stream.filter(predicate) -> Stream
most.filter(predicate, stream) -> Stream
Create a stream containing only events for which predicate
returns truthy.
skipRepeats
stream.skipRepeats() -> Stream
most.skipRepeats(stream) -> Stream
Create a new stream with adjacent repeated events removed.
Note that ===
is used to identify duplicate items. To use a different comparison, use skipRepeatsWith
skipRepeatsWith
stream.skipRepeatsWith(equals) -> Stream
most.skipRepeatsWith(equals, stream) -> Stream
Create a new stream with adjacent repeated events removed, using the provided equals
function.
The equals
function should accept two values and return truthy if the two values are equal, or falsy if they are not equal.
function equals(a, b) -> boolean
Transducer support
transduce
stream.transduce(transducer) -> Stream
most.transduce(transducer, stream) -> Stream
Create a new stream by passing items through the provided transducer.
Transducers are composable transformations. They may map, filter, add items to, drop items from, or otherwise transform an event stream. The primary benefit of transducers is that they are composable and reusable across any data structures that support them (see note on performance below)
Most.js supports any transducer that implements the de facto JavaScript transducer protocol. For example, two popular transducers libraries are transducers-js and transducers.js.
Note on transducer performance: Transducers perform single-pass transformation. For many data structures, this can provide a significant performance improvement. However, most.js's builtin combinators currently outperform popular transducer libraries. The primary benefit of using transducers with most.js is reusability and portability.
Slicing streams
slice
stream.slice(start, end) -> Stream
most.slice(start, end, stream) -> Stream
Create a new stream containing only events where start <= index < end
, where index
is the ordinal index of an event in stream
.
If stream contains fewer than start
events, the returned stream will be empty.
take
stream.take(n) -> Stream
most.take(n, stream) -> Stream
Create a new stream containing at most n
events from stream
.
If stream
contains fewer than n
events, the returned stream will be effectively equivalent to stream
.
skip
stream.skip(n) -> Stream
most.skip(n, stream) -> Stream
Create a new stream that omits the first n
events from stream
.
If stream
contains fewer than n
events, the returned stream will be empty.
takeWhile
stream.takeWhile(predicate) -> Stream
most.takeWhile(predicate, stream) -> Stream
Create a new stream containing all events until predicate
returns false.
skipWhile
stream.skipWhile(predicate) -> Stream
most.skipWhile(predicate, stream) -> Stream
Create a new stream containing all events after predicate
returns false.
until
Alias: takeUntil
stream.until(endSignal) -> Stream
most.until(endSignal, stream) -> Stream
Create a new stream containing all events until endSignal
emits an event.
If endSignal
is empty or never emits an event, then the returned stream will be effectively equivalent to stream
.
since
Alias: skipUntil
stream.since(startSignal) -> Stream
most.since(startSignal, stream) -> Stream
Create a new stream containing all events after startSignal
emits its first event.
If startSignal
is empty or never emits an event, then the returned stream will be effectively equivalent to never()
.
during
stream.during(timeWindow)
most.during(timeWindow, stream)
Create a new stream containing only events that occur during a dynamic time window.
This is similar to slice, but uses time signals rather than indices to limit the stream.
Looping
loop
stream.loop(stepper, seed) -> Stream
most.loop(stepper, seed, stream) -> Stream
Create a feedback loop that emits one value and feeds back another to be used in the next iteration.
It allows you to maintain and update a "state" (aka feedback, aka seed
for the next iteration) while emitting a different value. In contrast, scan
feeds back and emits the same value.
Adapting fluent APIs
thru
stream.thru(transform) -> Stream
transform(stream: Stream) -> Stream
Use a functional API in fluent style.
Functional APIs allow for the highest degree of modularity via external packages, such as @most/hold
, without the risks of modifying prototypes.
If you prefer using fluent APIs, thru
allows using those functional APIs in a fluent style. For example:
rather than mixing functional and fluent:
Multiple arguments
Multiple arguments should be handled via partial application of the function passed to thru, using bind
or a currying or partial application utility from your favorite functional programming library.
Consuming streams
reduce
stream.reduce(f, initial) -> Promise
most.reduce(f, initial, stream) -> Promise
Reduce a stream, returning a promise for the ultimate result.
The returned promise will fulfill with the final reduced result, or will reject if a failure occurs while reducing the stream.
The reduce function (f
above)
TODO: Example
observe
Alias: forEach
stream.observe(f) -> Promise
stream.forEach(f) -> Promise
most.observe(f, stream) -> Promise
most.forEach(f, stream) -> Promise
Start consuming events from stream
, processing each with f
. The returned promise will fulfill after all the events have been consumed, or will reject if the stream fails and the error is not handled.
The forEach
alias is compatible with the draft ES Observable proposal forEach
. Read more about Observable interop here.
drain
stream.drain() -> Promise
most.drain(stream) -> Promise
Start consuming events from stream
. This can be useful in some cases where you don't want or need to process the terminal events--e.g. when all processing has been done via upstream side-effects. Most times, however, you'll use observe
to consume and process terminal events.
The returned promise will fulfill after all the events have been consumed, or will reject if the stream fails and the error is not handled.
subscribe
stream.subscribe(Observer) -> Subscription
Draft ES Observable compatible subscribe. Start consuming events from stream
by providing an Observer object.
Returns a Subscription object that can be used to unsubscribe from the stream of events.
Read more about draft ES Observable interop here.
observe
/forEach
or subscribe
Both forEach
and subscribe
are supported in the draft ES Observable proposal, and the following behave similarly:
However, there are also some important differences.
forEach
- returns a Promise, which can be transformed further using
.then
, - integrates easily into existing asynchronous code that uses promises
- encourages declarative programming using
until
,take
, andtakeWhile
, etc.
subscribe
- returns a
Subscription
, - allows imperative unsubscription in cases where declarative isn't possible
Combining streams
merge
stream1.merge(stream2) -> Stream
most.merge(stream1, stream2) -> Stream
Create a new stream containing events from stream1
and stream2
.
Merging multiple streams creates a new stream containing all events from the input stream without affecting the arrival time of the events. You can think of the events from the input streams simply being interleaved into the new, merged stream. A merged stream ends when all of its input streams have ended.
In contrast to concat
, merge
preserves the arrival times of events. That is, it creates a new stream where events from stream1
and stream2
can interleave.
mergeArray
most.mergeArray(arrayOfStreams) -> Stream
Array form of merge. Create a new Stream containing all events from all streams in arrayOfStreams
.
See merge for more details.
combine
stream1.combine(f, stream2) -> Stream
most.combine(f, stream1, stream2) -> Stream
Create a new stream that emits the set of latest event values from all input streams whenever a new event arrives on any input stream.
Combining creates a new stream by applying a function to the most recent event from each stream whenever a new event arrives on any one stream. Combining must wait for at least one event to arrive on all input streams before it can produce any events.
A combined stream has the same proportion as the max of the proportions of its input streams. To put it imperative terms: combine ends after all its inputs have ended.
combineArray
most.combineArray(f, arrayOfStreams) -> Stream
Array form of combine. Create a new stream that emits the set of latest event values from all input streams whenever a new event arrives on any input stream.
See combine for more details.
sample
sampler.sample(f, ...streams) -> Stream
most.sample(f, sampler, ...streams) -> Stream
Create a new stream by combining sampled values from many input streams.
While combine
, produces a value whenever an event arrives on any of its inputs, sample
produces a value only when an event arrives on the sampler.
sampleWith
values.sampleWith(sampler) -> Stream
most.sampleWith(sampler, values) -> Stream
When an event arrives on sampler, emit the latest event value from values. Effectively equivalent to sampler.sample(identity, values);
Sampling can "smooth" an erratic source, or can act as a dynamic throttle to speed or slow events from one stream using another.
zip
stream1.zip(f, stream2) -> Stream
most.zip(f, stream1, stream2) -> Stream
Create a new stream by applying a function to corresponding pairs of events from the inputs streams.
Zipping correlates by index corresponding events from two or more input streams. Note that zipping a "fast" stream and a "slow" stream will cause buffering. Events from the fast stream must be buffered in memory until an event at the corresponding index arrives on the slow stream.
A zipped stream ends when any one of its input streams ends.
A stream zipped with a stream created by most.periodic
will emit events in intervals.
Combining higher-order streams
switchLatest
Alias: switch
stream.switchLatest() -> Stream
most.switchLatest(stream) -> Stream
Given a higher-order stream, return a new stream that adopts the behavior of (ie emits the events of) the most recent inner stream.
TODO: Example
join
stream.join() -> Stream
most.join(stream) -> Stream
Given a higher-order stream, return a new stream that merges all the inner streams as they arrive.
TODO: Example
mergeConcurrently
stream.mergeConcurrently(concurrency) -> Stream
most.mergeConcurrently(concurrency, stream) -> Stream
Given a higher-order stream, return a new stream that merges inner streams as they arrive up to the specified concurrency. Once concurrency
number of streams are being merged, newly arriving streams will be merged after an existing one ends.
Note that u
is only merged after t
ends, due to the concurrency level of 2
.
Note also that stream.mergeConcurrently(Infinity)
is equivalent to stream.join()
.
To control concurrency, mergeConcurrently
must maintain an internal queue of newly arrived streams. If new streams arrive faster than the concurrency level allows them to be merged, the internal queue will grow infinitely.
Awaiting promises
awaitPromises
Deprecated alias: await
stream.awaitPromises() -> Stream
most.awaitPromises(stream) -> Stream
Given a stream of promises, ie Stream<Promise<X>>, return a new stream containing the fulfillment values, ie Stream<X>.
Event times may be delayed. However, event order is always preserved, regardless of promise fulfillment order.
To create a stream that merges promises in fulfillment order, use
stream.chain(most.fromPromise)
. Note the difference:
If a promise rejects, the stream will be in an error state with the rejected promise's reason as its error. See recoverWith for error recovery. For example:
Rate limiting streams
debounce
stream.debounce(debounceTime) -> Stream
most.debounce(debounceTime, stream) -> Stream
Wait for a burst of events to subside and emit only the last event in the burst.
If the stream ends while there is a pending debounced event (e.g. via until
, see example above), the pending event will be emitted just before the stream ends.
Debouncing can be extremely useful when dealing with bursts of similar events, for example, debouncing keypress events before initiating a remote search query in a browser application.
throttle
stream.throttle(throttlePeriod) -> Stream
most.throttle(throttlePeriod, stream) -> Stream
Limit the rate of events to at most one per throttlePeriod.
In contrast to debounce, throttle simply drops events that occur more often than throttlePeriod
, whereas debounce waits for a "quiet period".
Delaying streams
delay
stream.delay(delayTime) -> Stream
most.delay(delayTime, stream) -> Stream
Timeshift a stream
by delayTime
.
Delaying a stream timeshifts all the events by the same amount. Delaying doesn't change the time between events.
Sharing streams
multicast
stream.multicast() -> Stream
most.multicast(stream) -> Stream
Returns a stream equivalent to the original, but which can be shared more efficiently among multiple consumers.
Using multicast
allows you to build up a stream of maps, filters, and other transformations, and then share it efficiently with multiple observers.