import io
import logging
import os
from collections.abc import (
ItemsView, KeysView, Mapping, MappingView, Sequence, ValuesView
)
from typing import Union, List, Optional
import ruamel.yaml # type: ignore
from ruamel.yaml import RoundTripRepresenter, YAML, yaml_object, RoundTripConstructor # type: ignore
from ruamel.yaml.comments import CommentedMap # type: ignore
from ymp.exceptions import YmpConfigError
from ymp.common import AttrDict
log = logging.getLogger(__name__) # pylint: disable=invalid-name
[docs]class LayeredConfError(YmpConfigError):
"""Error in LayeredConf"""
def __init__(self, obj: object, msg: str, key: Optional[object]=None, stack = None):
super().__init__(obj, msg, key)
if stack:
self.stack = stack
[docs] def get_fileline(self):
if self.obj:
if hasattr(self.obj, "get_fileline"):
return self.obj.get_fileline(self.key)
if isinstance(self.obj, Sequence) and len(self.obj) == 2:
if hasattr(self.obj[1], "_yaml_line_col"):
return self.obj[0], self.obj[1]._yaml_line_col.line
else:
return self.obj
return None, None
[docs]class Entry:
def __init__(self, filename, yaml, index):
self.filename = filename
self.lineno = yaml._yaml_line_col.data[index][0] + 1
[docs]class MixedTypeError(LayeredConfError):
"""Mixed types in proxy collection"""
[docs]class LayeredConfWriteError(LayeredConfError):
"""Can't write"""
[docs]class LayeredConfAccessError(LayeredConfError, KeyError, IndexError):
"""Can't access"""
[docs]class AttrItemAccessMixin(object):
"""Mixin class mapping dot to bracket access
Added to classes implementing __getitem__, __setitem__ and
__delitem__, this mixin will allow acessing items using dot
notation. I.e. "object.xyz" is translated to "object[xyz]".
"""
def __getattr__(self, key):
try:
if key[0] == "_":
return self.__getattribute__(key)
else:
return self[key]
except (IndexError, KeyError) as exc:
raise AttributeError() from exc
def __setattr__(self, key, value):
try:
if key[0] == "_":
object.__setattr__(self, key, value)
else:
self[key] = value
except (IndexError, KeyError) as exc:
raise AttributeError() from exc
def __delattr__(self, key):
raise NotImplementedError()
[docs]class MultiProxy(object):
"""Base class for layered container structure"""
def __init__(self, maps, root=None, parent=None, key=None):
self._maps = list(maps)
self._parent = parent
self._key = key
self._root = root
def _make_proxy(self, key, items):
item = items[0][1]
if isinstance(item, Mapping):
return MultiMapProxy(items, parent=self, key=key)
if isinstance(item, str):
return item
if isinstance(item, Sequence):
return MultiSeqProxy(items, parent=self, key=key)
return item
def _finditem(self, key):
raise NotImplementedError()
def __getitem__(self, key):
return self._make_proxy(key, self._finditem(key))
[docs] def get_files(self):
return [fn for fn, layer in self._maps]
[docs] def get_linenos(self):
return [layer._yaml_line_col.line
for fn, layer in self._maps]
[docs] def get_fileline(self, key = None):
if key:
for fname, layer in self._maps:
if key in layer:
return fname, layer._yaml_line_col.data[key][0] + 1
return ";".join(self.get_files()), next(iter(self.get_linenos()), None)
[docs] def to_yaml(self, show_source=False):
buf = io.StringIO()
if show_source:
for fn, layer in self._maps:
buf.write(f"--- # from '{fn}' # ---\n")
rt_yaml.dump(layer, buf)
else:
rt_yaml.dump(self, buf)
return buf.getvalue()
def __str__(self):
return self.to_yaml()
def __repr__(self):
return f"{self.__class__.__name__}({self._maps!r})"
[docs] def add_layer(self, name, container):
self._maps.insert(0, ((name, container)))
[docs] def remove_layer(self, name):
map_name = self._maps[0][0]
if map_name == name:
self._maps.pop(0)
else:
raise LayeredConfError(self, f"in remove_layer: {map_name} != {name}")
def _get_root(self):
node = self
while node._parent:
node = node._parent
return node
[docs] def get_path(self, key=None, absolute=False):
items = self._finditem(key)
value = self._make_proxy(key, items)
if isinstance(value, MultiProxy):
return value.get_paths(absolute)
if value is None:
return None
fname = items[0][0]
rootpath = self._get_root()._root
if isinstance(value, WorkdirTag):
path = str(value)
basepath = rootpath
else:
path = os.path.expanduser(value)
basepath = os.path.dirname(fname)
if os.path.isabs(path):
return path
filepath = os.path.join(basepath, path)
if absolute:
return os.path.normpath(filepath)
return os.path.relpath(filepath, rootpath)
[docs]class MultiMapProxyMappingView(MappingView):
"""MappingView for MultiMapProxy"""
def __init__(self, mapping):
self._mapping = mapping
def __len__(self):
return len(self._mapping)
def __repr__(self):
return '{0.__class__.__name__}({0._mapping!r})'.format(self)
def __radd__(self, other):
if isinstance(other, Sequence):
return other + list(self)
raise TypeError()
[docs]class MultiMapProxyItemsView(MultiMapProxyMappingView, ItemsView):
"""ItemsView for MultiMapProxy"""
def __contains__(self, key):
return (isinstance(key, tuple)
and len(key) == 2
and key[0] in self._mapping
and self._mapping[key[0]] == key[1])
def __iter__(self):
for key in self._mapping:
yield key, self._mapping[key]
[docs]class MultiMapProxyKeysView(MultiMapProxyMappingView, KeysView):
"""KeysView for MultiMapProxy"""
def __contains__(self, key):
return key in self._mapping
def __iter__(self):
yield from iter(self._mapping)
[docs]class MultiMapProxyValuesView(MultiMapProxyMappingView, ValuesView):
"""ValuesView for MultiMapProxy"""
def __contains__(self, key):
return any(self._mapping[k] == key for k in self._mapping)
def __iter__(self):
for key in self._mapping:
yield self._mapping[key]
[docs]class MultiMapProxy(MultiProxy, AttrItemAccessMixin, Mapping):
"""Mapping Proxy for layered containers"""
def __contains__(self, key):
return any(key in m for _, m in self._maps)
def __len__(self):
return len(set(k for _, m in self._maps for k in m))
def _finditem(self, key):
items = [(fn, m[key]) for fn, m in self._maps if key in m]
if not items:
raise KeyError(f"key '{key}' not found in any map")
typs = set(type(m[1]) for m in items if m[1])
if len(typs) > 1:
stack = [Entry(fn, m, key) for fn, m in self._maps if key in m]
raise MixedTypeError(
self,
f"Mixed data types for key '{key}'s in present in files",
key = key,
stack=stack
)
return items
def __setitem__(self, key, value):
# we want to set to the top layer, so get that first
mp = self
keys = []
while mp._parent:
keys.append(mp._key)
mp = mp._parent
# now walk back down
for k in reversed(keys):
if k not in mp._maps[0][1]:
mp._maps[0][1][k] = CommentedMap()
mp = mp[k]
# and set the value, potentially on a different object than self
mp._maps[0][1][key] = value
def __delitem__(self, key):
raise NotImplementedError()
def __iter__(self):
for key in dict.fromkeys(k for _, m in self._maps for k in m):
yield key
[docs] def get(self, value, default=None):
try:
return self[value]
except KeyError:
return default
[docs] def items(self):
return MultiMapProxyItemsView(self)
[docs] def keys(self):
return MultiMapProxyKeysView(self)
[docs] def values(self):
return MultiMapProxyValuesView(self)
[docs] def get_paths(self, absolute=False):
return AttrDict(
(key, self.get_path(key, absolute) )
for key in self.keys()
if self.get(key) is not None
)
[docs]class MultiSeqProxy(MultiProxy, AttrItemAccessMixin, Sequence):
"""Sequence Proxy for layered containers"""
def __contains__(self, value):
return any(value in m for _, m in self._maps)
def __iter__(self):
index = 0
for fn, smap in self._maps:
for item in smap:
yield self._make_proxy(index, [(fn, item)])
index += 1
def __len__(self):
return sum(len(m) for _, m in self._maps)
def __repr__(self):
return f"{self.__class__.__name__}({self._maps})"
def __str__(self):
return "+".join(f"{m}" for _, m in self._maps)
def _finditem(self, index):
if isinstance(index, slice):
raise NotImplementedError()
if isinstance(index, str):
try:
index = int(index)
except ValueError as exc:
raise KeyError() from exc
for fn, smap in self._maps:
if index >= len(smap):
index -= len(smap)
else:
return [(fn, smap[index])]
else:
raise IndexError()
def __radd__(self, other):
return self.__add__(other)
def __add__(self, other):
return other + list(self)
def __setitem__(self, key, value):
raise NotImplementedError()
def __delitem__(self, key):
raise NotImplementedError()
def __missing__(self, key):
raise NotImplementedError()
def __iadd__(self, item): # broken?!
self.extend(item)
[docs] def extend(self, item):
smap = self._maps[0][1]
smap.extend(item)
[docs] def get_paths(self, absolute=False):
return [self.get_path(i, absolute) for i in range(len(self))]
[docs]class LayeredConfProxy(MultiMapProxy):
"""Layered configuration"""
def __str__(self):
try:
return self.to_yaml()
except ruamel.yaml.serializer.SerializerError:
return self.__repr__()
def __enter__(self):
self.add_layer("dynamic", {})
return self
def __exit__(self, *args):
self.remove_layer("dynamic")
[docs] def save(self, outstream=None, layer=0):
outfile = None
if outstream:
rt_yaml.dump(self._maps[layer][1], outstream)
else:
outfile = self._maps[layer][0]
with open(outfile+".tmp", "w") as outstream:
rt_yaml.dump(self._maps[layer][1], outstream)
os.rename(outfile, outfile+".bkup")
os.rename(outfile+".tmp", outfile)
RoundTripRepresenter.add_representer(LayeredConfProxy,
RoundTripRepresenter.represent_dict)
RoundTripRepresenter.add_representer(MultiMapProxy,
RoundTripRepresenter.represent_dict)
RoundTripRepresenter.add_representer(MultiSeqProxy,
RoundTripRepresenter.represent_list)
rt_yaml = YAML(typ="rt")
[docs]@yaml_object(rt_yaml)
class WorkdirTag:
yaml_tag = u"!workdir"
def __init__(self, path) -> None:
self.path = path
def __repr__(self):
return f"!workdir {self.path}"
def __str__(self):
return self.path
[docs] @classmethod
def from_yaml(cls, _constructor, node):
return cls(node.value)
[docs] @classmethod
def to_yaml(cls, representer, instance):
return representer.represent_scalar(
cls.yaml_tag, instance.path
)
[docs]def resolve_installed_package(fname, stack):
basename = os.path.basename(fname)
if basename.startswith("<") and basename.endswith(">"):
package = basename[1:-1]
from pkg_resources import iter_entry_points
for entry in iter_entry_points("ymp.pipelines"):
if entry.name == package:
func = entry.load()
real_include = func()
return real_include
raise LayeredConfError((fname, None), f"Extension package '{package}' not found", stack=stack)
return fname
[docs]def load(files, root=None):
"""Load configuration files
Creates a `LayeredConfProxy` configuration object from a set of
YAML files.
Files listed later will override parts of earlier included files
"""
def load_one(fname, stack):
fname = resolve_installed_package(fname, stack)
if any(fname == entry.filename for entry in stack):
raise LayeredConfError((fname, None), "Recursion in includes", stack=stack)
try:
with open(fname, "r") as fdes:
yaml = rt_yaml.load(fdes)
except IOError as exc:
raise LayeredConfError((fname, None), "Failed to read file", stack=stack) from exc
if not isinstance(yaml, Mapping):
raise LayeredConfError((fname, 1), "Config must have mapping as toplevel", stack=stack)
layers = [(fname, yaml)]
includes = yaml.get("include", [])
if not includes:
return layers
basedir = os.path.dirname(fname)
if isinstance(includes, str):
path = os.path.join(basedir, includes)
stack.append(Entry(fname, yaml, "include"))
layers.extend(load_one(path, stack))
stack.pop()
return layers
if not isinstance(includes, Sequence):
raise LayeredConfError((fname, includes), 'Statement "include" must be a list', stack=stack)
for num, include in enumerate(reversed(includes)):
path = os.path.join(basedir, include)
stack.append(Entry(fname, includes, num))
layers.extend(load_one(path, stack))
stack.pop()
return layers
if not root:
root = os.path.dirname(files[0])
layers = []
for fname in reversed(files):
layers.extend(load_one(str(fname), []))
return LayeredConfProxy(layers, root=root)