Collections#

Tools for working with collection types.

class ktz.collections.Incrementer(*args, fn=None, **kwargs)#

Automatically assign unique ids.

This is basically a defaultdict using a state which remembers the latest assigned id and assigns its increment when queried for a missing item. It can be frozen to error out on unknown keys. You can overwrite the built-in incrementer by providing your own iterable upon instantiation using the fn kwarg.

Parameters:
dictdict

Base dictionary

fnIterable

Custom iterable to use instead of count()

Raises:
NameError

Thrown if the dict is frozen and an unknown key is accessed

KeyError

Thrown for invalid explicit setting of values

StopIteration

Thrown for depleted custom iterators given to __init__

Examples

>>> from ktz.collections import Incrementer
>>> # using a custom fn to control the assigned ids
>>> from itertools import count
>>> ids = Incrementer(fn=count(10))
>>> ids[4]
10
>>> ids[10]
11
>>> ids
{4: 10, 10: 11}
>>> ids.freeze()
>>> ids[3]
NameError: Key '3' not present and incrementer is frozen.

Methods

clear()

copy()

freeze()

Freeze the incrementer.

fromkeys(iterable[, value])

Create a new dictionary with keys from iterable and values set to value.

get(key[, default])

Return the value for key if key is in the dictionary, else default.

items()

keys()

pop(key[, default])

If the key is not found, return the default if given; otherwise, raise a KeyError.

popitem(/)

Remove and return a (key, value) pair as a 2-tuple.

setdefault(key[, default])

Insert key with a value of default if key is not in the dictionary.

unfreeze()

Unfreeze the incrementer.

update([E, ]**F)

If E is present and has a .keys() method, then does: for k in E: D[k] = E[k] If E is present and lacks a .keys() method, then does: for k, v in E: D[k] = v In either case, this is followed by: for k in F: D[k] = F[k]

values()

freeze()#

Freeze the incrementer.

It is no longer possible to automatically create new keys.

Examples

>>> from ktz.collections import Incrementer
>>> ids = Incrementer()
>>> ids[1]
0
>>> ids.freeze()
>>> ids[1]
0
>>> ids[2]
NameError: Key '2' not present and incrementer is frozen.
unfreeze()#

Unfreeze the incrementer.

Allows the creation of new keys again

Examples

>>> from ktz.collections import Incrementer
>>> ids = Incrementer()
>>> ids.freeze()
>>> ids[2]
NameError: Key '2' not present and incrementer is frozen.
>>> ids.unfreeze()
>>> ids[2]
0
ktz.collections.buckets(col, key=None, mapper=None)#

Sort data into buckets.

Takes a collection and sorts the data into buckets based on a provided function. The resulting buckets can then optionally be mapped (e.g. to be reduced).

Parameters:
colCollection[A] | Collection[tuple[B, C]]

Collection to be partitioned

keyCallable[[Index, A], tuple[B, C]] | None

Optional function that returns (key, value) tuples

mapperCallable[[tuple[C]], D] | None

Optional function that takes a bucket and maps it

Returns:
Mapping[B, list[C]] | Mapping[B, D]

A dictionary which maps bucket identifieres to their data

Examples

>>> from ktz.collections import buckets
>>> data = [1, 2, 3, 4, 5]
>>> buckets(col=data, key=lambda i, x: (x % 2 ==0 , x))
{False: [1, 3, 5], True: [2, 4]}
>>> buckets(col=data, key=lambda i, x: (x % 2 ==0 , x), mapper=sum)
{False: 9, True: 6}
ktz.collections.dconv(dic, *convert)#

Convert a dictionary deeply.

A pipeline of converter functions may be provided which transform the values of the given mapping. It always returns a deep copy of the mapping as a dictionary. The converter functions are applied in the given order.

Parameters:
dicdict

Mapping to be copied and transformed

*convertCallable[[A, B], C]

Converter functions

Examples

>>> from ktz.collections import dconv
>>> dconv(dict(a=1, d=dict(b=2, c=3)), lambda v: v + 2)
{'a': 3, 'd': {'b': 4, 'c': 5}}
>>> dconv(dict(a=1, d=dict(b=2, c=3)), lambda v, k: True if k == 'b' else False)
{'a': False, 'd': {'b': True, 'c': False}}
ktz.collections.dflat(dic, sep='.', only=None)#

Flatten a deep dictionary with string keys.

Takes a deeply nested dictionary and flattens it by concatenating its keys using the provided separator. For example a dictionary d[‘foo’][‘bar’] = 3 becomes d[‘foo.bar’] = 3. Keys are transformed to strings either by __str__ or __repr__ if __str__ is not defined.

Parameters:
dicMapping[str, XXXX]

The dictionary to be flattened

sepstr

Separator to concatenate the keys with

onlyint | None

Stops flattening after the provided depth

Examples

>>> from ktz.collections import dflat
>>> dic = dict(foo=dict(bar=dict(a=1,b=2),c=3),d=4)
>>> dflat(dic)
{'foo.bar.a': 1, 'foo.bar.b': 2, 'foo.c': 3, 'd': 4}
>>> dflat(dic, sep=' ')
{'foo bar a': 1, 'foo bar b': 2, 'foo c': 3, 'd': 4}
>>> dflat(dic, only=2)
{'foo.bar': {'a': 1, 'b': 2}, 'foo.c': 3, 'd': 4}
ktz.collections.dmerge(*ds)#

Deeply merge dictionaries.

A new deep copy is created from the keys and values from the provided mappings. Values of the the next mapping overwrite the former unless they are set to None.

Parameters:
dsMapping

Deep mappings to be merged

Examples

>>> from ktz.collections import dmerge
>>> d1 = dict(foo=dict(a=1, b=2), bar=3)
>>> d2 = dict(foo=dict(a=3, c=4), xyz=5)
>>> dmerge(d1, d2)
{'foo': {'a': 3, 'b': 2, 'c': 4}, 'bar': 3, 'xyz': 5}
ktz.collections.drslv(dic, chain, sep='.', default=<class 'KeyError'>, collapse=None, dtype=None)#

Resolve string trails in deep dictionaries.

For example, with sep=”.” and collapse=0 chain=foo.bar.baz retrieves dic[‘foo’][‘bar’][‘baz’]. Setting collapse=1 returns dic[‘foo’][‘bar’] = {‘baz’: …}

It is also possible to use wildcards in the querz string to skip unknown but unambiguous (i.e. single-key dict) entries.

Parameters:
dicMapping

Data to be looked up

chainstr

Query string

sepstr

How the chain needs to be split

collapseint | None

Return an n-level deep dict instead

defaultAny

For missing keys; defaults to raising a KeyError

dtypeAny | None

If not None: checks value(s) with isinstance

Examples

>>> from ktz.collections import drslv
>>> dic = dict(foo=dict(bar=dict(a=1,b=2),c=3),d=4)
>>> drslv(dic, 'foo.bar.a')
1
>>> drslv(dic, 'foo bar a', sep=' ')
1
>>> drslv(dic, 'foo.bar.a', collapse=1)
{'a': 1, 'b': 2}
>>> drslv(dic, 'not.there')
Traceback (most recent call last):
  Input In [15] in <cell line: 1>
    drslv(dic, 'not.there')
  File ~/Complex/scm/ktz/ktz/collections.py:267 in drslv
    raise err
  File ~/Complex/scm/ktz/ktz/collections.py:264 in drslv
    dic = dic[key]
KeyError: 'not'
>>> drslv(dic, 'not.there', default=None)
>>> drslv(dic, 'not.there', default=1)
1
>>> drslv(dic, 'foo.*.a')  # only works for single-element dicts
1
ktz.collections.lflat(col, depth=-1)#

Flattens a tuple or list.

Consumes the given sequence and flattens it up to n levels deep or completely.

Parameters:
colNested

Nested list or tuple

depthint

Maximum depth to flatten

Returns:
Generator[A, None, None]

Generator with flattened collection

Examples

>>> from ktz.collections import flat
>>> flat([[1], [[2]]], depth=2)
<generator object flat at 0x7f2886aeccf0>
>>> list(flat([[1], [[2]]], depth=2))
[1, [2]]
>>> list(flat([["foo"], [["bar"]]]))
["foo", "bar"]
ktz.collections.ryaml(*configs, **overwrites)#

Load and join configurations from yaml and kwargs.

First, all provided configuration files are loaded and joined together. Afterwards, all provided kwargs overwrite the joined configuration dict.

Parameters:
*configsPath | str

Config files to be read and merged

**overwritesAny

To overwrite loaded values

ktz.collections.unbucket(buckets)#

Flattens a bucket dictionary.

Partitioned data is joined back up together and presented as tuples in a flat list.

Parameters:
bucketsMapping[A, list[B]]

Bucket dictionary

Returns:
list[tuple[A, B]]

Flattened collection

Examples

>>> from ktz.collections import buckets
>>> from ktz.collections import unbucket
>>> data = [1, 2, 3, 4, 5]
>>> kcol.buckets(col=data, key=lambda i, x: (x % 2 ==0 , x))
{False: [1, 3, 5], True: [2, 4]}
>>> parts = kcol.buckets(col=data, key=lambda i, x: (x % 2 ==0 , x))
>>> unbucket(parts)
[(False, 1), (False, 3), (False, 5), (True, 2), (True, 4)]