255 lines
7.7 KiB
Python
255 lines
7.7 KiB
Python
"""Minimal YAML shim for ontology validation.
|
|
|
|
Supports the subset needed by memory/ontology/schema.yaml:
|
|
- mappings
|
|
- nested mappings
|
|
- lists
|
|
- flow-style lists: [a, b, c]
|
|
- quoted and unquoted scalars
|
|
- booleans, null, ints, floats
|
|
|
|
This is intentionally tiny and dependency-free.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from typing import Any, Iterable
|
|
|
|
|
|
class YAMLError(ValueError):
|
|
pass
|
|
|
|
|
|
def safe_load(stream: Any) -> Any:
|
|
if hasattr(stream, "read"):
|
|
text = stream.read()
|
|
else:
|
|
text = str(stream)
|
|
return _parse(text)
|
|
|
|
|
|
def safe_dump(data: Any, stream: Any = None, sort_keys: bool = False) -> str:
|
|
text = _dump(data, 0, sort_keys=sort_keys)
|
|
if stream is not None:
|
|
stream.write(text)
|
|
return ""
|
|
return text
|
|
|
|
|
|
def _parse(text: str) -> Any:
|
|
lines = text.splitlines()
|
|
cleaned: list[tuple[int, str]] = []
|
|
for raw in lines:
|
|
stripped = raw.rstrip()
|
|
if not stripped:
|
|
continue
|
|
if stripped.lstrip().startswith("#"):
|
|
continue
|
|
indent = len(stripped) - len(stripped.lstrip(" "))
|
|
cleaned.append((indent, stripped.lstrip(" ")))
|
|
if not cleaned:
|
|
return None
|
|
|
|
value, idx = _parse_block(cleaned, 0, cleaned[0][0])
|
|
if idx != len(cleaned):
|
|
raise YAMLError("Trailing YAML content could not be parsed")
|
|
return value
|
|
|
|
|
|
def _parse_block(items: list[tuple[int, str]], idx: int, indent: int) -> tuple[Any, int]:
|
|
# Decide whether the block is a list or a mapping based on first line.
|
|
if idx >= len(items):
|
|
return None, idx
|
|
|
|
cur_indent, cur_line = items[idx]
|
|
if cur_indent < indent:
|
|
return None, idx
|
|
|
|
if cur_line.startswith("-"):
|
|
result = []
|
|
while idx < len(items):
|
|
line_indent, line = items[idx]
|
|
if line_indent < indent:
|
|
break
|
|
if line_indent != indent or not line.startswith("-"):
|
|
break
|
|
item_text = line[1:].lstrip()
|
|
idx += 1
|
|
if item_text:
|
|
# Inline scalar or inline mapping fragment.
|
|
if ":" in item_text and not _is_quoted(item_text):
|
|
key, rest = item_text.split(":", 1)
|
|
key = key.strip()
|
|
rest = rest.lstrip()
|
|
item: Any = {key: _parse_scalar(rest) if rest else None}
|
|
if not rest and idx < len(items) and items[idx][0] > indent:
|
|
nested, idx = _parse_block(items, idx, items[idx][0])
|
|
if isinstance(nested, dict):
|
|
item[key] = nested if item[key] is None else item[key]
|
|
if isinstance(nested, dict) and item[key] is None:
|
|
item[key] = nested
|
|
else:
|
|
item[key] = nested
|
|
result.append(item)
|
|
else:
|
|
result.append(_parse_scalar(item_text))
|
|
else:
|
|
if idx < len(items) and items[idx][0] > indent:
|
|
nested, idx = _parse_block(items, idx, items[idx][0])
|
|
result.append(nested)
|
|
else:
|
|
result.append(None)
|
|
return result, idx
|
|
|
|
result: dict[str, Any] = {}
|
|
while idx < len(items):
|
|
line_indent, line = items[idx]
|
|
if line_indent < indent:
|
|
break
|
|
if line_indent != indent:
|
|
# Nested content belongs to the previous key.
|
|
break
|
|
if line.startswith("-"):
|
|
break
|
|
if ":" not in line:
|
|
raise YAMLError(f"Invalid YAML line: {line!r}")
|
|
key, rest = line.split(":", 1)
|
|
key = key.strip()
|
|
rest = rest.lstrip()
|
|
idx += 1
|
|
if rest:
|
|
result[key] = _parse_scalar(rest)
|
|
else:
|
|
if idx < len(items) and items[idx][0] > indent:
|
|
nested, idx = _parse_block(items, idx, items[idx][0])
|
|
result[key] = nested
|
|
else:
|
|
result[key] = None
|
|
return result, idx
|
|
|
|
|
|
def _parse_scalar(text: str) -> Any:
|
|
text = text.strip()
|
|
if not text:
|
|
return None
|
|
if _is_quoted(text):
|
|
return _unquote(text)
|
|
if text.startswith("[") and text.endswith("]"):
|
|
inner = text[1:-1].strip()
|
|
if not inner:
|
|
return []
|
|
parts = _split_flow_list(inner)
|
|
return [_parse_scalar(part) for part in parts]
|
|
lower = text.lower()
|
|
if lower in {"null", "none", "~"}:
|
|
return None
|
|
if lower == "true":
|
|
return True
|
|
if lower == "false":
|
|
return False
|
|
try:
|
|
if text.startswith("0") and text != "0" and not text.startswith("0."):
|
|
# preserve as string (e.g. IDs or codes with leading zeroes)
|
|
raise ValueError
|
|
return int(text)
|
|
except ValueError:
|
|
pass
|
|
try:
|
|
return float(text)
|
|
except ValueError:
|
|
pass
|
|
return text
|
|
|
|
|
|
def _is_quoted(text: str) -> bool:
|
|
return (len(text) >= 2 and text[0] == text[-1] and text[0] in {'"', "'"})
|
|
|
|
|
|
def _unquote(text: str) -> str:
|
|
if text[0] == '"':
|
|
return bytes(text[1:-1], "utf-8").decode("unicode_escape")
|
|
return text[1:-1].replace("''", "'")
|
|
|
|
|
|
def _split_flow_list(text: str) -> list[str]:
|
|
parts: list[str] = []
|
|
current = []
|
|
in_quote: str | None = None
|
|
escape = False
|
|
for ch in text:
|
|
if escape:
|
|
current.append(ch)
|
|
escape = False
|
|
continue
|
|
if in_quote:
|
|
current.append(ch)
|
|
if ch == "\\" and in_quote == '"':
|
|
escape = True
|
|
elif ch == in_quote:
|
|
in_quote = None
|
|
continue
|
|
if ch in {'"', "'"}:
|
|
current.append(ch)
|
|
in_quote = ch
|
|
continue
|
|
if ch == ',':
|
|
parts.append(''.join(current).strip())
|
|
current = []
|
|
continue
|
|
current.append(ch)
|
|
if current:
|
|
parts.append(''.join(current).strip())
|
|
return parts
|
|
|
|
|
|
def _dump(data: Any, indent: int, sort_keys: bool = False) -> str:
|
|
pad = " " * indent
|
|
if isinstance(data, dict):
|
|
items = data.items()
|
|
if sort_keys:
|
|
items = sorted(items)
|
|
lines = []
|
|
for key, value in items:
|
|
if isinstance(value, (dict, list)):
|
|
lines.append(f"{pad}{key}:\n{_dump(value, indent + 2, sort_keys=sort_keys)}")
|
|
else:
|
|
lines.append(f"{pad}{key}: {_dump_scalar(value)}")
|
|
return "\n".join(lines)
|
|
if isinstance(data, list):
|
|
lines = []
|
|
for value in data:
|
|
if isinstance(value, (dict, list)):
|
|
nested = _dump(value, indent + 2, sort_keys=sort_keys)
|
|
if nested:
|
|
first, *rest = nested.splitlines()
|
|
lines.append(f"{pad}- {first.lstrip()}")
|
|
lines.extend(rest)
|
|
else:
|
|
lines.append(f"{pad}-")
|
|
else:
|
|
lines.append(f"{pad}- {_dump_scalar(value)}")
|
|
return "\n".join(lines)
|
|
return f"{pad}{_dump_scalar(data)}"
|
|
|
|
|
|
def _dump_scalar(value: Any) -> str:
|
|
if value is None:
|
|
return "null"
|
|
if value is True:
|
|
return "true"
|
|
if value is False:
|
|
return "false"
|
|
if isinstance(value, (int, float)):
|
|
return str(value)
|
|
text = str(value)
|
|
if not text:
|
|
return '""'
|
|
if any(ch in text for ch in [":", "#", "\n", "\r", "\t"]) or text.strip() != text or text.startswith(("-", "?", "@", "&", "*", "!", "{", "}", "[", "]", ",", "#", "|", ">", "'", '"')):
|
|
return json_quote(text)
|
|
return text
|
|
|
|
|
|
def json_quote(text: str) -> str:
|
|
import json
|
|
return json.dumps(text, ensure_ascii=False)
|