#!/usr/bin/env python3 """ Двухпроходный анализ сообщений Telegram-группы. Пасс 1: GPT-4o mini — извлечение структурированных фактов из чанков по 50 сообщений Пасс 2: Claude Sonnet — синтез финального knowledge_base.md Поддерживает возобновление: прогресс сохраняется в facts_partial.json. Использует OPENAI_API_KEY и OPENROUTER_API_KEY из ~/.openclaw/.env """ import os, sys, json, time, argparse from pathlib import Path from datetime import datetime sys.path.insert(0, '/home/node/.local/lib/python3.11/site-packages') from dotenv import load_dotenv load_dotenv(os.path.expanduser('~/.openclaw/.env')) try: import requests except ImportError: os.system('~/.local/bin/pip install --break-system-packages requests -q') import requests # --- Конфигурация --- OPENAI_API_KEY = os.getenv('OPENAI_API_KEY') OPENROUTER_API_KEY = os.getenv('OPENROUTER_API_KEY') DATA_DIR = Path('/home/node/.openclaw/workspace/data/telegram-collector') CHANNEL_DIR = DATA_DIR / 'raw' / '1242788123' OUTPUT_DIR = DATA_DIR FACTS_FILE = OUTPUT_DIR / 'facts_partial.json' KB_FILE = OUTPUT_DIR / 'knowledge_base.md' CHUNK_SIZE = 50 SKIP_TOPICS = {'117112'} # Опросы — не информативны PASS1_MODEL = 'gpt-4o-mini' PASS2_MODEL = 'anthropic/claude-sonnet-4-5' # через OpenRouter FACT_CATEGORIES = ['repairs', 'models', 'locations', 'prices', 'riding_tips', 'tuning', 'donor_bikes', 'season'] PASS1_SYSTEM = """Ты анализируешь сообщения из русскоязычного Telegram-сообщества сноубайкеров. Извлеки полезные конкретные факты и верни ТОЛЬКО валидный JSON без пояснений: { "repairs": ["конкретная проблема и/или решение"], "models": ["модель/бренд сноубайка: характеристика, мнение пользователей"], "locations": ["место катания: регион/название, особенности, сезон, снег"], "prices": ["товар или услуга: цена в рублях, условия"], "riding_tips": ["конкретный совет по технике езды, безопасности"], "tuning": ["компонент или система: модификация, эффект от тюнинга"], "donor_bikes": ["мотоцикл-донор: модель, почему подходит или не подходит"], "season": ["информация о сезоне, открытии/закрытии, условиях снега"] } Правила: - Только конкретные факты, никакого флуда и эмоций - Если в чанке нет фактов по категории — пустой массив [] - Каждый факт — одна строка, максимально информативно - Пиши на русском""" PASS2_SYSTEM = """Ты составляешь экспертную базу знаний по сноубайкам на основе реальных обсуждений русскоязычного сообщества. Тебе переданы факты, извлечённые из 155 000 сообщений группы "Сноубайк Россия". Создай структурированный Markdown-документ — практическое руководство для сноубайкеров.""" def ts(): return datetime.now().strftime('%H:%M:%S') def call_gpt4o_mini(messages_text: str, topic_name: str) -> dict: """Вызов GPT-4o mini для извлечения фактов из чанка.""" prompt = f"Раздел группы: «{topic_name}»\n\nСообщения:\n{messages_text}" resp = requests.post( 'https://api.openai.com/v1/chat/completions', headers={'Authorization': f'Bearer {OPENAI_API_KEY}', 'Content-Type': 'application/json'}, json={ 'model': PASS1_MODEL, 'messages': [ {'role': 'system', 'content': PASS1_SYSTEM}, {'role': 'user', 'content': prompt} ], 'temperature': 0.1, 'max_tokens': 1000, 'response_format': {'type': 'json_object'} }, timeout=30 ) if resp.status_code != 200: raise RuntimeError(f'OpenAI API error {resp.status_code}: {resp.text[:200]}') data = resp.json() TOKEN_COUNTER['p1_input'] += data.get('usage', {}).get('prompt_tokens', 0) TOKEN_COUNTER['p1_output'] += data.get('usage', {}).get('completion_tokens', 0) content = data['choices'][0]['message']['content'] try: return json.loads(content) except json.JSONDecodeError: return {cat: [] for cat in FACT_CATEGORIES} def call_claude_sonnet(facts_summary: str) -> str: """Вызов Claude Sonnet через OpenRouter для финального синтеза.""" prompt = f"""На основе следующих фактов, извлечённых из 155 000 сообщений сообщества "Сноубайк Россия", создай подробную базу знаний. {facts_summary} Структура документа: # База знаний: Сноубайки — опыт сообщества ## 1. Модели сноубайков (популярные модели, бренды, характеристики, мнения пользователей) ## 2. Мотоциклы-доноры (какие мотоциклы используют для переделки в сноубайк, плюсы и минусы каждого) ## 3. Частые проблемы и решения (типичные поломки, болячки, как их устранять) ## 4. Тюнинг и модификации (популярные доработки, что улучшает что ухудшает) ## 5. Экипировка (что носят, что рекомендуют) ## 6. Цены и рынок (актуальные цены на технику, запчасти, услуги) ## 7. Локации (где катаются, особенности мест, регионы России) ## 8. Сезонность (когда открывается/закрывается сезон, условия снега) ## 9. Техника езды (советы по управлению, безопасности, обучению) ## 10. Электросноубайки (электрические модели, особенности, мнения) Пиши развёрнуто, используй конкретные цифры и факты из данных. Документ должен быть полезен как новичку, так и опытному сноубайкеру.""" resp = requests.post( 'https://openrouter.ai/api/v1/chat/completions', headers={'Authorization': f'Bearer {OPENROUTER_API_KEY}', 'Content-Type': 'application/json'}, json={ 'model': PASS2_MODEL, 'messages': [ {'role': 'system', 'content': PASS2_SYSTEM}, {'role': 'user', 'content': prompt} ], 'temperature': 0.3, 'max_tokens': 8000, }, timeout=120 ) if resp.status_code != 200: raise RuntimeError(f'OpenRouter API error {resp.status_code}: {resp.text[:200]}') data = resp.json() TOKEN_COUNTER['p2_input'] += data.get('usage', {}).get('prompt_tokens', 0) TOKEN_COUNTER['p2_output'] += data.get('usage', {}).get('completion_tokens', 0) return data['choices'][0]['message']['content'] def load_messages(): """Загружает все сообщения с текстом, группируя по топику.""" meta = json.load(open(CHANNEL_DIR / 'meta.json')) all_chunks = [] # [(topic_name, [msg_texts])] total = 0 for tid, tname in sorted(meta['topics'].items(), key=lambda x: int(x[0])): if tid in SKIP_TOPICS: continue tdir = CHANNEL_DIR / tid if not tdir.exists(): continue topic_msgs = [] for b in sorted(tdir.glob('batch_*.json')): for msg in json.load(open(b)): text = (msg.get('text') or '').strip() if len(text) > 5: # фильтруем совсем короткие topic_msgs.append(text) # Разбиваем на чанки for i in range(0, len(topic_msgs), CHUNK_SIZE): chunk = topic_msgs[i:i + CHUNK_SIZE] all_chunks.append((tname, chunk)) total += len(topic_msgs) print(f" {tname}: {len(topic_msgs)} сообщений → {len(topic_msgs)//CHUNK_SIZE + 1} чанков") print(f"Итого: {total} сообщений, {len(all_chunks)} чанков\n") return all_chunks def merge_facts(acc: dict, new: dict) -> dict: """Объединяет новые факты с накопленными.""" for cat in FACT_CATEGORIES: items = new.get(cat, []) if isinstance(items, list): acc[cat].extend(items) return acc def load_progress() -> tuple[dict, int]: """Загружает прогресс из файла.""" if FACTS_FILE.exists(): data = json.load(open(FACTS_FILE)) return data['facts'], data['processed_chunks'] return {cat: [] for cat in FACT_CATEGORIES}, 0 def save_progress(facts: dict, processed: int): """Сохраняет прогресс.""" with open(FACTS_FILE, 'w', encoding='utf-8') as f: json.dump({'facts': facts, 'processed_chunks': processed, 'updated_at': datetime.now().isoformat()}, f, ensure_ascii=False, indent=2) def format_facts_for_pass2(facts: dict) -> str: """Форматирует накопленные факты для передачи в Пасс 2.""" labels = { 'repairs': 'РЕМОНТ И ПРОБЛЕМЫ', 'models': 'МОДЕЛИ СНОУБАЙКОВ', 'locations': 'ЛОКАЦИИ', 'prices': 'ЦЕНЫ И РЫНОК', 'riding_tips': 'ТЕХНИКА ЕЗДЫ', 'tuning': 'ТЮНИНГ И МОДИФИКАЦИИ', 'donor_bikes': 'МОТОЦИКЛЫ-ДОНОРЫ', 'season': 'СЕЗОННОСТЬ', } lines = [] for cat, label in labels.items(): items = facts.get(cat, []) if items: # Фильтруем только строки, дедупликация str_items = [i for i in items if isinstance(i, str)] unique = list(dict.fromkeys(str_items))[:300] # макс 300 фактов на категорию lines.append(f"\n=== {label} ({len(unique)} фактов) ===") for item in unique: lines.append(f"• {item}") return '\n'.join(lines) def pass1(all_chunks: list, facts: dict, start_from: int) -> dict: """Пасс 1: извлечение фактов через GPT-4o mini.""" total = len(all_chunks) errors = 0 for i, (topic_name, chunk) in enumerate(all_chunks): if i < start_from: continue chunk_text = '\n---\n'.join(chunk) pct = (i + 1) / total * 100 try: result = call_gpt4o_mini(chunk_text, topic_name) facts = merge_facts(facts, result) new_facts = sum(len(v) for v in result.values() if isinstance(v, list)) print(f"[{ts()}] Чанк {i+1}/{total} ({pct:.0f}%) [{topic_name}]: +{new_facts} фактов") except Exception as e: errors += 1 print(f"[{ts()}] ⚠ Чанк {i+1} ошибка: {e}") if errors > 10: print("Слишком много ошибок, останавливаемся") break # Сохраняем прогресс каждые 10 чанков if (i + 1) % 10 == 0: save_progress(facts, i + 1) # Небольшая пауза чтобы не превышать rate limit time.sleep(0.3) save_progress(facts, len(all_chunks)) return facts def pass2(facts: dict) -> str: """Пасс 2: синтез knowledge_base.md через Claude Sonnet.""" facts_text = format_facts_for_pass2(facts) total_facts = sum(len(v) for v in facts.values() if isinstance(v, list)) print(f"[{ts()}] Пасс 2: передаём {total_facts} фактов в Claude Sonnet...") return call_claude_sonnet(facts_text) # Счётчики токенов для подсчёта стоимости TOKEN_COUNTER = {'p1_input': 0, 'p1_output': 0, 'p2_input': 0, 'p2_output': 0} PRICES = { 'gpt-4o-mini': {'input': 0.15, 'output': 0.60}, # $ per 1M tokens 'claude-sonnet':{'input': 3.00, 'output': 15.00}, } def calc_cost(): p1 = PRICES['gpt-4o-mini'] p2 = PRICES['claude-sonnet'] cost_p1 = (TOKEN_COUNTER['p1_input'] / 1_000_000 * p1['input'] + TOKEN_COUNTER['p1_output'] / 1_000_000 * p1['output']) cost_p2 = (TOKEN_COUNTER['p2_input'] / 1_000_000 * p2['input'] + TOKEN_COUNTER['p2_output'] / 1_000_000 * p2['output']) return cost_p1, cost_p2, cost_p1 + cost_p2 def main(): parser = argparse.ArgumentParser() parser.add_argument('--pass1-only', action='store_true', help='Только пасс 1') parser.add_argument('--pass2-only', action='store_true', help='Только пасс 2 (из готовых фактов)') parser.add_argument('--reset', action='store_true', help='Начать заново, игнорировать прогресс') args = parser.parse_args() print(f"[{ts()}] === Анализ @snowbikerussia ===\n") if args.pass2_only: if not FACTS_FILE.exists(): print("Нет файла фактов, сначала запусти пасс 1") return facts, _ = load_progress() total_facts = sum(len(v) for v in facts.values() if isinstance(v, list)) print(f"[{ts()}] Загружено {total_facts} фактов из {FACTS_FILE}") else: # Пасс 1 if args.reset and FACTS_FILE.exists(): FACTS_FILE.unlink() print(f"[{ts()}] Прогресс сброшен") facts, start_from = load_progress() if start_from > 0: total_facts = sum(len(v) for v in facts.values() if isinstance(v, list)) print(f"[{ts()}] Продолжаем с чанка #{start_from+1} ({total_facts} фактов уже собрано)") print(f"[{ts()}] Загружаю сообщения...") all_chunks = load_messages() print(f"[{ts()}] === ПАСС 1: GPT-4o mini ===") facts = pass1(all_chunks, facts, start_from) total_facts = sum(len(v) for v in facts.values() if isinstance(v, list)) print(f"\n[{ts()}] Пасс 1 завершён. Извлечено {total_facts} фактов:") for cat in FACT_CATEGORIES: print(f" {cat}: {len(facts.get(cat, []))}") if args.pass1_only: print(f"\nФакты сохранены в {FACTS_FILE}") return # Пасс 2 print(f"\n[{ts()}] === ПАСС 2: Claude Sonnet ===") kb_content = pass2(facts) # Сохраняем результат header = f"\n" header += f"\n\n" with open(KB_FILE, 'w', encoding='utf-8') as f: f.write(header + kb_content) cost_p1, cost_p2, total_cost = calc_cost() print(f"\n[{ts()}] ✅ База знаний сохранена: {KB_FILE}") print(f"[{ts()}] Размер: {KB_FILE.stat().st_size / 1024:.1f} КБ") print(f"\n[{ts()}] 💰 Стоимость анализа:") print(f" Пасс 1 (GPT-4o mini): ${cost_p1:.3f} ({TOKEN_COUNTER['p1_input']:,} in / {TOKEN_COUNTER['p1_output']:,} out tokens)") print(f" Пасс 2 (Claude Sonnet): ${cost_p2:.3f} ({TOKEN_COUNTER['p2_input']:,} in / {TOKEN_COUNTER['p2_output']:,} out tokens)") print(f" ИТОГО: ${total_cost:.3f}") if __name__ == '__main__': main()