Class Documentation
Summary
| Class | Description |
|---|---|
| KeyCount | used to count keys in a stream |
| LastState | used to hold the last state of key values in a stream e.g. building KTables |
| Max | used to grab the highest value of key values in a stream |
| Min | used grab the lowest value of key values in a stream |
| Sum | used to sum up key values in a stream |
| Window | used to build windows of key value states in a stream |
| JSKafkaClient | |
| NativeKafkaClient | |
| KStream | change-log representation of a stream |
| KTable | table representation of a stream |
| StreamDSL | Stream base class |
KeyCount
used to count keys in a stream
Kind: global class
LastState
used to hold the last state of key values in a stream e.g. building KTables
Kind: global class
Max
used to grab the highest value of key values in a stream
Kind: global class
Min
used grab the lowest value of key values in a stream
Kind: global class
Sum
used to sum up key values in a stream
Kind: global class
Window
used to build windows of key value states in a stream
Kind: global class
JSKafkaClient
Kind: global class
- JSKafkaClient
- new JSKafkaClient(topic, config)
- .setProduceHandler(handler)
- .getProduceHandler() ⇒
null|EventEmitter - .overwriteTopics(topics)
- .start(readyCallback, kafkaErrorCallback, withProducer, withBackPressure)
- .setupProducer(produceTopic, partitions, readyCallback, kafkaErrorCallback, outputKafkaConfig)
- .send(topic, message) ⇒
\* - .buffer(topic, identifier, payload, compressionType) ⇒
\* - .bufferFormat(topic, identifier, payload, version, compressionType) ⇒
\*
new JSKafkaClient(topic, config)
KafkaClient (EventEmitter) that wraps an internal instance of a Sinek kafka- Consumer and/or Producer
| Param |
|---|
| topic |
| config |
jsKafkaClient.setProduceHandler(handler)
sets a handler for produce messages (emits whenever kafka messages are produced/delivered)
Kind: instance method of JSKafkaClient
| Param | Type |
|---|---|
| handler | EventEmitter |
jsKafkaClient.getProduceHandler() ⇒ null | EventEmitter
returns the produce handler instance if present
Kind: instance method of JSKafkaClient
jsKafkaClient.overwriteTopics(topics)
overwrites the topic
Kind: instance method of JSKafkaClient
| Param | Type |
|---|---|
| topics | Array.<string> |
jsKafkaClient.start(readyCallback, kafkaErrorCallback, withProducer, withBackPressure)
starts a new kafka consumer (using sinek's partition drainer) will await a kafka-producer-ready-event if started withProducer=true
Kind: instance method of JSKafkaClient
| Param | Default |
|---|---|
| readyCallback | |
| kafkaErrorCallback | |
| withProducer | false |
| withBackPressure | false |
jsKafkaClient.setupProducer(produceTopic, partitions, readyCallback, kafkaErrorCallback, outputKafkaConfig)
starts a new kafka-producer using sinek's publisher will fire kafka-producer-ready-event requires a topic's partition count during initialisation
Kind: instance method of JSKafkaClient
| Param | Default |
|---|---|
| produceTopic | |
| partitions | 1 |
| readyCallback | |
| kafkaErrorCallback | |
| outputKafkaConfig |
jsKafkaClient.send(topic, message) ⇒ *
simply produces a message or multiple on a topic if producerPartitionCount is > 1 it will randomize the target partition for the message/s
Kind: instance method of JSKafkaClient
| Param |
|---|
| topic |
| message |
jsKafkaClient.buffer(topic, identifier, payload, compressionType) ⇒ *
buffers a keyed message to be send a keyed message needs an identifier, if none is provided an uuid.v4() will be generated
Kind: instance method of JSKafkaClient
| Param | Default |
|---|---|
| topic | |
| identifier | |
| payload | |
| compressionType | 0 |
jsKafkaClient.bufferFormat(topic, identifier, payload, version, compressionType) ⇒ *
buffers a keyed message in (a base json format) to be send a keyed message needs an identifier, if none is provided an uuid.4() will be generated
Kind: instance method of JSKafkaClient
| Param | Default |
|---|---|
| topic | |
| identifier | |
| payload | |
| version | 1 |
| compressionType | 0 |
NativeKafkaClient
Kind: global class
- NativeKafkaClient
- new NativeKafkaClient(topic, config, batchOptions)
- .setProduceHandler(handler)
- .getProduceHandler() ⇒
null|EventEmitter - .overwriteTopics(topics)
- .start(readyCallback, kafkaErrorCallback, withProducer, withBackPressure)
- .setupProducer(produceTopic, partitions, readyCallback, kafkaErrorCallback, outputKafkaConfig)
- .send(topicName, message, partition, key, partitionKey, opaqueKey) ⇒
Promise.<void> - .buffer(topic, identifier, payload, _, partition, version, partitionKey) ⇒
Promise.<void> - .bufferFormat(topic, identifier, payload, version, _, partitionKey, partition) ⇒
Promise.<void>
new NativeKafkaClient(topic, config, batchOptions)
NativeKafkaClient (EventEmitter) that wraps an internal instance of a Sinek native kafka- Consumer and/or Producer
| Param | Description |
|---|---|
| topic | |
| config | |
| batchOptions | optional |
nativeKafkaClient.setProduceHandler(handler)
sets a handler for produce messages (emits whenever kafka messages are produced/delivered)
Kind: instance method of NativeKafkaClient
| Param | Type |
|---|---|
| handler | EventEmitter |
nativeKafkaClient.getProduceHandler() ⇒ null | EventEmitter
returns the produce handler instance if present
Kind: instance method of NativeKafkaClient
nativeKafkaClient.overwriteTopics(topics)
overwrites the topic
Kind: instance method of NativeKafkaClient
| Param | Type |
|---|---|
| topics | Array.<string> |
nativeKafkaClient.start(readyCallback, kafkaErrorCallback, withProducer, withBackPressure)
starts a new kafka consumer will await a kafka-producer-ready-event if started withProducer=true
Kind: instance method of NativeKafkaClient
| Param | Default |
|---|---|
| readyCallback | |
| kafkaErrorCallback | |
| withProducer | false |
| withBackPressure | false |
nativeKafkaClient.setupProducer(produceTopic, partitions, readyCallback, kafkaErrorCallback, outputKafkaConfig)
starts a new kafka-producer will fire kafka-producer-ready-event requires a topic's partition count during initialisation
Kind: instance method of NativeKafkaClient
| Param | Default |
|---|---|
| produceTopic | |
| partitions | 1 |
| readyCallback | |
| kafkaErrorCallback | |
| outputKafkaConfig |
nativeKafkaClient.send(topicName, message, partition, key, partitionKey, opaqueKey) ⇒ Promise.<void>
simply produces a message or multiple on a topic if producerPartitionCount is > 1 it will randomize the target partition for the message/s
Kind: instance method of NativeKafkaClient
| Param | Default | Description |
|---|---|---|
| topicName | ||
| message | ||
| partition | optional | |
| key | optional | |
| partitionKey | optional | |
| opaqueKey | optional |
nativeKafkaClient.buffer(topic, identifier, payload, _, partition, version, partitionKey) ⇒ Promise.<void>
buffers a keyed message to be send a keyed message needs an identifier, if none is provided an uuid.v4() will be generated
Kind: instance method of NativeKafkaClient
| Param | Default | Description |
|---|---|---|
| topic | ||
| identifier | ||
| payload | ||
| _ | optional | |
| partition | optional | |
| version | optional | |
| partitionKey | optional |
nativeKafkaClient.bufferFormat(topic, identifier, payload, version, _, partitionKey, partition) ⇒ Promise.<void>
buffers a keyed message in (a base json format) to be send a keyed message needs an identifier, if none is provided an uuid.4() will be generated
Kind: instance method of NativeKafkaClient
| Param | Default | Description |
|---|---|---|
| topic | ||
| identifier | ||
| payload | ||
| version | 1 | optional |
| _ | optional | |
| partitionKey | optional | |
| partition | optional |
KStream
change-log representation of a stream
Kind: global class
- KStream
- new KStream(topicName, storage, kafka, isClone)
- .start(kafkaReadyCallback, kafkaErrorCallback, withBackPressure, outputKafkaConfig)
- .innerJoin(stream, key, windowed, combine) ⇒
KStream - .outerJoin(stream)
- .leftJoin(stream)
- .merge(stream) ⇒
KStream - .fromMost() ⇒
KStream - .clone(cloneEvents, cloneDeep) ⇒
KStream - .branch(preds) ⇒
Array.<KStream> - .window(from, to, etl, encapsulated, collect) ⇒
Object - .close() ⇒
Promise.<boolean>
new KStream(topicName, storage, kafka, isClone)
creates a changelog representation of a stream join operations of kstream instances are synchronous and return new instances immediately
| Param | Type | Default |
|---|---|---|
| topicName | string | |
| storage | KStorage | |
| kafka | KafkaClient | |
| isClone | boolean | false |
kStream.start(kafkaReadyCallback, kafkaErrorCallback, withBackPressure, outputKafkaConfig)
start kafka consumption prepare production of messages if necessary when called with zero or just a single callback argument this function will return a promise and use the callback for errors
Kind: instance method of KStream
| Param | Type | Default | Description |
|---|---|---|---|
| kafkaReadyCallback | function | Object | can also be an object (config) | |
| kafkaErrorCallback | function | ||
| withBackPressure | boolean | false | |
| outputKafkaConfig | Object |
kStream.innerJoin(stream, key, windowed, combine) ⇒ KStream
Emits an output when both input sources have records with the same key. s1$:{object} + s2$:{object} -> j$:{left: s1$object, right: s2$object}
Kind: instance method of KStream
| Param | Type | Default |
|---|---|---|
| stream | StreamDSL | |
| key | string | "key" |
| windowed | boolean | false |
| combine | function |
kStream.outerJoin(stream)
Emits an output for each record in either input source. If only one source contains a key, the other is null
Kind: instance method of KStream
| Param | Type |
|---|---|
| stream | StreamDSL |
kStream.leftJoin(stream)
Emits an output for each record in the left or primary input source. If the other source does not have a value for a given key, it is set to null
Kind: instance method of KStream
| Param | Type |
|---|---|
| stream | StreamDSL |
kStream.merge(stream) ⇒ KStream
Emits an output for each record in any of the streams. Acts as simple merge of both streams. can be used with KStream or KTable instances returns a NEW KStream instance
Kind: instance method of KStream
| Param | Type |
|---|---|
| stream | StreamDSL |
kStream.fromMost() ⇒ KStream
creates a new KStream instance from a given most.js stream; the consume topic will be empty and therefore no consumer will be build
Kind: instance method of KStream
| Param | Type | Description |
|---|---|---|
| most.js | Object | stream |
kStream.clone(cloneEvents, cloneDeep) ⇒ KStream
as only joins and window operations return new stream instances you might need a clone sometimes, which can be accomplished using this function
Kind: instance method of KStream
| Param | Type | Default | Description |
|---|---|---|---|
| cloneEvents | boolean | false | if events in the stream should be cloned |
| cloneDeep | boolean | false | if events in the stream should be cloned deeply |
kStream.branch(preds) ⇒ Array.<KStream>
Splits a stream into multiple branches based on cloning and filtering it depending on the passed predicates. [ (message) => message.key.startsWith("A"), (message) => message.key.startsWith("B"),
(message) => true ]
[ streamA, streamB, streamTrue ]
Kind: instance method of KStream
| Param | Type |
|---|---|
| preds | Array.<function()> |
kStream.window(from, to, etl, encapsulated, collect) ⇒ Object
builds a window'ed stream across all events of the current kstream when the first event with an exceeding "to" is received (or the abort() callback is called) the window closes and emits its "collected" values to the returned kstream from and to must be unix epoch timestamps in milliseconds (Date.now()) etl can be a function that should return the timestamp (event time) of from within the message e.g. m -> m.payload.createdAt if etl is not given, a timestamp of receiving will be used (processing time) for each event encapsulated refers to the result messages (defaults to true, they will be encapsulated in an object: {time, value}
Kind: instance method of KStream
| Param | Type | Default | Description |
|---|---|---|---|
| from | number | ||
| to | number | ||
| etl | function | ||
| encapsulated | boolean | true | if event should stay encapsulated {time, value} |
| collect | boolean | true | if events should be collected first before publishing to result stream |
kStream.close() ⇒ Promise.<boolean>
closes the internal stream and all kafka open connections as well as KStorage connections
Kind: instance method of KStream
KTable
table representation of a stream
Kind: global class
- KTable
- new KTable(topicName, keyMapETL, storage, kafka, isClone)
- .start(kafkaReadyCallback, kafkaErrorCallback, withBackPressure, outputKafkaConfig)
- .innerJoin(stream, key)
- .outerJoin(stream)
- .leftJoin(stream)
- .writeToTableStream(message)
- .consumeUntilMs(ms, finishedCallback) ⇒
KTable - .consumeUntilCount(count, finishedCallback) ⇒
KTable - .consumeUntilLatestOffset(finishedCallback)
- .getTable() ⇒
Promise.<object> - .replay()
- .merge(stream) ⇒
Promise.<KTable> - .clone() ⇒
Promise.<KTable> - .close() ⇒
Promise.<boolean>
new KTable(topicName, keyMapETL, storage, kafka, isClone)
creates a table representation of a stream join operations of ktable instances are asynchronous and return promises keyMapETL = v -> {key, value} (sync)
| Param | Type | Default |
|---|---|---|
| topicName | string | |
| keyMapETL | function | |
| storage | KStorage | |
| kafka | KafkaClient | |
| isClone | boolean | false |
kTable.start(kafkaReadyCallback, kafkaErrorCallback, withBackPressure, outputKafkaConfig)
start kafka consumption prepare production of messages if necessary when called with zero or just a single callback argument this function will return a promise and use the callback for errors
Kind: instance method of KTable
| Param | Type | Default | Description |
|---|---|---|---|
| kafkaReadyCallback | function | Object | can also be an object (config) | |
| kafkaErrorCallback | function | ||
| withBackPressure | boolean | false | |
| outputKafkaConfig | Object |
kTable.innerJoin(stream, key)
Emits an output when both input sources have records with the same key.
Kind: instance method of KTable
| Param | Type | Default |
|---|---|---|
| stream | StreamDSL | |
| key | string | "key" |
kTable.outerJoin(stream)
Emits an output for each record in either input source. If only one source contains a key, the other is null
Kind: instance method of KTable
| Param | Type |
|---|---|
| stream | StreamDSL |
kTable.leftJoin(stream)
Emits an output for each record in the left or primary input source. If the other source does not have a value for a given key, it is set to null
Kind: instance method of KTable
| Param | Type |
|---|---|
| stream | StreamDSL |
kTable.writeToTableStream(message)
write message to the internal stream
Kind: instance method of KTable
| Param | Type |
|---|---|
| message | any |
kTable.consumeUntilMs(ms, finishedCallback) ⇒ KTable
consume messages until ms passed
Kind: instance method of KTable
| Param | Type | Default |
|---|---|---|
| ms | number | 1000 |
| finishedCallback | function |
kTable.consumeUntilCount(count, finishedCallback) ⇒ KTable
consume messages until a certain count is reached
Kind: instance method of KTable
| Param | Type | Default |
|---|---|---|
| count | number | 1000 |
| finishedCallback | function |
kTable.consumeUntilLatestOffset(finishedCallback)
consume messages until latest offset of topic
Kind: instance method of KTable
| Param | Type | Default |
|---|---|---|
| finishedCallback | function |
kTable.getTable() ⇒ Promise.<object>
returns the state of the internal KStorage
Kind: instance method of KTable
kTable.replay()
rewrites content of internal KStorage to the stream, every observer will receive the content as KV {key, value} object
Kind: instance method of KTable
kTable.merge(stream) ⇒ Promise.<KTable>
Emits an output for each record in any of the streams. Acts as simple merge of both streams. can be used with KStream or KTable instances returns a Promise with a NEW KTable instance
Kind: instance method of KTable
| Param | Type |
|---|---|
| stream | StreamDSL |
kTable.clone() ⇒ Promise.<KTable>
as only joins and window operations return new stream instances you might need a clone sometimes, which can be accomplished using this function
Kind: instance method of KTable
kTable.close() ⇒ Promise.<boolean>
closes the internal stream and all kafka open connections as well as KStorage connections
Kind: instance method of KTable
StreamDSL
Stream base class
Kind: global class
- Summary
- KeyCount
- LastState
- Max
- Min
- Sum
- Window
- JSKafkaClient
- new JSKafkaClient(topic, config)
- jsKafkaClient.setProduceHandler(handler)
- jsKafkaClient.getProduceHandler() ⇒
null|EventEmitter - jsKafkaClient.overwriteTopics(topics)
- jsKafkaClient.start(readyCallback, kafkaErrorCallback, withProducer, withBackPressure)
- jsKafkaClient.setupProducer(produceTopic, partitions, readyCallback, kafkaErrorCallback, outputKafkaConfig)
- jsKafkaClient.send(topic, message) ⇒
* - jsKafkaClient.buffer(topic, identifier, payload, compressionType) ⇒
* - jsKafkaClient.bufferFormat(topic, identifier, payload, version, compressionType) ⇒
*
- NativeKafkaClient
- new NativeKafkaClient(topic, config, batchOptions)
- nativeKafkaClient.setProduceHandler(handler)
- nativeKafkaClient.getProduceHandler() ⇒
null|EventEmitter - nativeKafkaClient.overwriteTopics(topics)
- nativeKafkaClient.start(readyCallback, kafkaErrorCallback, withProducer, withBackPressure)
- nativeKafkaClient.setupProducer(produceTopic, partitions, readyCallback, kafkaErrorCallback, outputKafkaConfig)
- nativeKafkaClient.send(topicName, message, partition, key, partitionKey, opaqueKey) ⇒
Promise.<void> - nativeKafkaClient.buffer(topic, identifier, payload, _, partition, version, partitionKey) ⇒
Promise.<void> - nativeKafkaClient.bufferFormat(topic, identifier, payload, version, _, partitionKey, partition) ⇒
Promise.<void>
- KStream
- new KStream(topicName, storage, kafka, isClone)
- kStream.start(kafkaReadyCallback, kafkaErrorCallback, withBackPressure, outputKafkaConfig)
- kStream.innerJoin(stream, key, windowed, combine) ⇒
KStream - kStream.outerJoin(stream)
- kStream.leftJoin(stream)
- kStream.merge(stream) ⇒
KStream - kStream.fromMost() ⇒
KStream - kStream.clone(cloneEvents, cloneDeep) ⇒
KStream - kStream.branch(preds) ⇒
Array.<KStream>
- [(message) => true ]](#message--true-)
- KTable
- new KTable(topicName, keyMapETL, storage, kafka, isClone)
- kTable.start(kafkaReadyCallback, kafkaErrorCallback, withBackPressure, outputKafkaConfig)
- kTable.innerJoin(stream, key)
- kTable.outerJoin(stream)
- kTable.leftJoin(stream)
- kTable.writeToTableStream(message)
- kTable.consumeUntilMs(ms, finishedCallback) ⇒
KTable - kTable.consumeUntilCount(count, finishedCallback) ⇒
KTable - kTable.consumeUntilLatestOffset(finishedCallback)
- kTable.getTable() ⇒
Promise.<object> - kTable.replay()
- kTable.merge(stream) ⇒
Promise.<KTable> - kTable.clone() ⇒
Promise.<KTable> - kTable.close() ⇒
Promise.<boolean>
- StreamDSL
- new StreamDSL(topicName, storage, kafka, isClone)
- streamDSL.start()
- streamDSL.getStats() ⇒
object - streamDSL.getStorage() ⇒
KStorage - streamDSL.writeToStream(message)
- streamDSL.getMost() ⇒
Object - streamDSL.getNewMostFrom(array) ⇒
Stream.<any> - streamDSL.replaceInternalObservable(newStream$)
- streamDSL.setProduceHandler(handler)
- streamDSL.createAndSetProduceHandler() ⇒
module:events.internal - streamDSL.setKafkaStreamsReference(reference)
- streamDSL.from(topicName) ⇒
StreamDSL - streamDSL.awaitPromises(etl) ⇒
StreamDSL - streamDSL.map(etl) ⇒
StreamDSL - streamDSL.asyncMap(etl) ⇒
StreamDSL - streamDSL.concatMap(etl) ⇒
StreamDSL - streamDSL.forEach(eff) ⇒
\* - streamDSL.chainForEach(eff, callback) ⇒
StreamDSL - streamDSL.tap(eff)
- streamDSL.filter(pred) ⇒
StreamDSL - streamDSL.skipRepeats() ⇒
StreamDSL - streamDSL.skipRepeatsWith(equals) ⇒
StreamDSL - streamDSL.skip(count) ⇒
StreamDSL - streamDSL.take(count) ⇒
StreamDSL - streamDSL.mapStringToArray(delimiter) ⇒
StreamDSL - streamDSL.mapArrayToKV(keyIndex, valueIndex) ⇒
StreamDSL - streamDSL.mapStringToKV(delimiter, keyIndex, valueIndex) ⇒
StreamDSL - streamDSL.mapJSONParse() ⇒
StreamDSL - streamDSL.mapStringify() ⇒
StreamDSL - streamDSL.mapBufferKeyToString() ⇒
StreamDSL - streamDSL.mapBufferValueToString() ⇒
StreamDSL - streamDSL.mapStringValueToJSONObject() ⇒
StreamDSL - streamDSL.mapJSONConvenience() ⇒
StreamDSL - streamDSL.wrapAsKafkaValue(topic) ⇒
StreamDSL - streamDSL.mapWrapKafkaValue() ⇒
StreamDSL - streamDSL.atThroughput(count, callback) ⇒
StreamDSL - streamDSL.mapToFormat(type, getId) ⇒
StreamDSL - streamDSL.mapFromFormat() ⇒
StreamDSL - streamDSL.timestamp(etl) ⇒
StreamDSL - streamDSL.constant(substitute) ⇒
StreamDSL - streamDSL.scan(eff, initial) ⇒
StreamDSL - streamDSL.slice(start, end) ⇒
StreamDSL - streamDSL.takeWhile(pred) ⇒
StreamDSL - streamDSL.skipWhile(pred) ⇒
StreamDSL - streamDSL.until(signal$) ⇒
StreamDSL - streamDSL.since(signal$) ⇒
StreamDSL - streamDSL.continueWith(f)
- streamDSL.reduce(eff, initial) ⇒
\* - streamDSL.chainReduce(eff, initial, callback) ⇒
StreamDSL - streamDSL.drain() ⇒
\* - streamDSL.throttle(throttlePeriod) ⇒
StreamDSL - streamDSL.delay(delayTime) ⇒
StreamDSL - streamDSL.debounce(debounceTime) ⇒
StreamDSL - streamDSL.countByKey(key, countFieldName) ⇒
StreamDSL - streamDSL.sumByKey(key, fieldName, sumField) ⇒
StreamDSL - streamDSL.min(fieldName, minField) ⇒
StreamDSL - streamDSL.max(fieldName, maxField) ⇒
StreamDSL - streamDSL._merge(otherStream$)
- streamDSL._zip(otherStream$, combine)
- streamDSL.to(topic, outputPartitionsCount, produceType, version, compressionType, producerErrorCallback, outputKafkaConfig) ⇒
Promise.<boolean>
new StreamDSL(topicName, storage, kafka, isClone)
Stream base class that wraps around a private most.js stream$ and interacts with storages/actions and a kafka-client instance.
| Param | Type | Default | Description |
|---|---|---|---|
| topicName | string | Array.<string> | can also be topics | |
| storage | KStorage | ||
| kafka | KafkaClient | ||
| isClone | boolean | false |
streamDSL.start()
dummy, should be overwritten
Kind: instance method of StreamDSL
streamDSL.getStats() ⇒ object
returns a stats object with information about the internal kafka clients
Kind: instance method of StreamDSL
streamDSL.getStorage() ⇒ KStorage
returns the internal KStorage instance
Kind: instance method of StreamDSL
streamDSL.writeToStream(message)
can be used to manually write message/events to the internal stream$
Kind: instance method of StreamDSL
| Param | Type |
|---|---|
| message | Object | Array.<Object> |
streamDSL.getMost() ⇒ Object
returns the internal most.js stream
Kind: instance method of StreamDSL
Returns: Object - most.js stream
streamDSL.getNewMostFrom(array) ⇒ Stream.<any>
returns a new most stream from the given array
Kind: instance method of StreamDSL
| Param |
|---|
| array |
streamDSL.replaceInternalObservable(newStream$)
used to clone or during merges resets the internal event emitter to the new stream and replaces the internal stream with the merged new stream
Kind: instance method of StreamDSL
| Param |
|---|
| newStream$ |
streamDSL.setProduceHandler(handler)
sets a handler for produce messages (emits whenever kafka messages are produced/delivered) events: produced, delivered
Kind: instance method of StreamDSL
| Param | Type |
|---|---|
| handler | module:events.internal |
streamDSL.createAndSetProduceHandler() ⇒ module:events.internal
creates (and returns) and sets a produce handler for this stream instance
Kind: instance method of StreamDSL
streamDSL.setKafkaStreamsReference(reference)
overwrites the internal kafkaStreams reference
Kind: instance method of StreamDSL
| Param |
|---|
| reference |
streamDSL.from(topicName) ⇒ StreamDSL
add more topic/s to the consumer
Kind: instance method of StreamDSL
| Param | Type |
|---|---|
| topicName | string | Array.<string> |
streamDSL.awaitPromises(etl) ⇒ StreamDSL
given a stream of promises, returns stream containing the fulfillment values etl = Promise -> v
Kind: instance method of StreamDSL
| Param |
|---|
| etl |
streamDSL.map(etl) ⇒ StreamDSL
simple synchronous map function etl = v -> v2
Kind: instance method of StreamDSL
| Param |
|---|
| etl |
streamDSL.asyncMap(etl) ⇒ StreamDSL
map that expects etl to return a Promise can be used to apply async maps to stream etl = v -> Promise
Kind: instance method of StreamDSL
| Param |
|---|
| etl |
streamDSL.concatMap(etl) ⇒ StreamDSL
transform each etl in stream into a stream, and then concatenate it onto the end of the resulting stream. etl = v -> stream(v2)
Kind: instance method of StreamDSL
| Param |
|---|
| etl |
streamDSL.forEach(eff) ⇒ \*
(do not use for side effects, except for a closing operation at the end of the stream) may not be used to chain eff = v -> void
Kind: instance method of StreamDSL
Returns: \* - Promise
| Param |
|---|
| eff |
streamDSL.chainForEach(eff, callback) ⇒ StreamDSL
runs forEach on a multicast stream you probably would not want to use this in production
Kind: instance method of StreamDSL
| Param | Default |
|---|---|
| eff | |
| callback |
streamDSL.tap(eff)
(alternative to forEach if in the middle of a stream operation chain) use this for side-effects errors in eff will break stream
Kind: instance method of StreamDSL
| Param |
|---|
| eff |
streamDSL.filter(pred) ⇒ StreamDSL
stream contains only events for which predicate returns true pred = v -> boolean
Kind: instance method of StreamDSL
| Param |
|---|
| pred |
streamDSL.skipRepeats() ⇒ StreamDSL
will remove duplicate messages be aware that this might take a lot of memory
Kind: instance method of StreamDSL
streamDSL.skipRepeatsWith(equals) ⇒ StreamDSL
skips repeats per your definition equals = (a,b) -> boolean
Kind: instance method of StreamDSL
| Param |
|---|
| equals |
streamDSL.skip(count) ⇒ StreamDSL
skips the amount of messages
Kind: instance method of StreamDSL
| Param |
|---|
| count |
streamDSL.take(count) ⇒ StreamDSL
takes the first messages until count and omits the rest
Kind: instance method of StreamDSL
| Param |
|---|
| count |
streamDSL.mapStringToArray(delimiter) ⇒ StreamDSL
easy string to array mapping you can pass your delimiter default is space "bla blup" => ["bla", "blup"]
Kind: instance method of StreamDSL
| Param | Default |
|---|---|
| delimiter | |
streamDSL.mapArrayToKV(keyIndex, valueIndex) ⇒ StreamDSL
easy array to key-value object mapping you can pass your own indices default is 0,1 ["bla", "blup"] => { key: "bla", value: "blup" }
Kind: instance method of StreamDSL
| Param | Default |
|---|---|
| keyIndex | 0 |
| valueIndex | 1 |
streamDSL.mapStringToKV(delimiter, keyIndex, valueIndex) ⇒ StreamDSL
easy string to key-value object mapping you can pass your own delimiter and indices default is " " and 0,1 "bla blup" => { key: "bla", value: "blup" }
Kind: instance method of StreamDSL
| Param | Default |
|---|---|
| delimiter | |
| keyIndex | 0 |
| valueIndex | 1 |
streamDSL.mapJSONParse() ⇒ StreamDSL
maps every stream event through JSON.parse if its type is an object (if parsing fails, the error object will be returned)
Kind: instance method of StreamDSL
streamDSL.mapStringify() ⇒ StreamDSL
maps every stream event through JSON.stringify if its type is object
Kind: instance method of StreamDSL
streamDSL.mapBufferKeyToString() ⇒ StreamDSL
maps an object type event with a Buffer key field to an object event with a string key field
Kind: instance method of StreamDSL
streamDSL.mapBufferValueToString() ⇒ StreamDSL
maps an object type event with a Buffer value field to an object event with a string value field
Kind: instance method of StreamDSL
streamDSL.mapStringValueToJSONObject() ⇒ StreamDSL
maps an object type event with a string value field to an object event with (parsed) object value field
Kind: instance method of StreamDSL
streamDSL.mapJSONConvenience() ⇒ StreamDSL
takes a buffer kafka message and turns it into a json representation buffer key -> string buffer value -> string -> object
Kind: instance method of StreamDSL
streamDSL.wrapAsKafkaValue(topic) ⇒ StreamDSL
wraps an event value inside a kafka message object the event value will be used as value of the kafka message
Kind: instance method of StreamDSL
| Param | Description |
|---|---|
| topic | optional |
streamDSL.mapWrapKafkaValue() ⇒ StreamDSL
maps every stream event's kafka message right to its payload value
Kind: instance method of StreamDSL
streamDSL.atThroughput(count, callback) ⇒ StreamDSL
taps to the stream counts messages and returns callback once (when message count is reached) with the current message at count
Kind: instance method of StreamDSL
| Param | Type | Default |
|---|---|---|
| count | number | 1 |
| callback | function |
streamDSL.mapToFormat(type, getId) ⇒ StreamDSL
- default kafka format stringify {} -> {payload, time, type, id} getId can be a function to read the id from the message e.g. getId = message -> message.id
Kind: instance method of StreamDSL
| Param | Default |
|---|---|
| type | unknown-publish |
| getId |
streamDSL.mapFromFormat() ⇒ StreamDSL
default kafka format parser {value: "{ payload: {} }" -> {}
Kind: instance method of StreamDSL
streamDSL.timestamp(etl) ⇒ StreamDSL
maps elements into {time, value} objects
Kind: instance method of StreamDSL
| Param |
|---|
| etl |
streamDSL.constant(substitute) ⇒ StreamDSL
replace every element with the substitute value
Kind: instance method of StreamDSL
| Param |
|---|
| substitute |
streamDSL.scan(eff, initial) ⇒ StreamDSL
mapping to incrementally accumulated results, starting with the provided initial value.
Kind: instance method of StreamDSL
| Param |
|---|
| eff |
| initial |
streamDSL.slice(start, end) ⇒ StreamDSL
slicing events from start ot end of index
Kind: instance method of StreamDSL
| Param |
|---|
| start |
| end |
streamDSL.takeWhile(pred) ⇒ StreamDSL
contain events until predicate returns false m -> !!m
Kind: instance method of StreamDSL
| Param |
|---|
| pred |
streamDSL.skipWhile(pred) ⇒ StreamDSL
contain events after predicate returns false
Kind: instance method of StreamDSL
| Param |
|---|
| pred |
streamDSL.until(signal$) ⇒ StreamDSL
contain events until signal$ emits first event signal$ must be a most stream instance
Kind: instance method of StreamDSL
| Param |
|---|
| signal$ |
streamDSL.since(signal$) ⇒ StreamDSL
contain all events after signal$ emits first event signal$ must be a most stream instance
Kind: instance method of StreamDSL
| Param |
|---|
| signal$ |
streamDSL.continueWith(f)
Replace the end signal with a new stream returned by f. Note that f must return a (most.js) stream.
Kind: instance method of StreamDSL
| Param | Description |
|---|---|
| f | function (must return a most stream) |
streamDSL.reduce(eff, initial) ⇒ \*
reduce a stream to a single result will return a promise
Kind: instance method of StreamDSL
Returns: \* - Promise
| Param |
|---|
| eff |
| initial |
streamDSL.chainReduce(eff, initial, callback) ⇒ StreamDSL
runs reduce on a multicast stream you probably would not want to use this in production
Kind: instance method of StreamDSL
| Param |
|---|
| eff |
| initial |
| callback |
streamDSL.drain() ⇒ \*
drains the stream, equally to forEach without iterator, returns a promise
Kind: instance method of StreamDSL
Returns: \* - Promise
streamDSL.throttle(throttlePeriod) ⇒ StreamDSL
limits rate events at most one per throttlePeriod throttlePeriod = index count omit
Kind: instance method of StreamDSL
| Param |
|---|
| throttlePeriod |
streamDSL.delay(delayTime) ⇒ StreamDSL
delays every event in stream by given time
Kind: instance method of StreamDSL
| Param |
|---|
| delayTime |
streamDSL.debounce(debounceTime) ⇒ StreamDSL
wait for a burst of events and emit only the last event
Kind: instance method of StreamDSL
| Param |
|---|
| debounceTime |
streamDSL.countByKey(key, countFieldName) ⇒ StreamDSL
maps into counts per key requires events to have a present key/value field
Kind: instance method of StreamDSL
| Param | Default |
|---|---|
| key | key |
| countFieldName | count |
streamDSL.sumByKey(key, fieldName, sumField) ⇒ StreamDSL
maps into sums per key requires events to have a present key/value field
Kind: instance method of StreamDSL
| Param | Default |
|---|---|
| key | key |
| fieldName | value |
| sumField | false |
streamDSL.min(fieldName, minField) ⇒ StreamDSL
collects the smallest value of the given field, will not alter the events in the stream use .getStorage().getMin() to get the latest value which is stored
Kind: instance method of StreamDSL
| Param | Default |
|---|---|
| fieldName | value |
| minField | min |
streamDSL.max(fieldName, maxField) ⇒ StreamDSL
collects the greatest value of the given field, will not alter the events in the stream use .getStorage().getMax() to get the latest value which is stored
Kind: instance method of StreamDSL
| Param | Default |
|---|---|
| fieldName | value |
| maxField | max |
streamDSL._merge(otherStream$)
merge this stream with another, resulting a stream with all elements from both streams
Kind: instance method of StreamDSL
| Param |
|---|
| otherStream$ |
streamDSL._zip(otherStream$, combine)
merge this stream with another stream by combining (zipping) every event from each stream to a single new event on the new stream combine = (e1, e2) -> e1 + e2
Kind: instance method of StreamDSL
| Param |
|---|
| otherStream$ |
| combine |
streamDSL.to(topic, outputPartitionsCount, produceType, version, compressionType, producerErrorCallback, outputKafkaConfig) ⇒ Promise.<boolean>
define an output topic when passed to KafkaStreams this will trigger the stream$ result to be produced to the given topic name if the instance is a clone, this function call will have to setup a kafka producer returns a promise
Kind: instance method of StreamDSL
| Param | Type | Default | Description |
|---|---|---|---|
| topic | string | Object | optional (can also be an object, containing the same parameters as fields) | |
| outputPartitionsCount | number | 1 | optional |
| produceType | string | "send" | optional |
| version | number | 1 | optional |
| compressionType | number | 0 | optional |
| producerErrorCallback | function | optional | |
| outputKafkaConfig | Object | optional |