auto-sync: 2026-04-18 09:30:01

This commit is contained in:
Stream
2026-04-18 09:30:08 +03:00
parent 9168966cd7
commit de29247a38

View File

@@ -0,0 +1,254 @@
"""Minimal YAML shim for ontology validation.
Supports the subset needed by memory/ontology/schema.yaml:
- mappings
- nested mappings
- lists
- flow-style lists: [a, b, c]
- quoted and unquoted scalars
- booleans, null, ints, floats
This is intentionally tiny and dependency-free.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Iterable
class YAMLError(ValueError):
pass
def safe_load(stream: Any) -> Any:
if hasattr(stream, "read"):
text = stream.read()
else:
text = str(stream)
return _parse(text)
def safe_dump(data: Any, stream: Any = None, sort_keys: bool = False) -> str:
text = _dump(data, 0, sort_keys=sort_keys)
if stream is not None:
stream.write(text)
return ""
return text
def _parse(text: str) -> Any:
lines = text.splitlines()
cleaned: list[tuple[int, str]] = []
for raw in lines:
stripped = raw.rstrip()
if not stripped:
continue
if stripped.lstrip().startswith("#"):
continue
indent = len(stripped) - len(stripped.lstrip(" "))
cleaned.append((indent, stripped.lstrip(" ")))
if not cleaned:
return None
value, idx = _parse_block(cleaned, 0, cleaned[0][0])
if idx != len(cleaned):
raise YAMLError("Trailing YAML content could not be parsed")
return value
def _parse_block(items: list[tuple[int, str]], idx: int, indent: int) -> tuple[Any, int]:
# Decide whether the block is a list or a mapping based on first line.
if idx >= len(items):
return None, idx
cur_indent, cur_line = items[idx]
if cur_indent < indent:
return None, idx
if cur_line.startswith("-"):
result = []
while idx < len(items):
line_indent, line = items[idx]
if line_indent < indent:
break
if line_indent != indent or not line.startswith("-"):
break
item_text = line[1:].lstrip()
idx += 1
if item_text:
# Inline scalar or inline mapping fragment.
if ":" in item_text and not _is_quoted(item_text):
key, rest = item_text.split(":", 1)
key = key.strip()
rest = rest.lstrip()
item: Any = {key: _parse_scalar(rest) if rest else None}
if not rest and idx < len(items) and items[idx][0] > indent:
nested, idx = _parse_block(items, idx, items[idx][0])
if isinstance(nested, dict):
item[key] = nested if item[key] is None else item[key]
if isinstance(nested, dict) and item[key] is None:
item[key] = nested
else:
item[key] = nested
result.append(item)
else:
result.append(_parse_scalar(item_text))
else:
if idx < len(items) and items[idx][0] > indent:
nested, idx = _parse_block(items, idx, items[idx][0])
result.append(nested)
else:
result.append(None)
return result, idx
result: dict[str, Any] = {}
while idx < len(items):
line_indent, line = items[idx]
if line_indent < indent:
break
if line_indent != indent:
# Nested content belongs to the previous key.
break
if line.startswith("-"):
break
if ":" not in line:
raise YAMLError(f"Invalid YAML line: {line!r}")
key, rest = line.split(":", 1)
key = key.strip()
rest = rest.lstrip()
idx += 1
if rest:
result[key] = _parse_scalar(rest)
else:
if idx < len(items) and items[idx][0] > indent:
nested, idx = _parse_block(items, idx, items[idx][0])
result[key] = nested
else:
result[key] = None
return result, idx
def _parse_scalar(text: str) -> Any:
text = text.strip()
if not text:
return None
if _is_quoted(text):
return _unquote(text)
if text.startswith("[") and text.endswith("]"):
inner = text[1:-1].strip()
if not inner:
return []
parts = _split_flow_list(inner)
return [_parse_scalar(part) for part in parts]
lower = text.lower()
if lower in {"null", "none", "~"}:
return None
if lower == "true":
return True
if lower == "false":
return False
try:
if text.startswith("0") and text != "0" and not text.startswith("0."):
# preserve as string (e.g. IDs or codes with leading zeroes)
raise ValueError
return int(text)
except ValueError:
pass
try:
return float(text)
except ValueError:
pass
return text
def _is_quoted(text: str) -> bool:
return (len(text) >= 2 and text[0] == text[-1] and text[0] in {'"', "'"})
def _unquote(text: str) -> str:
if text[0] == '"':
return bytes(text[1:-1], "utf-8").decode("unicode_escape")
return text[1:-1].replace("''", "'")
def _split_flow_list(text: str) -> list[str]:
parts: list[str] = []
current = []
in_quote: str | None = None
escape = False
for ch in text:
if escape:
current.append(ch)
escape = False
continue
if in_quote:
current.append(ch)
if ch == "\\" and in_quote == '"':
escape = True
elif ch == in_quote:
in_quote = None
continue
if ch in {'"', "'"}:
current.append(ch)
in_quote = ch
continue
if ch == ',':
parts.append(''.join(current).strip())
current = []
continue
current.append(ch)
if current:
parts.append(''.join(current).strip())
return parts
def _dump(data: Any, indent: int, sort_keys: bool = False) -> str:
pad = " " * indent
if isinstance(data, dict):
items = data.items()
if sort_keys:
items = sorted(items)
lines = []
for key, value in items:
if isinstance(value, (dict, list)):
lines.append(f"{pad}{key}:\n{_dump(value, indent + 2, sort_keys=sort_keys)}")
else:
lines.append(f"{pad}{key}: {_dump_scalar(value)}")
return "\n".join(lines)
if isinstance(data, list):
lines = []
for value in data:
if isinstance(value, (dict, list)):
nested = _dump(value, indent + 2, sort_keys=sort_keys)
if nested:
first, *rest = nested.splitlines()
lines.append(f"{pad}- {first.lstrip()}")
lines.extend(rest)
else:
lines.append(f"{pad}-")
else:
lines.append(f"{pad}- {_dump_scalar(value)}")
return "\n".join(lines)
return f"{pad}{_dump_scalar(data)}"
def _dump_scalar(value: Any) -> str:
if value is None:
return "null"
if value is True:
return "true"
if value is False:
return "false"
if isinstance(value, (int, float)):
return str(value)
text = str(value)
if not text:
return '""'
if any(ch in text for ch in [":", "#", "\n", "\r", "\t"]) or text.strip() != text or text.startswith(("-", "?", "@", "&", "*", "!", "{", "}", "[", "]", ",", "#", "|", ">", "'", '"')):
return json_quote(text)
return text
def json_quote(text: str) -> str:
import json
return json.dumps(text, ensure_ascii=False)