Source code for ymp.yaml

import io
import logging
import os
from collections.abc import (
    ItemsView, KeysView, Mapping, MappingView, Sequence, ValuesView
)

import ruamel.yaml
from ruamel.yaml import RoundTripRepresenter, YAML
from ruamel.yaml.comments import CommentedMap


log = logging.getLogger(__name__)  # pylint: disable=invalid-name


[docs]class MixedTypeError(Exception): """Mixed types in proxy collection"""
[docs]class LayeredConfError(Exception): """Error in LayeredConf"""
[docs]class LayeredConfWriteError(LayeredConfError): """Can't write"""
[docs]class LayeredConfAccessError(LayeredConfError, KeyError, IndexError): """Can't access"""
[docs]class AttrItemAccessMixin(object): """Mixin class mapping dot to bracket access Added to classes implementing __getitem__, __setitem__ and __delitem__, this mixin will allow acessing items using dot notation. I.e. "object.xyz" is translated to "object[xyz]". """ def __getattr__(self, key): try: if key[0] == "_": return self.__getattribute__(key) else: return self[key] except (IndexError, KeyError) as e: raise AttributeError(e) def __setattr__(self, key, value): try: if key[0] == "_": object.__setattr__(self, key, value) else: self[key] = value except (IndexError, KeyError) as e: raise AttributeError(e) def __delattr__(self, key): raise NotImplementedError()
[docs]class MultiProxy(object): """Base class for layered container structure""" def __init__(self, maps, parent=None, key=None): self._maps = list(maps) self._parent = parent self._key = key
[docs] def make_map_proxy(self, key, items): return MultiMapProxy(items, parent=self, key=key)
[docs] def make_seq_proxy(self, key, items): return MultiSeqProxy(items, parent=self, key=key)
[docs] def get_files(self): return [fn for fn, layer in self._maps]
[docs] def get_linenos(self): return [layer._yaml_line_col.line for fn, layer in self._maps]
[docs] def to_yaml(self, show_source=False): buf = io.StringIO() if show_source: for fn, layer in self._maps: buf.write(f"--- # from '{fn}' # ---\n") rt_yaml.dump(layer, buf) else: rt_yaml.dump(self, buf) return buf.getvalue()
def __str__(self): return self.to_yaml() def __repr__(self): return f"{self.__class__.__name__}({self._maps!r})"
[docs] def add_layer(self, name, container): self._maps.insert(0, ((name, container)))
[docs] def remove_layer(self, name): map_name = self._maps[0][0] if map_name == name: self._maps.pop(0) else: raise LayeredConfError(f"in remove_layer: {map_name} != {name}")
[docs]class MultiMapProxyMappingView(MappingView): """MappingView for MultiMapProxy""" def __init__(self, mapping): self._mapping = mapping def __len__(self): return len(self._mapping) def __repr__(self): return '{0.__class__.__name__}({0._mapping!r})'.format(self) def __radd__(self, other): if isinstance(other, Sequence): return other + list(self) raise TypeError()
[docs]class MultiMapProxyItemsView(MultiMapProxyMappingView, ItemsView): """ItemsView for MultiMapProxy""" def __contains__(self, key): return (isinstance(key, tuple) and len(key) == 2 and key[0] in self._mapping and self._mapping[key[0]] == key[1]) def __iter__(self): for key in self._mapping: yield key, self._mapping[key]
[docs]class MultiMapProxyKeysView(MultiMapProxyMappingView, KeysView): """KeysView for MultiMapProxy""" def __contains__(self, key): return key in self._mapping def __iter__(self): yield from iter(self._mapping)
[docs]class MultiMapProxyValuesView(MultiMapProxyMappingView, ValuesView): """ValuesView for MultiMapProxy""" def __contains__(self, key): return any(self._mapping[k] == key for k in self._mapping) def __iter__(self): for key in self._mapping: yield self._mapping[key]
[docs]class MultiMapProxy(Mapping, MultiProxy, AttrItemAccessMixin): """Mapping Proxy for layered containers""" def __contains__(self, key): return any(key in m for _, m in self._maps) def __len__(self): return len(set(k for _, m in self._maps for k in m)) def __getitem__(self, key): items = [(fn, m[key]) for fn, m in self._maps if key in m] if not items: raise KeyError(f"key '{key}' not found in any map") typs = set(type(m[1]) for m in items if m[1]) if len(typs) > 1: raise MixedTypeError( f"while trying to obtain '{key}' from {items!r}," f"types differ: {typs}" ) if isinstance(items[0][1], Mapping): return self.make_map_proxy(key, items) if isinstance(items[0][1], str): return items[0][1] if isinstance(items[0][1], Sequence): return self.make_seq_proxy(key, items) return items[0][1] def __setitem__(self, key, value): # we want to set to the top layer, so get that first mp = self keys = [] while mp._parent: keys.append(mp._key) mp = mp._parent # now walk back down for k in reversed(keys): if k not in mp._maps[0][1]: mp._maps[0][1][k] = CommentedMap() mp = mp[k] # and set the value, potentially on a different object than self mp._maps[0][1][key] = value def __delitem__(self, key): raise NotImplementedError() def __iter__(self): for key in set(k for _, m in self._maps for k in m): yield key
[docs] def get(self, value, default=None): try: return self[value] except KeyError: return default
[docs] def items(self): return MultiMapProxyItemsView(self)
[docs] def keys(self): return MultiMapProxyKeysView(self)
[docs] def values(self): return MultiMapProxyValuesView(self)
[docs]class MultiSeqProxy(Sequence, MultiProxy, AttrItemAccessMixin): """Sequence Proxy for layered containers""" def __contains__(self, value): return any(value in m for _, m in self._maps) def __iter__(self): for _, m in self._maps: for item in m: yield item def __len__(self): return sum(len(m) for _, m in self._maps) def __repr__(self): return f"{self.__class__.__name__}({self._maps})" def __str__(self): return "+".join(f"{m}" for _, m in self._maps) def __getitem__(self, index): if isinstance(index, slice): raise NotImplementedError() if isinstance(index, str): try: index = int(index) except ValueError: raise KeyError() for _, m in self._maps: if index >= len(m): index -= len(m) else: return m[index] def __radd__(self, other): return self.__add__(other) def __add__(self, other): return other + list(self) def __setitem__(self, key, value): raise NotImplementedError() def __delitem__(self, key): raise NotImplementedError() def __missing__(self, key): raise NotImplementedError() def __iadd__(self, item): # broken?! self.extend(item)
[docs] def extend(self, item): m = self._maps[0][1] m.extend(item)
[docs]class LayeredConfProxy(MultiMapProxy): """Layered configuration""" def __str__(self): try: return self.to_yaml() except ruamel.yaml.serializer.SerializerError: return self.__repr__() def __enter__(self): self.add_layer("dynamic", {}) return self def __exit__(self, *args): self.remove_layer("dynamic")
[docs] def save(self, outstream=None, layer=0): outfile = None if outstream: rt_yaml.dump(self._maps[layer][1], outstream) else: outfile = self._maps[layer][0] with open(outfile+".tmp", "w") as outstream: rt_yaml.dump(self._maps[layer][1], outstream) os.rename(outfile, outfile+".bkup") os.rename(outfile+".tmp", outfile)
RoundTripRepresenter.add_representer(LayeredConfProxy, RoundTripRepresenter.represent_dict) RoundTripRepresenter.add_representer(MultiMapProxy, RoundTripRepresenter.represent_dict) RoundTripRepresenter.add_representer(MultiSeqProxy, RoundTripRepresenter.represent_list) rt_yaml = YAML(typ="rt")
[docs]def load(files): """Load configuration files Creates a `LayeredConfProxy` configuration object from a set of YAML files. """ layers = [] for fn in reversed(files): with open(fn, "r") as f: yaml = rt_yaml.load(f) if not isinstance(yaml, Mapping): raise LayeredConfError( f"Malformed config file '{fn}'." ) layers.append((fn, yaml)) return LayeredConfProxy(layers)