Class Documentation
Summary
Class | Description |
---|---|
KeyCount | used to count keys in a stream |
LastState | used to hold the last state of key values in a stream e.g. building KTables |
Max | used to grab the highest value of key values in a stream |
Min | used grab the lowest value of key values in a stream |
Sum | used to sum up key values in a stream |
Window | used to build windows of key value states in a stream |
JSKafkaClient | |
NativeKafkaClient | |
KStream | change-log representation of a stream |
KTable | table representation of a stream |
StreamDSL | Stream base class |
KeyCount
used to count keys in a stream
Kind: global class
LastState
used to hold the last state of key values in a stream e.g. building KTables
Kind: global class
Max
used to grab the highest value of key values in a stream
Kind: global class
Min
used grab the lowest value of key values in a stream
Kind: global class
Sum
used to sum up key values in a stream
Kind: global class
Window
used to build windows of key value states in a stream
Kind: global class
JSKafkaClient
Kind: global class
- JSKafkaClient
- new JSKafkaClient(topic, config)
- .setProduceHandler(handler)
- .getProduceHandler() ⇒
null
|EventEmitter
- .overwriteTopics(topics)
- .start(readyCallback, kafkaErrorCallback, withProducer, withBackPressure)
- .setupProducer(produceTopic, partitions, readyCallback, kafkaErrorCallback, outputKafkaConfig)
- .send(topic, message) ⇒
\*
- .buffer(topic, identifier, payload, compressionType) ⇒
\*
- .bufferFormat(topic, identifier, payload, version, compressionType) ⇒
\*
new JSKafkaClient(topic, config)
KafkaClient (EventEmitter) that wraps an internal instance of a Sinek kafka- Consumer and/or Producer
Param |
---|
topic |
config |
jsKafkaClient.setProduceHandler(handler)
sets a handler for produce messages (emits whenever kafka messages are produced/delivered)
Kind: instance method of JSKafkaClient
Param | Type |
---|---|
handler | EventEmitter |
null
| EventEmitter
jsKafkaClient.getProduceHandler() ⇒ returns the produce handler instance if present
Kind: instance method of JSKafkaClient
jsKafkaClient.overwriteTopics(topics)
overwrites the topic
Kind: instance method of JSKafkaClient
Param | Type |
---|---|
topics | Array.<string> |
jsKafkaClient.start(readyCallback, kafkaErrorCallback, withProducer, withBackPressure)
starts a new kafka consumer (using sinek's partition drainer) will await a kafka-producer-ready-event if started withProducer=true
Kind: instance method of JSKafkaClient
Param | Default |
---|---|
readyCallback | |
kafkaErrorCallback | |
withProducer | false |
withBackPressure | false |
jsKafkaClient.setupProducer(produceTopic, partitions, readyCallback, kafkaErrorCallback, outputKafkaConfig)
starts a new kafka-producer using sinek's publisher will fire kafka-producer-ready-event requires a topic's partition count during initialisation
Kind: instance method of JSKafkaClient
Param | Default |
---|---|
produceTopic | |
partitions | 1 |
readyCallback | |
kafkaErrorCallback | |
outputKafkaConfig |
*
jsKafkaClient.send(topic, message) ⇒ simply produces a message or multiple on a topic if producerPartitionCount is > 1 it will randomize the target partition for the message/s
Kind: instance method of JSKafkaClient
Param |
---|
topic |
message |
*
jsKafkaClient.buffer(topic, identifier, payload, compressionType) ⇒ buffers a keyed message to be send a keyed message needs an identifier, if none is provided an uuid.v4() will be generated
Kind: instance method of JSKafkaClient
Param | Default |
---|---|
topic | |
identifier | |
payload | |
compressionType | 0 |
*
jsKafkaClient.bufferFormat(topic, identifier, payload, version, compressionType) ⇒ buffers a keyed message in (a base json format) to be send a keyed message needs an identifier, if none is provided an uuid.4() will be generated
Kind: instance method of JSKafkaClient
Param | Default |
---|---|
topic | |
identifier | |
payload | |
version | 1 |
compressionType | 0 |
NativeKafkaClient
Kind: global class
- NativeKafkaClient
- new NativeKafkaClient(topic, config, batchOptions)
- .setProduceHandler(handler)
- .getProduceHandler() ⇒
null
|EventEmitter
- .overwriteTopics(topics)
- .start(readyCallback, kafkaErrorCallback, withProducer, withBackPressure)
- .setupProducer(produceTopic, partitions, readyCallback, kafkaErrorCallback, outputKafkaConfig)
- .send(topicName, message, partition, key, partitionKey, opaqueKey) ⇒
Promise.<void>
- .buffer(topic, identifier, payload, _, partition, version, partitionKey) ⇒
Promise.<void>
- .bufferFormat(topic, identifier, payload, version, _, partitionKey, partition) ⇒
Promise.<void>
new NativeKafkaClient(topic, config, batchOptions)
NativeKafkaClient (EventEmitter) that wraps an internal instance of a Sinek native kafka- Consumer and/or Producer
Param | Description |
---|---|
topic | |
config | |
batchOptions | optional |
nativeKafkaClient.setProduceHandler(handler)
sets a handler for produce messages (emits whenever kafka messages are produced/delivered)
Kind: instance method of NativeKafkaClient
Param | Type |
---|---|
handler | EventEmitter |
null
| EventEmitter
nativeKafkaClient.getProduceHandler() ⇒ returns the produce handler instance if present
Kind: instance method of NativeKafkaClient
nativeKafkaClient.overwriteTopics(topics)
overwrites the topic
Kind: instance method of NativeKafkaClient
Param | Type |
---|---|
topics | Array.<string> |
nativeKafkaClient.start(readyCallback, kafkaErrorCallback, withProducer, withBackPressure)
starts a new kafka consumer will await a kafka-producer-ready-event if started withProducer=true
Kind: instance method of NativeKafkaClient
Param | Default |
---|---|
readyCallback | |
kafkaErrorCallback | |
withProducer | false |
withBackPressure | false |
nativeKafkaClient.setupProducer(produceTopic, partitions, readyCallback, kafkaErrorCallback, outputKafkaConfig)
starts a new kafka-producer will fire kafka-producer-ready-event requires a topic's partition count during initialisation
Kind: instance method of NativeKafkaClient
Param | Default |
---|---|
produceTopic | |
partitions | 1 |
readyCallback | |
kafkaErrorCallback | |
outputKafkaConfig |
Promise.<void>
nativeKafkaClient.send(topicName, message, partition, key, partitionKey, opaqueKey) ⇒ simply produces a message or multiple on a topic if producerPartitionCount is > 1 it will randomize the target partition for the message/s
Kind: instance method of NativeKafkaClient
Param | Default | Description |
---|---|---|
topicName | ||
message | ||
partition | optional | |
key | optional | |
partitionKey | optional | |
opaqueKey | optional |
Promise.<void>
nativeKafkaClient.buffer(topic, identifier, payload, _, partition, version, partitionKey) ⇒ buffers a keyed message to be send a keyed message needs an identifier, if none is provided an uuid.v4() will be generated
Kind: instance method of NativeKafkaClient
Param | Default | Description |
---|---|---|
topic | ||
identifier | ||
payload | ||
_ | optional | |
partition | optional | |
version | optional | |
partitionKey | optional |
Promise.<void>
nativeKafkaClient.bufferFormat(topic, identifier, payload, version, _, partitionKey, partition) ⇒ buffers a keyed message in (a base json format) to be send a keyed message needs an identifier, if none is provided an uuid.4() will be generated
Kind: instance method of NativeKafkaClient
Param | Default | Description |
---|---|---|
topic | ||
identifier | ||
payload | ||
version | 1 | optional |
_ | optional | |
partitionKey | optional | |
partition | optional |
KStream
change-log representation of a stream
Kind: global class
- KStream
- new KStream(topicName, storage, kafka, isClone)
- .start(kafkaReadyCallback, kafkaErrorCallback, withBackPressure, outputKafkaConfig)
- .innerJoin(stream, key, windowed, combine) ⇒
KStream
- .outerJoin(stream)
- .leftJoin(stream)
- .merge(stream) ⇒
KStream
- .fromMost() ⇒
KStream
- .clone(cloneEvents, cloneDeep) ⇒
KStream
- .branch(preds) ⇒
Array.<KStream>
- .window(from, to, etl, encapsulated, collect) ⇒
Object
- .close() ⇒
Promise.<boolean>
new KStream(topicName, storage, kafka, isClone)
creates a changelog representation of a stream join operations of kstream instances are synchronous and return new instances immediately
Param | Type | Default |
---|---|---|
topicName | string | |
storage | KStorage | |
kafka | KafkaClient | |
isClone | boolean | false |
kStream.start(kafkaReadyCallback, kafkaErrorCallback, withBackPressure, outputKafkaConfig)
start kafka consumption prepare production of messages if necessary when called with zero or just a single callback argument this function will return a promise and use the callback for errors
Kind: instance method of KStream
Param | Type | Default | Description |
---|---|---|---|
kafkaReadyCallback | function | Object | can also be an object (config) | |
kafkaErrorCallback | function | ||
withBackPressure | boolean | false | |
outputKafkaConfig | Object |
KStream
kStream.innerJoin(stream, key, windowed, combine) ⇒ Emits an output when both input sources have records with the same key. s1$:{object} + s2$:{object} -> j$:{left: s1$object, right: s2$object}
Kind: instance method of KStream
Param | Type | Default |
---|---|---|
stream | StreamDSL | |
key | string | "key" |
windowed | boolean | false |
combine | function |
kStream.outerJoin(stream)
Emits an output for each record in either input source. If only one source contains a key, the other is null
Kind: instance method of KStream
Param | Type |
---|---|
stream | StreamDSL |
kStream.leftJoin(stream)
Emits an output for each record in the left or primary input source. If the other source does not have a value for a given key, it is set to null
Kind: instance method of KStream
Param | Type |
---|---|
stream | StreamDSL |
KStream
kStream.merge(stream) ⇒ Emits an output for each record in any of the streams. Acts as simple merge of both streams. can be used with KStream or KTable instances returns a NEW KStream instance
Kind: instance method of KStream
Param | Type |
---|---|
stream | StreamDSL |
KStream
kStream.fromMost() ⇒ creates a new KStream instance from a given most.js stream; the consume topic will be empty and therefore no consumer will be build
Kind: instance method of KStream
Param | Type | Description |
---|---|---|
most.js | Object | stream |
KStream
kStream.clone(cloneEvents, cloneDeep) ⇒ as only joins and window operations return new stream instances you might need a clone sometimes, which can be accomplished using this function
Kind: instance method of KStream
Param | Type | Default | Description |
---|---|---|---|
cloneEvents | boolean | false | if events in the stream should be cloned |
cloneDeep | boolean | false | if events in the stream should be cloned deeply |
Array.<KStream>
kStream.branch(preds) ⇒ Splits a stream into multiple branches based on cloning and filtering it depending on the passed predicates. [ (message) => message.key.startsWith("A"), (message) => message.key.startsWith("B"),
(message) => true ]
[ streamA, streamB, streamTrue ]
Kind: instance method of KStream
Param | Type |
---|---|
preds | Array.<function()> |
Object
kStream.window(from, to, etl, encapsulated, collect) ⇒ builds a window'ed stream across all events of the current kstream when the first event with an exceeding "to" is received (or the abort() callback is called) the window closes and emits its "collected" values to the returned kstream from and to must be unix epoch timestamps in milliseconds (Date.now()) etl can be a function that should return the timestamp (event time) of from within the message e.g. m -> m.payload.createdAt if etl is not given, a timestamp of receiving will be used (processing time) for each event encapsulated refers to the result messages (defaults to true, they will be encapsulated in an object: {time, value}
Kind: instance method of KStream
Param | Type | Default | Description |
---|---|---|---|
from | number | ||
to | number | ||
etl | function | ||
encapsulated | boolean | true | if event should stay encapsulated {time, value} |
collect | boolean | true | if events should be collected first before publishing to result stream |
Promise.<boolean>
kStream.close() ⇒ closes the internal stream and all kafka open connections as well as KStorage connections
Kind: instance method of KStream
KTable
table representation of a stream
Kind: global class
- KTable
- new KTable(topicName, keyMapETL, storage, kafka, isClone)
- .start(kafkaReadyCallback, kafkaErrorCallback, withBackPressure, outputKafkaConfig)
- .innerJoin(stream, key)
- .outerJoin(stream)
- .leftJoin(stream)
- .writeToTableStream(message)
- .consumeUntilMs(ms, finishedCallback) ⇒
KTable
- .consumeUntilCount(count, finishedCallback) ⇒
KTable
- .consumeUntilLatestOffset(finishedCallback)
- .getTable() ⇒
Promise.<object>
- .replay()
- .merge(stream) ⇒
Promise.<KTable>
- .clone() ⇒
Promise.<KTable>
- .close() ⇒
Promise.<boolean>
new KTable(topicName, keyMapETL, storage, kafka, isClone)
creates a table representation of a stream join operations of ktable instances are asynchronous and return promises keyMapETL = v -> {key, value} (sync)
Param | Type | Default |
---|---|---|
topicName | string | |
keyMapETL | function | |
storage | KStorage | |
kafka | KafkaClient | |
isClone | boolean | false |
kTable.start(kafkaReadyCallback, kafkaErrorCallback, withBackPressure, outputKafkaConfig)
start kafka consumption prepare production of messages if necessary when called with zero or just a single callback argument this function will return a promise and use the callback for errors
Kind: instance method of KTable
Param | Type | Default | Description |
---|---|---|---|
kafkaReadyCallback | function | Object | can also be an object (config) | |
kafkaErrorCallback | function | ||
withBackPressure | boolean | false | |
outputKafkaConfig | Object |
kTable.innerJoin(stream, key)
Emits an output when both input sources have records with the same key.
Kind: instance method of KTable
Param | Type | Default |
---|---|---|
stream | StreamDSL | |
key | string | "key" |
kTable.outerJoin(stream)
Emits an output for each record in either input source. If only one source contains a key, the other is null
Kind: instance method of KTable
Param | Type |
---|---|
stream | StreamDSL |
kTable.leftJoin(stream)
Emits an output for each record in the left or primary input source. If the other source does not have a value for a given key, it is set to null
Kind: instance method of KTable
Param | Type |
---|---|
stream | StreamDSL |
kTable.writeToTableStream(message)
write message to the internal stream
Kind: instance method of KTable
Param | Type |
---|---|
message | any |
KTable
kTable.consumeUntilMs(ms, finishedCallback) ⇒ consume messages until ms passed
Kind: instance method of KTable
Param | Type | Default |
---|---|---|
ms | number | 1000 |
finishedCallback | function |
KTable
kTable.consumeUntilCount(count, finishedCallback) ⇒ consume messages until a certain count is reached
Kind: instance method of KTable
Param | Type | Default |
---|---|---|
count | number | 1000 |
finishedCallback | function |
kTable.consumeUntilLatestOffset(finishedCallback)
consume messages until latest offset of topic
Kind: instance method of KTable
Param | Type | Default |
---|---|---|
finishedCallback | function |
Promise.<object>
kTable.getTable() ⇒ returns the state of the internal KStorage
Kind: instance method of KTable
kTable.replay()
rewrites content of internal KStorage to the stream, every observer will receive the content as KV {key, value} object
Kind: instance method of KTable
Promise.<KTable>
kTable.merge(stream) ⇒ Emits an output for each record in any of the streams. Acts as simple merge of both streams. can be used with KStream or KTable instances returns a Promise with a NEW KTable instance
Kind: instance method of KTable
Param | Type |
---|---|
stream | StreamDSL |
Promise.<KTable>
kTable.clone() ⇒ as only joins and window operations return new stream instances you might need a clone sometimes, which can be accomplished using this function
Kind: instance method of KTable
Promise.<boolean>
kTable.close() ⇒ closes the internal stream and all kafka open connections as well as KStorage connections
Kind: instance method of KTable
StreamDSL
Stream base class
Kind: global class
- Summary
- KeyCount
- LastState
- Max
- Min
- Sum
- Window
- JSKafkaClient
- new JSKafkaClient(topic, config)
- jsKafkaClient.setProduceHandler(handler)
- jsKafkaClient.getProduceHandler() ⇒
null
|EventEmitter
- jsKafkaClient.overwriteTopics(topics)
- jsKafkaClient.start(readyCallback, kafkaErrorCallback, withProducer, withBackPressure)
- jsKafkaClient.setupProducer(produceTopic, partitions, readyCallback, kafkaErrorCallback, outputKafkaConfig)
- jsKafkaClient.send(topic, message) ⇒
*
- jsKafkaClient.buffer(topic, identifier, payload, compressionType) ⇒
*
- jsKafkaClient.bufferFormat(topic, identifier, payload, version, compressionType) ⇒
*
- NativeKafkaClient
- new NativeKafkaClient(topic, config, batchOptions)
- nativeKafkaClient.setProduceHandler(handler)
- nativeKafkaClient.getProduceHandler() ⇒
null
|EventEmitter
- nativeKafkaClient.overwriteTopics(topics)
- nativeKafkaClient.start(readyCallback, kafkaErrorCallback, withProducer, withBackPressure)
- nativeKafkaClient.setupProducer(produceTopic, partitions, readyCallback, kafkaErrorCallback, outputKafkaConfig)
- nativeKafkaClient.send(topicName, message, partition, key, partitionKey, opaqueKey) ⇒
Promise.<void>
- nativeKafkaClient.buffer(topic, identifier, payload, _, partition, version, partitionKey) ⇒
Promise.<void>
- nativeKafkaClient.bufferFormat(topic, identifier, payload, version, _, partitionKey, partition) ⇒
Promise.<void>
- KStream
- new KStream(topicName, storage, kafka, isClone)
- kStream.start(kafkaReadyCallback, kafkaErrorCallback, withBackPressure, outputKafkaConfig)
- kStream.innerJoin(stream, key, windowed, combine) ⇒
KStream
- kStream.outerJoin(stream)
- kStream.leftJoin(stream)
- kStream.merge(stream) ⇒
KStream
- kStream.fromMost() ⇒
KStream
- kStream.clone(cloneEvents, cloneDeep) ⇒
KStream
- kStream.branch(preds) ⇒
Array.<KStream>
- [(message) => true ]](#message--true-)
- KTable
- new KTable(topicName, keyMapETL, storage, kafka, isClone)
- kTable.start(kafkaReadyCallback, kafkaErrorCallback, withBackPressure, outputKafkaConfig)
- kTable.innerJoin(stream, key)
- kTable.outerJoin(stream)
- kTable.leftJoin(stream)
- kTable.writeToTableStream(message)
- kTable.consumeUntilMs(ms, finishedCallback) ⇒
KTable
- kTable.consumeUntilCount(count, finishedCallback) ⇒
KTable
- kTable.consumeUntilLatestOffset(finishedCallback)
- kTable.getTable() ⇒
Promise.<object>
- kTable.replay()
- kTable.merge(stream) ⇒
Promise.<KTable>
- kTable.clone() ⇒
Promise.<KTable>
- kTable.close() ⇒
Promise.<boolean>
- StreamDSL
- new StreamDSL(topicName, storage, kafka, isClone)
- streamDSL.start()
- streamDSL.getStats() ⇒
object
- streamDSL.getStorage() ⇒
KStorage
- streamDSL.writeToStream(message)
- streamDSL.getMost() ⇒
Object
- streamDSL.getNewMostFrom(array) ⇒
Stream.<any>
- streamDSL.replaceInternalObservable(newStream$)
- streamDSL.setProduceHandler(handler)
- streamDSL.createAndSetProduceHandler() ⇒
module:events.internal
- streamDSL.setKafkaStreamsReference(reference)
- streamDSL.from(topicName) ⇒
StreamDSL
- streamDSL.awaitPromises(etl) ⇒
StreamDSL
- streamDSL.map(etl) ⇒
StreamDSL
- streamDSL.asyncMap(etl) ⇒
StreamDSL
- streamDSL.concatMap(etl) ⇒
StreamDSL
- streamDSL.forEach(eff) ⇒
\*
- streamDSL.chainForEach(eff, callback) ⇒
StreamDSL
- streamDSL.tap(eff)
- streamDSL.filter(pred) ⇒
StreamDSL
- streamDSL.skipRepeats() ⇒
StreamDSL
- streamDSL.skipRepeatsWith(equals) ⇒
StreamDSL
- streamDSL.skip(count) ⇒
StreamDSL
- streamDSL.take(count) ⇒
StreamDSL
- streamDSL.mapStringToArray(delimiter) ⇒
StreamDSL
- streamDSL.mapArrayToKV(keyIndex, valueIndex) ⇒
StreamDSL
- streamDSL.mapStringToKV(delimiter, keyIndex, valueIndex) ⇒
StreamDSL
- streamDSL.mapJSONParse() ⇒
StreamDSL
- streamDSL.mapStringify() ⇒
StreamDSL
- streamDSL.mapBufferKeyToString() ⇒
StreamDSL
- streamDSL.mapBufferValueToString() ⇒
StreamDSL
- streamDSL.mapStringValueToJSONObject() ⇒
StreamDSL
- streamDSL.mapJSONConvenience() ⇒
StreamDSL
- streamDSL.wrapAsKafkaValue(topic) ⇒
StreamDSL
- streamDSL.mapWrapKafkaValue() ⇒
StreamDSL
- streamDSL.atThroughput(count, callback) ⇒
StreamDSL
- streamDSL.mapToFormat(type, getId) ⇒
StreamDSL
- streamDSL.mapFromFormat() ⇒
StreamDSL
- streamDSL.timestamp(etl) ⇒
StreamDSL
- streamDSL.constant(substitute) ⇒
StreamDSL
- streamDSL.scan(eff, initial) ⇒
StreamDSL
- streamDSL.slice(start, end) ⇒
StreamDSL
- streamDSL.takeWhile(pred) ⇒
StreamDSL
- streamDSL.skipWhile(pred) ⇒
StreamDSL
- streamDSL.until(signal$) ⇒
StreamDSL
- streamDSL.since(signal$) ⇒
StreamDSL
- streamDSL.continueWith(f)
- streamDSL.reduce(eff, initial) ⇒
\*
- streamDSL.chainReduce(eff, initial, callback) ⇒
StreamDSL
- streamDSL.drain() ⇒
\*
- streamDSL.throttle(throttlePeriod) ⇒
StreamDSL
- streamDSL.delay(delayTime) ⇒
StreamDSL
- streamDSL.debounce(debounceTime) ⇒
StreamDSL
- streamDSL.countByKey(key, countFieldName) ⇒
StreamDSL
- streamDSL.sumByKey(key, fieldName, sumField) ⇒
StreamDSL
- streamDSL.min(fieldName, minField) ⇒
StreamDSL
- streamDSL.max(fieldName, maxField) ⇒
StreamDSL
- streamDSL._merge(otherStream$)
- streamDSL._zip(otherStream$, combine)
- streamDSL.to(topic, outputPartitionsCount, produceType, version, compressionType, producerErrorCallback, outputKafkaConfig) ⇒
Promise.<boolean>
new StreamDSL(topicName, storage, kafka, isClone)
Stream base class that wraps around a private most.js stream$ and interacts with storages/actions and a kafka-client instance.
Param | Type | Default | Description |
---|---|---|---|
topicName | string | Array.<string> | can also be topics | |
storage | KStorage | ||
kafka | KafkaClient | ||
isClone | boolean | false |
streamDSL.start()
dummy, should be overwritten
Kind: instance method of StreamDSL
object
streamDSL.getStats() ⇒ returns a stats object with information about the internal kafka clients
Kind: instance method of StreamDSL
KStorage
streamDSL.getStorage() ⇒ returns the internal KStorage instance
Kind: instance method of StreamDSL
streamDSL.writeToStream(message)
can be used to manually write message/events to the internal stream$
Kind: instance method of StreamDSL
Param | Type |
---|---|
message | Object | Array.<Object> |
Object
streamDSL.getMost() ⇒ returns the internal most.js stream
Kind: instance method of StreamDSL
Returns: Object
- most.js stream
Stream.<any>
streamDSL.getNewMostFrom(array) ⇒ returns a new most stream from the given array
Kind: instance method of StreamDSL
Param |
---|
array |
streamDSL.replaceInternalObservable(newStream$)
used to clone or during merges resets the internal event emitter to the new stream and replaces the internal stream with the merged new stream
Kind: instance method of StreamDSL
Param |
---|
newStream$ |
streamDSL.setProduceHandler(handler)
sets a handler for produce messages (emits whenever kafka messages are produced/delivered) events: produced, delivered
Kind: instance method of StreamDSL
Param | Type |
---|---|
handler | module:events.internal |
module:events.internal
streamDSL.createAndSetProduceHandler() ⇒ creates (and returns) and sets a produce handler for this stream instance
Kind: instance method of StreamDSL
streamDSL.setKafkaStreamsReference(reference)
overwrites the internal kafkaStreams reference
Kind: instance method of StreamDSL
Param |
---|
reference |
StreamDSL
streamDSL.from(topicName) ⇒ add more topic/s to the consumer
Kind: instance method of StreamDSL
Param | Type |
---|---|
topicName | string | Array.<string> |
StreamDSL
streamDSL.awaitPromises(etl) ⇒ given a stream of promises, returns stream containing the fulfillment values etl = Promise -> v
Kind: instance method of StreamDSL
Param |
---|
etl |
StreamDSL
streamDSL.map(etl) ⇒ simple synchronous map function etl = v -> v2
Kind: instance method of StreamDSL
Param |
---|
etl |
StreamDSL
streamDSL.asyncMap(etl) ⇒ map that expects etl to return a Promise can be used to apply async maps to stream etl = v -> Promise
Kind: instance method of StreamDSL
Param |
---|
etl |
StreamDSL
streamDSL.concatMap(etl) ⇒ transform each etl in stream into a stream, and then concatenate it onto the end of the resulting stream. etl = v -> stream(v2)
Kind: instance method of StreamDSL
Param |
---|
etl |
\*
streamDSL.forEach(eff) ⇒ (do not use for side effects, except for a closing operation at the end of the stream) may not be used to chain eff = v -> void
Kind: instance method of StreamDSL
Returns: \*
- Promise
Param |
---|
eff |
StreamDSL
streamDSL.chainForEach(eff, callback) ⇒ runs forEach on a multicast stream you probably would not want to use this in production
Kind: instance method of StreamDSL
Param | Default |
---|---|
eff | |
callback |
streamDSL.tap(eff)
(alternative to forEach if in the middle of a stream operation chain) use this for side-effects errors in eff will break stream
Kind: instance method of StreamDSL
Param |
---|
eff |
StreamDSL
streamDSL.filter(pred) ⇒ stream contains only events for which predicate returns true pred = v -> boolean
Kind: instance method of StreamDSL
Param |
---|
pred |
StreamDSL
streamDSL.skipRepeats() ⇒ will remove duplicate messages be aware that this might take a lot of memory
Kind: instance method of StreamDSL
StreamDSL
streamDSL.skipRepeatsWith(equals) ⇒ skips repeats per your definition equals = (a,b) -> boolean
Kind: instance method of StreamDSL
Param |
---|
equals |
StreamDSL
streamDSL.skip(count) ⇒ skips the amount of messages
Kind: instance method of StreamDSL
Param |
---|
count |
StreamDSL
streamDSL.take(count) ⇒ takes the first messages until count and omits the rest
Kind: instance method of StreamDSL
Param |
---|
count |
StreamDSL
streamDSL.mapStringToArray(delimiter) ⇒ easy string to array mapping you can pass your delimiter default is space "bla blup" => ["bla", "blup"]
Kind: instance method of StreamDSL
Param | Default |
---|---|
delimiter | |
StreamDSL
streamDSL.mapArrayToKV(keyIndex, valueIndex) ⇒ easy array to key-value object mapping you can pass your own indices default is 0,1 ["bla", "blup"] => { key: "bla", value: "blup" }
Kind: instance method of StreamDSL
Param | Default |
---|---|
keyIndex | 0 |
valueIndex | 1 |
StreamDSL
streamDSL.mapStringToKV(delimiter, keyIndex, valueIndex) ⇒ easy string to key-value object mapping you can pass your own delimiter and indices default is " " and 0,1 "bla blup" => { key: "bla", value: "blup" }
Kind: instance method of StreamDSL
Param | Default |
---|---|
delimiter | |
keyIndex | 0 |
valueIndex | 1 |
StreamDSL
streamDSL.mapJSONParse() ⇒ maps every stream event through JSON.parse if its type is an object (if parsing fails, the error object will be returned)
Kind: instance method of StreamDSL
StreamDSL
streamDSL.mapStringify() ⇒ maps every stream event through JSON.stringify if its type is object
Kind: instance method of StreamDSL
StreamDSL
streamDSL.mapBufferKeyToString() ⇒ maps an object type event with a Buffer key field to an object event with a string key field
Kind: instance method of StreamDSL
StreamDSL
streamDSL.mapBufferValueToString() ⇒ maps an object type event with a Buffer value field to an object event with a string value field
Kind: instance method of StreamDSL
StreamDSL
streamDSL.mapStringValueToJSONObject() ⇒ maps an object type event with a string value field to an object event with (parsed) object value field
Kind: instance method of StreamDSL
StreamDSL
streamDSL.mapJSONConvenience() ⇒ takes a buffer kafka message and turns it into a json representation buffer key -> string buffer value -> string -> object
Kind: instance method of StreamDSL
StreamDSL
streamDSL.wrapAsKafkaValue(topic) ⇒ wraps an event value inside a kafka message object the event value will be used as value of the kafka message
Kind: instance method of StreamDSL
Param | Description |
---|---|
topic | optional |
StreamDSL
streamDSL.mapWrapKafkaValue() ⇒ maps every stream event's kafka message right to its payload value
Kind: instance method of StreamDSL
StreamDSL
streamDSL.atThroughput(count, callback) ⇒ taps to the stream counts messages and returns callback once (when message count is reached) with the current message at count
Kind: instance method of StreamDSL
Param | Type | Default |
---|---|---|
count | number | 1 |
callback | function |
StreamDSL
streamDSL.mapToFormat(type, getId) ⇒ - default kafka format stringify {} -> {payload, time, type, id} getId can be a function to read the id from the message e.g. getId = message -> message.id
Kind: instance method of StreamDSL
Param | Default |
---|---|
type | unknown-publish |
getId |
StreamDSL
streamDSL.mapFromFormat() ⇒ default kafka format parser {value: "{ payload: {} }" -> {}
Kind: instance method of StreamDSL
StreamDSL
streamDSL.timestamp(etl) ⇒ maps elements into {time, value} objects
Kind: instance method of StreamDSL
Param |
---|
etl |
StreamDSL
streamDSL.constant(substitute) ⇒ replace every element with the substitute value
Kind: instance method of StreamDSL
Param |
---|
substitute |
StreamDSL
streamDSL.scan(eff, initial) ⇒ mapping to incrementally accumulated results, starting with the provided initial value.
Kind: instance method of StreamDSL
Param |
---|
eff |
initial |
StreamDSL
streamDSL.slice(start, end) ⇒ slicing events from start ot end of index
Kind: instance method of StreamDSL
Param |
---|
start |
end |
StreamDSL
streamDSL.takeWhile(pred) ⇒ contain events until predicate returns false m -> !!m
Kind: instance method of StreamDSL
Param |
---|
pred |
StreamDSL
streamDSL.skipWhile(pred) ⇒ contain events after predicate returns false
Kind: instance method of StreamDSL
Param |
---|
pred |
StreamDSL
streamDSL.until(signal$) ⇒ contain events until signal$ emits first event signal$ must be a most stream instance
Kind: instance method of StreamDSL
Param |
---|
signal$ |
StreamDSL
streamDSL.since(signal$) ⇒ contain all events after signal$ emits first event signal$ must be a most stream instance
Kind: instance method of StreamDSL
Param |
---|
signal$ |
streamDSL.continueWith(f)
Replace the end signal with a new stream returned by f. Note that f must return a (most.js) stream.
Kind: instance method of StreamDSL
Param | Description |
---|---|
f | function (must return a most stream) |
\*
streamDSL.reduce(eff, initial) ⇒ reduce a stream to a single result will return a promise
Kind: instance method of StreamDSL
Returns: \*
- Promise
Param |
---|
eff |
initial |
StreamDSL
streamDSL.chainReduce(eff, initial, callback) ⇒ runs reduce on a multicast stream you probably would not want to use this in production
Kind: instance method of StreamDSL
Param |
---|
eff |
initial |
callback |
\*
streamDSL.drain() ⇒ drains the stream, equally to forEach without iterator, returns a promise
Kind: instance method of StreamDSL
Returns: \*
- Promise
StreamDSL
streamDSL.throttle(throttlePeriod) ⇒ limits rate events at most one per throttlePeriod throttlePeriod = index count omit
Kind: instance method of StreamDSL
Param |
---|
throttlePeriod |
StreamDSL
streamDSL.delay(delayTime) ⇒ delays every event in stream by given time
Kind: instance method of StreamDSL
Param |
---|
delayTime |
StreamDSL
streamDSL.debounce(debounceTime) ⇒ wait for a burst of events and emit only the last event
Kind: instance method of StreamDSL
Param |
---|
debounceTime |
StreamDSL
streamDSL.countByKey(key, countFieldName) ⇒ maps into counts per key requires events to have a present key/value field
Kind: instance method of StreamDSL
Param | Default |
---|---|
key | key |
countFieldName | count |
StreamDSL
streamDSL.sumByKey(key, fieldName, sumField) ⇒ maps into sums per key requires events to have a present key/value field
Kind: instance method of StreamDSL
Param | Default |
---|---|
key | key |
fieldName | value |
sumField | false |
StreamDSL
streamDSL.min(fieldName, minField) ⇒ collects the smallest value of the given field, will not alter the events in the stream use .getStorage().getMin() to get the latest value which is stored
Kind: instance method of StreamDSL
Param | Default |
---|---|
fieldName | value |
minField | min |
StreamDSL
streamDSL.max(fieldName, maxField) ⇒ collects the greatest value of the given field, will not alter the events in the stream use .getStorage().getMax() to get the latest value which is stored
Kind: instance method of StreamDSL
Param | Default |
---|---|
fieldName | value |
maxField | max |
streamDSL._merge(otherStream$)
merge this stream with another, resulting a stream with all elements from both streams
Kind: instance method of StreamDSL
Param |
---|
otherStream$ |
streamDSL._zip(otherStream$, combine)
merge this stream with another stream by combining (zipping) every event from each stream to a single new event on the new stream combine = (e1, e2) -> e1 + e2
Kind: instance method of StreamDSL
Param |
---|
otherStream$ |
combine |
Promise.<boolean>
streamDSL.to(topic, outputPartitionsCount, produceType, version, compressionType, producerErrorCallback, outputKafkaConfig) ⇒ define an output topic when passed to KafkaStreams this will trigger the stream$ result to be produced to the given topic name if the instance is a clone, this function call will have to setup a kafka producer returns a promise
Kind: instance method of StreamDSL
Param | Type | Default | Description |
---|---|---|---|
topic | string | Object | optional (can also be an object, containing the same parameters as fields) | |
outputPartitionsCount | number | 1 | optional |
produceType | string | "send" | optional |
version | number | 1 | optional |
compressionType | number | 0 | optional |
producerErrorCallback | function | optional | |
outputKafkaConfig | Object | optional |