Collections#
Tools for working with collection types.
- class ktz.collections.Incrementer(*args, fn=None, **kwargs)#
Automatically assign unique ids.
This is basically a defaultdict using a state which remembers the latest assigned id and assigns its increment when queried for a missing item. It can be frozen to error out on unknown keys. You can overwrite the built-in incrementer by providing your own iterable upon instantiation using the fn kwarg.
- Parameters:
- dictdict
Base dictionary
- fnIterable
Custom iterable to use instead of count()
- Raises:
- NameError
Thrown if the dict is frozen and an unknown key is accessed
- KeyError
Thrown for invalid explicit setting of values
- StopIteration
Thrown for depleted custom iterators given to __init__
Examples
>>> from ktz.collections import Incrementer >>> # using a custom fn to control the assigned ids >>> from itertools import count >>> ids = Incrementer(fn=count(10)) >>> ids[4] 10 >>> ids[10] 11 >>> ids {4: 10, 10: 11} >>> ids.freeze() >>> ids[3] NameError: Key '3' not present and incrementer is frozen.
Methods
clear()copy()freeze()Freeze the incrementer.
fromkeys(iterable[, value])Create a new dictionary with keys from iterable and values set to value.
get(key[, default])Return the value for key if key is in the dictionary, else default.
items()keys()pop(key[, default])If the key is not found, return the default if given; otherwise, raise a KeyError.
popitem(/)Remove and return a (key, value) pair as a 2-tuple.
setdefault(key[, default])Insert key with a value of default if key is not in the dictionary.
unfreeze()Unfreeze the incrementer.
update([E, ]**F)If E is present and has a .keys() method, then does: for k in E: D[k] = E[k] If E is present and lacks a .keys() method, then does: for k, v in E: D[k] = v In either case, this is followed by: for k in F: D[k] = F[k]
values()- freeze()#
Freeze the incrementer.
It is no longer possible to automatically create new keys.
Examples
>>> from ktz.collections import Incrementer >>> ids = Incrementer() >>> ids[1] 0 >>> ids.freeze() >>> ids[1] 0 >>> ids[2] NameError: Key '2' not present and incrementer is frozen.
- unfreeze()#
Unfreeze the incrementer.
Allows the creation of new keys again
Examples
>>> from ktz.collections import Incrementer >>> ids = Incrementer() >>> ids.freeze() >>> ids[2] NameError: Key '2' not present and incrementer is frozen. >>> ids.unfreeze() >>> ids[2] 0
- ktz.collections.buckets(col, key=None, mapper=None)#
Sort data into buckets.
Takes a collection and sorts the data into buckets based on a provided function. The resulting buckets can then optionally be mapped (e.g. to be reduced).
- Parameters:
- colCollection[A] | Collection[tuple[B, C]]
Collection to be partitioned
- keyCallable[[Index, A], tuple[B, C]] | None
Optional function that returns (key, value) tuples
- mapperCallable[[tuple[C]], D] | None
Optional function that takes a bucket and maps it
- Returns:
- Mapping[B, list[C]] | Mapping[B, D]
A dictionary which maps bucket identifieres to their data
Examples
>>> from ktz.collections import buckets >>> data = [1, 2, 3, 4, 5] >>> buckets(col=data, key=lambda i, x: (x % 2 ==0 , x)) {False: [1, 3, 5], True: [2, 4]} >>> buckets(col=data, key=lambda i, x: (x % 2 ==0 , x), mapper=sum) {False: 9, True: 6}
- ktz.collections.dconv(dic, *convert)#
Convert a dictionary deeply.
A pipeline of converter functions may be provided which transform the values of the given mapping. It always returns a deep copy of the mapping as a dictionary. The converter functions are applied in the given order.
- Parameters:
- dicdict
Mapping to be copied and transformed
- *convertCallable[[A, B], C]
Converter functions
Examples
>>> from ktz.collections import dconv >>> dconv(dict(a=1, d=dict(b=2, c=3)), lambda v: v + 2) {'a': 3, 'd': {'b': 4, 'c': 5}} >>> dconv(dict(a=1, d=dict(b=2, c=3)), lambda v, k: True if k == 'b' else False) {'a': False, 'd': {'b': True, 'c': False}}
- ktz.collections.dflat(dic, sep='.', only=None)#
Flatten a deep dictionary with string keys.
Takes a deeply nested dictionary and flattens it by concatenating its keys using the provided separator. For example a dictionary d[‘foo’][‘bar’] = 3 becomes d[‘foo.bar’] = 3. Keys are transformed to strings either by __str__ or __repr__ if __str__ is not defined.
- Parameters:
- dicMapping[str, XXXX]
The dictionary to be flattened
- sepstr
Separator to concatenate the keys with
- onlyint | None
Stops flattening after the provided depth
Examples
>>> from ktz.collections import dflat >>> dic = dict(foo=dict(bar=dict(a=1,b=2),c=3),d=4) >>> dflat(dic) {'foo.bar.a': 1, 'foo.bar.b': 2, 'foo.c': 3, 'd': 4} >>> dflat(dic, sep=' ') {'foo bar a': 1, 'foo bar b': 2, 'foo c': 3, 'd': 4} >>> dflat(dic, only=2) {'foo.bar': {'a': 1, 'b': 2}, 'foo.c': 3, 'd': 4}
- ktz.collections.dmerge(*ds)#
Deeply merge dictionaries.
A new deep copy is created from the keys and values from the provided mappings. Values of the the next mapping overwrite the former unless they are set to None.
- Parameters:
- dsMapping
Deep mappings to be merged
Examples
>>> from ktz.collections import dmerge >>> d1 = dict(foo=dict(a=1, b=2), bar=3) >>> d2 = dict(foo=dict(a=3, c=4), xyz=5) >>> dmerge(d1, d2) {'foo': {'a': 3, 'b': 2, 'c': 4}, 'bar': 3, 'xyz': 5}
- ktz.collections.drslv(dic, chain, sep='.', default=<class 'KeyError'>, collapse=None, dtype=None)#
Resolve string trails in deep dictionaries.
For example, with sep=”.” and collapse=0 chain=foo.bar.baz retrieves dic[‘foo’][‘bar’][‘baz’]. Setting collapse=1 returns dic[‘foo’][‘bar’] = {‘baz’: …}
It is also possible to use wildcards in the querz string to skip unknown but unambiguous (i.e. single-key dict) entries.
- Parameters:
- dicMapping
Data to be looked up
- chainstr
Query string
- sepstr
How the chain needs to be split
- collapseint | None
Return an n-level deep dict instead
- defaultAny
For missing keys; defaults to raising a KeyError
- dtypeAny | None
If not None: checks value(s) with isinstance
Examples
>>> from ktz.collections import drslv >>> dic = dict(foo=dict(bar=dict(a=1,b=2),c=3),d=4) >>> drslv(dic, 'foo.bar.a') 1 >>> drslv(dic, 'foo bar a', sep=' ') 1 >>> drslv(dic, 'foo.bar.a', collapse=1) {'a': 1, 'b': 2} >>> drslv(dic, 'not.there') Traceback (most recent call last): Input In [15] in <cell line: 1> drslv(dic, 'not.there') File ~/Complex/scm/ktz/ktz/collections.py:267 in drslv raise err File ~/Complex/scm/ktz/ktz/collections.py:264 in drslv dic = dic[key] KeyError: 'not'
>>> drslv(dic, 'not.there', default=None) >>> drslv(dic, 'not.there', default=1) 1 >>> drslv(dic, 'foo.*.a') # only works for single-element dicts 1
- ktz.collections.lflat(col, depth=-1)#
Flattens a tuple or list.
Consumes the given sequence and flattens it up to n levels deep or completely.
- Parameters:
- colNested
Nested list or tuple
- depthint
Maximum depth to flatten
- Returns:
- Generator[A, None, None]
Generator with flattened collection
Examples
>>> from ktz.collections import flat >>> flat([[1], [[2]]], depth=2) <generator object flat at 0x7f2886aeccf0> >>> list(flat([[1], [[2]]], depth=2)) [1, [2]] >>> list(flat([["foo"], [["bar"]]])) ["foo", "bar"]
- ktz.collections.ryaml(*configs, **overwrites)#
Load and join configurations from yaml and kwargs.
First, all provided configuration files are loaded and joined together. Afterwards, all provided kwargs overwrite the joined configuration dict.
- Parameters:
- *configsPath | str
Config files to be read and merged
- **overwritesAny
To overwrite loaded values
- ktz.collections.unbucket(buckets)#
Flattens a bucket dictionary.
Partitioned data is joined back up together and presented as tuples in a flat list.
- Parameters:
- bucketsMapping[A, list[B]]
Bucket dictionary
- Returns:
- list[tuple[A, B]]
Flattened collection
Examples
>>> from ktz.collections import buckets >>> from ktz.collections import unbucket >>> data = [1, 2, 3, 4, 5] >>> kcol.buckets(col=data, key=lambda i, x: (x % 2 ==0 , x)) {False: [1, 3, 5], True: [2, 4]} >>> parts = kcol.buckets(col=data, key=lambda i, x: (x % 2 ==0 , x)) >>> unbucket(parts) [(False, 1), (False, 3), (False, 5), (True, 2), (True, 4)]