"""Minimal YAML shim for ontology validation. Supports the subset needed by memory/ontology/schema.yaml: - mappings - nested mappings - lists - flow-style lists: [a, b, c] - quoted and unquoted scalars - booleans, null, ints, floats This is intentionally tiny and dependency-free. """ from __future__ import annotations from dataclasses import dataclass from typing import Any, Iterable class YAMLError(ValueError): pass def safe_load(stream: Any) -> Any: if hasattr(stream, "read"): text = stream.read() else: text = str(stream) return _parse(text) def safe_dump(data: Any, stream: Any = None, sort_keys: bool = False) -> str: text = _dump(data, 0, sort_keys=sort_keys) if stream is not None: stream.write(text) return "" return text def _parse(text: str) -> Any: lines = text.splitlines() cleaned: list[tuple[int, str]] = [] for raw in lines: stripped = raw.rstrip() if not stripped: continue if stripped.lstrip().startswith("#"): continue indent = len(stripped) - len(stripped.lstrip(" ")) cleaned.append((indent, stripped.lstrip(" "))) if not cleaned: return None value, idx = _parse_block(cleaned, 0, cleaned[0][0]) if idx != len(cleaned): raise YAMLError("Trailing YAML content could not be parsed") return value def _parse_block(items: list[tuple[int, str]], idx: int, indent: int) -> tuple[Any, int]: # Decide whether the block is a list or a mapping based on first line. if idx >= len(items): return None, idx cur_indent, cur_line = items[idx] if cur_indent < indent: return None, idx if cur_line.startswith("-"): result = [] while idx < len(items): line_indent, line = items[idx] if line_indent < indent: break if line_indent != indent or not line.startswith("-"): break item_text = line[1:].lstrip() idx += 1 if item_text: # Inline scalar or inline mapping fragment. if ":" in item_text and not _is_quoted(item_text): key, rest = item_text.split(":", 1) key = key.strip() rest = rest.lstrip() item: Any = {key: _parse_scalar(rest) if rest else None} if not rest and idx < len(items) and items[idx][0] > indent: nested, idx = _parse_block(items, idx, items[idx][0]) if isinstance(nested, dict): item[key] = nested if item[key] is None else item[key] if isinstance(nested, dict) and item[key] is None: item[key] = nested else: item[key] = nested result.append(item) else: result.append(_parse_scalar(item_text)) else: if idx < len(items) and items[idx][0] > indent: nested, idx = _parse_block(items, idx, items[idx][0]) result.append(nested) else: result.append(None) return result, idx result: dict[str, Any] = {} while idx < len(items): line_indent, line = items[idx] if line_indent < indent: break if line_indent != indent: # Nested content belongs to the previous key. break if line.startswith("-"): break if ":" not in line: raise YAMLError(f"Invalid YAML line: {line!r}") key, rest = line.split(":", 1) key = key.strip() rest = rest.lstrip() idx += 1 if rest: result[key] = _parse_scalar(rest) else: if idx < len(items) and items[idx][0] > indent: nested, idx = _parse_block(items, idx, items[idx][0]) result[key] = nested else: result[key] = None return result, idx def _parse_scalar(text: str) -> Any: text = text.strip() if not text: return None if _is_quoted(text): return _unquote(text) if text.startswith("[") and text.endswith("]"): inner = text[1:-1].strip() if not inner: return [] parts = _split_flow_list(inner) return [_parse_scalar(part) for part in parts] lower = text.lower() if lower in {"null", "none", "~"}: return None if lower == "true": return True if lower == "false": return False try: if text.startswith("0") and text != "0" and not text.startswith("0."): # preserve as string (e.g. IDs or codes with leading zeroes) raise ValueError return int(text) except ValueError: pass try: return float(text) except ValueError: pass return text def _is_quoted(text: str) -> bool: return (len(text) >= 2 and text[0] == text[-1] and text[0] in {'"', "'"}) def _unquote(text: str) -> str: if text[0] == '"': return bytes(text[1:-1], "utf-8").decode("unicode_escape") return text[1:-1].replace("''", "'") def _split_flow_list(text: str) -> list[str]: parts: list[str] = [] current = [] in_quote: str | None = None escape = False for ch in text: if escape: current.append(ch) escape = False continue if in_quote: current.append(ch) if ch == "\\" and in_quote == '"': escape = True elif ch == in_quote: in_quote = None continue if ch in {'"', "'"}: current.append(ch) in_quote = ch continue if ch == ',': parts.append(''.join(current).strip()) current = [] continue current.append(ch) if current: parts.append(''.join(current).strip()) return parts def _dump(data: Any, indent: int, sort_keys: bool = False) -> str: pad = " " * indent if isinstance(data, dict): items = data.items() if sort_keys: items = sorted(items) lines = [] for key, value in items: if isinstance(value, (dict, list)): lines.append(f"{pad}{key}:\n{_dump(value, indent + 2, sort_keys=sort_keys)}") else: lines.append(f"{pad}{key}: {_dump_scalar(value)}") return "\n".join(lines) if isinstance(data, list): lines = [] for value in data: if isinstance(value, (dict, list)): nested = _dump(value, indent + 2, sort_keys=sort_keys) if nested: first, *rest = nested.splitlines() lines.append(f"{pad}- {first.lstrip()}") lines.extend(rest) else: lines.append(f"{pad}-") else: lines.append(f"{pad}- {_dump_scalar(value)}") return "\n".join(lines) return f"{pad}{_dump_scalar(data)}" def _dump_scalar(value: Any) -> str: if value is None: return "null" if value is True: return "true" if value is False: return "false" if isinstance(value, (int, float)): return str(value) text = str(value) if not text: return '""' if any(ch in text for ch in [":", "#", "\n", "\r", "\t"]) or text.strip() != text or text.startswith(("-", "?", "@", "&", "*", "!", "{", "}", "[", "]", ",", "#", "|", ">", "'", '"')): return json_quote(text) return text def json_quote(text: str) -> str: import json return json.dumps(text, ensure_ascii=False)