Files
wiki/tasks/internet-orders/relay_server.py
2026-04-12 21:55:33 +03:00

95 lines
2.4 KiB
Python

#!/usr/bin/env python3
"""
vprok.ru relay server
Accepts tasks from Стрим AI and serves them to Windows Playwright client.
"""
from flask import Flask, request, jsonify
from datetime import datetime
import uuid
app = Flask(__name__)
API_KEY = "vprok2024secret"
# In-memory storage
task_store = {
"current": None, # current task dict
"last_result": None
}
def require_api_key(f):
from functools import wraps
@wraps(f)
def decorated(*args, **kwargs):
key = request.headers.get("X-Api-Key")
if key != API_KEY:
return jsonify({"error": "Unauthorized"}), 401
return f(*args, **kwargs)
return decorated
@app.route("/task", methods=["POST"])
@require_api_key
def post_task():
data = request.get_json(force=True, silent=True)
if not data or "items" not in data:
return jsonify({"error": "Invalid payload. Expected {'items': [...]}"}), 400
task_id = str(uuid.uuid4())[:8]
task_store["current"] = {
"id": task_id,
"items": data["items"],
"status": "pending",
"created_at": datetime.utcnow().isoformat()
}
task_store["last_result"] = None
return jsonify({
"ok": True,
"task_id": task_id,
"items_count": len(data["items"])
}), 201
@app.route("/task", methods=["GET"])
@require_api_key
def get_task():
task = task_store["current"]
if task is None or task["status"] not in ("pending",):
return "", 204
task["status"] = "in_progress"
task["started_at"] = datetime.utcnow().isoformat()
return jsonify(task), 200
@app.route("/task/done", methods=["POST"])
@require_api_key
def task_done():
data = request.get_json(force=True, silent=True) or {}
task = task_store["current"]
if task is None:
return jsonify({"error": "No active task"}), 404
task["status"] = "done"
task["finished_at"] = datetime.utcnow().isoformat()
task["result_status"] = data.get("status", "unknown")
task["result_message"] = data.get("message", "")
task_store["last_result"] = dict(task)
return jsonify({"ok": True}), 200
@app.route("/status", methods=["GET"])
@require_api_key
def get_status():
task = task_store["current"]
if task is None:
return jsonify({"status": "idle", "task": None}), 200
return jsonify({"status": task["status"], "task": task}), 200
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000)