#!/usr/bin/env python3 """ Первичная загрузка всей истории из Telegram форумной супергруппы. Структура: raw/{channel_id}/{topic_id}/batch_XXXX.json + media/ Прогресс сохраняется в state.json — можно продолжить после прерывания. """ import asyncio, os, sys, json, time, mimetypes from datetime import datetime, timezone from pathlib import Path sys.path.insert(0, '/home/node/.local/lib/python3.11/site-packages') from dotenv import load_dotenv load_dotenv(os.path.expanduser('~/.openclaw/.env')) from telethon import TelegramClient from telethon.tl.functions.messages import GetForumTopicsRequest from telethon.tl.types import ( MessageMediaPhoto, MessageMediaDocument, MessageMediaWebPage, DocumentAttributeVideo, DocumentAttributeFilename, DocumentAttributeAudio ) # --- Конфигурация --- API_ID = int(os.getenv('TELEGRAM_COLLECTOR_API_ID')) API_HASH = os.getenv('TELEGRAM_COLLECTOR_API_HASH') SESSION = os.getenv('TELEGRAM_COLLECTOR_SESSION', 'telegram_collector') SESSION_PATH = str(Path(__file__).parent.parent / 'telegram_collector') SKILL_DIR = Path(__file__).parent.parent CONFIG_FILE = SKILL_DIR / 'config.json' with open(CONFIG_FILE) as f: CONFIG = json.load(f) DATA_DIR = Path(CONFIG['data_dir']) BATCH_SIZE = 10000 PAUSE_MINUTES = 3 MAX_FILE_SIZE = CONFIG['media']['max_file_size_mb'] * 1024 * 1024 # bytes SKIP_VIDEO = CONFIG['media']['skip_video'] DL_PHOTOS = CONFIG['media']['download_photos'] DL_DOCS = CONFIG['media']['download_documents'] def ts(): return datetime.now().strftime('%H:%M:%S') def media_info(msg): """Извлекает метаданные медиафайла из сообщения.""" if not msg.media: return None m = {"type": None, "file_id": None, "size": None, "mime": None, "filename": None, "duration": None, "width": None, "height": None, "local_path": None, "downloaded": False} if isinstance(msg.media, MessageMediaPhoto): photo = msg.media.photo m["type"] = "photo" m["file_id"] = str(photo.id) m["mime"] = "image/jpeg" # Берём размер самого большого варианта sizes = [s for s in photo.sizes if hasattr(s, 'size')] if sizes: m["size"] = max(s.size for s in sizes) elif isinstance(msg.media, MessageMediaDocument): doc = msg.media.document m["file_id"] = str(doc.id) m["size"] = doc.size m["mime"] = doc.mime_type is_video = False for attr in doc.attributes: if isinstance(attr, DocumentAttributeVideo): m["type"] = "video" m["duration"] = attr.duration m["width"] = attr.w m["height"] = attr.h is_video = True elif isinstance(attr, DocumentAttributeFilename): m["filename"] = attr.file_name elif isinstance(attr, DocumentAttributeAudio): m["type"] = "audio" m["duration"] = attr.duration if not is_video and m["type"] != "audio": m["type"] = "document" elif isinstance(msg.media, MessageMediaWebPage): wp = msg.media.webpage m["type"] = "webpage" m["file_id"] = str(getattr(wp, 'id', '')) return { "type": "webpage", "url": getattr(wp, 'url', None), "title": getattr(wp, 'title', None), "downloaded": False } else: m["type"] = type(msg.media).__name__ return m def should_download(m): """Нужно ли скачивать этот файл.""" if not m or m.get("type") == "webpage": return False if m["type"] in ("video", "audio"): return False # видео и аудио — всегда пропуск # Дополнительно проверяем mime-тип — некоторые видео приходят как document mime = m.get("mime") or "" if mime.startswith("video/") or mime.startswith("audio/"): return False if m.get("size") and m["size"] > MAX_FILE_SIZE: return False if m["type"] == "photo" and DL_PHOTOS: return True if m["type"] == "document" and DL_DOCS: return True return False def msg_to_dict(msg, topic_id, local_path=None, downloaded=False): """Конвертирует сообщение Telethon в словарь для сохранения.""" m = media_info(msg) if m and local_path: m["local_path"] = local_path m["downloaded"] = downloaded reply_msg_id = None reply_top_id = None quote_text = None if msg.reply_to: rt = msg.reply_to # Для форумных сообщений: # - корневое сообщение топика: reply_to_msg_id == topic_id, reply_to_top_id == None # - ответ на сообщение: reply_to_msg_id == ID сообщения, reply_to_top_id == topic_id if getattr(rt, 'forum_topic', False) and rt.reply_to_msg_id == topic_id: # Корневое — не считаем это ответом на сообщение reply_msg_id = None else: reply_msg_id = rt.reply_to_msg_id reply_top_id = getattr(rt, 'reply_to_top_id', None) quote_text = getattr(rt, 'quote_text', None) return { "id": msg.id, "date": msg.date.astimezone(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ'), "text": msg.text or None, "from_id": getattr(msg.from_id, 'user_id', None) if msg.from_id else None, "reply_to_msg_id": reply_msg_id, "reply_to_top_id": reply_top_id, "quote_text": quote_text, "edit_date": msg.edit_date.astimezone(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ') if msg.edit_date else None, "pinned": bool(msg.pinned), "media": m } def load_state(state_file): if state_file.exists(): with open(state_file) as f: return json.load(f) return None def save_state(state_file, state): with open(state_file, 'w', encoding='utf-8') as f: json.dump(state, f, ensure_ascii=False, indent=2) def save_batch(batch_dir, batch_num, messages): out = batch_dir / f'batch_{batch_num:04d}.json' with open(out, 'w', encoding='utf-8') as f: json.dump(messages, f, ensure_ascii=False, indent=2) return out async def download_media(client, msg, media_dir, msg_id): """Скачивает медиафайл, возвращает относительный путь.""" media_dir.mkdir(exist_ok=True) m = media_info(msg) if not m: return None # Определяем расширение ext = None if m["type"] == "photo": ext = "jpg" elif m.get("filename"): ext = Path(m["filename"]).suffix.lstrip('.') or "bin" elif m.get("mime"): ext = mimetypes.guess_extension(m["mime"], strict=False) if ext: ext = ext.lstrip('.') else: ext = "bin" filename = f"{msg_id}_{m['type']}.{ext}" local_abs = media_dir / filename local_rel = f"media/{filename}" if local_abs.exists(): return local_rel # уже скачан try: await client.download_media(msg, file=str(local_abs)) return local_rel except Exception as e: print(f" [{ts()}] ⚠ Ошибка скачивания медиа msg {msg_id}: {e}") return None async def load_topic(client, entity, topic_id, topic_title, channel_dir): """Загружает все сообщения одного топика.""" topic_dir = channel_dir / str(topic_id) topic_dir.mkdir(parents=True, exist_ok=True) media_dir = topic_dir / 'media' state_file = topic_dir / 'state.json' state = load_state(state_file) if state and state.get('initial_load_complete'): print(f"[{ts()}] ✅ Топик [{topic_id}] «{topic_title}» уже загружен, пропускаем") return # Восстанавливаем прогресс if state: offset_id = state.get('current_offset_id', 0) batch_num = state.get('batch_num', 0) total_loaded = state.get('total_loaded', 0) print(f"[{ts()}] ↩ Продолжаем топик [{topic_id}] «{topic_title}» с пакета #{batch_num+1}") else: offset_id = 0 batch_num = 0 total_loaded = 0 state = { "topic_id": topic_id, "topic_title": topic_title, "first_message_id": None, "last_message_id": None, "total_messages": 0, "initial_load_complete": False, "last_incremental_at": None, "batch_num": 0, "current_offset_id": 0, "total_loaded": 0 } print(f"[{ts()}] 📂 Топик [{topic_id}] «{topic_title}»") while True: # Пагинация назад по истории: # offset_id=N → возвращает сообщения с id < N, от новых к старым # offset_id=0 → возвращает самые свежие сообщения batch_msgs_raw = await client.get_messages( entity, limit=BATCH_SIZE, reply_to=topic_id, offset_id=offset_id, ) if not batch_msgs_raw: print(f"[{ts()}] Сообщений больше нет.") break batch_num += 1 batch_data = [] dl_count = 0 for msg in batch_msgs_raw: m = media_info(msg) local_path = None downloaded = False if m and should_download(m): local_path = await download_media(client, msg, media_dir, msg.id) if local_path: downloaded = True dl_count += 1 batch_data.append(msg_to_dict(msg, topic_id, local_path, downloaded)) save_batch(topic_dir, batch_num, batch_data) total_loaded += len(batch_msgs_raw) ids = [msg.id for msg in batch_msgs_raw] oldest_id = min(ids) # самый старый ID → следующий offset newest_id = max(ids) # last_message_id фиксируем только из первого батча (самые новые) if state["last_message_id"] is None: state["last_message_id"] = newest_id state["first_message_id"] = oldest_id state["batch_num"] = batch_num state["current_offset_id"] = oldest_id state["total_loaded"] = total_loaded save_state(state_file, state) text_count = sum(1 for m in batch_data if m.get('text')) media_count = sum(1 for m in batch_data if m.get('media')) print(f"[{ts()}] Пакет #{batch_num}: {len(batch_msgs_raw)} сообщений " f"(id {oldest_id}..{newest_id}, {text_count} текст, {media_count} медиа, {dl_count} скачано)") if len(batch_msgs_raw) < BATCH_SIZE: print(f"[{ts()}] Последний пакет.") break # Следующий батч: берём сообщения старее текущего oldest_id offset_id = oldest_id print(f"[{ts()}] Пауза {PAUSE_MINUTES} мин...") await client.disconnect() time.sleep(PAUSE_MINUTES * 60) await client.connect() # Финализируем state state["initial_load_complete"] = True state["total_messages"] = total_loaded # Убираем служебные поля прогресса state.pop("current_offset_id", None) state.pop("batch_num", None) state.pop("total_loaded", None) save_state(state_file, state) print(f"[{ts()}] ✅ Топик [{topic_id}] «{topic_title}» загружен: {total_loaded} сообщений\n") async def main(): print(f"[{ts()}] Стартуем первичную загрузку...") client = TelegramClient(SESSION_PATH, API_ID, API_HASH) await client.start() # Читаем источники из конфига sources = [s for s in CONFIG['sources'] if s.get('enabled')] if not sources: print("Нет активных источников в config.json") return for source in sources: username = source['username'] print(f"\n[{ts()}] === Канал: @{username} ===") entity = await client.get_entity(f'@{username}') channel_id = entity.id channel_dir = DATA_DIR / 'raw' / str(channel_id) channel_dir.mkdir(parents=True, exist_ok=True) # Обновляем channel_id в конфиге если ещё не записан if not source.get('channel_id'): source['channel_id'] = channel_id with open(CONFIG_FILE, 'w') as f: json.dump(CONFIG, f, ensure_ascii=False, indent=2) # Получаем список топиков topics_result = await client(GetForumTopicsRequest( peer=entity, offset_date=None, offset_id=0, offset_topic=0, limit=100 )) topics = {str(t.id): t.title for t in topics_result.topics} print(f"[{ts()}] Топиков: {len(topics)}: {', '.join(topics.values())}") # Сохраняем meta.json meta = { "id": channel_id, "username": username, "title": entity.title, "topics": topics } with open(channel_dir / 'meta.json', 'w', encoding='utf-8') as f: json.dump(meta, f, ensure_ascii=False, indent=2) # Загружаем топик за топиком for topic in topics_result.topics: await load_topic(client, entity, topic.id, topic.title, channel_dir) await client.disconnect() print(f"\n[{ts()}] 🎉 Первичная загрузка завершена!") if __name__ == '__main__': asyncio.run(main())