diff --git a/skills/ontology/scripts/yaml.py b/skills/ontology/scripts/yaml.py new file mode 100644 index 0000000..31c8fb7 --- /dev/null +++ b/skills/ontology/scripts/yaml.py @@ -0,0 +1,254 @@ +"""Minimal YAML shim for ontology validation. + +Supports the subset needed by memory/ontology/schema.yaml: +- mappings +- nested mappings +- lists +- flow-style lists: [a, b, c] +- quoted and unquoted scalars +- booleans, null, ints, floats + +This is intentionally tiny and dependency-free. +""" +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any, Iterable + + +class YAMLError(ValueError): + pass + + +def safe_load(stream: Any) -> Any: + if hasattr(stream, "read"): + text = stream.read() + else: + text = str(stream) + return _parse(text) + + +def safe_dump(data: Any, stream: Any = None, sort_keys: bool = False) -> str: + text = _dump(data, 0, sort_keys=sort_keys) + if stream is not None: + stream.write(text) + return "" + return text + + +def _parse(text: str) -> Any: + lines = text.splitlines() + cleaned: list[tuple[int, str]] = [] + for raw in lines: + stripped = raw.rstrip() + if not stripped: + continue + if stripped.lstrip().startswith("#"): + continue + indent = len(stripped) - len(stripped.lstrip(" ")) + cleaned.append((indent, stripped.lstrip(" "))) + if not cleaned: + return None + + value, idx = _parse_block(cleaned, 0, cleaned[0][0]) + if idx != len(cleaned): + raise YAMLError("Trailing YAML content could not be parsed") + return value + + +def _parse_block(items: list[tuple[int, str]], idx: int, indent: int) -> tuple[Any, int]: + # Decide whether the block is a list or a mapping based on first line. + if idx >= len(items): + return None, idx + + cur_indent, cur_line = items[idx] + if cur_indent < indent: + return None, idx + + if cur_line.startswith("-"): + result = [] + while idx < len(items): + line_indent, line = items[idx] + if line_indent < indent: + break + if line_indent != indent or not line.startswith("-"): + break + item_text = line[1:].lstrip() + idx += 1 + if item_text: + # Inline scalar or inline mapping fragment. + if ":" in item_text and not _is_quoted(item_text): + key, rest = item_text.split(":", 1) + key = key.strip() + rest = rest.lstrip() + item: Any = {key: _parse_scalar(rest) if rest else None} + if not rest and idx < len(items) and items[idx][0] > indent: + nested, idx = _parse_block(items, idx, items[idx][0]) + if isinstance(nested, dict): + item[key] = nested if item[key] is None else item[key] + if isinstance(nested, dict) and item[key] is None: + item[key] = nested + else: + item[key] = nested + result.append(item) + else: + result.append(_parse_scalar(item_text)) + else: + if idx < len(items) and items[idx][0] > indent: + nested, idx = _parse_block(items, idx, items[idx][0]) + result.append(nested) + else: + result.append(None) + return result, idx + + result: dict[str, Any] = {} + while idx < len(items): + line_indent, line = items[idx] + if line_indent < indent: + break + if line_indent != indent: + # Nested content belongs to the previous key. + break + if line.startswith("-"): + break + if ":" not in line: + raise YAMLError(f"Invalid YAML line: {line!r}") + key, rest = line.split(":", 1) + key = key.strip() + rest = rest.lstrip() + idx += 1 + if rest: + result[key] = _parse_scalar(rest) + else: + if idx < len(items) and items[idx][0] > indent: + nested, idx = _parse_block(items, idx, items[idx][0]) + result[key] = nested + else: + result[key] = None + return result, idx + + +def _parse_scalar(text: str) -> Any: + text = text.strip() + if not text: + return None + if _is_quoted(text): + return _unquote(text) + if text.startswith("[") and text.endswith("]"): + inner = text[1:-1].strip() + if not inner: + return [] + parts = _split_flow_list(inner) + return [_parse_scalar(part) for part in parts] + lower = text.lower() + if lower in {"null", "none", "~"}: + return None + if lower == "true": + return True + if lower == "false": + return False + try: + if text.startswith("0") and text != "0" and not text.startswith("0."): + # preserve as string (e.g. IDs or codes with leading zeroes) + raise ValueError + return int(text) + except ValueError: + pass + try: + return float(text) + except ValueError: + pass + return text + + +def _is_quoted(text: str) -> bool: + return (len(text) >= 2 and text[0] == text[-1] and text[0] in {'"', "'"}) + + +def _unquote(text: str) -> str: + if text[0] == '"': + return bytes(text[1:-1], "utf-8").decode("unicode_escape") + return text[1:-1].replace("''", "'") + + +def _split_flow_list(text: str) -> list[str]: + parts: list[str] = [] + current = [] + in_quote: str | None = None + escape = False + for ch in text: + if escape: + current.append(ch) + escape = False + continue + if in_quote: + current.append(ch) + if ch == "\\" and in_quote == '"': + escape = True + elif ch == in_quote: + in_quote = None + continue + if ch in {'"', "'"}: + current.append(ch) + in_quote = ch + continue + if ch == ',': + parts.append(''.join(current).strip()) + current = [] + continue + current.append(ch) + if current: + parts.append(''.join(current).strip()) + return parts + + +def _dump(data: Any, indent: int, sort_keys: bool = False) -> str: + pad = " " * indent + if isinstance(data, dict): + items = data.items() + if sort_keys: + items = sorted(items) + lines = [] + for key, value in items: + if isinstance(value, (dict, list)): + lines.append(f"{pad}{key}:\n{_dump(value, indent + 2, sort_keys=sort_keys)}") + else: + lines.append(f"{pad}{key}: {_dump_scalar(value)}") + return "\n".join(lines) + if isinstance(data, list): + lines = [] + for value in data: + if isinstance(value, (dict, list)): + nested = _dump(value, indent + 2, sort_keys=sort_keys) + if nested: + first, *rest = nested.splitlines() + lines.append(f"{pad}- {first.lstrip()}") + lines.extend(rest) + else: + lines.append(f"{pad}-") + else: + lines.append(f"{pad}- {_dump_scalar(value)}") + return "\n".join(lines) + return f"{pad}{_dump_scalar(data)}" + + +def _dump_scalar(value: Any) -> str: + if value is None: + return "null" + if value is True: + return "true" + if value is False: + return "false" + if isinstance(value, (int, float)): + return str(value) + text = str(value) + if not text: + return '""' + if any(ch in text for ch in [":", "#", "\n", "\r", "\t"]) or text.strip() != text or text.startswith(("-", "?", "@", "&", "*", "!", "{", "}", "[", "]", ",", "#", "|", ">", "'", '"')): + return json_quote(text) + return text + + +def json_quote(text: str) -> str: + import json + return json.dumps(text, ensure_ascii=False)