Class Documentation

Summary

ClassDescription
KeyCountused to count keys in a stream
LastStateused to hold the last state of key values in a stream e.g. building KTables
Maxused to grab the highest value of key values in a stream
Minused grab the lowest value of key values in a stream
Sumused to sum up key values in a stream
Windowused to build windows of key value states in a stream
JSKafkaClient
NativeKafkaClient
KStreamchange-log representation of a stream
KTabletable representation of a stream
StreamDSLStream base class

KeyCount

used to count keys in a stream

Kind: global class

LastState

used to hold the last state of key values in a stream e.g. building KTables

Kind: global class

Max

used to grab the highest value of key values in a stream

Kind: global class

Min

used grab the lowest value of key values in a stream

Kind: global class

Sum

used to sum up key values in a stream

Kind: global class

Window

used to build windows of key value states in a stream

Kind: global class

JSKafkaClient

Kind: global class

new JSKafkaClient(topic, config)

KafkaClient (EventEmitter) that wraps an internal instance of a Sinek kafka- Consumer and/or Producer

Param
topic
config

jsKafkaClient.setProduceHandler(handler)

sets a handler for produce messages (emits whenever kafka messages are produced/delivered)

Kind: instance method of JSKafkaClient

ParamType
handlerEventEmitter

jsKafkaClient.getProduceHandler() ⇒ null | EventEmitter

returns the produce handler instance if present

Kind: instance method of JSKafkaClient

jsKafkaClient.overwriteTopics(topics)

overwrites the topic

Kind: instance method of JSKafkaClient

ParamType
topicsArray.<string>

jsKafkaClient.start(readyCallback, kafkaErrorCallback, withProducer, withBackPressure)

starts a new kafka consumer (using sinek's partition drainer) will await a kafka-producer-ready-event if started withProducer=true

Kind: instance method of JSKafkaClient

ParamDefault
readyCallback
kafkaErrorCallback
withProducerfalse
withBackPressurefalse

jsKafkaClient.setupProducer(produceTopic, partitions, readyCallback, kafkaErrorCallback, outputKafkaConfig)

starts a new kafka-producer using sinek's publisher will fire kafka-producer-ready-event requires a topic's partition count during initialisation

Kind: instance method of JSKafkaClient

ParamDefault
produceTopic
partitions1
readyCallback
kafkaErrorCallback
outputKafkaConfig

jsKafkaClient.send(topic, message) ⇒ *

simply produces a message or multiple on a topic if producerPartitionCount is > 1 it will randomize the target partition for the message/s

Kind: instance method of JSKafkaClient

Param
topic
message

jsKafkaClient.buffer(topic, identifier, payload, compressionType) ⇒ *

buffers a keyed message to be send a keyed message needs an identifier, if none is provided an uuid.v4() will be generated

Kind: instance method of JSKafkaClient

ParamDefault
topic
identifier
payload
compressionType0

jsKafkaClient.bufferFormat(topic, identifier, payload, version, compressionType) ⇒ *

buffers a keyed message in (a base json format) to be send a keyed message needs an identifier, if none is provided an uuid.4() will be generated

Kind: instance method of JSKafkaClient

ParamDefault
topic
identifier
payload
version1
compressionType0

NativeKafkaClient

Kind: global class

new NativeKafkaClient(topic, config, batchOptions)

NativeKafkaClient (EventEmitter) that wraps an internal instance of a Sinek native kafka- Consumer and/or Producer

ParamDescription
topic
config
batchOptionsoptional

nativeKafkaClient.setProduceHandler(handler)

sets a handler for produce messages (emits whenever kafka messages are produced/delivered)

Kind: instance method of NativeKafkaClient

ParamType
handlerEventEmitter

nativeKafkaClient.getProduceHandler() ⇒ null | EventEmitter

returns the produce handler instance if present

Kind: instance method of NativeKafkaClient

nativeKafkaClient.overwriteTopics(topics)

overwrites the topic

Kind: instance method of NativeKafkaClient

ParamType
topicsArray.<string>

nativeKafkaClient.start(readyCallback, kafkaErrorCallback, withProducer, withBackPressure)

starts a new kafka consumer will await a kafka-producer-ready-event if started withProducer=true

Kind: instance method of NativeKafkaClient

ParamDefault
readyCallback
kafkaErrorCallback
withProducerfalse
withBackPressurefalse

nativeKafkaClient.setupProducer(produceTopic, partitions, readyCallback, kafkaErrorCallback, outputKafkaConfig)

starts a new kafka-producer will fire kafka-producer-ready-event requires a topic's partition count during initialisation

Kind: instance method of NativeKafkaClient

ParamDefault
produceTopic
partitions1
readyCallback
kafkaErrorCallback
outputKafkaConfig

nativeKafkaClient.send(topicName, message, partition, key, partitionKey, opaqueKey) ⇒ Promise.<void>

simply produces a message or multiple on a topic if producerPartitionCount is > 1 it will randomize the target partition for the message/s

Kind: instance method of NativeKafkaClient

ParamDefaultDescription
topicName
message
partitionoptional
keyoptional
partitionKeyoptional
opaqueKeyoptional

nativeKafkaClient.buffer(topic, identifier, payload, _, partition, version, partitionKey) ⇒ Promise.<void>

buffers a keyed message to be send a keyed message needs an identifier, if none is provided an uuid.v4() will be generated

Kind: instance method of NativeKafkaClient

ParamDefaultDescription
topic
identifier
payload
_optional
partitionoptional
versionoptional
partitionKeyoptional

nativeKafkaClient.bufferFormat(topic, identifier, payload, version, _, partitionKey, partition) ⇒ Promise.<void>

buffers a keyed message in (a base json format) to be send a keyed message needs an identifier, if none is provided an uuid.4() will be generated

Kind: instance method of NativeKafkaClient

ParamDefaultDescription
topic
identifier
payload
version1optional
_optional
partitionKeyoptional
partitionoptional

KStream

change-log representation of a stream

Kind: global class

new KStream(topicName, storage, kafka, isClone)

creates a changelog representation of a stream join operations of kstream instances are synchronous and return new instances immediately

ParamTypeDefault
topicNamestring
storageKStorage
kafkaKafkaClient
isClonebooleanfalse

kStream.start(kafkaReadyCallback, kafkaErrorCallback, withBackPressure, outputKafkaConfig)

start kafka consumption prepare production of messages if necessary when called with zero or just a single callback argument this function will return a promise and use the callback for errors

Kind: instance method of KStream

ParamTypeDefaultDescription
kafkaReadyCallbackfunction | Objectcan also be an object (config)
kafkaErrorCallbackfunction
withBackPressurebooleanfalse
outputKafkaConfigObject

kStream.innerJoin(stream, key, windowed, combine) ⇒ KStream

Emits an output when both input sources have records with the same key. s1$:{object} + s2$:{object} -> j$:{left: s1$object, right: s2$object}

Kind: instance method of KStream

ParamTypeDefault
streamStreamDSL
keystring"key"
windowedbooleanfalse
combinefunction

kStream.outerJoin(stream)

Emits an output for each record in either input source. If only one source contains a key, the other is null

Kind: instance method of KStream

ParamType
streamStreamDSL

kStream.leftJoin(stream)

Emits an output for each record in the left or primary input source. If the other source does not have a value for a given key, it is set to null

Kind: instance method of KStream

ParamType
streamStreamDSL

kStream.merge(stream) ⇒ KStream

Emits an output for each record in any of the streams. Acts as simple merge of both streams. can be used with KStream or KTable instances returns a NEW KStream instance

Kind: instance method of KStream

ParamType
streamStreamDSL

kStream.fromMost() ⇒ KStream

creates a new KStream instance from a given most.js stream; the consume topic will be empty and therefore no consumer will be build

Kind: instance method of KStream

ParamTypeDescription
most.jsObjectstream

kStream.clone(cloneEvents, cloneDeep) ⇒ KStream

as only joins and window operations return new stream instances you might need a clone sometimes, which can be accomplished using this function

Kind: instance method of KStream

ParamTypeDefaultDescription
cloneEventsbooleanfalseif events in the stream should be cloned
cloneDeepbooleanfalseif events in the stream should be cloned deeply

kStream.branch(preds) ⇒ Array.<KStream>

Splits a stream into multiple branches based on cloning and filtering it depending on the passed predicates. [ (message) => message.key.startsWith("A"), (message) => message.key.startsWith("B"),

(message) => true ]

[ streamA, streamB, streamTrue ]

Kind: instance method of KStream

ParamType
predsArray.<function()>

kStream.window(from, to, etl, encapsulated, collect) ⇒ Object

builds a window'ed stream across all events of the current kstream when the first event with an exceeding "to" is received (or the abort() callback is called) the window closes and emits its "collected" values to the returned kstream from and to must be unix epoch timestamps in milliseconds (Date.now()) etl can be a function that should return the timestamp (event time) of from within the message e.g. m -> m.payload.createdAt if etl is not given, a timestamp of receiving will be used (processing time) for each event encapsulated refers to the result messages (defaults to true, they will be encapsulated in an object: {time, value}

Kind: instance method of KStream

ParamTypeDefaultDescription
fromnumber
tonumber
etlfunction
encapsulatedbooleantrueif event should stay encapsulated {time, value}
collectbooleantrueif events should be collected first before publishing to result stream

kStream.close() ⇒ Promise.<boolean>

closes the internal stream and all kafka open connections as well as KStorage connections

Kind: instance method of KStream

KTable

table representation of a stream

Kind: global class

new KTable(topicName, keyMapETL, storage, kafka, isClone)

creates a table representation of a stream join operations of ktable instances are asynchronous and return promises keyMapETL = v -> {key, value} (sync)

ParamTypeDefault
topicNamestring
keyMapETLfunction
storageKStorage
kafkaKafkaClient
isClonebooleanfalse

kTable.start(kafkaReadyCallback, kafkaErrorCallback, withBackPressure, outputKafkaConfig)

start kafka consumption prepare production of messages if necessary when called with zero or just a single callback argument this function will return a promise and use the callback for errors

Kind: instance method of KTable

ParamTypeDefaultDescription
kafkaReadyCallbackfunction | Objectcan also be an object (config)
kafkaErrorCallbackfunction
withBackPressurebooleanfalse
outputKafkaConfigObject

kTable.innerJoin(stream, key)

Emits an output when both input sources have records with the same key.

Kind: instance method of KTable

ParamTypeDefault
streamStreamDSL
keystring"key"

kTable.outerJoin(stream)

Emits an output for each record in either input source. If only one source contains a key, the other is null

Kind: instance method of KTable

ParamType
streamStreamDSL

kTable.leftJoin(stream)

Emits an output for each record in the left or primary input source. If the other source does not have a value for a given key, it is set to null

Kind: instance method of KTable

ParamType
streamStreamDSL

kTable.writeToTableStream(message)

write message to the internal stream

Kind: instance method of KTable

ParamType
messageany

kTable.consumeUntilMs(ms, finishedCallback) ⇒ KTable

consume messages until ms passed

Kind: instance method of KTable

ParamTypeDefault
msnumber1000
finishedCallbackfunction

kTable.consumeUntilCount(count, finishedCallback) ⇒ KTable

consume messages until a certain count is reached

Kind: instance method of KTable

ParamTypeDefault
countnumber1000
finishedCallbackfunction

kTable.consumeUntilLatestOffset(finishedCallback)

consume messages until latest offset of topic

Kind: instance method of KTable

ParamTypeDefault
finishedCallbackfunction

kTable.getTable() ⇒ Promise.<object>

returns the state of the internal KStorage

Kind: instance method of KTable

kTable.replay()

rewrites content of internal KStorage to the stream, every observer will receive the content as KV {key, value} object

Kind: instance method of KTable

kTable.merge(stream) ⇒ Promise.<KTable>

Emits an output for each record in any of the streams. Acts as simple merge of both streams. can be used with KStream or KTable instances returns a Promise with a NEW KTable instance

Kind: instance method of KTable

ParamType
streamStreamDSL

kTable.clone() ⇒ Promise.<KTable>

as only joins and window operations return new stream instances you might need a clone sometimes, which can be accomplished using this function

Kind: instance method of KTable

kTable.close() ⇒ Promise.<boolean>

closes the internal stream and all kafka open connections as well as KStorage connections

Kind: instance method of KTable

StreamDSL

Stream base class

Kind: global class

new StreamDSL(topicName, storage, kafka, isClone)

Stream base class that wraps around a private most.js stream$ and interacts with storages/actions and a kafka-client instance.

ParamTypeDefaultDescription
topicNamestring | Array.<string>can also be topics
storageKStorage
kafkaKafkaClient
isClonebooleanfalse

streamDSL.start()

dummy, should be overwritten

Kind: instance method of StreamDSL

streamDSL.getStats() ⇒ object

returns a stats object with information about the internal kafka clients

Kind: instance method of StreamDSL

streamDSL.getStorage() ⇒ KStorage

returns the internal KStorage instance

Kind: instance method of StreamDSL

streamDSL.writeToStream(message)

can be used to manually write message/events to the internal stream$

Kind: instance method of StreamDSL

ParamType
messageObject | Array.<Object>

streamDSL.getMost() ⇒ Object

returns the internal most.js stream

Kind: instance method of StreamDSL
Returns: Object - most.js stream

streamDSL.getNewMostFrom(array) ⇒ Stream.<any>

returns a new most stream from the given array

Kind: instance method of StreamDSL

Param
array

streamDSL.replaceInternalObservable(newStream$)

used to clone or during merges resets the internal event emitter to the new stream and replaces the internal stream with the merged new stream

Kind: instance method of StreamDSL

Param
newStream$

streamDSL.setProduceHandler(handler)

sets a handler for produce messages (emits whenever kafka messages are produced/delivered) events: produced, delivered

Kind: instance method of StreamDSL

ParamType
handlermodule:events.internal

streamDSL.createAndSetProduceHandler() ⇒ module:events.internal

creates (and returns) and sets a produce handler for this stream instance

Kind: instance method of StreamDSL

streamDSL.setKafkaStreamsReference(reference)

overwrites the internal kafkaStreams reference

Kind: instance method of StreamDSL

Param
reference

streamDSL.from(topicName) ⇒ StreamDSL

add more topic/s to the consumer

Kind: instance method of StreamDSL

ParamType
topicNamestring | Array.<string>

streamDSL.awaitPromises(etl) ⇒ StreamDSL

given a stream of promises, returns stream containing the fulfillment values etl = Promise -> v

Kind: instance method of StreamDSL

Param
etl

streamDSL.map(etl) ⇒ StreamDSL

simple synchronous map function etl = v -> v2

Kind: instance method of StreamDSL

Param
etl

streamDSL.asyncMap(etl) ⇒ StreamDSL

map that expects etl to return a Promise can be used to apply async maps to stream etl = v -> Promise

Kind: instance method of StreamDSL

Param
etl

streamDSL.concatMap(etl) ⇒ StreamDSL

transform each etl in stream into a stream, and then concatenate it onto the end of the resulting stream. etl = v -> stream(v2)

Kind: instance method of StreamDSL

Param
etl

streamDSL.forEach(eff) ⇒ \*

(do not use for side effects, except for a closing operation at the end of the stream) may not be used to chain eff = v -> void

Kind: instance method of StreamDSL
Returns: \* - Promise

Param
eff

streamDSL.chainForEach(eff, callback) ⇒ StreamDSL

runs forEach on a multicast stream you probably would not want to use this in production

Kind: instance method of StreamDSL

ParamDefault
eff
callback

streamDSL.tap(eff)

(alternative to forEach if in the middle of a stream operation chain) use this for side-effects errors in eff will break stream

Kind: instance method of StreamDSL

Param
eff

streamDSL.filter(pred) ⇒ StreamDSL

stream contains only events for which predicate returns true pred = v -> boolean

Kind: instance method of StreamDSL

Param
pred

streamDSL.skipRepeats() ⇒ StreamDSL

will remove duplicate messages be aware that this might take a lot of memory

Kind: instance method of StreamDSL

streamDSL.skipRepeatsWith(equals) ⇒ StreamDSL

skips repeats per your definition equals = (a,b) -> boolean

Kind: instance method of StreamDSL

Param
equals

streamDSL.skip(count) ⇒ StreamDSL

skips the amount of messages

Kind: instance method of StreamDSL

Param
count

streamDSL.take(count) ⇒ StreamDSL

takes the first messages until count and omits the rest

Kind: instance method of StreamDSL

Param
count

streamDSL.mapStringToArray(delimiter) ⇒ StreamDSL

easy string to array mapping you can pass your delimiter default is space "bla blup" => ["bla", "blup"]

Kind: instance method of StreamDSL

ParamDefault
delimiter

streamDSL.mapArrayToKV(keyIndex, valueIndex) ⇒ StreamDSL

easy array to key-value object mapping you can pass your own indices default is 0,1 ["bla", "blup"] => { key: "bla", value: "blup" }

Kind: instance method of StreamDSL

ParamDefault
keyIndex0
valueIndex1

streamDSL.mapStringToKV(delimiter, keyIndex, valueIndex) ⇒ StreamDSL

easy string to key-value object mapping you can pass your own delimiter and indices default is " " and 0,1 "bla blup" => { key: "bla", value: "blup" }

Kind: instance method of StreamDSL

ParamDefault
delimiter
keyIndex0
valueIndex1

streamDSL.mapJSONParse() ⇒ StreamDSL

maps every stream event through JSON.parse if its type is an object (if parsing fails, the error object will be returned)

Kind: instance method of StreamDSL

streamDSL.mapStringify() ⇒ StreamDSL

maps every stream event through JSON.stringify if its type is object

Kind: instance method of StreamDSL

streamDSL.mapBufferKeyToString() ⇒ StreamDSL

maps an object type event with a Buffer key field to an object event with a string key field

Kind: instance method of StreamDSL

streamDSL.mapBufferValueToString() ⇒ StreamDSL

maps an object type event with a Buffer value field to an object event with a string value field

Kind: instance method of StreamDSL

streamDSL.mapStringValueToJSONObject() ⇒ StreamDSL

maps an object type event with a string value field to an object event with (parsed) object value field

Kind: instance method of StreamDSL

streamDSL.mapJSONConvenience() ⇒ StreamDSL

takes a buffer kafka message and turns it into a json representation buffer key -> string buffer value -> string -> object

Kind: instance method of StreamDSL

streamDSL.wrapAsKafkaValue(topic) ⇒ StreamDSL

wraps an event value inside a kafka message object the event value will be used as value of the kafka message

Kind: instance method of StreamDSL

ParamDescription
topicoptional

streamDSL.mapWrapKafkaValue() ⇒ StreamDSL

maps every stream event's kafka message right to its payload value

Kind: instance method of StreamDSL

streamDSL.atThroughput(count, callback) ⇒ StreamDSL

taps to the stream counts messages and returns callback once (when message count is reached) with the current message at count

Kind: instance method of StreamDSL

ParamTypeDefault
countnumber1
callbackfunction

streamDSL.mapToFormat(type, getId) ⇒ StreamDSL

  • default kafka format stringify {} -> {payload, time, type, id} getId can be a function to read the id from the message e.g. getId = message -> message.id

Kind: instance method of StreamDSL

ParamDefault
typeunknown-publish
getId

streamDSL.mapFromFormat() ⇒ StreamDSL

default kafka format parser {value: "{ payload: {} }" -> {}

Kind: instance method of StreamDSL

streamDSL.timestamp(etl) ⇒ StreamDSL

maps elements into {time, value} objects

Kind: instance method of StreamDSL

Param
etl

streamDSL.constant(substitute) ⇒ StreamDSL

replace every element with the substitute value

Kind: instance method of StreamDSL

Param
substitute

streamDSL.scan(eff, initial) ⇒ StreamDSL

mapping to incrementally accumulated results, starting with the provided initial value.

Kind: instance method of StreamDSL

Param
eff
initial

streamDSL.slice(start, end) ⇒ StreamDSL

slicing events from start ot end of index

Kind: instance method of StreamDSL

Param
start
end

streamDSL.takeWhile(pred) ⇒ StreamDSL

contain events until predicate returns false m -> !!m

Kind: instance method of StreamDSL

Param
pred

streamDSL.skipWhile(pred) ⇒ StreamDSL

contain events after predicate returns false

Kind: instance method of StreamDSL

Param
pred

streamDSL.until(signal$) ⇒ StreamDSL

contain events until signal$ emits first event signal$ must be a most stream instance

Kind: instance method of StreamDSL

Param
signal$

streamDSL.since(signal$) ⇒ StreamDSL

contain all events after signal$ emits first event signal$ must be a most stream instance

Kind: instance method of StreamDSL

Param
signal$

streamDSL.continueWith(f)

Replace the end signal with a new stream returned by f. Note that f must return a (most.js) stream.

Kind: instance method of StreamDSL

ParamDescription
ffunction (must return a most stream)

streamDSL.reduce(eff, initial) ⇒ \*

reduce a stream to a single result will return a promise

Kind: instance method of StreamDSL
Returns: \* - Promise

Param
eff
initial

streamDSL.chainReduce(eff, initial, callback) ⇒ StreamDSL

runs reduce on a multicast stream you probably would not want to use this in production

Kind: instance method of StreamDSL

Param
eff
initial
callback

streamDSL.drain() ⇒ \*

drains the stream, equally to forEach without iterator, returns a promise

Kind: instance method of StreamDSL
Returns: \* - Promise

streamDSL.throttle(throttlePeriod) ⇒ StreamDSL

limits rate events at most one per throttlePeriod throttlePeriod = index count omit

Kind: instance method of StreamDSL

Param
throttlePeriod

streamDSL.delay(delayTime) ⇒ StreamDSL

delays every event in stream by given time

Kind: instance method of StreamDSL

Param
delayTime

streamDSL.debounce(debounceTime) ⇒ StreamDSL

wait for a burst of events and emit only the last event

Kind: instance method of StreamDSL

Param
debounceTime

streamDSL.countByKey(key, countFieldName) ⇒ StreamDSL

maps into counts per key requires events to have a present key/value field

Kind: instance method of StreamDSL

ParamDefault
keykey
countFieldNamecount

streamDSL.sumByKey(key, fieldName, sumField) ⇒ StreamDSL

maps into sums per key requires events to have a present key/value field

Kind: instance method of StreamDSL

ParamDefault
keykey
fieldNamevalue
sumFieldfalse

streamDSL.min(fieldName, minField) ⇒ StreamDSL

collects the smallest value of the given field, will not alter the events in the stream use .getStorage().getMin() to get the latest value which is stored

Kind: instance method of StreamDSL

ParamDefault
fieldNamevalue
minFieldmin

streamDSL.max(fieldName, maxField) ⇒ StreamDSL

collects the greatest value of the given field, will not alter the events in the stream use .getStorage().getMax() to get the latest value which is stored

Kind: instance method of StreamDSL

ParamDefault
fieldNamevalue
maxFieldmax

streamDSL._merge(otherStream$)

merge this stream with another, resulting a stream with all elements from both streams

Kind: instance method of StreamDSL

Param
otherStream$

streamDSL._zip(otherStream$, combine)

merge this stream with another stream by combining (zipping) every event from each stream to a single new event on the new stream combine = (e1, e2) -> e1 + e2

Kind: instance method of StreamDSL

Param
otherStream$
combine

streamDSL.to(topic, outputPartitionsCount, produceType, version, compressionType, producerErrorCallback, outputKafkaConfig) ⇒ Promise.<boolean>

define an output topic when passed to KafkaStreams this will trigger the stream$ result to be produced to the given topic name if the instance is a clone, this function call will have to setup a kafka producer returns a promise

Kind: instance method of StreamDSL

ParamTypeDefaultDescription
topicstring | Objectoptional (can also be an object, containing the same parameters as fields)
outputPartitionsCountnumber1optional
produceTypestring"send"optional
versionnumber1optional
compressionTypenumber0optional
producerErrorCallbackfunctionoptional
outputKafkaConfigObjectoptional