#!/usr/bin/env python3 """ Инкрементальная загрузка новых сообщений из Telegram форумной супергруппы. Читает last_message_id из state.json каждого топика и тянет только новые. Запускается каждые 6 часов через cron. По завершении отправляет отчёт в Telegram. """ import asyncio, os, sys, json, time, subprocess from datetime import datetime, timezone from pathlib import Path sys.path.insert(0, '/home/node/.local/lib/python3.11/site-packages') from dotenv import load_dotenv load_dotenv(os.path.expanduser('~/.openclaw/.env')) from telethon import TelegramClient from telethon.tl.functions.messages import GetForumTopicsRequest # Импортируем хелперы из initial_load sys.path.insert(0, str(Path(__file__).parent)) from initial_load import ( API_ID, API_HASH, SESSION_PATH, CONFIG, DATA_DIR, BATCH_SIZE, PAUSE_MINUTES, ts, media_info, should_download, msg_to_dict, download_media, save_batch, load_state, save_state ) SKILL_DIR = Path(__file__).parent.parent CONFIG_FILE = SKILL_DIR / 'config.json' TG_TARGET = os.getenv('VOICE_TTS_TARGET', 'telegram:126472752').replace('telegram:', '') def send_telegram(text): """Отправляет сообщение в Telegram через openclaw.""" subprocess.run( ['openclaw', 'message', 'send', '--channel', 'telegram', '--target', TG_TARGET, '--message', text], capture_output=True ) def build_report(results, start_time): """Строит текстовый отчёт по результатам инкремента.""" now = datetime.now(timezone.utc).strftime('%H:%M UTC') duration = int((datetime.now(timezone.utc).timestamp() - start_time)) grand_new = sum(r['new'] for r in results) lines = [f"🔄 Инкремент @snowbikerussia — {now} ({duration}с)\n"] for r in results: if r['new'] > 0: lines.append(f"✅ {r['title']}: +{r['new']} новых") else: lines.append(f" {r['title']}: без изменений") lines.append(f"\n💬 Новых сообщений: {grand_new}") if grand_new == 0: lines.append("📭 Всё актуально") return '\n'.join(lines) async def increment_topic(client, entity, topic_id, topic_title, channel_dir): """Загружает новые сообщения топика начиная с last_message_id.""" topic_dir = channel_dir / str(topic_id) state_file = topic_dir / 'state.json' media_dir = topic_dir / 'media' state = load_state(state_file) if not state or not state.get('initial_load_complete'): print(f"[{ts()}] ⚠ Топик [{topic_id}] «{topic_title}»: первичная загрузка не завершена, пропускаем") return 0 min_id = state.get('last_message_id', 0) print(f"[{ts()}] 🔄 Топик [{topic_id}] «{topic_title}» — новые после msg_id={min_id}") topic_dir.mkdir(parents=True, exist_ok=True) existing_batches = sorted(topic_dir.glob('batch_*.json')) batch_num = len(existing_batches) total_new = 0 offset_id = 0 while True: msgs = await client.get_messages( entity, limit=BATCH_SIZE, reply_to=topic_id, offset_id=offset_id, min_id=min_id, reverse=False ) if not msgs: break batch_num += 1 batch_data = [] dl_count = 0 for msg in msgs: m = media_info(msg) local_path, downloaded = None, False if m and should_download(m): local_path = await download_media(client, msg, media_dir, msg.id) downloaded = bool(local_path) if downloaded: dl_count += 1 batch_data.append(msg_to_dict(msg, topic_id, local_path, downloaded)) save_batch(topic_dir, batch_num, batch_data) total_new += len(msgs) newest_id = msgs[0].id if newest_id > (state.get('last_message_id') or 0): state['last_message_id'] = newest_id state['total_messages'] = state.get('total_messages', 0) + len(msgs) text_count = sum(1 for m in batch_data if m.get('text')) media_count = sum(1 for m in batch_data if m.get('media')) print(f"[{ts()}] Пакет #{batch_num}: +{len(msgs)} " f"({text_count} текст, {media_count} медиа, {dl_count} скачано)") if len(msgs) < BATCH_SIZE: break offset_id = msgs[-1].id print(f"[{ts()}] Пауза {PAUSE_MINUTES} мин...") await client.disconnect() time.sleep(PAUSE_MINUTES * 60) await client.connect() # Фиксируем время проверки в любом случае state['last_incremental_at'] = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ') save_state(state_file, state) if total_new == 0: print(f"[{ts()}] Нет новых сообщений") else: print(f"[{ts()}] ✅ Топик [{topic_id}] «{topic_title}»: +{total_new} новых") return total_new async def main(): start_time = datetime.now(timezone.utc).timestamp() print(f"[{ts()}] ▶ Инкрементальная загрузка...") client = TelegramClient(SESSION_PATH, API_ID, API_HASH) await client.start() sources = [s for s in CONFIG['sources'] if s.get('enabled')] if not sources: print("Нет активных источников в config.json") return all_results = [] for source in sources: username = source['username'] channel_id = source.get('channel_id') print(f"\n[{ts()}] === Канал: @{username} ===") entity = await client.get_entity(f'@{username}') if not channel_id: channel_id = entity.id source['channel_id'] = channel_id with open(CONFIG_FILE, 'w') as f: json.dump(CONFIG, f, ensure_ascii=False, indent=2) channel_dir = DATA_DIR / 'raw' / str(channel_id) channel_dir.mkdir(parents=True, exist_ok=True) topics_result = await client(GetForumTopicsRequest( peer=entity, offset_date=None, offset_id=0, offset_topic=0, limit=100 )) # Обновляем meta.json topics_map = {str(t.id): t.title for t in topics_result.topics} meta_file = channel_dir / 'meta.json' if meta_file.exists(): with open(meta_file) as f: meta = json.load(f) meta['topics'] = topics_map else: meta = {"id": channel_id, "username": username, "title": entity.title, "topics": topics_map} with open(meta_file, 'w', encoding='utf-8') as f: json.dump(meta, f, ensure_ascii=False, indent=2) for topic in topics_result.topics: new_count = await increment_topic( client, entity, topic.id, topic.title, channel_dir ) all_results.append({ 'title': topic.title, 'topic_id': topic.id, 'new': new_count }) await client.disconnect() grand_total = sum(r['new'] for r in all_results) print(f"\n[{ts()}] ✅ Инкремент завершён. Всего новых: {grand_total}") # Отправляем отчёт в Telegram report = build_report(all_results, start_time) print(f"\n{report}") send_telegram(report) if __name__ == '__main__': asyncio.run(main())