Source code for bsb.config.parsers

import abc
import functools
import os

from ..exceptions import FileReferenceError, PluginError
from ._parse_types import file_imp, file_ref, parsed_dict, recurse_handlers


[docs] class ConfigurationParser(abc.ABC):
[docs] @abc.abstractmethod def parse(self, content, path=None): # pragma: nocover """ Parse configuration file content. :param content: str or dict content of the file to parse :param path: path to the file containing the configuration. :return: configuration tree and metadata attached as dictionaries """ pass
[docs] @abc.abstractmethod def generate(self, tree, pretty=False): # pragma: nocover """ Generate a string representation of the configuration tree (dictionary). :param dict tree: configuration tree :param bool pretty: if True, will add indentation to the output string :return: str representation of the configuration tree :rtype: str """ pass
[docs] class ParsesReferences: """ Mixin to decorate parse function of ConfigurationParser. Allows for imports and references inside configuration files. """ def __init_subclass__(cls, **kwargs): super().__init_subclass__(**kwargs) parse = cls.parse def parse_with_references(self, content, path=None): """ Traverse the parsed tree and resolve any `$ref` and `$import` """ content, meta = parse(self, content, path) content = parsed_dict(content) self.root = content self.path = path or os.getcwd() self.is_doc = path and not os.path.isdir(path) self.references = [] self.documents = {} self._traverse(content, content.items()) self.resolved_documents = {} self._resolve_documents() self._resolve_references() return content, meta cls.parse = parse_with_references def _traverse(self, node, iter): # Iterates over all values in `iter` and checks for import keys, recursion or refs # Also wraps all nodes in their `parsed_*` counterparts. for key, value in iter: if self._is_import(key): self._store_import(node) elif type(value) in recurse_handlers: # The recurse handlers wrap the dicts and lists and return appropriate # iterators for them. value, iter = recurse_handlers[type(value)](value, node) # Set some metadata on the wrapped recursable objects. value._key = key value._parent = node # Overwrite the reference to the original object with a reference to the # wrapped object. node[key] = value # Recurse a level deeper self._traverse(value, iter) elif self._is_reference(key): self._store_reference(node, value) def _is_reference(self, key): return key == "$ref" def _is_import(self, key): return key == "$import" def _get_ref_document(self, ref, base=None): if "#" not in ref or ref.split("#")[0] == "": return None doc = ref.split("#")[0] if not os.path.isabs(doc): if not base: # reference should be relative to the current configuration file # to avoid recurrence issues. base = os.path.dirname(self.path) elif not os.path.isdir(base): base = os.path.dirname(base) if not os.path.exists(base): raise OSError(f"Can't find reference directory '{base}'") doc = os.path.abspath(os.path.join(base, doc)) return doc @staticmethod def _get_absolute_ref(node, ref): ref = ref.split("#")[-1] path = ref if ref.startswith("/") else os.path.join(node.location(), ref) return os.path.normpath(path).replace(os.path.sep, "/") def _store_reference(self, node, ref): # Analyzes the reference and creates a ref object from the given data doc = self._get_ref_document(ref, self.path) ref = self._get_absolute_ref(node, ref) if doc not in self.documents: self.documents[doc] = set() self.documents[doc].add(ref) self.references.append(file_ref(node, doc, ref)) def _store_import(self, node): # Analyzes the import node and creates a ref object from the given data imp = node["$import"] ref = imp["ref"] doc = self._get_ref_document(ref) ref = self._get_absolute_ref(node, ref) if doc not in self.documents: self.documents[doc] = set() self.documents[doc].add(ref) if "values" not in imp: e = RuntimeError(f"Import node {node} is missing a 'values' list.") e._bsbparser_show_user = True raise e self.references.append(file_imp(node, doc, ref, imp["values"])) def _resolve_documents(self): # Iterates over the list of stored documents parses them and fetches the content # of each reference node. for file, refs in self.documents.items(): if file is None: content = self.root else: from . import _try_parsers parser_classes = get_configuration_parser_classes() ext = file.split(".")[-1] with open(file) as f: content = f.read() _, content, _ = _try_parsers(content, parser_classes, ext, path=file) try: self.resolved_documents[file] = self._resolve_document(content, refs) except FileReferenceError as jre: if not file: raise raise FileReferenceError(str(jre) + f" in document '{file}'") from None def _resolve_document(self, content, refs): resolved = {} for ref in refs: resolved[ref] = self._fetch_reference(content, ref) return resolved def _fetch_reference(self, content, ref): parts = [p for p in ref.split("/")[1:] if p] n = content loc = "" for part in parts: loc += "/" + part try: n = n[part] except KeyError: raise FileReferenceError( f"'{loc}' in File reference '{ref}' does not exist" ) from None if not isinstance(n, dict): raise FileReferenceError( "File references can only point to dictionaries. '{}' is a {}".format( f"{loc}' in '{ref}" if loc != ref else ref, type(n).__name__, ) ) return n def _resolve_references(self): for ref in self.references: target = self.resolved_documents[ref.doc][ref.ref] ref.resolve(self, target)
@functools.cache def get_configuration_parser_classes(): from ..plugins import discover return discover("config.parsers")
[docs] def get_configuration_parser(parser, **kwargs): """ Create an instance of a configuration parser that can parse configuration strings into configuration trees, or serialize trees into strings. Configuration trees can be cast into Configuration objects. """ parsers = get_configuration_parser_classes() if parser not in parsers: raise PluginError(f"Configuration parser '{parser}' not found") return parsers[parser](**kwargs)
__all__ = [ "ConfigurationParser", "ParsesReferences", "get_configuration_parser", "get_configuration_parser_classes", ]