proficloud.tsd_io package
Submodules
proficloud.tsd_io.tsd module
- class
proficloud.tsd_io.tsd.
MetricsStreamIO
(connector, uuid, metrics, intervalMs, bufferTime, **kwargs) Bases:
streamz.core.Stream
This class creates a “streamz”-stream from a ProvicloudV3Connector (or one of its child classes such as ProficloudMetrics).
Parameters: -
connector (ProficloudIOMetrics (or subclass)) – An initialized connector.
-
metrics (list(str), str) – A list of metric names (or a single metric name) to query.
-
intervalMs (int) – Polling interval in milliseconds
-
bufferTime (dict) – The buffer time is the current date and time minus the specified value and unit. Possible unit values are “milliseconds”, “seconds”, “minutes”, “hours”, “days”, “weeks”, “months”, and “years”. For example, if the start time is 5 minutes, the query will return all matching data points for the last 5 minutes. Example value: { “value”: “10”, “unit”: “minutes” }
-
convertTimestamp (boolean) – Convert the timestamp to datetime (Default: False)
accumulate
(func, start='--no-default--', returns_state=False, **kwargs)Accumulate results with previous state
This performs running or cumulative reductions, applying the function to the previous total and the new element. The function should take two arguments, the previous accumulated state and the next element and it should return a new accumulated state, -
state = func(previous_state, new_value)
(returns_state=False) -state, result = func(previous_state, new_value)
(returns_state=True)where the new_state is passed to the next invocation. The state or result is emitted downstream for the two cases.
func: callable start: object
Initial value, passed as the value of
previous_state
on the first invocation. Defaults to the first submitted element- returns_state: boolean
-
If true then func should return both the state and the value to emit If false then both values are the same, and func returns one value
- kwargs:
-
Keyword arguments to pass to func
A running total, producing triangular numbers
>>> source = Stream() >>> source.accumulate(lambda acc, x: acc + x).sink(print) >>> for i in range(5): ... source.emit(i) 0 1 3 6 10
A count of number of events (including the current one)
>>> source = Stream() >>> source.accumulate(lambda acc, x: acc + 1, start=0).sink(print) >>> for _ in range(5): ... source.emit(0) 1 2 3 4 5
Like the builtin “enumerate”.
>>> source = Stream() >>> source.accumulate(lambda acc, x: ((acc[0] + 1, x), (acc[0], x)), ... start=(0, 0), returns_state=True ... ).sink(print) >>> for i in range(3): ... source.emit(0) (0, 0) (1, 0) (2, 0)
buffer
(n, **kwargs)Allow results to pile up at this point in the stream
This allows results to buffer in place at various points in the stream. This can help to smooth flow through the system when backpressure is applied.
changeInterval
(intervalMs)
collect
(cache=None, metadata_cache=None, **kwargs)Hold elements in a cache and emit them as a collection when flushed.
>>> source1 = Stream() >>> source2 = Stream() >>> collector = collect(source1) >>> collector.sink(print) >>> source2.sink(collector.flush) >>> source1.emit(1) >>> source1.emit(2) >>> source2.emit('anything') # flushes collector ... [1, 2]
combine_latest
(**kwargs)Combine multiple streams together to a stream of tuples
This will emit a new tuple of all of the most recent elements seen from any stream.
- emit_on : stream or list of streams or None
-
only emit upon update of the streams listed. If None, emit on update from any stream
zip
- property
concat
connect
(downstream)Connect this stream to a downstream element.
- downstream: Stream
-
The downstream stream to connect to
delay
(interval, **kwargs)Add a time delay to results
destroy
(streams=None)Disconnect this stream from any upstream sources
disconnect
(downstream)Disconnect this stream to a downstream element.
- downstream: Stream
-
The downstream stream to disconnect from
emit
(x, asynchronous=False, metadata=None)Push data into the stream at this point
This is typically done only at source Streams but can theoretically be done at any point
- x: any
-
an element of data
- asynchronous:
-
emit asynchronously
- metadata: list[dict], optional
-
Various types of metadata associated with the data element in x.
ref: RefCounter A reference counter used to check when data is done
- static
filenames
(path, poll_interval=0.1, **kwargs) Stream over filenames in a directory
- path: string
-
Directory path or globstring over which to search for files
- poll_interval: Number
-
Seconds between checking path
- start: bool (False)
-
Whether to start running immediately; otherwise call stream.start() explicitly.
>>> source = Stream.filenames('path/to/dir') >>> source = Stream.filenames('path/to/*.csv', poll_interval=0.500)
filter
(predicate, *args, **kwargs)Only pass through elements that satisfy the predicate
- predicate : function
-
The predicate. Should return True or False, where True means that the predicate is satisfied.
- args :
-
The arguments to pass to the predicate.
- kwargs:
-
Keyword arguments to pass to predicate
>>> source = Stream() >>> source.filter(lambda x: x % 2 == 0).sink(print) >>> for i in range(5): ... source.emit(i) 0 2 4
flatten
(upstreams=None, stream_name=None, loop=None, asynchronous=None, ensure_io_loop=False)Flatten streams of lists or iterables into a stream of elements
>>> source = Stream() >>> source.flatten().sink(print) >>> for x in [[1, 2, 3], [4, 5], [6, 7, 7]]: ... source.emit(x) 1 2 3 4 5 6 7
partition
frequencies
(**kwargs)Count occurrences of elements
- static
from_http_server
(port, path='/.*', server_kwargs=None, **kwargs) Listen for HTTP POSTs on given port
Each connection will emit one event, containing the body data of the request
- port : int
-
The port to listen on
- path : str
-
Specific path to listen on. Can be regex, but content is not used.
- start : bool
-
Whether to immediately startup the server. Usually you want to connect downstream nodes first, and then call
.start()
. - server_kwargs : dict or None
-
If given, set of further parameters to pass on to HTTPServer
>>> source = Source.from_http_server(4567)
- static
from_iterable
(iterable, **kwargs) Emits items from an iterable.
- iterable: iterable
-
An iterable to emit messages from.
>>> source = Stream.from_iterable(range(3)) >>> L = source.sink_to_list() >>> source.start() >>> L [0, 1, 2]
- static
from_kafka
(topics, consumer_params, poll_interval=0.1, **kwargs) Accepts messages from Kafka
Uses the confluent-kafka library, https://docs.confluent.io/current/clients/confluent-kafka-python/
- topics: list of str
-
Labels of Kafka topics to consume from
- consumer_params: dict
-
Settings to set up the stream, see https://docs.confluent.io/current/clients/confluent-kafka-python/#configuration https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md Examples: bootstrap.servers, Connection string(s) (host:port) by which to reach Kafka; group.id, Identity of the consumer. If multiple sources share the same group, each message will be passed to only one of them.
- poll_interval: number
-
Seconds that elapse between polling Kafka for new messages
- start: bool (False)
-
Whether to start polling upon instantiation
>>> source = Stream.from_kafka(['mytopic'], ... {'bootstrap.servers': 'localhost:9092', ... 'group.id': 'streamz'})
- static
from_kafka_batched
(topic, consumer_params, poll_interval='1s', npartitions=None, refresh_partitions=False, start=False, dask=False, max_batch_size=10000, keys=False, engine=None, **kwargs) Get messages and keys (optional) from Kafka in batches
Uses the confluent-kafka library, https://docs.confluent.io/current/clients/confluent-kafka-python/
This source will emit lists of messages for each partition of a single given topic per time interval, if there is new data. If using dask, one future will be produced per partition per time-step, if there is data.
Checkpointing is achieved through the use of reference counting. A reference counter is emitted downstream for each batch of data. A callback is triggered when the reference count reaches zero and the offsets are committed back to Kafka. Upon the start of this function, the previously committed offsets will be fetched from Kafka and begin reading form there. This will guarantee at-least-once semantics.
- topic: str
-
Kafka topic to consume from
- consumer_params: dict
-
Settings to set up the stream, seehttps://docs.confluent.io/current/clients/confluent-kafka-python/#configurationhttps://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.mdExamples:bootstrap.servers: Connection string(s) (host:port) by which to reach Kafkagroup.id: Identity of the consumer. If multiple sources share the samegroup, each message will be passed to only one of them.
- poll_interval: number
-
Seconds that elapse between polling Kafka for new messages
- npartitions: int (None)
-
Number of partitions in the topic.If None, streamz will poll Kafka to get the number of partitions.
- refresh_partitions: bool (False)
-
Useful if the user expects to increase the number of topic partitions on thefly, maybe to handle spikes in load. Streamz polls Kafka in every batch todetermine the current number of partitions. If partitions have been added,streamz will automatically start reading data from the new partitions as well.If set to False, streamz will not accommodate adding partitions on the fly.It is recommended to restart the stream after decreasing the number of partitions.
- start: bool (False)
-
Whether to start polling upon instantiation
- max_batch_size: int
-
The maximum number of messages per partition to be consumed per batch
- keys: bool (False)
-
Whether to extract keys along with the messages.If True, this will yield each message as a dict:{‘key’:msg.key(), ‘value’:msg.value()}
- engine: str (None)
-
If engine is set to “cudf”, streamz reads data (messages must be JSON)from Kafka in an accelerated manner directly into cuDF (GPU) dataframes.This is done using the RAPIDS custreamz library.
Please refer to RAPIDS cudf API here:https://docs.rapids.ai/api/cudf/stable/
Folks interested in trying out custreamz would benefit from thisaccelerated Kafka reader. If one does not want to use GPUs, theycan use streamz as is, with the default engine=None.
To use this option, one must install custreamz (use theappropriate CUDA version recipe & Python version)using a command like the one below, which will install allGPU dependencies and streamz itself:
conda install -c rapidsai-nightly -c nvidia -c conda-forge | -c defaults custreamz=0.15 python=3.7 cudatoolkit=10.2
More information at: https://rapids.ai/start.html
Important Kafka Configurations By default, a stream will start reading from the latest offsets available. Please set ‘auto.offset.reset’: ‘earliest’ in the consumer configs, if the stream needs to start processing from the earliest offsets.
>>> source = Stream.from_kafka_batched('mytopic', ... {'bootstrap.servers': 'localhost:9092', ... 'group.id': 'streamz'})
- static
from_periodic
(callback, poll_interval=0.1, **kwargs) Generate data from a function on given period
cf
streamz.dataframe.PeriodicDataFrame
- callback: callable
-
Function to call on each iteration. Takes no arguments.
- poll_interval: float
-
Time to sleep between calls (s)
- static
from_process
(cmd, open_kwargs=None, with_stderr=False, with_end=True, **kwargs) Messages from a running external process
This doesn’t work on Windows
- cmd : list of str or str
-
Command to run: program name, followed by arguments
- open_kwargs : dict
-
To pass on the the process open function, see
subprocess.Popen
. - with_stderr : bool
-
Whether to include the process STDERR in the stream
- start : bool
-
Whether to immediately startup the process. Usually you want to connect downstream nodes first, and then call
.start()
.
>>> source = Source.from_process(['ping', 'localhost'])
- static
from_tcp
(port, delimiter=b'\n', server_kwargs=None, **kwargs) Creates events by reading from a socket using tornado TCPServer
The stream of incoming bytes is split on a given delimiter, and the parts become the emitted events.
- port : int
-
The port to open and listen on. It only gets opened when the source is started, and closed upon
stop()
- delimiter : bytes
-
The incoming data will be split on this value. The resulting events will still have the delimiter at the end.
- start : bool
-
Whether to immediately initiate the source. You probably want to set up downstream nodes first.
- server_kwargs : dict or None
-
If given, additional arguments to pass to TCPServer
>>> source = Source.from_tcp(4567)
- static
from_textfile
(f, poll_interval=0.1, delimiter='\n', from_end=False, **kwargs) Stream data from a text file
- f: file or string
-
Source of the data. If string, will be opened.
- poll_interval: Number
-
Interval to poll file for new data in seconds
- delimiter: str
-
Character(s) to use to split the data into parts
- start: bool
-
Whether to start running immediately; otherwise call stream.start() explicitly.
- from_end: bool
-
Whether to begin streaming from the end of the file (i.e., only emit lines appended after the stream starts).
>>> source = Stream.from_textfile('myfile.json') >>> source.map(json.loads).pluck('value').sum().sink(print) >>> source.start()
Stream
gather
()This is a no-op for core streamz
This allows gather to be used in both dask and core streams
header
= NoneThe header for DataFrame creation with the streamz package
latest
(**kwargs)Drop held-up data and emit the latest result
This allows you to skip intermediate elements in the stream if there is some back pressure causing a slowdown. Use this when you only care about the latest elements, and are willing to lose older data.
This passes through values without modification otherwise.
>>> source.map(f).latest().map(g)
map
(func, *args, **kwargs)Apply a function to every element in the stream
func: callable args :
The arguments to pass to the function.
- kwargs:
-
Keyword arguments to pass to func
>>> source = Stream() >>> source.map(lambda x: 2*x).sink(print) >>> for i in range(5): ... source.emit(i) 0 2 4 6 8
partition
(n, timeout=None, key=None, **kwargs)Partition stream into tuples of equal size
- n: int
-
Maximum partition size
- timeout: int or float, optional
-
Number of seconds after which a partition will be emitted, even if its size is less than
n
. IfNone
(default), a partition will be emitted only when its size reachesn
. - key: hashable or callable, optional
-
Emit items with the same key together as a separate partition. If
key
is callable, partition will be identified bykey(x)
, otherwise byx[key]
. Defaults toNone
.
>>> source = Stream() >>> source.partition(3).sink(print) >>> for i in range(10): ... source.emit(i) (0, 1, 2) (3, 4, 5) (6, 7, 8)
>>> source = Stream() >>> source.partition(2, key=lambda x: x % 2).sink(print) >>> for i in range(4): ... source.emit(i) (0, 2) (1, 3)
>>> from time import sleep >>> source = Stream() >>> source.partition(5, timeout=1).sink(print) >>> for i in range(3): ... source.emit(i) >>> sleep(1) (0, 1, 2)
partition_unique
(n: int, key: collections.abc.Hashable = <function identity>, keep: str = 'first', **kwargs)Partition stream elements into groups of equal size with unique keys only.
- n: int
-
Number of (unique) elements to pass through as a group.
- key: Union[Hashable, Callable[[Any], Hashable]]
-
Callable that accepts a stream element and returns a unique, hashable representation of the incoming data (
key(x)
), or a hashable that gets the corresponding value of a stream element (x[key]
). For example,key=lambda x: x["a"]
would allow only elements with unique"a"
values to pass through.By default, we simply use the element object itself as the key, so that object must be hashable. If that’s not the case, a non-default key must be provided.
- keep: str
-
Which element to keep in the case that a unique key is already found in the group. If “first”, keep element from the first occurrence of a given key; if “last”, keep element from the most recent occurrence. Note that relative ordering of elements is preserved in the data passed through, and not ordering of keys.
kwargs
>>> source = Stream() >>> stream = source.partition_unique(n=3, keep="first").sink(print) >>> eles = [1, 2, 1, 3, 1, 3, 3, 2] >>> for ele in eles: ... source.emit(ele) (1, 2, 3) (1, 3, 2)
>>> source = Stream() >>> stream = source.partition_unique(n=3, keep="last").sink(print) >>> eles = [1, 2, 1, 3, 1, 3, 3, 2] >>> for ele in eles: ... source.emit(ele) (2, 1, 3) (1, 3, 2)
>>> source = Stream() >>> stream = source.partition_unique(n=3, key=lambda x: len(x), keep="last").sink(print) >>> eles = ["f", "fo", "f", "foo", "f", "foo", "foo", "fo"] >>> for ele in eles: ... source.emit(ele) ('fo', 'f', 'foo') ('f', 'foo', 'fo')
pluck
(pick, **kwargs)Select elements from elements in the stream.
- pluck : object, list
-
The element(s) to pick from the incoming element in the stream If an instance of list, will pick multiple elements.
>>> source = Stream() >>> source.pluck([0, 3]).sink(print) >>> for x in [[1, 2, 3, 4], [4, 5, 6, 7], [8, 9, 10, 11]]: ... source.emit(x) (1, 4) (4, 7) (8, 11)
>>> source = Stream() >>> source.pluck('name').sink(print) >>> for x in [{'name': 'Alice', 'x': 123}, {'name': 'Bob', 'x': 456}]: ... source.emit(x) 'Alice' 'Bob'
poll_metrics
()Polling co-routine. This retrieves metrics from the connector.
rate_limit
(interval, **kwargs)Limit the flow of data
This stops two elements of streaming through in an interval shorter than the provided value.
- interval: float
-
Time in seconds
- classmethod
register_api
(modifier=<function identity>, attribute_name=None) Add callable to Stream API
This allows you to register a new method onto this class. You can use it as a decorator.:
>>> @Stream.register_api() ... class foo(Stream): ... ... >>> Stream().foo(...) # this works now
It attaches the callable as a normal attribute to the class object. In doing so it respects inheritance (all subclasses of Stream will also get the foo attribute).
By default callables are assumed to be instance methods. If you like you can include modifiers to apply before attaching to the class as in the following case where we construct a
staticmethod
.>>> @Stream.register_api(staticmethod) ... class foo(Stream): ... ...
>>> Stream.foo(...) # Foo operates as a static method
You can also provide an optional
attribute_name
argument to control the name of the attribute your callable will be attached as.>>> @Stream.register_api(attribute_name="bar") ... class foo(Stream): ... ...
>> Stream().bar(…) # foo was actually attached as bar
- classmethod
register_plugin_entry_point
(entry_point, modifier=<function identity>)
remove
(predicate)Only pass through elements for which the predicate returns False
- property
scan
scatter
(**kwargs)Convert local stream to Dask Stream
All elements flowing through the input will be scattered out to the cluster
sink
(func, *args, **kwargs)Apply a function on every element
- func: callable
-
A function that will be applied on every element.
- args:
-
Positional arguments that will be passed to
func
after the incoming element. - kwargs:
-
Stream-specific arguments will be passed to
Stream.__init__
, the rest of them will be passed tofunc
.
>>> source = Stream() >>> L = list() >>> source.sink(L.append) >>> source.sink(print) >>> source.sink(print) >>> source.emit(123) 123 123 >>> L [123]
map Stream.sink_to_list
sink_to_list
()Append all elements of a stream to a list as they come in
>>> source = Stream() >>> L = source.map(lambda x: 10 * x).sink_to_list() >>> for i in range(5): ... source.emit(i) >>> L [0, 10, 20, 30, 40]
sink_to_textfile
(file, end='\n', mode='a', **kwargs)Write elements to a plain text file, one element per line.
Type of elements must be
str
.- file: str or file-like
-
File to write the elements to.
str
is treated as a file name to open. If file-like, descriptor must be open in text mode. Note that the file descriptor will be closed when this sink is destroyed. - end: str, optional
-
This value will be written to the file after each element. Defaults to newline character.
- mode: str, optional
-
If file is
str
, file will be opened in this mode. Defaults to"a"
(append mode).
>>> source = Stream() >>> source.map(str).sink_to_textfile("test.txt") >>> source.emit(0) >>> source.emit(1) >>> print(open("test.txt", "r").read()) 0 1
slice
(start=None, end=None, step=None, **kwargs)Get only some events in a stream by position. Works like list[] syntax.
- start : int
-
First event to use. If None, start from the beginnning
- end : int
-
Last event to use (non-inclusive). If None, continue without stopping. Does not support negative indexing.
- step : int
-
Pass on every Nth event. If None, pass every one.
>>> source = Stream() >>> source.slice(2, 6, 2).sink(print) >>> for i in range(5): ... source.emit(0) 2 4
sliding_window
(n, return_partial=True, **kwargs)Produce overlapping tuples of size n
- return_partial : bool
-
If True, yield tuples as soon as any events come in, each tuple being smaller or equal to the window size. If False, only start yielding tuples once a full window has accrued.
>>> source = Stream() >>> source.sliding_window(3, return_partial=False).sink(print) >>> for i in range(8): ... source.emit(i) (0, 1, 2) (1, 2, 3) (2, 3, 4) (3, 4, 5) (4, 5, 6) (5, 6, 7)
starmap
(func, *args, **kwargs)Apply a function to every element in the stream, splayed out
See
itertools.starmap
func: callable args :
The arguments to pass to the function.
- kwargs:
-
Keyword arguments to pass to func
>>> source = Stream() >>> source.starmap(lambda a, b: a + b).sink(print) >>> for i in range(5): ... source.emit((i, i)) 0 2 4 6 8
start
()Start the stream.
stop
()Stop the stream.
str_list
= ['func', 'predicate', 'n', 'interval']
timed_window
(interval, **kwargs)Emit a tuple of collected results every interval
Every
interval
seconds this emits a tuple of all of the results seen so far. This can help to batch data coming off of a high-volume stream.
timed_window_unique
(interval: Union[int, str], key: collections.abc.Hashable = <function identity>, keep: str = 'first', **kwargs)Emit a group of elements with unique keys every
interval
seconds.- interval: Union[int, str]
-
Number of seconds over which to group elements, or a
pandas
-style duration string that can be converted into seconds. - key: Union[Hashable, Callable[[Any], Hashable]]
-
Callable that accepts a stream element and returns a unique, hashable representation of the incoming data (
key(x)
), or a hashable that gets the corresponding value of a stream element (x[key]
). For example, bothkey=lambda x: x["a"]
andkey="a"
would allow only elements with unique"a"
values to pass through.By default, we simply use the element object itself as the key, so that object must be hashable. If that’s not the case, a non-default key must be provided.
- keep: str
-
Which element to keep in the case that a unique key is already found in the group. If “first”, keep element from the first occurrence of a given key; if “last”, keep element from the most recent occurrence. Note that relative ordering of elements is preserved in the data passed through, and not ordering of keys.
>>> source = Stream()
Get unique hashable elements in a window, keeping just the first occurrence: >>> stream = source.timed_window_unique(interval=1.0, keep=”first”).sink(print) >>> for ele in [1, 2, 3, 3, 2, 1]: … source.emit(ele) () (1, 2, 3) ()
Get unique hashable elements in a window, keeping just the last occurrence: >>> stream = source.timed_window_unique(interval=1.0, keep=”last”).sink(print) >>> for ele in [1, 2, 3, 3, 2, 1]: … source.emit(ele) () (3, 2, 1) ()
Get unique elements in a window by (string) length, keeping just the first occurrence: >>> stream = source.timed_window_unique(interval=1.0, key=len, keep=”first”) >>> for ele in [“f”, “b”, “fo”, “ba”, “foo”, “bar”]: … source.emit(ele) () (‘f’, ‘fo’, ‘foo’) ()
Get unique elements in a window by (string) length, keeping just the last occurrence: >>> stream = source.timed_window_unique(interval=1.0, key=len, keep=”last”) >>> for ele in [“f”, “b”, “fo”, “ba”, “foo”, “bar”]: … source.emit(ele) () (‘b’, ‘ba’, ‘bar’) ()
to_batch
(**kwargs)Convert a stream of lists to a Batch
All elements of the stream are assumed to be lists or tuples
>>> source = Stream() >>> batches = source.to_batch() >>> L = batches.pluck('value').map(inc).sum().stream.sink_to_list() >>> source.emit([{'name': 'Alice', 'value': 1}, ... {'name': 'Bob', 'value': 2}, ... {'name': 'Charlie', 'value': 3}]) >>> source.emit([{'name': 'Alice', 'value': 4}, ... {'name': 'Bob', 'value': 5}, ... {'name': 'Charlie', 'value': 6}])
to_dataframe
(example)Convert a stream of Pandas dataframes to a DataFrame
>>> source = Stream() >>> sdf = source.to_dataframe() >>> L = sdf.groupby(sdf.x).y.mean().stream.sink_to_list() >>> source.emit(pd.DataFrame(...)) >>> source.emit(pd.DataFrame(...)) >>> source.emit(pd.DataFrame(...))
to_kafka
(topic, producer_config, **kwargs)Writes data in the stream to Kafka
This stream accepts a string or bytes object. Call
flush
to ensure all messages are pushed. Responses from Kafka are pushed downstream.- topic : string
-
The topic which to write
- producer_config : dict
-
Settings to set up the stream, see https://docs.confluent.io/current/clients/confluent-kafka-python/#configuration https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md Examples: bootstrap.servers: Connection string (host:port) to Kafka
>>> from streamz import Stream >>> ARGS = {'bootstrap.servers': 'localhost:9092'} >>> source = Stream() >>> kafka = source.map(lambda x: str(x)).to_kafka('test', ARGS) <to_kafka> >>> for i in range(10): ... source.emit(i) >>> kafka.flush()
union
(**kwargs)Combine multiple streams into one
Every element from any of the upstreams streams will immediately flow into the output stream. They will not be combined with elements from other streams.
Stream.zip Stream.combine_latest
unique
(maxsize=None, key=<function identity>, hashable=True, **kwargs)Avoid sending through repeated elements
This deduplicates a stream so that only new elements pass through. You can control how much of a history is stored with the
maxsize=
parameter. For example settingmaxsize=1
avoids sending through elements when one is repeated right after the other.- maxsize: int or None, optional
-
number of stored unique values to check against
- key : function, optional
-
Function which returns a representation of the incoming data. For example
key=lambda x: x['a']
could be used to allow only pieces of data with unique'a'
values to pass through. - hashable : bool, optional
-
If True then data is assumed to be hashable, else it is not. This is used for determining how to cache the history, if hashable then either dicts or LRU caches are used, otherwise a deque is used. Defaults to True.
>>> source = Stream() >>> source.unique(maxsize=1).sink(print) >>> for x in [1, 1, 2, 2, 2, 1, 3]: ... source.emit(x) 1 2 1 3
update
(x, who=None, metadata=None)
- property
upstream
visualize
(filename='mystream.png', **kwargs)Render the computation of this object’s task graph using graphviz.
Requires
graphviz
andnetworkx
to be installed.- filename : str, optional
-
The name of the file to write to disk.
- kwargs:
-
Graph attributes to pass to graphviz like
rankdir="LR"
zip
(**kwargs)Combine streams together into a stream of tuples
We emit a new tuple once all streams have produce a new tuple.
combine_latest zip_latest
zip_latest
(*upstreams, **kwargs)Combine multiple streams together to a stream of tuples
The stream which this is called from is lossless. All elements from the lossless stream are emitted reguardless of when they came in. This will emit a new tuple consisting of an element from the lossless stream paired with the latest elements from the other streams. Elements are only emitted when an element on the lossless stream are received, similar to
combine_latest
with theemit_on
flag.Stream.combine_latest Stream.zip
-
- class
proficloud.tsd_io.tsd.
ProficloudIOMetrics
(host='proficloud.io', debug=False) Bases:
object
authenticate
(username, password)
- static
dateparse
(datestring)
queryMetrics
(uuid, metrics, start_time=None, end_time=None, dropNa=True, chunkSize=20000, groupIntervalMs=200)Query metrics and return the data as pandas DataFrame.
Parameters: -
metrics (list(str), str) – A list of metric names (or a single metric name) to query.
-
start_time (int, datetime, str or None) – Timestamp (ms based), or datetime object (or datetime as string) not used when None. (Default: None)
-
end_time (int, datetime, str or None) – Timestamp (ms based), or datetime object (or datetime as string). This must be later in time than the start time. If not specified, the end time is assumed to be the current date and time. (Default: None)
-
dropNa (bool) – Drop NA when true. Default: True
Return type: pandas.DataFrame
Returns: Returns pandas.DataFrame
-