Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

class proficloud.timeseries.proficloudio.MetricsStreamIO
Anchor
proficloud.timeseries.proficloudio.MetricsStreamIO
proficloud.timeseries.proficloudio.MetricsStreamIO
(connector, endpoint_id, metrics, intervalMs, bufferTime, **kwargs)

Bases: streamz.core.Stream

This class creates a “streamz”-stream from a ProvicloudV3Connector (or one of its child classes such as ProficloudMetrics).

Parameters:
  • connector (ProficloudIOMetrics (or subclass)) – An initialized connector.

  • metrics (list(str), str) – A list of metric names (or a single metric name) to query.

  • intervalMs (int) – Polling interval in milliseconds

  • bufferTime (dict) – The buffer time is the current date and time minus the specified value and unit. Possible unit values are “milliseconds”, “seconds”, “minutes”, “hours”, “days”, “weeks”, “months”, and “years”. For example, if the start time is 5 minutes, the query will return all matching data points for the last 5 minutes. Example value: { “value”: “10”, “unit”: “minutes” }

  • convertTimestamp (boolean) – Convert the timestamp to datetime (Default: False)

accumulate(func, start='--no-default--', returns_state=False, **kwargs)

Accumulate results with previous state

This performs running or cumulative reductions, applying the function to the previous total and the new element. The function should take two arguments, the previous accumulated state and the next element and it should return a new accumulated state, - state = func(previous_state, new_value) (returns_state=False) - state, result = func(previous_state, new_value) (returns_state=True)

where the new_state is passed to the next invocation. The state or result is emitted downstream for the two cases.

func: callable start: object

Initial value, passed as the value of previous_state on the first invocation. Defaults to the first submitted element

returns_state: boolean

If true then func should return both the state and the value to emit If false then both values are the same, and func returns one value

kwargs:

Keyword arguments to pass to func

A running total, producing triangular numbers

Code Block
languagepython
>>> source = Stream()
>>> source.accumulate(lambda acc, x: acc + x).sink(print)
>>> for i in range(5):
...     source.emit(i)
0
1
3
6
10

A count of number of events (including the current one)

Code Block
languagepython
>>> source = Stream()
>>> source.accumulate(lambda acc, x: acc + 1, start=0).sink(print)
>>> for _ in range(5):
...     source.emit(0)
1
2
3
4
5

Like the builtin “enumerate”.

Code Block
languagepython
>>> source = Stream()
>>> source.accumulate(lambda acc, x: ((acc[0] + 1, x), (acc[0], x)),
...                   start=(0, 0), returns_state=True
...                   ).sink(print)
>>> for i in range(3):
...     source.emit(0)
(0, 0)
(1, 0)
(2, 0)
buffer(n, **kwargs)

Allow results to pile up at this point in the stream

This allows results to buffer in place at various points in the stream. This can help to smooth flow through the system when backpressure is applied.

changeInterval(intervalMs)
collect(cache=None, metadata_cache=None, **kwargs)

Hold elements in a cache and emit them as a collection when flushed.

Code Block
languagepython
>>> source1 = Stream()
>>> source2 = Stream()
>>> collector = collect(source1)
>>> collector.sink(print)
>>> source2.sink(collector.flush)
>>> source1.emit(1)
>>> source1.emit(2)
>>> source2.emit('anything')  # flushes collector
...
[1, 2]
combine_latest(**kwargs)

Combine multiple streams together to a stream of tuples

This will emit a new tuple of all of the most recent elements seen from any stream.

emit_on : stream or list of streams or None

only emit upon update of the streams listed. If None, emit on update from any stream

zip

property concat
Anchor
proficloud.timeseries.proficloudio.MetricsStreamIO.concat
proficloud.timeseries.proficloudio.MetricsStreamIO.concat
connect(downstream)

Connect this stream to a downstream element.

downstream: Stream

The downstream stream to connect to

delay(interval, **kwargs)

Add a time delay to results

destroy(streams=None)

Disconnect this stream from any upstream sources

disconnect(downstream)

Disconnect this stream to a downstream element.

downstream: Stream

The downstream stream to disconnect from

emit(x, asynchronous=False, metadata=None)

Push data into the stream at this point

This is typically done only at source Streams but can theoretically be done at any point

x: any

an element of data

asynchronous:

emit asynchronously

metadata: list[dict], optional

Various types of metadata associated with the data element in x.

ref: RefCounter A reference counter used to check when data is done

static filenames
Anchor
proficloud.timeseries.proficloudio.MetricsStreamIO.filenames
proficloud.timeseries.proficloudio.MetricsStreamIO.filenames
(path, poll_interval=0.1, start=False, **kwargs)

Stream over filenames in a directory

path: string

Directory path or globstring over which to search for files

poll_interval: Number

Seconds between checking path

start: bool (False)

Whether to start running immediately; otherwise call stream.start() explicitly.

Code Block
languagepython
>>> source = Stream.filenames('path/to/dir')  
>>> source = Stream.filenames('path/to/*.csv', poll_interval=0.500)  
filter(predicate, *args, **kwargs)

Only pass through elements that satisfy the predicate

predicate : function

The predicate. Should return True or False, where True means that the predicate is satisfied.

args :

The arguments to pass to the predicate.

kwargs:

Keyword arguments to pass to predicate

Code Block
languagepython
>>> source = Stream()
>>> source.filter(lambda x: x % 2 == 0).sink(print)
>>> for i in range(5):
...     source.emit(i)
0
2
4
flatten(upstreams=None, stream_name=None, loop=None, asynchronous=None, ensure_io_loop=False)

Flatten streams of lists or iterables into a stream of elements

Code Block
languagepython
>>> source = Stream()
>>> source.flatten().sink(print)
>>> for x in [[1, 2, 3], [4, 5], [6, 7, 7]]:
...     source.emit(x)
1
2
3
4
5
6
7

partition

frequencies(**kwargs)

Count occurrences of elements

static from_http_server
Anchor
proficloud.timeseries.proficloudio.MetricsStreamIO.from_http_server
proficloud.timeseries.proficloudio.MetricsStreamIO.from_http_server
(port, path='/.*', start=False, server_kwargs=None, **kwargs)

Listen for HTTP POSTs on given port

Each connection will emit one event, containing the body data of the request

port : int

The port to listen on

path : str

Specific path to listen on. Can be regex, but content is not used.

start : bool

Whether to immediately startup the server. Usually you want to connect downstream nodes first, and then call .start().

server_kwargs : dict or None

If given, set of further parameters to pass on to HTTPServer

Code Block
languagepython
>>> source = Source.from_http_server(4567)  
static from_iterable
Anchor
proficloud.timeseries.proficloudio.MetricsStreamIO.from_iterable
proficloud.timeseries.proficloudio.MetricsStreamIO.from_iterable
(iterable, **kwargs)

Emits items from an iterable.

iterable: iterable

An iterable to emit messages from.

Code Block
languagepython
>>> source = Stream.from_iterable(range(3))
>>> L = source.sink_to_list()
>>> source.start()
>>> L
[0, 1, 2]
static from_kafka
Anchor
proficloud.timeseries.proficloudio.MetricsStreamIO.from_kafka
proficloud.timeseries.proficloudio.MetricsStreamIO.from_kafka
(topics, consumer_params, poll_interval=0.1, start=False, **kwargs)

Accepts messages from Kafka

Uses the confluent-kafka library, https://docs.confluent.io/current/clients/confluent-kafka-python/

topics: list of str

Labels of Kafka topics to consume from

consumer_params: dict

Settings to set up the stream, see https://docs.confluent.io/current/clients/confluent-kafka-python/#configuration https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md Examples: bootstrap.servers, Connection string(s) (host:port) by which to reach Kafka; group.id, Identity of the consumer. If multiple sources share the same group, each message will be passed to only one of them.

poll_interval: number

Seconds that elapse between polling Kafka for new messages

start: bool (False)

Whether to start polling upon instantiation

Code Block
languagepython
>>> source = Stream.from_kafka(['mytopic'],
...           {'bootstrap.servers': 'localhost:9092',
...            'group.id': 'streamz'})  
static from_kafka_batched
Anchor
proficloud.timeseries.proficloudio.MetricsStreamIO.from_kafka_batched
proficloud.timeseries.proficloudio.MetricsStreamIO.from_kafka_batched
(topic, consumer_params, poll_interval='1s', npartitions=1None, refresh_partitions=False, start=False, dask=False, max_batch_size=10000, keys=False, engine=None, **kwargs)

Get messages and keys (optional) from Kafka in batches

Uses the confluent-kafka library, https://docs.confluent.io/current/clients/confluent-kafka-python/

This source will emit lists of messages for each partition of a single given topic per time interval, if there is new data. If using dask, one future will be produced per partition per time-step, if there is data.

Checkpointing is achieved through the use of reference counting. A reference counter is emitted downstream for each batch of data. A callback is triggered when the reference count reaches zero and the offsets are committed back to Kafka. Upon the start of this function, the previously committed offsets will be fetched from Kafka and begin reading form there. This will guarantee at-least-once semantics.

topic: str

Kafka topic to consume from

consumer_params: dict

Settings to set up the stream, seehttps://docs.confluent.io/current/clients/confluent-kafka-python/#configurationhttps://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.mdExamples:bootstrap.servers: Connection string(s) (host:port) by which to reach Kafka groupKafkagroup.id: Identity of the consumer. If multiple sources share the same groupsamegroup, each message will be passed to only one of them.

poll_interval: number

Seconds that elapse between polling Kafka for new messages

npartitions: int (None)

Number of partitions in the topic.If None, streamz will poll Kafka to get the number of partitions.

refresh_partitions: bool (False)

Useful if the user expects to increase the number of topic partitions on thefly, maybe to handle spikes in load. Streamz polls Kafka in every batch todetermine the current number of partitions. If partitions have been added,streamz will automatically start reading data from the new partitions as well.If set to False, streamz will not accommodate adding partitions on the fly.It is recommended to restart the stream after decreasing the number of partitions.

start: bool (False)

Whether to start polling upon instantiation

max_batch_size: int

The maximum number of messages per partition to be consumed per batch

keys: bool (False)

Whether to extract keys along with the messages.If True, this will yield each message as a dict:{‘key’:msg.key(), ‘value’:msg.value()}

engine: str (None)

If engine is set to “cudf”, streamz reads data (messages must be JSON)from Kafka in an accelerated manner directly into cuDF (GPU) dataframes.This is done using the RAPIDS custreamz library.

Please refer to RAPIDS cudf API here:https://docs.rapids.ai/api/cudf/stable/

This is done using the custreamz.kafka module in cudf. custreamz.kafka has the exact same API as Confluent Kafka, so it serves as a drop-in replacement with minimal duplication of code. But under the hood, it reads messages from librdkafka and directly uploads them to the GPU as a cuDF dataframe instead of gathering all the messages back from C++ into Python. This essentially avoids the GIL issue described in the Confluent Kafka consumer: https://github.com/confluentinc/confluent-kafka-python/issues/597, and hence enables readingfrom Kafka in a faster fashion with fewer processes. This accelerated reader also adheres to the checkpointing mechanism in streamz.

Folks interested in trying out custreamz would benefit from this accelerated Kafka Folks interested in trying out custreamz would benefit from thisaccelerated Kafka reader. If one does not want to use GPUs, they can theycan use streamz as is, with the default engine=None.

To use this option, one must install custreamz (use the appropriate theappropriate CUDA version recipe & Python version)using the following commanda command like the one below, which will install all GPU allGPU dependencies , including and streamz itself: conda env create –name custreamz –file custreamz_dev_cuda10.1.yml The file custreamz_dev_cuda10.1.yml can be downloaded from

conda install -c rapidsai-nightly -c nvidia -c conda-forge | -c defaults custreamz=0.15 python=3.7 cudatoolkit=10.2

More information at: https://githubrapids.com/jdye64/cudf/blob/kratos/conda/environments/custreamz_dev_cuda10.1.yml Instead of creating a new conda environment, one can alternatively install all the packages mentioned in the .yml file. But creating a new conda environment with the .yml script installs everything cleanly. Also, this is temporary (as mentioned below) until custreamz.kafka code gets merged into cudf/custreamz, after which it be a single line nightly package install.

The accelerated Kafka datasource will soon be officially merged into RAPIDS custreamz. Then, custreamz can simply be installed using: https://anaconda.org/rapidsai-nightly/custreamz, instead of the command above.

Please refer to RAPIDS custreamz.kafka API here: https://github.com/jdye64/cudf/blob/kratos/python/custreamz/custreamz/kafka.py

If ‘auto.offset.reset’: ‘latest’ is set in the consumer configs, the stream starts reading messages from the latest offset. Else, if it’s set to ‘earliest’, it will read from the start offset.

Code Block
languagepython
>>> source = Stream.from_kafka_batched('mytopic', ... {'bootstrap.servers': 'localhost:9092', ...

ai/start.html

Important Kafka Configurations By default, a stream will start reading from the latest offsets available. Please set ‘auto.offset.reset’: ‘earliest’ in the consumer configs, if the stream needs to start processing from the earliest offsets.

Code Block
languagepython
>>> source = Stream.from_kafka_batched('mytopic',
...           {'bootstrap.servers': 'localhost:9092',
...              'group.id': 'streamz'}, npartitions=4)  
static from_processperiodic
Anchor
proficloud.timeseries.proficloudio.MetricsStreamIO.from_processperiodic
proficloud.timeseries.proficloudio.MetricsStreamIO.from_processperiodic
(cmdcallback, openpoll_kwargsinterval=None, with_0.1, **kwargs)

Generate data from a function on given period

cf streamz.dataframe.PeriodicDataFrame

callback: callable

Function to call on each iteration. Takes no arguments.

poll_interval: float

Time to sleep between calls (s)

static from_process
Anchor
proficloud.timeseries.proficloudio.MetricsStreamIO.from_process
proficloud.timeseries.proficloudio.MetricsStreamIO.from_process
(cmd, open_kwargs=None, with_stderr=False, start=False with_end=True, **kwargs)

Messages from a running external process

This doesn’t work on Windows

cmd : list of str or str

Command to run: program name, followed by arguments

open_kwargs : dict

To pass on the the process open function, see subprocess.Popen.

with_stderr : bool

Whether to include the process STDERR in the stream

start : bool

Whether to immediately startup the process. Usually you want to connect downstream nodes first, and then call .start().

Code Block
languagepython
>>> source = Source.from_process(['ping', 'localhost'])  
static from_tcp
Anchor
proficloud.timeseries.proficloudio.MetricsStreamIO.from_tcp
proficloud.timeseries.proficloudio.MetricsStreamIO.from_tcp
(port, delimiter=b'\n', start=False, server_kwargs=None, **kwargs)

Creates events by reading from a socket using tornado TCPServer

The stream of incoming bytes is split on a given delimiter, and the parts become the emitted events.

port : int

The port to open and listen on. It only gets opened when the source is started, and closed upon stop()

delimiter : bytes

The incoming data will be split on this value. The resulting events will still have the delimiter at the end.

start : bool

Whether to immediately initiate the source. You probably want to set up downstream nodes first.

server_kwargs : dict or None

If given, additional arguments to pass to TCPServer

Code Block
languagepython
>>> source = Source.from_tcp(4567)  
static from_textfile
Anchor
proficloud.timeseries.proficloudio.MetricsStreamIO.from_textfile
proficloud.timeseries.proficloudio.MetricsStreamIO.from_textfile
(f, poll_interval=0.1, delimiter='\n', start=False, from_end=False, **kwargs)

Stream data from a text file

f: file or string

Source of the data. If string, will be opened.

poll_interval: Number

Interval to poll file for new data in seconds

delimiter: str

Character(s) to use to split the data into parts

start: bool

Whether to start running immediately; otherwise call stream.start() explicitly.

from_end: bool

Whether to begin streaming from the end of the file (i.e., only emit lines appended after the stream starts).

Code Block
languagepython
>>> source = Stream.from_textfile('myfile.json')  
>>> source.map(json.loads).pluck('value').sum().sink(print)  
>>> source.start()  

Stream

gather()

This is a no-op for core streamz

This allows gather to be used in both dask and core streams

header = None

The header for DataFrame creation with the streamz package

latest(**kwargs)

Drop held-up data and emit the latest result

This allows you to skip intermediate elements in the stream if there is some back pressure causing a slowdown. Use this when you only care about the latest elements, and are willing to lose older data.

This passes through values without modification otherwise.

Code Block
languagepython
>>> source.map(f).latest().map(g)  
map(func, *args, **kwargs)

Apply a function to every element in the stream

func: callable args :

The arguments to pass to the function.

kwargs:

Keyword arguments to pass to func

Code Block
languagepython
>>> source = Stream()
>>> source.map(lambda x: 2*x).sink(print)
>>> for i in range(5):
...     source.emit(i)
0
2
4
6
8
partition(n, timeout=None, key=None, **kwargs)

Partition stream into tuples of equal size

Code Block
languagepython
>>> source
n: int

Maximum partition size

timeout: int or float, optional

Number of seconds after which a partition will be emitted, even if its size is less than n. If None (default), a partition will be emitted only when its size reaches n.

key: hashable or callable, optional

Emit items with the same key together as a separate partition. If key is callable, partition will be identified by key(x), otherwise by x[key]. Defaults to None.

Code Block
languagepython
>>> source = Stream()
>>> source.partition(3).sink(print)
>>> for i in range(10):
...     source.emit(i)
(0, 1, 2)
(3, 4, 5)
(6, 7, 8)
pluck(pick, **kwargs)

Select elements from elements in the stream.

pluck : object, list
The element(s
)
to pick from the incoming element in the stream If an instance of list, will pick multiple elements.
Code Block
languagepython
>>> source = Stream()
>>> source.pluckpartition([0, 3]2, key=lambda x: x % 2).sink(print)
>>> for xi in [[1, 2, 3, 4], [4, 5, 6, 7], [8, 9, 10, 11]]range(4):
...     source.emit(xi)
(10, 42)
(41, 7)
(8, 11)3)
Code Block
languagepython
>>> from time import sleep
>>> source = Stream()
>>> source.pluck('name'partition(5, timeout=1).sink(print)
>>> for xi in [{'name': 'Alice', 'x': 123}, {'name': 'Bob', 'x': 456}]:
range(3):
...     source.emit(x)
'Alice'
'Bob'
poll_metrics()

Polling co-routine. This retrieves metrics from the connector.

rate_limit(interval
i)
>>> sleep(1)
(0, 1, 2)
partition_unique(n: int, key: collections.abc.Hashable = <function identity>, keep: str = 'first', **kwargs)
proficloud.timeseries.proficloudio.MetricsStreamIO.scanproficloud.timeseries.proficloudio.MetricsStreamIO.scan
scatter(**kwargs)

Convert local stream to Dask Stream

All elements flowing through the input will be scattered out to the cluster

sink(func, *args, **kwargs)

Limit the flow of data

This stops two elements of streaming through in an interval shorter than the provided value.

interval: float

Time in seconds

classmethod register_api Anchorproficloud.timeseries.proficloudio.MetricsStreamIO.register_apiproficloud.timeseries.proficloudio.MetricsStreamIO.register_api (modifier=<function identity>)

Add callable to Stream API

This allows you to register a new method onto this class. You can use it as a decorator.:

Code Block
languagepython
linenumbersfalse
>>> @Stream.register_api()
... class foo(Stream):
...     ...

>>> Stream().foo(...)  # this works now

It attaches the callable as a normal attribute to the class object. In doing so it respsects inheritance (all subclasses of Stream will also get the foo attribute).

By default callables are assumed to be instance methods. If you like you can include modifiers to apply before attaching to the class as in the following case where we construct a staticmethod.

Code Block
languagepython
>>> @Stream.register_api(staticmethod)
... class foo(Stream):
...     ...
Code Block
languagepython
>>> Stream.foo(...)  # Foo operates as a static method
remove(predicate)

Only pass through elements for which the predicate returns False

property scan Anchor

Apply a function on every elementPartition stream elements into groups of equal size with unique keys only.

n: int

Number of (unique) elements to pass through as a group.

key: Union[Hashable, Callable[[Any], Hashable]]

Callable that accepts a stream element and returns a unique, hashable representation of the incoming data (key(x)), or a hashable that gets the corresponding value of a stream element (x[key]). For example, key=lambda x: x["a"] would allow only elements with unique "a" values to pass through.

Info

By default, we simply use the element object itself as the key, so that object must be hashable. If that’s not the case, a non-default key must be provided.

keep: str

Which element to keep in the case that a unique key is already found in the group. If “first”, keep element from the first occurrence of a given key; if “last”, keep element from the most recent occurrence. Note that relative ordering of elements is preserved in the data passed through, and not ordering of keys.

kwargs

Code Block
languagepython
>>> source = Stream()
>>> stream = source.partition_unique(n=3, keep="first").sink(print)
>>> eles = [1, 2, 1, 3, 1, 3, 3, 2]
>>> for ele in eles:
...     source.emit(ele)
(1, 2, 3)
(1, 3, 2)
Code Block
languagepython
>>> source = Stream()
>>> Lstream = list()
>>> source.sink(L.append)
>>> source.sink(print)
>>> sourcepartition_unique(n=3, keep="last").sink(print)
>>> source.emit(123)
123
123
>>> L
[123]

map Stream.sink_to_list

sink_to_list()
Append all elements of a stream to a list as they come in
eles = [1, 2, 1, 3, 1, 3, 3, 2]
>>> for ele in eles:
...     source.emit(ele)
(2, 1, 3)
(1, 3, 2)
Code Block
languagepython
>>> source = Stream()
>>> Lstream = source.map(partition_unique(n=3, key=lambda x: 10 * xlen(x), keep="last").sink_to_list(print)
>>> foreles i in range(5):
...     = ["f", "fo", "f", "foo", "f", "foo", "foo", "fo"]
>>> for ele in eles:
...     source.emit(iele)
>>> L
[0, 10, 20, 30, 40]
slice(start=None, end=None, step=None
('fo', 'f', 'foo')
('f', 'foo', 'fo')
pluck(pick, **kwargs)

Get only some events in a stream by position. Works like list[] syntax.

start : int

First event to use. If None, start from the beginnning

end : int

Last event to use (non-inclusive). If None, continue without stopping. Does not support negative indexing.

step : int

Pass on every Nth event. If None, pass every one.

Select elements from elements in the stream.

pluck : object, list

The element(s) to pick from the incoming element in the stream If an instance of list, will pick multiple elements.

Code Block
languagepython
>>> source = Stream()
>>> source.slicepluck(2[0, 6, 23]).sink(print)
>>> for ix in range(5) [[1, 2, 3, 4], [4, 5, 6, 7], [8, 9, 10, 11]]:
...     source.emit(0x)
2(1, 4
sliding_window(n, return_partial=True, **kwargs)

Produce overlapping tuples of size n

return_partial : bool

If True, yield tuples as soon as any events come in, each tuple being smaller or equal to the window size. If False, only start yielding tuples once a full window has accrued.

Code Block
languagepython
)
(4, 7)
(8, 11)
Code Block
languagepython
>>> source = Stream()
>>> source.sliding_window(3, return_partial=Falsepluck('name').sink(print)
>>> for ix in range(8) [{'name': 'Alice', 'x': 123}, {'name': 'Bob', 'x': 456}]:
...     source.emit(ix)
'Alice'
(0, 1, 2)
(1, 2, 3)
(2, 3, 4)
(3, 4, 5)
(4, 5, 6)
(5, 6, 7)
starmap(func, *args, **kwargs)

Apply a function to every element in the stream, splayed out

See itertools.starmap

func: callable args :

The arguments to pass to the function.

kwargs:

Keyword arguments to pass to func

Code Block
languagepython
>>> source = Stream()
>>> source.starmap(lambda a, b: a + b).sink(print)
>>> for i in range(5'Bob'
poll_metrics()

Polling co-routine. This retrieves metrics from the connector.

rate_limit(interval, **kwargs)

Limit the flow of data

This stops two elements of streaming through in an interval shorter than the provided value.

interval: float

Time in seconds

classmethod register_api
Anchor
proficloud.timeseries.proficloudio.MetricsStreamIO.register_api
proficloud.timeseries.proficloudio.MetricsStreamIO.register_api
(modifier=<function identity>, attribute_name=None)

Add callable to Stream API

This allows you to register a new method onto this class. You can use it as a decorator.:

Code Block
languagepython
linenumbersfalse
>>> @Stream.register_api()
... class foo(Stream):
...     source.emit((i, i))
0
2
4
6
8
start()

Start the stream.

stop()

Stop the stream.

str_list = ['func', 'predicate', 'n', 'interval']
timed_window(interval, **kwargs)

Emit a tuple of collected results every interval

Every interval seconds this emits a tuple of all of the results seen so far. This can help to batch data coming off of a high-volume stream.

to_batch(**kwargs)

Convert a stream of lists to a Batch

All elements of the stream are assumed to be lists or tuples
...

>>> Stream().foo(...)  # this works now

It attaches the callable as a normal attribute to the class object. In doing so it respects inheritance (all subclasses of Stream will also get the foo attribute).

By default callables are assumed to be instance methods. If you like you can include modifiers to apply before attaching to the class as in the following case where we construct a staticmethod.

Code Block
languagepython
>>> source = Stream()
>>> batches = source.to_batch()
>>> L = batches.pluck('value').map(inc).sum().stream.sink_to_list()
>>> source.emit([{'name': 'Alice', 'value': 1},
@Stream.register_api(staticmethod)
... class foo(Stream):
...     ...
Code Block
languagepython
>>> Stream.foo(...)  # Foo operates as a static method

You can also provide an optional attribute_name argument to control the name of the attribute your callable will be attached as.

Code Block
languagepython
>>> @Stream.register_api(attribute_name="bar")
... class foo(Stream):
...     
{'name': 'Bob', 'value': 2}, ... {'name': 'Charlie', 'value': 3}]) >>> source.emit([{'name': 'Alice', 'value': 4}, ... {'name': 'Bob', 'value': 5}, ... {'name': 'Charlie', 'value': 6}])
to_dataframe(example)

Convert a stream of Pandas dataframes to a DataFrame

Code Block
languagepython
>>> source = Stream()
>>> sdf = source.to_dataframe()
>>> L = sdf.groupby(sdf.x).y.mean().stream.sink_to_list()
>>> source.emit(pd.DataFrame(...))  
>>> source.emit(pd.DataFrame(...))  
>>> source.emit(pd.DataFrame(...))  
to_kafka(topic, producer_config, **kwargs)

Writes data in the stream to Kafka

This stream accepts a string or bytes object. Call flush to ensure all messages are pushed. Responses from Kafka are pushed downstream.

topic : string

The topic which to write

producer_config : dict
Settings to set up the stream, see https://docs.confluent.io/current/clients/confluent-kafka-python/#configuration https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md Examples: bootstrap.servers: Connection string (host:port) to Kafka
...

>> Stream().bar(…) # foo was actually attached as bar

classmethod register_plugin_entry_point
Anchor
proficloud.timeseries.proficloudio.MetricsStreamIO.register_plugin_entry_point
proficloud.timeseries.proficloudio.MetricsStreamIO.register_plugin_entry_point
(entry_point, modifier=<function identity>)
remove(predicate)

Only pass through elements for which the predicate returns False

property scan
Anchor
proficloud.timeseries.proficloudio.MetricsStreamIO.scan
proficloud.timeseries.proficloudio.MetricsStreamIO.scan
scatter(**kwargs)

Convert local stream to Dask Stream

All elements flowing through the input will be scattered out to the cluster

sink(func, *args, **kwargs)

Apply a function on every element

func: callable

A function that will be applied on every element.

args:

Positional arguments that will be passed to func after the incoming element.

kwargs:

Stream-specific arguments will be passed to Stream.__init__, the rest of them will be passed to func.

Code Block
languagepython
>>> fromsource streamz import= Stream()
>>> ARGSL = {'bootstrap.servers': 'localhost:9092'}list()
>>> source = Stream()
>>> kafka.sink(L.append)
>>> source.sink(print)
>>> source.sink(print)
>>> source.emit(123)
123
123
>>> L
[123]

map Stream.sink_to_list

sink_to_list()

Append all elements of a stream to a list as they come in

Code Block
languagepython
>>> source = Stream()
>>> L = source.map(lambda x: 10 * str(x)).sink_to_kafka('test', ARGS)
<to_kafka>list()
>>> for i in range(105):
...     source.emit(i)
>>> kafka.flush()
union(**kwargs)

Combine multiple streams into one

Every element from any of the upstreams streams will immediately flow into the output stream. They will not be combined with elements from other streams.

Stream.zip Stream.combine_latest

unique(maxsize=None, key=<function identity>, hashable=True
 L
[0, 10, 20, 30, 40]
sink_to_textfile(file, end='\n', mode='a', **kwargs)

Avoid sending through repeated elements

This deduplicates a stream so that only new elements pass through. You can control how much of a history is stored with the maxsize= parameter. For example setting maxsize=1 avoids sending through elements when one is repeated right after the other.

maxsize: int or None, optional

number of stored unique values to check against

key : function, optional

Function which returns a representation of the incoming data. For example key=lambda x: x['a'] could be used to allow only pieces of data with unique 'a' values to pass through.

hashable : bool, optional
If True then data is assumed to be hashable, else it is not. This is used for determining how to cache the history, if hashable then either dicts or LRU caches are used, otherwise a deque is used. Defaults to True.

Write elements to a plain text file, one element per line.

Type of elements must be str.

file: str or file-like

File to write the elements to. str is treated as a file name to open. If file-like, descriptor must be open in text mode. Note that the file descriptor will be closed when this sink is destroyed.

end: str, optional

This value will be written to the file after each element. Defaults to newline character.

mode: str, optional

If file is str, file will be opened in this mode. Defaults to "a" (append mode).

Code Block
languagepython
>>> source = Stream()
>>> source.uniquemap(maxsize=1str).sink(print_to_textfile("test.txt")
>>> for x in [1, 1, 2, 2, 2, 1, 3]:
...     source.emit(x)
1
2
1
3
update(x, who=None, metadata=None)
property upstream Anchorproficloud.timeseries.proficloudio.MetricsStreamIO.upstreamproficloud.timeseries.proficloudio.MetricsStreamIO.upstream
visualize(filename='mystream.png', **kwargs)

Render the computation of this object’s task graph using graphviz.

Requires graphviz and networkx to be installed.

filename : str, optional

The name of the file to write to disk.

kwargs:

Graph attributes to pass to graphviz like rankdir="LR"

zip(**kwargs)

Combine streams together into a stream of tuples

We emit a new tuple once all streams have produce a new tuple.

combine_latest zip_latest

zip_latest(*upstreams, **kwargs)

Combine multiple streams together to a stream of tuples

The stream which this is called from is lossless. All elements from the lossless stream are emitted reguardless of when they came in. This will emit a new tuple consisting of an element from the lossless stream paired with the latest elements from the other streams. Elements are only emitted when an element on the lossless stream are received, similar to combine_latest with the emit_on flag.

Stream.combine_latest Stream.zip

class proficloud.timeseries.proficloudio.ProficloudIOMetrics Anchorproficloud.timeseries.proficloudio.ProficloudIOMetricsproficloud.timeseries.proficloudio.ProficloudIOMetrics (staging=False)

Bases: object

DEBUGRESPONSE = False

Debug http-responses. When set to true, the attribute DEBUGRESPONSECONTENT then contains the last raw response. Does not work when calling API in parallel using the same instance!

DEBUGRESPONSECONTENT = None

Contains the last response if DEBUGRESPONSE is set to True.

DEBUGTIME = False

Debug request response times (print)

authenticate(username, password)
static convert_response Anchorproficloud.timeseries.proficloudio.ProficloudIOMetrics.convert_responseproficloud.timeseries.proficloudio.ProficloudIOMetrics.convert_response (response, uuid, fillNaMethod=None, dropTrailingNa=True, convertTimestamp=False)

Used existing function from package as base. Description follows.

static dateparse Anchorproficloud.timeseries.proficloudio.ProficloudIOMetrics.dateparseproficloud.timeseries.proficloudio.ProficloudIOMetrics.dateparse (datestring)
queryMetrics(uuid, metrics, start_time=None, end_time=None, createDf=True, fillNaMethod=None, dropTrailingNa=True, orderDesc=False)

Query metrics and return the data as pandas DataFrame.

Parameters:
  • metrics (list(str), str) – A list of metric names (or a single metric name) to query.

  • start_time (int, datetime, str or None) – Timestamp (ms based), or datetime object (or datetime as string) not used when None. (Default: None)

  • end_time (int, datetime, str or None) – Timestamp (ms based), or datetime object (or datetime as string). This must be later in time than the start time. If not specified, the end time is assumed to be the current date and time. (Default: None)

  • orderDesc (boolean) – Orders returned data points based on timestamp. Descending order when True, or ascending when False (default) Only in effect when returning DataFrame.

  • createDF (boolean) – Convert response into a convenient Pandas Dataframe. (Default=True)

  • fillNaMethod (str) – {‘backfill’, ‘bfill’, ‘pad’, ‘ffill’, None}, default None. Method to use for filling holes in reindexed Series pad / ffill: propagate last valid observation forward to next valid backfill / bfill: use NEXT valid observation to fill gap

  • dropTrailingNa (boolean) – Drop trailing NaN values when set to true. Especially useful when end neither end param is specified (filters time delay when querying multiple metrics). Default: True

Return type:

pandas.DataFrame or dict

Returns:

Returns pandas.DataFrame

...

proficloud.timeseries.proficloudio.get_time_series(app_name, time_series_names, from_date, to_date, endpoint_id, sort='ASC')

Returns time series data points within the specified time range ordered by timestamp and grouped by endpoints.

Parameters:
  • app_name – Application name

  • time_series_names – One or more time series names.

  • from_date – Start date to retrieve data points.

  • to_date – End date to retrieve data points.

  • endpoint_id – One or more endpoint IDs. If not specified, data is returned for all available endpoints.

  • sort – Sorting order by timestamp. (one of ‘ASC’, ‘DESC’ - default: ‘ASC’)

Returns:

json with time-series data

...

proficloud.timeseries.proficloudutil module

...

proficloud.timeseries.proficloudutil.datetimeToTimestampMs(dt, datetimeformat='%Y-%m-%d %H:%M:%S.%f')

Convert a given string or datetime object to a millisecond based timestamp. No conversion is performed when a timestamp (int) is given as an input. The format string can be set using the datetimeformat attribute.

Parameters:

dt (datetime, str or int) – The datetime or datetime string

Returns:

Millisecond based timestamp. None when conversion not successful.

Return type:

int

...

proficloud.timeseries.proficloudutil.getTimeOffsetToUtc()

Get the offset of local time to UTC time at pool.ntp.org. :return: Offset in seconds :rtype: float

...

proficloud.timeseries.proficloudutil.pandasDfFromQueryResponse(query, fillNaMethod=None, dropTrailingNa=True, convertTimestamp=False, orderDesc=False)

Convert a kairosdb query response to pandas DataFrame. Timestamp contains timestamp in milliseconds.

Parameters:
  • query (list) – The result from the query method (list of deserialised json responses from kairosdb (one for each metric))

  • fillNaMethod (str) – Method to fill in missing values: ‘backfill’, ‘bfill’, ‘pad’, ‘ffill’, None, (default=None)

  • dropTrailingNa (boolean) – Drop trailing NaN values when set to true. Especially useful when end neither end param is specified (filters time delay when querying multiple metrics). Default: True

  • convertTimestamp (boolean) – Convert the timestamp to datetime (Default: False)

  • orderDesc (boolean) – Orders returned data points based on timestamp. Descending order when True, or ascending when False (default)

Return type:

pandas.DataFrame

Returns:

Pandas DataFrame.

...

proficloud.timeseries.proficloudutil.timestampMsToDatetime(ts)

Takes a timestamp and returns a datetime object. The timestamp is millisecond based and can be in float (default datetime convention) or an int.

Parameters:

ts (float, int) – The timestamp

Returns:

datetime

Return type:

datetime.datetime

...

proficloud.timeseries.stream module

class proficloud.timeseries.stream.MetricsStream Anchorproficloud.timeseries.stream.MetricsStreamproficloud.timeseries.stream.MetricsStream (connector, metrics, intervalMs, bufferTime, convertTimestamp=False, convertDf=True, **kwargs)

Bases: streamz.core.Stream

This class creates a “streamz”-stream from a KairosConnector (or one of its child classes such as ProficloudMetrics).

Parameters:
  • connector (KairosConnector (or subclass)) – An initialized connector.

  • metrics (list(str), str) – A list of metric names (or a single metric name) to query.

  • intervalMs (int) – Polling interval in milliseconds

  • bufferTime (dict) – The buffer time is the current date and time minus the specified value and unit. Possible unit values are “milliseconds”, “seconds”, “minutes”, “hours”, “days”, “weeks”, “months”, and “years”. For example, if the start time is 5 minutes, the query will return all matching data points for the last 5 minutes. Example value: { “value”: “10”, “unit”: “minutes” }

  • convertTimestamp (boolean) – Convert the timestamp to datetime (Default: False)

accumulate(func, start='--no-default--', returns_state=False, **kwargs)

Accumulate results with previous state

This performs running or cumulative reductions, applying the function to the previous total and the new element. The function should take two arguments, the previous accumulated state and the next element and it should return a new accumulated state, - state = func(previous_state, new_value) (returns_state=False) - state, result = func(previous_state, new_value) (returns_state=True)

where the new_state is passed to the next invocation. The state or result is emitted downstream for the two cases.

func: callable start: object

Initial value, passed as the value of previous_state on the first invocation. Defaults to the first submitted element

returns_state: boolean

If true then func should return both the state and the value to emit If false then both values are the same, and func returns one value

kwargs:

Keyword arguments to pass to func

A running total, producing triangular numbers

Code Block
languagepython
>>> source = Stream()
>>> source.accumulate(lambda acc, x: acc + x).sink(print)
>>> for i in range(5):
...     source.emit(i)
0
1
3
6
10

A count of number of events (including the current one)

Code Block
languagepython
>>> source = Stream()
>>> source.accumulate(lambda acc, x: acc + 1, start=0).sink(print)
>>> for _ in range(5):
...     source.emit(0)
1
2
3
4
5
Like the builtin “enumerate”.
source.emit(0)
>>> source.emit(1)
>>> print(open("test.txt", "r").read())
0
1
slice(start=None, end=None, step=None, **kwargs)

Get only some events in a stream by position. Works like list[] syntax.

start : int

First event to use. If None, start from the beginnning

end : int

Last event to use (non-inclusive). If None, continue without stopping. Does not support negative indexing.

step : int

Pass on every Nth event. If None, pass every one.

Code Block
languagepython
>>> source = Stream()
>>> source.slice(2, 6, 2).sink(print)
>>> for i in range(5):
...     source.emit(0)
2
4
sliding_window(n, return_partial=True, **kwargs)

Produce overlapping tuples of size n

return_partial : bool

If True, yield tuples as soon as any events come in, each tuple being smaller or equal to the window size. If False, only start yielding tuples once a full window has accrued.

Code Block
languagepython
>>> source = Stream()
>>> source.sliding_window(3, return_partial=False).sink(print)
>>> for i in range(8):
...     source.emit(i)
(0, 1, 2)
(1, 2, 3)
(2, 3, 4)
(3, 4, 5)
(4, 5, 6)
(5, 6, 7)
starmap(func, *args, **kwargs)

Apply a function to every element in the stream, splayed out

See itertools.starmap

func: callable args :

The arguments to pass to the function.

kwargs:

Keyword arguments to pass to func

Code Block
languagepython
>>> source = Stream()
>>> source.starmap(lambda a, b: a + b).sink(print)
>>> for i in range(5):
...     source.emit((i, i))
0
2
4
6
8
start()

Start the stream.

stop()

Stop the stream.

str_list = ['func', 'predicate', 'n', 'interval']
timed_window(interval, **kwargs)

Emit a tuple of collected results every interval

Every interval seconds this emits a tuple of all of the results seen so far. This can help to batch data coming off of a high-volume stream.

timed_window_unique(interval: Union[int, str], key: collections.abc.Hashable = <function identity>, keep: str = 'first', **kwargs)

Emit a group of elements with unique keys every interval seconds.

interval: Union[int, str]

Number of seconds over which to group elements, or a pandas-style duration string that can be converted into seconds.

key: Union[Hashable, Callable[[Any], Hashable]]

Callable that accepts a stream element and returns a unique, hashable representation of the incoming data (key(x)), or a hashable that gets the corresponding value of a stream element (x[key]). For example, both key=lambda x: x["a"] and key="a" would allow only elements with unique "a" values to pass through.

Info

By default, we simply use the element object itself as the key, so that object must be hashable. If that’s not the case, a non-default key must be provided.

keep: str

Which element to keep in the case that a unique key is already found in the group. If “first”, keep element from the first occurrence of a given key; if “last”, keep element from the most recent occurrence. Note that relative ordering of elements is preserved in the data passed through, and not ordering of keys.

Code Block
languagepython
>>> source = Stream()

Get unique hashable elements in a window, keeping just the first occurrence: >>> stream = source.timed_window_unique(interval=1.0, keep=”first”).sink(print) >>> for ele in [1, 2, 3, 3, 2, 1]: … source.emit(ele) () (1, 2, 3) ()

Get unique hashable elements in a window, keeping just the last occurrence: >>> stream = source.timed_window_unique(interval=1.0, keep=”last”).sink(print) >>> for ele in [1, 2, 3, 3, 2, 1]: … source.emit(ele) () (3, 2, 1) ()

Get unique elements in a window by (string) length, keeping just the first occurrence: >>> stream = source.timed_window_unique(interval=1.0, key=len, keep=”first”) >>> for ele in [“f”, “b”, “fo”, “ba”, “foo”, “bar”]: … source.emit(ele) () (‘f’, ‘fo’, ‘foo’) ()

Get unique elements in a window by (string) length, keeping just the last occurrence: >>> stream = source.timed_window_unique(interval=1.0, key=len, keep=”last”) >>> for ele in [“f”, “b”, “fo”, “ba”, “foo”, “bar”]: … source.emit(ele) () (‘b’, ‘ba’, ‘bar’) ()

to_batch(**kwargs)

Convert a stream of lists to a Batch

All elements of the stream are assumed to be lists or tuples

Code Block
languagepython
>>> source = Stream()
>>> batches = source.to_batch()
>>> L = batches.pluck('value').map(inc).sum().stream.sink_to_list()
>>> source.emit([{'name': 'Alice', 'value': 1},
...              {'name': 'Bob', 'value': 2},
...              {'name': 'Charlie', 'value': 3}])
>>> source.emit([{'name': 'Alice', 'value': 4},
...              {'name': 'Bob', 'value': 5},
...              {'name': 'Charlie', 'value': 6}])
to_dataframe(example)

Convert a stream of Pandas dataframes to a DataFrame

Code Block
languagepython
>>> source = Stream()
>>> sdf = source.accumulate(lambda acc, x: ((acc[0] + 1, x), (acc[0], x)),
...                   start=(0, 0), returns_state=True
...                   ).sink(print)
>>> for i in range(3):
...     source.emit(0)
(0, 0)
(1, 0)
(2, 0)
buffer(n, **kwargs)

Allow results to pile up at this point in the stream

This allows results to buffer in place at various points in the stream. This can help to smooth flow through the system when backpressure is applied.

changeInterval(intervalMs)
collect(cache=None, metadata_cache=None, **kwargs)
Hold elements in a cache and emit them as a collection when flushed.
to_dataframe()
>>> L = sdf.groupby(sdf.x).y.mean().stream.sink_to_list()
>>> source.emit(pd.DataFrame(...))  
>>> source.emit(pd.DataFrame(...))  
>>> source.emit(pd.DataFrame(...))  
to_kafka(topic, producer_config, **kwargs)

Writes data in the stream to Kafka

This stream accepts a string or bytes object. Call flush to ensure all messages are pushed. Responses from Kafka are pushed downstream.

topic : string

The topic which to write

producer_config : dict

Settings to set up the stream, see https://docs.confluent.io/current/clients/confluent-kafka-python/#configuration https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md Examples: bootstrap.servers: Connection string (host:port) to Kafka

Code Block
languagepython
>>> source1from = Stream()
>>> source2 =streamz import Stream()
>>> collectorARGS = collect(source1){'bootstrap.servers': 'localhost:9092'}
>>> collector.sink(printsource = Stream()
>>> source2.sink(collector.flush)
>>> source1.emit(1)
>>> source1.emit(2)
>>> source2.emit('anything')  # flushes collector
...
[1, 2]
combine_latest
kafka = source.map(lambda x: str(x)).to_kafka('test', ARGS)
<to_kafka>
>>> for i in range(10):
...     source.emit(i)
>>> kafka.flush()
union(**kwargs)

Combine multiple streams together to a stream of tuples

This will emit a new tuple of all of the most recent elements seen from any stream.

emit_on : stream or list of streams or None

only emit upon update of the streams listed. If None, emit on update from any stream

zip

property concat Anchorproficloud.timeseries.stream.MetricsStream.concatproficloud.timeseries.stream.MetricsStream.concat
connect(downstream)

Connect this stream to a downstream element.

downstream: Stream

The downstream stream to connect to

convertDf = None

The header for DataFrame creation with the streamz package

delay(interval, **kwargs)

Add a time delay to results

destroy(streams=None)

Disconnect this stream from any upstream sources

disconnect(downstream)

Disconnect this stream to a downstream element.

downstream: Stream

The downstream stream to disconnect from

emit(x, asynchronous=False, metadata=None)

Push data into the stream at this point

This is typically done only at source Streams but can theoretically be done at any point

x: any

an element of data

asynchronous:

emit asynchronously

metadata: list[dict], optional

Various types of metadata associated with the data element in x.

ref: RefCounter A reference counter used to check when data is done

static filenames Anchorproficloud.timeseries.stream.MetricsStream.filenamesproficloud.timeseries.stream.MetricsStream.filenames (path, poll_interval=0.1, start=False, **kwargs)

Stream over filenames in a directory

path: string

Directory path or globstring over which to search for files

poll_interval: Number

Seconds between checking path

start: bool (False)

Whether to start running immediately; otherwise call stream.start() explicitly.

Code Block
languagepython
>>> source = Stream.filenames('path/to/dir')  
>>> source = Stream.filenames('path/to/*.csv', poll_interval=0.500)  
filter(predicate, *args, **kwargs)

Only pass through elements that satisfy the predicate

predicate : function

The predicate. Should return True or False, where True means that the predicate is satisfied.

args :

The arguments to pass to the predicate.

kwargs:

Keyword arguments to pass to predicate

Code Block
languagepython
>>> source = Stream()
>>> source.filter(lambda x: x % 2 == 0).sink(print)
>>> for i in range(5):
...     source.emit(i)
0
2
4
flatten(upstreams=None, stream_name=None, loop=None, asynchronous=None, ensure_io_loop=False)

Flatten streams of lists or iterables into a stream of elements

Code Block
languagepython
>>> source = Stream()
>>> source.flatten().sink(print)
>>> for x in [[1, 2, 3], [4, 5], [6, 7, 7]]:
...     source.emit(x)
1
2
3
4
5
6
7

partition

frequencies(**kwargs)

Count occurrences of elements

static from_http_server Anchorproficloud.timeseries.stream.MetricsStream.from_http_serverproficloud.timeseries.stream.MetricsStream.from_http_server (port, path='/.*', start=False, server_kwargs=None)

Listen for HTTP POSTs on given port

Each connection will emit one event, containing the body data of the request

port : int

The port to listen on

path : str

Specific path to listen on. Can be regex, but content is not used.

start : bool

Whether to immediately startup the server. Usually you want to connect downstream nodes first, and then call .start().

server_kwargs : dict or None

If given, set of further parameters to pass on to HTTPServer

Code Block
languagepython
>>> source = Source.from_http_server(4567)  
static from_kafka Anchorproficloud.timeseries.stream.MetricsStream.from_kafkaproficloud.timeseries.stream.MetricsStream.from_kafka (topics, consumer_params, poll_interval=0.1, start=False, **kwargs)

Accepts messages from Kafka

Uses the confluent-kafka library, https://docs.confluent.io/current/clients/confluent-kafka-python/

topics: list of str

Labels of Kafka topics to consume from

consumer_params: dict

Settings to set up the stream, see https://docs.confluent.io/current/clients/confluent-kafka-python/#configuration https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md Examples: bootstrap.servers, Connection string(s) (host:port) by which to reach Kafka; group.id, Identity of the consumer. If multiple sources share the same group, each message will be passed to only one of them.

poll_interval: number

Seconds that elapse between polling Kafka for new messages

start: bool (False)

Whether to start polling upon instantiation

Code Block
languagepython
>>> source = Stream.from_kafka(['mytopic'],
...           {'bootstrap.servers': 'localhost:9092',
...            'group.id': 'streamz'})  
static from_kafka_batched Anchorproficloud.timeseries.stream.MetricsStream.from_kafka_batchedproficloud.timeseries.stream.MetricsStream.from_kafka_batched (topic, consumer_params, poll_interval='1s', npartitions=1, start=False, dask=False, max_batch_size=10000, keys=False, engine=None, **kwargs)

Get messages and keys (optional) from Kafka in batches

Uses the confluent-kafka library, https://docs.confluent.io/current/clients/confluent-kafka-python/

This source will emit lists of messages for each partition of a single given topic per time interval, if there is new data. If using dask, one future will be produced per partition per time-step, if there is data.

Checkpointing is achieved through the use of reference counting. A reference counter is emitted downstream for each batch of data. A callback is triggered when the reference count reaches zero and the offsets are committed back to Kafka. Upon the start of this function, the previously committed offsets will be fetched from Kafka and begin reading form there. This will guarantee at-least-once semantics.

topic: str

Kafka topic to consume from

consumer_params: dict

Settings to set up the stream, see https://docs.confluent.io/current/clients/confluent-kafka-python/#configuration https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md Examples: bootstrap.servers: Connection string(s) (host:port) by which to reach Kafka group.id: Identity of the consumer. If multiple sources share the same group, each message will be passed to only one of them.

poll_interval: number

Seconds that elapse between polling Kafka for new messages

npartitions: int

Number of partitions in the topic

start: bool (False)

Whether to start polling upon instantiation

max_batch_size: int

The maximum number of messages per partition to be consumed per batch

keys: bool (False)

Whether to extract keys along with the messages. If True, this will yield each message as a dict: {‘key’:msg.key(), ‘value’:msg.value()}

engine: str (None)

If engine is set to “cudf”, streamz reads data (messages must be JSON) from Kafka in an accelerated manner directly into cuDF (GPU) dataframes.

Please refer to RAPIDS cudf API here: https://docs.rapids.ai/api/cudf/stable/

This is done using the custreamz.kafka module in cudf. custreamz.kafka has the exact same API as Confluent Kafka, so it serves as a drop-in replacement with minimal duplication of code. But under the hood, it reads messages from librdkafka and directly uploads them to the GPU as a cuDF dataframe instead of gathering all the messages back from C++ into Python. This essentially avoids the GIL issue described in the Confluent Kafka consumer: https://github.com/confluentinc/confluent-kafka-python/issues/597, and hence enables readingfrom Kafka in a faster fashion with fewer processes. This accelerated reader also adheres to the checkpointing mechanism in streamz.

Folks interested in trying out custreamz would benefit from this accelerated Kafka reader. If one does not want to use GPUs, they can use streamz as is, with the default engine=None.

To use this option, one must install custreamz (use the appropriate CUDA version recipe) using the following command, which will install all GPU dependencies, including streamz: conda env create –name custreamz –file custreamz_dev_cuda10.1.yml The file custreamz_dev_cuda10.1.yml can be downloaded from: https://github.com/jdye64/cudf/blob/kratos/conda/environments/custreamz_dev_cuda10.1.yml Instead of creating a new conda environment, one can alternatively install all the packages mentioned in the .yml file. But creating a new conda environment with the .yml script installs everything cleanly. Also, this is temporary (as mentioned below) until custreamz.kafka code gets merged into cudf/custreamz, after which it be a single line nightly package install.

The accelerated Kafka datasource will soon be officially merged into RAPIDS custreamz. Then, custreamz can simply be installed using: https://anaconda.org/rapidsai-nightly/custreamz, instead of the command above.

Please refer to RAPIDS custreamz.kafka API here: https://github.com/jdye64/cudf/blob/kratos/python/custreamz/custreamz/kafka.py

If ‘auto.offset.reset’: ‘latest’ is set in the consumer configs, the stream starts reading messages from the latest offset. Else, if it’s set to ‘earliest’, it will read from the start offsetinto one

Every element from any of the upstreams streams will immediately flow into the output stream. They will not be combined with elements from other streams.

Stream.zip Stream.combine_latest

unique(maxsize=None, key=<function identity>, hashable=True, **kwargs)

Avoid sending through repeated elements

This deduplicates a stream so that only new elements pass through. You can control how much of a history is stored with the maxsize= parameter. For example setting maxsize=1 avoids sending through elements when one is repeated right after the other.

maxsize: int or None, optional

number of stored unique values to check against

key : function, optional

Function which returns a representation of the incoming data. For example key=lambda x: x['a'] could be used to allow only pieces of data with unique 'a' values to pass through.

hashable : bool, optional

If True then data is assumed to be hashable, else it is not. This is used for determining how to cache the history, if hashable then either dicts or LRU caches are used, otherwise a deque is used. Defaults to True.

Code Block
languagepython
>>> source = Stream()
>>> source.unique(maxsize=1).sink(print)
>>> for x in [1, 1, 2, 2, 2, 1, 3]:
...     source.emit(x)
1
2
1
3
update(x, who=None, metadata=None)
property upstream
Anchor
proficloud.timeseries.proficloudio.MetricsStreamIO.upstream
proficloud.timeseries.proficloudio.MetricsStreamIO.upstream
visualize(filename='mystream.png', **kwargs)

Render the computation of this object’s task graph using graphviz.

Requires graphviz and networkx to be installed.

filename : str, optional

The name of the file to write to disk.

kwargs:

Graph attributes to pass to graphviz like rankdir="LR"

zip(**kwargs)

Combine streams together into a stream of tuples

We emit a new tuple once all streams have produce a new tuple.

combine_latest zip_latest

zip_latest(*upstreams, **kwargs)

Combine multiple streams together to a stream of tuples

The stream which this is called from is lossless. All elements from the lossless stream are emitted reguardless of when they came in. This will emit a new tuple consisting of an element from the lossless stream paired with the latest elements from the other streams. Elements are only emitted when an element on the lossless stream are received, similar to combine_latest with the emit_on flag.

Stream.combine_latest Stream.zip

...

class proficloud.timeseries.proficloudio.ProficloudIOMetrics
Anchor
proficloud.timeseries.proficloudio.ProficloudIOMetrics
proficloud.timeseries.proficloudio.ProficloudIOMetrics
(staging=False)

Bases: object

DEBUGRESPONSE = False

Debug http-responses. When set to true, the attribute DEBUGRESPONSECONTENT then contains the last raw response. Does not work when calling API in parallel using the same instance!

DEBUGRESPONSECONTENT = None

Contains the last response if DEBUGRESPONSE is set to True.

DEBUGTIME = False

Debug request response times (print)

authenticate(username, password)
static convert_response
Anchor
proficloud.timeseries.proficloudio.ProficloudIOMetrics.convert_response
proficloud.timeseries.proficloudio.ProficloudIOMetrics.convert_response
(response, uuid, fillNaMethod=None, dropTrailingNa=True, convertTimestamp=False, mergeTime=True)

Used existing function from package as base. Description follows.

static dateparse
Anchor
proficloud.timeseries.proficloudio.ProficloudIOMetrics.dateparse
proficloud.timeseries.proficloudio.ProficloudIOMetrics.dateparse
(datestring)
queryMetrics(uuid, metrics, start_time=None, end_time=None, createDf=True, fillNaMethod=None, dropTrailingNa=True, orderDesc=False, mergeTime=False)

Query metrics and return the data as pandas DataFrame.

Parameters:
  • metrics (list(str), str) – A list of metric names (or a single metric name) to query.

  • start_time (int, datetime, str or None) – Timestamp (ms based), or datetime object (or datetime as string) not used when None. (Default: None)

  • end_time (int, datetime, str or None) – Timestamp (ms based), or datetime object (or datetime as string). This must be later in time than the start time. If not specified, the end time is assumed to be the current date and time. (Default: None)

  • orderDesc (boolean) – Orders returned data points based on timestamp. Descending order when True, or ascending when False (default) Only in effect when returning DataFrame.

  • createDF (boolean) – Convert response into a convenient Pandas Dataframe. (Default=True)

  • fillNaMethod (str) – {‘backfill’, ‘bfill’, ‘pad’, ‘ffill’, None}, default None. Method to use for filling holes in reindexed Series pad / ffill: propagate last valid observation forward to next valid backfill / bfill: use NEXT valid observation to fill gap

  • dropTrailingNa (boolean) – Drop trailing NaN values when set to true. Especially useful when end neither end param is specified (filters time delay when querying multiple metrics). Default: True

Return type:

pandas.DataFrame or dict

Returns:

Returns pandas.DataFrame

...

proficloud.timeseries.proficloudio.get_time_series(app_name, time_series_names, from_date, to_date, endpoint_id, sort='ASC')

Returns time series data points within the specified time range ordered by timestamp and grouped by endpoints.

Parameters:
  • app_name – Application name

  • time_series_names – One or more time series names.

  • from_date – Start date to retrieve data points.

  • to_date – End date to retrieve data points.

  • endpoint_id – One or more endpoint IDs. If not specified, data is returned for all available endpoints.

  • sort – Sorting order by timestamp. (one of ‘ASC’, ‘DESC’ - default: ‘ASC’)

Returns:

json with time-series data

...

proficloud.timeseries.proficloudutil module

...

proficloud.timeseries.proficloudutil.datetimeToTimestampMs(dt, datetimeformat='%Y-%m-%d %H:%M:%S.%f')

Convert a given string or datetime object to a millisecond based timestamp. No conversion is performed when a timestamp (int) is given as an input. The format string can be set using the datetimeformat attribute.

Parameters:

dt (datetime, str or int) – The datetime or datetime string

Returns:

Millisecond based timestamp. None when conversion not successful.

Return type:

int

...

proficloud.timeseries.proficloudutil.getTimeOffsetToUtc()

Get the offset of local time to UTC time at pool.ntp.org. :return: Offset in seconds :rtype: float

...

proficloud.timeseries.proficloudutil.pandasDfFromQueryResponse(query, fillNaMethod=None, dropTrailingNa=True, convertTimestamp=False, orderDesc=False)

Convert a kairosdb query response to pandas DataFrame. Timestamp contains timestamp in milliseconds.

Parameters:
  • query (list) – The result from the query method (list of deserialised json responses from kairosdb (one for each metric))

  • fillNaMethod (str) – Method to fill in missing values: ‘backfill’, ‘bfill’, ‘pad’, ‘ffill’, None, (default=None)

  • dropTrailingNa (boolean) – Drop trailing NaN values when set to true. Especially useful when end neither end param is specified (filters time delay when querying multiple metrics). Default: True

  • convertTimestamp (boolean) – Convert the timestamp to datetime (Default: False)

  • orderDesc (boolean) – Orders returned data points based on timestamp. Descending order when True, or ascending when False (default)

Return type:

pandas.DataFrame

Returns:

Pandas DataFrame.

...

proficloud.timeseries.proficloudutil.timestampMsToDatetime(ts)

Takes a timestamp and returns a datetime object. The timestamp is millisecond based and can be in float (default datetime convention) or an int.

Parameters:

ts (float, int) – The timestamp

Returns:

datetime

Return type:

datetime.datetime

...

proficloud.timeseries.stream module

...

class proficloud.timeseries.stream.MetricsStream
Anchor
proficloud.timeseries.stream.MetricsStream
proficloud.timeseries.stream.MetricsStream
(connector, metrics, intervalMs, bufferTime, convertTimestamp=False, convertDf=True, **kwargs)

Bases: streamz.core.Stream

This class creates a “streamz”-stream from a KairosConnector (or one of its child classes such as ProficloudMetrics).

Parameters:
  • connector (KairosConnector (or subclass)) – An initialized connector.

  • metrics (list(str), str) – A list of metric names (or a single metric name) to query.

  • intervalMs (int) – Polling interval in milliseconds

  • bufferTime (dict) – The buffer time is the current date and time minus the specified value and unit. Possible unit values are “milliseconds”, “seconds”, “minutes”, “hours”, “days”, “weeks”, “months”, and “years”. For example, if the start time is 5 minutes, the query will return all matching data points for the last 5 minutes. Example value: { “value”: “10”, “unit”: “minutes” }

  • convertTimestamp (boolean) – Convert the timestamp to datetime (Default: False)

accumulate(func, start='--no-default--', returns_state=False, **kwargs)

Accumulate results with previous state

This performs running or cumulative reductions, applying the function to the previous total and the new element. The function should take two arguments, the previous accumulated state and the next element and it should return a new accumulated state, - state = func(previous_state, new_value) (returns_state=False) - state, result = func(previous_state, new_value) (returns_state=True)

where the new_state is passed to the next invocation. The state or result is emitted downstream for the two cases.

func: callable start: object

Initial value, passed as the value of previous_state on the first invocation. Defaults to the first submitted element

returns_state: boolean

If true then func should return both the state and the value to emit If false then both values are the same, and func returns one value

kwargs:

Keyword arguments to pass to func

A running total, producing triangular numbers

Code Block
languagepython
>>> source = Stream()
>>> source.accumulate(lambda acc, x: acc + x).sink(print)
>>> for i in range(5):
...     source.emit(i)
0
1
3
6
10

A count of number of events (including the current one)

Code Block
languagepython
>>> source = Stream()
>>> source.accumulate(lambda acc, x: acc + 1, start=0).sink(print)
>>> for _ in range(5):
...     source.emit(0)
1
2
3
4
5

Like the builtin “enumerate”.

Code Block
languagepython
>>> source = Stream()
>>> source.accumulate(lambda acc, x: ((acc[0] + 1, x), (acc[0], x)),
...                   start=(0, 0), returns_state=True
...                   ).sink(print)
>>> for i in range(3):
...     source.emit(0)
(0, 0)
(1, 0)
(2, 0)
buffer(n, **kwargs)

Allow results to pile up at this point in the stream

This allows results to buffer in place at various points in the stream. This can help to smooth flow through the system when backpressure is applied.

changeInterval(intervalMs)
collect(cache=None, metadata_cache=None, **kwargs)

Hold elements in a cache and emit them as a collection when flushed.

Code Block
languagepython
>>> source1 = Stream()
>>> source2 = Stream()
>>> collector = collect(source1)
>>> collector.sink(print)
>>> source2.sink(collector.flush)
>>> source1.emit(1)
>>> source1.emit(2)
>>> source2.emit('anything')  # flushes collector
...
[1, 2]
combine_latest(**kwargs)

Combine multiple streams together to a stream of tuples

This will emit a new tuple of all of the most recent elements seen from any stream.

emit_on : stream or list of streams or None

only emit upon update of the streams listed. If None, emit on update from any stream

zip

property concat
Anchor
proficloud.timeseries.stream.MetricsStream.concat
proficloud.timeseries.stream.MetricsStream.concat
connect(downstream)

Connect this stream to a downstream element.

downstream: Stream

The downstream stream to connect to

convertDf = None

The header for DataFrame creation with the streamz package

delay(interval, **kwargs)

Add a time delay to results

destroy(streams=None)

Disconnect this stream from any upstream sources

disconnect(downstream)

Disconnect this stream to a downstream element.

downstream: Stream

The downstream stream to disconnect from

emit(x, asynchronous=False, metadata=None)

Push data into the stream at this point

This is typically done only at source Streams but can theoretically be done at any point

x: any

an element of data

asynchronous:

emit asynchronously

metadata: list[dict], optional

Various types of metadata associated with the data element in x.

ref: RefCounter A reference counter used to check when data is done

static filenames
Anchor
proficloud.timeseries.stream.MetricsStream.filenames
proficloud.timeseries.stream.MetricsStream.filenames
(path, poll_interval=0.1, **kwargs)

Stream over filenames in a directory

path: string

Directory path or globstring over which to search for files

poll_interval: Number

Seconds between checking path

start: bool (False)

Whether to start running immediately; otherwise call stream.start() explicitly.

Code Block
languagepython
>>> source = Stream.filenames('path/to/dir')  
>>> source = Stream.filenames('path/to/*.csv', poll_interval=0.500)  
filter(predicate, *args, **kwargs)

Only pass through elements that satisfy the predicate

predicate : function

The predicate. Should return True or False, where True means that the predicate is satisfied.

args :

The arguments to pass to the predicate.

kwargs:

Keyword arguments to pass to predicate

Code Block
languagepython
>>> source = Stream()
>>> source.filter(lambda x: x % 2 == 0).sink(print)
>>> for i in range(5):
...     source.emit(i)
0
2
4
flatten(upstreams=None, stream_name=None, loop=None, asynchronous=None, ensure_io_loop=False)

Flatten streams of lists or iterables into a stream of elements

Code Block
languagepython
>>> source = Stream()
>>> source.flatten().sink(print)
>>> for x in [[1, 2, 3], [4, 5], [6, 7, 7]]:
...     source.emit(x)
1
2
3
4
5
6
7

partition

frequencies(**kwargs)

Count occurrences of elements

static from_http_server
Anchor
proficloud.timeseries.stream.MetricsStream.from_http_server
proficloud.timeseries.stream.MetricsStream.from_http_server
(port, path='/.*', server_kwargs=None, **kwargs)

Listen for HTTP POSTs on given port

Each connection will emit one event, containing the body data of the request

port : int

The port to listen on

path : str

Specific path to listen on. Can be regex, but content is not used.

start : bool

Whether to immediately startup the server. Usually you want to connect downstream nodes first, and then call .start().

server_kwargs : dict or None

If given, set of further parameters to pass on to HTTPServer

Code Block
languagepython
>>> source = Source.from_http_server(4567)  
static from_iterable
Anchor
proficloud.timeseries.stream.MetricsStream.from_iterable
proficloud.timeseries.stream.MetricsStream.from_iterable
(iterable, **kwargs)

Emits items from an iterable.

iterable: iterable

An iterable to emit messages from.

Code Block
languagepython
>>> source = Stream.from_iterable(range(3))
>>> L = source.sink_to_list()
>>> source.start()
>>> L
[0, 1, 2]
static from_kafka
Anchor
proficloud.timeseries.stream.MetricsStream.from_kafka
proficloud.timeseries.stream.MetricsStream.from_kafka
(topics, consumer_params, poll_interval=0.1, **kwargs)

Accepts messages from Kafka

Uses the confluent-kafka library, https://docs.confluent.io/current/clients/confluent-kafka-python/

topics: list of str

Labels of Kafka topics to consume from

consumer_params: dict

Settings to set up the stream, see https://docs.confluent.io/current/clients/confluent-kafka-python/#configuration https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md Examples: bootstrap.servers, Connection string(s) (host:port) by which to reach Kafka; group.id, Identity of the consumer. If multiple sources share the same group, each message will be passed to only one of them.

poll_interval: number

Seconds that elapse between polling Kafka for new messages

start: bool (False)

Whether to start polling upon instantiation

Code Block
languagepython
>>> source = Stream.from_kafka(['mytopic'],
...           {'bootstrap.servers': 'localhost:9092',
...            'group.id': 'streamz'})  
static from_kafka_batched
Anchor
proficloud.timeseries.stream.MetricsStream.from_kafka_batched
proficloud.timeseries.stream.MetricsStream.from_kafka_batched
(topic, consumer_params, poll_interval='1s', npartitions=None, refresh_partitions=False, start=False, dask=False, max_batch_size=10000, keys=False, engine=None, **kwargs)

Get messages and keys (optional) from Kafka in batches

Uses the confluent-kafka library, https://docs.confluent.io/current/clients/confluent-kafka-python/

This source will emit lists of messages for each partition of a single given topic per time interval, if there is new data. If using dask, one future will be produced per partition per time-step, if there is data.

Checkpointing is achieved through the use of reference counting. A reference counter is emitted downstream for each batch of data. A callback is triggered when the reference count reaches zero and the offsets are committed back to Kafka. Upon the start of this function, the previously committed offsets will be fetched from Kafka and begin reading form there. This will guarantee at-least-once semantics.

topic: str

Kafka topic to consume from

consumer_params: dict

Settings to set up the stream, seehttps://docs.confluent.io/current/clients/confluent-kafka-python/#configurationhttps://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.mdExamples:bootstrap.servers: Connection string(s) (host:port) by which to reach Kafkagroup.id: Identity of the consumer. If multiple sources share the samegroup, each message will be passed to only one of them.

poll_interval: number

Seconds that elapse between polling Kafka for new messages

npartitions: int (None)

Number of partitions in the topic.If None, streamz will poll Kafka to get the number of partitions.

refresh_partitions: bool (False)

Useful if the user expects to increase the number of topic partitions on thefly, maybe to handle spikes in load. Streamz polls Kafka in every batch todetermine the current number of partitions. If partitions have been added,streamz will automatically start reading data from the new partitions as well.If set to False, streamz will not accommodate adding partitions on the fly.It is recommended to restart the stream after decreasing the number of partitions.

start: bool (False)

Whether to start polling upon instantiation

max_batch_size: int

The maximum number of messages per partition to be consumed per batch

keys: bool (False)

Whether to extract keys along with the messages.If True, this will yield each message as a dict:{‘key’:msg.key(), ‘value’:msg.value()}

engine: str (None)

If engine is set to “cudf”, streamz reads data (messages must be JSON)from Kafka in an accelerated manner directly into cuDF (GPU) dataframes.This is done using the RAPIDS custreamz library.

Please refer to RAPIDS cudf API here:https://docs.rapids.ai/api/cudf/stable/

Folks interested in trying out custreamz would benefit from thisaccelerated Kafka reader. If one does not want to use GPUs, theycan use streamz as is, with the default engine=None.

To use this option, one must install custreamz (use theappropriate CUDA version recipe & Python version)using a command like the one below, which will install allGPU dependencies and streamz itself:

conda install -c rapidsai-nightly -c nvidia -c conda-forge | -c defaults custreamz=0.15 python=3.7 cudatoolkit=10.2

More information at: https://rapids.ai/start.html

Important Kafka Configurations By default, a stream will start reading from the latest offsets available. Please set ‘auto.offset.reset’: ‘earliest’ in the consumer configs, if the stream needs to start processing from the earliest offsets.

Code Block
languagepython
>>> source = Stream.from_kafka_batched('mytopic',
...           {'bootstrap.servers': 'localhost:9092',
...            'group.id': 'streamz'})  
static from_periodic
Anchor
proficloud.timeseries.stream.MetricsStream.from_periodic
proficloud.timeseries.stream.MetricsStream.from_periodic
(callback, poll_interval=0.1, **kwargs)

Generate data from a function on given period

cf streamz.dataframe.PeriodicDataFrame

callback: callable

Function to call on each iteration. Takes no arguments.

poll_interval: float

Time to sleep between calls (s)

static from_process
Anchor
proficloud.timeseries.stream.MetricsStream.from_process
proficloud.timeseries.stream.MetricsStream.from_process
(cmd, open_kwargs=None, with_stderr=False, with_end=True, **kwargs)

Messages from a running external process

This doesn’t work on Windows

cmd : list of str or str

Command to run: program name, followed by arguments

open_kwargs : dict

To pass on the the process open function, see subprocess.Popen.

with_stderr : bool

Whether to include the process STDERR in the stream

start : bool

Whether to immediately startup the process. Usually you want to connect downstream nodes first, and then call .start().

Code Block
languagepython
>>> source = Source.from_process(['ping', 'localhost'])  
static from_tcp
Anchor
proficloud.timeseries.stream.MetricsStream.from_tcp
proficloud.timeseries.stream.MetricsStream.from_tcp
(port, delimiter=b'\n', server_kwargs=None, **kwargs)

Creates events by reading from a socket using tornado TCPServer

The stream of incoming bytes is split on a given delimiter, and the parts become the emitted events.

port : int

The port to open and listen on. It only gets opened when the source is started, and closed upon stop()

delimiter : bytes

The incoming data will be split on this value. The resulting events will still have the delimiter at the end.

start : bool

Whether to immediately initiate the source. You probably want to set up downstream nodes first.

server_kwargs : dict or None

If given, additional arguments to pass to TCPServer

Code Block
languagepython
>>> source = Source.from_tcp(4567)  
static from_textfile
Anchor
proficloud.timeseries.stream.MetricsStream.from_textfile
proficloud.timeseries.stream.MetricsStream.from_textfile
(f, poll_interval=0.1, delimiter='\n', from_end=False, **kwargs)

Stream data from a text file

f: file or string

Source of the data. If string, will be opened.

poll_interval: Number

Interval to poll file for new data in seconds

delimiter: str

Character(s) to use to split the data into parts

start: bool

Whether to start running immediately; otherwise call stream.start() explicitly.

from_end: bool

Whether to begin streaming from the end of the file (i.e., only emit lines appended after the stream starts).

Code Block
languagepython
>>> source = Stream.from_textfile('myfile.json')  
>>> source.map(json.loads).pluck('value').sum().sink(print)  
>>> source.start()  

Stream

gather()

This is a no-op for core streamz

This allows gather to be used in both dask and core streams

latest(**kwargs)

Drop held-up data and emit the latest result

This allows you to skip intermediate elements in the stream if there is some back pressure causing a slowdown. Use this when you only care about the latest elements, and are willing to lose older data.

This passes through values without modification otherwise.

Code Block
languagepython
>>> source.map(f).latest().map(g)  
map(func, *args, **kwargs)

Apply a function to every element in the stream

func: callable args :

The arguments to pass to the function.

kwargs:

Keyword arguments to pass to func

Code Block
languagepython
>>> source = Stream()
>>> source.map(lambda x: 2*x).sink(print)
>>> for i in range(5):
...     source.emit(i)
0
2
4
6
8
partition(n, timeout=None, key=None, **kwargs)

Partition stream into tuples of equal size

n: int

Maximum partition size

timeout: int or float, optional

Number of seconds after which a partition will be emitted, even if its size is less than n. If None (default), a partition will be emitted only when its size reaches n.

key: hashable or callable, optional

Emit items with the same key together as a separate partition. If key is callable, partition will be identified by key(x), otherwise by x[key]. Defaults to None.

Code Block
languagepython
>>> source = Stream()
>>> source.partition(3).sink(print)
>>> for i in range(10):
...     source.emit(i)
(0, 1, 2)
(3, 4, 5)
(6, 7, 8)
Code Block
languagepython
>>> source = Stream()
>>> source.partition(2, key=lambda x: x % 2).sink(print)
>>> for i in range(4):
...     source.emit(i)
(0, 2)
(1, 3)
Code Block
languagepython
>>> from time import sleep
>>> source = Stream()
>>> source.partition(5, timeout=1).sink(print)
>>> for i in range(3):
...     source.emit(i)
>>> sleep(1)
(0, 1, 2)
partition_unique(n: int, key: collections.abc.Hashable = <function identity>, keep: str = 'first', **kwargs)

Partition stream elements into groups of equal size with unique keys only.

n: int

Number of (unique) elements to pass through as a group.

key: Union[Hashable, Callable[[Any], Hashable]]

Callable that accepts a stream element and returns a unique, hashable representation of the incoming data (key(x)), or a hashable that gets the corresponding value of a stream element (x[key]). For example, key=lambda x: x["a"] would allow only elements with unique "a" values to pass through.

Info

By default, we simply use the element object itself as the key, so that object must be hashable. If that’s not the case, a non-default key must be provided.

keep: str

Which element to keep in the case that a unique key is already found in the group. If “first”, keep element from the first occurrence of a given key; if “last”, keep element from the most recent occurrence. Note that relative ordering of elements is preserved in the data passed through, and not ordering of keys.

kwargs

Code Block
languagepython
>>> source = Stream()
>>> stream = source.partition_unique(n=3, keep="first").sink(print)
>>> eles = [1, 2, 1, 3, 1, 3, 3, 2]
>>> for ele in eles:
...     source.emit(ele)
(1, 2, 3)
(1, 3, 2)
Code Block
languagepython
>>> source = Stream()
>>> stream = source.partition_unique(n=3, keep="last").sink(print)
>>> eles = [1, 2, 1, 3, 1, 3, 3, 2]
>>> for ele in eles:
...     source.emit(ele)
(2, 1, 3)
(1, 3, 2)
Code Block
languagepython
>>> source = Stream()
>>> stream = source.partition_unique(n=3, key=lambda x: len(x), keep="last").sink(print)
>>> eles = ["f", "fo", "f", "foo", "f", "foo", "foo", "fo"]
>>> for ele in eles:
...     source.emit(ele)
('fo', 'f', 'foo')
('f', 'foo', 'fo')
pluck(pick, **kwargs)

Select elements from elements in the stream.

pluck : object, list

The element(s) to pick from the incoming element in the stream If an instance of list, will pick multiple elements.

Code Block
languagepython
>>> source = Stream.from_kafka_batched('mytopic',
...           {'bootstrap.servers': 'localhost:9092',
...            'group.id': 'streamz'}, npartitions=4)  
static from_process Anchorproficloud.timeseries.stream.MetricsStream.from_processproficloud.timeseries.stream.MetricsStream.from_process (cmd, open_kwargs=None, with_stderr=False, start=False)

Messages from a running external process

This doesn’t work on Windows

cmd : list of str or str

Command to run: program name, followed by arguments

open_kwargs : dict

To pass on the the process open function, see subprocess.Popen.

with_stderr : bool

Whether to include the process STDERR in the stream

start : bool
Whether to immediately startup the process. Usually you want to connect downstream nodes first, and then call .start().
()
>>> source.pluck([0, 3]).sink(print)
>>> for x in [[1, 2, 3, 4], [4, 5, 6, 7], [8, 9, 10, 11]]:
...     source.emit(x)
(1, 4)
(4, 7)
(8, 11)
Code Block
languagepython
>>> source = Source.from_process(['ping', 'localhost'])  
static from_tcp Anchorproficloud.timeseries.stream.MetricsStream.from_tcpproficloud.timeseries.stream.MetricsStream.from_tcp (port, delimiter=b'\n', start=False, server_kwargs=None)

Creates events by reading from a socket using tornado TCPServer

The stream of incoming bytes is split on a given delimiter, and the parts become the emitted events.

port : int

The port to open and listen on. It only gets opened when the source is started, and closed upon stop()

delimiter : bytes

The incoming data will be split on this value. The resulting events will still have the delimiter at the end.

start : bool

Whether to immediately initiate the source. You probably want to set up downstream nodes first.

server_kwargs : dict or None

If given, additional arguments to pass to TCPServer

Code Block
languagepython
>>> source = Source.from_tcp(4567)  
static from_textfile Anchorproficloud.timeseries.stream.MetricsStream.from_textfileproficloud.timeseries.stream.MetricsStream.from_textfile (f, poll_interval=0.1, delimiter='\n', start=False, from_end=False, **kwargs)

Stream data from a text file

f: file or string

Source of the data. If string, will be opened.

poll_interval: Number

Interval to poll file for new data in seconds

delimiter: str

Character(s) to use to split the data into parts

start: bool

Whether to start running immediately; otherwise call stream.start() explicitly.

from_end: bool
Whether to begin streaming from the end of the file (i.e., only emit lines appended after the stream starts)
 Stream()
>>> source.pluck('name').sink(print)
>>> for x in [{'name': 'Alice', 'x': 123}, {'name': 'Bob', 'x': 456}]:
...     source.emit(x)
'Alice'
'Bob'
poll_metrics()

Polling co-routine. This retrieves metrics from the connector.

rate_limit(interval, **kwargs)

Limit the flow of data

This stops two elements of streaming through in an interval shorter than the provided value.

interval: float

Time in seconds

classmethod register_api
Anchor
proficloud.timeseries.stream.MetricsStream.register_api
proficloud.timeseries.stream.MetricsStream.register_api
(modifier=<function identity>, attribute_name=None)
map

Add callable to Stream API

This allows you to register a new method onto this class. You can use it as a decorator.:

Code Block
languagepython
linenumbersfalse
>>> @Stream.register_api()
... class foo(Stream):
...     ...

>>> Stream().foo(...)  # this works now

It attaches the callable as a normal attribute to the class object. In doing so it respects inheritance (all subclasses of Stream will also get the foo attribute).

By default callables are assumed to be instance methods. If you like you can include modifiers to apply before attaching to the class as in the following case where we construct a staticmethod.

Code Block
languagepython
>>> @Stream.register_api(staticmethod)
... class foo(Stream):
...     ...
Code Block
languagepython
>>> Stream.foo(...)  # Foo operates as a static method

You can also provide an optional attribute_name argument to control the name of the attribute your callable will be attached as.

Code Block
languagepython
>>> 
source = Stream.from_textfile('myfile.json') >>> source.map(json.loads).pluck('value').sum().sink(print) >>> source.start()

Stream

gather()

This is a no-op for core streamz

This allows gather to be used in both dask and core streams

latest(**kwargs)

Drop held-up data and emit the latest result

This allows you to skip intermediate elements in the stream if there is some back pressure causing a slowdown. Use this when you only care about the latest elements, and are willing to lose older data.

This passes through values without modification otherwise.

Code Block
languagepython
>>> source.map(f).latest().map(g)  
@Stream.register_api(attribute_name="bar")
... class foo(Stream):
...     ...

>> Stream().bar(…) # foo was actually attached as bar

classmethod register_plugin_entry_point
Anchor
proficloud.timeseries.stream.MetricsStream.register_plugin_entry_point
proficloud.timeseries.stream.MetricsStream.register_plugin_entry_point
(entry_point, modifier=<function identity>)
remove(predicate)

Only pass through elements for which the predicate returns False

property scan
Anchor
proficloud.timeseries.stream.MetricsStream.scan
proficloud.timeseries.stream.MetricsStream.scan
scatter(**kwargs)

Convert local stream to Dask Stream

All elements flowing through the input will be scattered out to the cluster

sink(func, *args, **kwargs)

Apply a function to on every element in the stream

func: callable

A function that will be applied on every element.

args:

The

Positional arguments

to pass to the function

that will be passed to func after the incoming element.

kwargs:

Keyword arguments to pass to funcStream-specific arguments will be passed to Stream.__init__, the rest of them will be passed to func.

Code Block
languagepython
>>> source = Stream()
>>> source.map(lambda x: 2*x).sink(print)
>>> for i in range(5):
...     source.emit(i)
0
2
4
6
8
partition(n, **kwargs)

Partition stream into tuples of equal size

Code Block
languagepython
>>> source = StreamL = list()
>>> source.partitionsink(3)L.append)
>>> source.sink(print)
>>> for i in range(10):
...     source.sink(print)
>>> source.emit(i123)
(0, 1, 2)
(3, 4, 5)
(6, 7, 8)
pluck(pick, **kwargs)

Select elements from elements in the stream.

pluck : object, list
The element(s) to pick from the incoming element in the stream If an instance of list, will pick multiple elements.
123
123
>>> L
[123]

map Stream.sink_to_list

sink_to_list()

Append all elements of a stream to a list as they come in

Code Block
languagepython
>>> source = Stream()
>>> L = source.pluck([0, 3]map(lambda x: 10 * x).sink_to_list(print)
>>> for xi in [[1, 2, 3, 4], [4, 5, 6, 7], [8, 9, 10, 11]]:
...     source.emit(x)
(1, 4)
(4, 7)
(8, 11)
Code Block
languagepython
>>> source = Stream()
>>> source.pluck('name').sink(print)
>>> for x in [{'name': 'Alice', 'x': 123}, {'name': 'Bob', 'x': 456}]:
...     source.emit(x)
'Alice'
'Bob'
poll_metrics()

Polling co-routine. This retrieves metrics from the connector.

rate_limit(interval, **kwargs)

Limit the flow of data

This stops two elements of streaming through in an interval shorter than the provided value.

interval: float

Time in seconds

classmethod register_api Anchorproficloud.timeseries.stream.MetricsStream.register_apiproficloud.timeseries.stream.MetricsStream.register_api (modifier=<function identity>)

Add callable to Stream API

This allows you to register a new method onto this class. You can use it as a decorator.:

Code Block
languagepython
linenumbersfalse
>>> @Stream.register_api()
... class foo(Streamrange(5):
...     source.emit(i)
>>> L
[0, 10, 20, 30, 40]
sink_to_textfile(file, end='\n', mode='a', **kwargs)

Write elements to a plain text file, one element per line.

Type of elements must be str.

file: str or file-like

File to write the elements to. str is treated as a file name to open. If file-like, descriptor must be open in text mode. Note that the file descriptor will be closed when this sink is destroyed.

end: str, optional

This value will be written to the file after each element. Defaults to newline character.

mode: str, optional

If file is str, file will be opened in this mode. Defaults to "a" (append mode).

Code Block
languagepython
>>> source = Stream()
>>> source.map(str).sink_to_textfile("test.txt")
>>> source.emit(0)
>>> source.emit(1)
>>> print(open("test.txt", "r").read())
0
1
slice(start=None, end=None, step=None, **kwargs)

Get only some events in a stream by position. Works like list[] syntax.

start : int

First event to use. If None, start from the beginnning

end : int

Last event to use (non-inclusive). If None, continue without stopping. Does not support negative indexing.

step : int

Pass on every Nth event. If None, pass every one.

Code Block
languagepython
>>> source = Stream()
>>> source.slice(2, 6, 2).sink(print)
>>> for i in range(5):
...     ...

>>> Stream().foo(...)  # this works now

It attaches the callable as a normal attribute to the class object. In doing so it respsects inheritance (all subclasses of Stream will also get the foo attribute).

By default callables are assumed to be instance methods. If you like you can include modifiers to apply before attaching to the class as in the following case where we construct a staticmethod
source.emit(0)
2
4
sliding_window(n, return_partial=True, **kwargs)
Anchorproficloud.timeseries.stream.MetricsStream.scanproficloud.timeseries.stream.MetricsStream.scan
scatter(**kwargs)

Convert local stream to Dask Stream

All elements flowing through the input will be scattered out to the cluster

sink

Produce overlapping tuples of size n

return_partial : bool

If True, yield tuples as soon as any events come in, each tuple being smaller or equal to the window size. If False, only start yielding tuples once a full window has accrued.

Code Block
languagepython
>>> @Stream.register_api(staticmethod)
... class foo(Streamsource = Stream()
>>> source.sliding_window(3, return_partial=False).sink(print)
>>> for i in range(8):
...     ...
Code Block
languagepython
>>> Stream.foo(...)  # Foo operates as a static method
remove(predicate)

Only pass through elements for which the predicate returns False

property scan
source.emit(i)
(0, 1, 2)
(1, 2, 3)
(2, 3, 4)
(3, 4, 5)
(4, 5, 6)
(5, 6, 7)
starmap(func, *args, **kwargs)

Apply a function on to every element in the stream, splayed out

See itertools.starmap

func: callable args :

The arguments to pass to the function.

kwargs:

Keyword arguments to pass to func

Code Block
languagepython
>>> source = Stream()
>>> L = list()
>>> source.sink(L.append)
>>> source.sink(print)
>>> source.sink(print)
>>> source.emit(123)
123
123
>>> L
[123]

map Stream.sink_to_list

sink_to_list()

Append all elements of a stream to a list as they come in

Code Block
languagepython
>>> source = Stream()
>>> L = source.map(lambda x: 10 * x).sink_to_list()
>>> for i in range(5):
...     source.emit(i)
>>> L
[0, 10, 20, 30, 40]
slice(start=None, end=None, step=None, **kwargs)

Get only some events in a stream by position. Works like list[] syntax.

start : int

First event to use. If None, start from the beginnning

end : int

Last event to use (non-inclusive). If None, continue without stopping. Does not support negative indexing.

step : int

Pass on every Nth event. If None, pass every one.

Code Block
languagepython
>>> source = Stream()
>>> source.slice(2, 6, 2).sink(print)
>>> for i in range(5):
...     source.emit(0)
2
4
sliding_window(n, return_partial=True, **kwargs)

Produce overlapping tuples of size n

return_partial : bool
If True, yield tuples as soon as any events come in, each tuple being smaller or equal to the window size. If False, only start yielding tuples once a full window has accrued
 source.starmap(lambda a, b: a + b).sink(print)
>>> for i in range(5):
...     source.emit((i, i))
0
2
4
6
8
start()

Start the stream.

stop()

Stop the stream.

str_list = ['func', 'predicate', 'n', 'interval']
timed_window(interval, **kwargs)

Emit a tuple of collected results every interval

Every interval seconds this emits a tuple of all of the results seen so far. This can help to batch data coming off of a high-volume stream.

timed_window_unique(interval: Union[int, str], key: collections.abc.Hashable = <function identity>, keep: str = 'first', **kwargs)

Emit a group of elements with unique keys every interval seconds.

interval: Union[int, str]

Number of seconds over which to group elements, or a pandas-style duration string that can be converted into seconds.

key: Union[Hashable, Callable[[Any], Hashable]]

Callable that accepts a stream element and returns a unique, hashable representation of the incoming data (key(x)), or a hashable that gets the corresponding value of a stream element (x[key]). For example, both key=lambda x: x["a"] and key="a" would allow only elements with unique "a" values to pass through.

Info

By default, we simply use the element object itself as the key, so that object must be hashable. If that’s not the case, a non-default key must be provided.

keep: str

Which element to keep in the case that a unique key is already found in the group. If “first”, keep element from the first occurrence of a given key; if “last”, keep element from the most recent occurrence. Note that relative ordering of elements is preserved in the data passed through, and not ordering of keys.

Code Block
languagepython
>>> source = Stream()
>>> source.sliding_window(3, return_partial=False).sink(print)
>>> for i in range(8):
...     source.emit(i)
(0, 1, 2)
(1, 2, 3)
(2, 3, 4)
(3, 4, 5)
(4, 5, 6)
(5, 6, 7)
starmap(func, *args, **kwargs)

Apply a function to every element in the stream, splayed out

See itertools.starmap

func: callable args :

The arguments to pass to the function.

kwargs:

Keyword arguments to pass to func

Code Block
languagepython
>>> source = Stream()
>>> source.starmap(lambda a, b: a + b).sink(print)
>>> for i in range(5):
...     source.emit((i, i))
0
2
4
6
8
start()

Start the stream.

stop()

Stop the stream.

str_list = ['func', 'predicate', 'n', 'interval']
timed_window(interval, **kwargs)

Emit a tuple of collected results every interval

Every interval seconds this emits a tuple of all of the results seen so far. This can help to batch data coming off of a high-volume stream.

Get unique hashable elements in a window, keeping just the first occurrence: >>> stream = source.timed_window_unique(interval=1.0, keep=”first”).sink(print) >>> for ele in [1, 2, 3, 3, 2, 1]: … source.emit(ele) () (1, 2, 3) ()

Get unique hashable elements in a window, keeping just the last occurrence: >>> stream = source.timed_window_unique(interval=1.0, keep=”last”).sink(print) >>> for ele in [1, 2, 3, 3, 2, 1]: … source.emit(ele) () (3, 2, 1) ()

Get unique elements in a window by (string) length, keeping just the first occurrence: >>> stream = source.timed_window_unique(interval=1.0, key=len, keep=”first”) >>> for ele in [“f”, “b”, “fo”, “ba”, “foo”, “bar”]: … source.emit(ele) () (‘f’, ‘fo’, ‘foo’) ()

Get unique elements in a window by (string) length, keeping just the last occurrence: >>> stream = source.timed_window_unique(interval=1.0, key=len, keep=”last”) >>> for ele in [“f”, “b”, “fo”, “ba”, “foo”, “bar”]: … source.emit(ele) () (‘b’, ‘ba’, ‘bar’) ()

to_batch(**kwargs)

Convert a stream of lists to a Batch

All elements of the stream are assumed to be lists or tuples

Code Block
languagepython
>>> source = Stream()
>>> batches = source.to_batch()
>>> L = batches.pluck('value').map(inc).sum().stream.sink_to_list()
>>> source.emit([{'name': 'Alice', 'value': 1},
...              {'name': 'Bob', 'value': 2},
...              {'name': 'Charlie', 'value': 3}])
>>> source.emit([{'name': 'Alice', 'value': 4},
...              {'name': 'Bob', 'value': 5},
...              {'name': 'Charlie', 'value': 6}])
to_dataframe(example)

Convert a stream of Pandas dataframes to a DataFrame

Code Block
languagepython
>>> source = Stream()
>>> sdf = source.to_dataframe()
>>> L = sdf.groupby(sdf.x).y.mean().stream.sink_to_list()
>>> source.emit(pd.DataFrame(...))  
>>> source.emit(pd.DataFrame(...))  
>>> source.emit(pd.DataFrame(...))  
to_kafka(topic, producer_config, **kwargs)

Writes data in the stream to Kafka

This stream accepts a string or bytes object. Call flush to ensure all messages are pushed. Responses from Kafka are pushed downstream.

topic : string

The topic which to write

producer_config : dict

Settings to set up the stream, see https://docs.confluent.io/current/clients/confluent-kafka-python/#configuration https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md Examples: bootstrap.servers: Connection string (host:port) to Kafka

Code Block
languagepython
>>> from streamz import Stream
>>> ARGS = {'bootstrap.servers': 'localhost:9092'}
>>> source = Stream()
>>> kafka = source.map(lambda x: str(x)).to_kafka('test', ARGS)
<to_kafka>
>>> for i in range(10):
...     source.emit(i)
>>> kafka.flush()
union(**kwargs)

Combine multiple streams into one

Every element from any of the upstreams streams will immediately flow into the output stream. They will not be combined with elements from other streams.

Stream.zip Stream.combine_latest

unique(maxsize=None, key=<function identity>, hashable=True, **kwargs)

Avoid sending through repeated elements

This deduplicates a stream so that only new elements pass through. You can control how much of a history is stored with the maxsize= parameter. For example setting maxsize=1 avoids sending through elements when one is repeated right after the other.

maxsize: int or None, optional

number of stored unique values to check against

key : function, optional

Function which returns a representation of the incoming data. For example key=lambda x: x['a'] could be used to allow only pieces of data with unique 'a' values to pass through.

hashable : bool, optional

If True then data is assumed to be hashable, else it is not. This is used for determining how to cache the history, if hashable then either dicts or LRU caches are used, otherwise a deque is used. Defaults to True.

Code Block
languagepython
>>> source = Stream()
>>> source.unique(maxsize=1).sink(print)
>>> for x in [1, 1, 2, 2, 2, 1, 3]:
...     source.emit(x)
1
2
1
3
update(x, who=None, metadata=None)
property upstream
Anchor
proficloud.timeseries.stream.MetricsStream.upstream
proficloud.timeseries.stream.MetricsStream.upstream
visualize(filename='mystream.png', **kwargs)

Render the computation of this object’s task graph using graphviz.

Requires graphviz and networkx to be installed.

filename : str, optional

The name of the file to write to disk.

kwargs:

Graph attributes to pass to graphviz like rankdir="LR"

zip(**kwargs)

Combine streams together into a stream of tuples

We emit a new tuple once all streams have produce a new tuple.

combine_latest zip_latest

zip_latest(*upstreams, **kwargs)

Combine multiple streams together to a stream of tuples

The stream which this is called from is lossless. All elements from the lossless stream are emitted reguardless of when they came in. This will emit a new tuple consisting of an element from the lossless stream paired with the latest elements from the other streams. Elements are only emitted when an element on the lossless stream are received, similar to combine_latest with the emit_on flag.

Stream.combine_latest Stream.zip

...