Files
wiki/skills/telegram-collector/scripts/initial_load.py
2026-04-12 21:55:33 +03:00

388 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Первичная загрузка всей истории из Telegram форумной супергруппы.
Структура: raw/{channel_id}/{topic_id}/batch_XXXX.json + media/
Прогресс сохраняется в state.json — можно продолжить после прерывания.
"""
import asyncio, os, sys, json, time, mimetypes
from datetime import datetime, timezone
from pathlib import Path
sys.path.insert(0, '/home/node/.local/lib/python3.11/site-packages')
from dotenv import load_dotenv
load_dotenv(os.path.expanduser('~/.openclaw/.env'))
from telethon import TelegramClient
from telethon.tl.functions.messages import GetForumTopicsRequest
from telethon.tl.types import (
MessageMediaPhoto, MessageMediaDocument, MessageMediaWebPage,
DocumentAttributeVideo, DocumentAttributeFilename, DocumentAttributeAudio
)
# --- Конфигурация ---
API_ID = int(os.getenv('TELEGRAM_COLLECTOR_API_ID'))
API_HASH = os.getenv('TELEGRAM_COLLECTOR_API_HASH')
SESSION = os.getenv('TELEGRAM_COLLECTOR_SESSION', 'telegram_collector')
SESSION_PATH = str(Path(__file__).parent.parent / 'telegram_collector')
SKILL_DIR = Path(__file__).parent.parent
CONFIG_FILE = SKILL_DIR / 'config.json'
with open(CONFIG_FILE) as f:
CONFIG = json.load(f)
DATA_DIR = Path(CONFIG['data_dir'])
BATCH_SIZE = 10000
PAUSE_MINUTES = 3
MAX_FILE_SIZE = CONFIG['media']['max_file_size_mb'] * 1024 * 1024 # bytes
SKIP_VIDEO = CONFIG['media']['skip_video']
DL_PHOTOS = CONFIG['media']['download_photos']
DL_DOCS = CONFIG['media']['download_documents']
def ts():
return datetime.now().strftime('%H:%M:%S')
def media_info(msg):
"""Извлекает метаданные медиафайла из сообщения."""
if not msg.media:
return None
m = {"type": None, "file_id": None, "size": None, "mime": None,
"filename": None, "duration": None, "width": None, "height": None,
"local_path": None, "downloaded": False}
if isinstance(msg.media, MessageMediaPhoto):
photo = msg.media.photo
m["type"] = "photo"
m["file_id"] = str(photo.id)
m["mime"] = "image/jpeg"
# Берём размер самого большого варианта
sizes = [s for s in photo.sizes if hasattr(s, 'size')]
if sizes:
m["size"] = max(s.size for s in sizes)
elif isinstance(msg.media, MessageMediaDocument):
doc = msg.media.document
m["file_id"] = str(doc.id)
m["size"] = doc.size
m["mime"] = doc.mime_type
is_video = False
for attr in doc.attributes:
if isinstance(attr, DocumentAttributeVideo):
m["type"] = "video"
m["duration"] = attr.duration
m["width"] = attr.w
m["height"] = attr.h
is_video = True
elif isinstance(attr, DocumentAttributeFilename):
m["filename"] = attr.file_name
elif isinstance(attr, DocumentAttributeAudio):
m["type"] = "audio"
m["duration"] = attr.duration
if not is_video and m["type"] != "audio":
m["type"] = "document"
elif isinstance(msg.media, MessageMediaWebPage):
wp = msg.media.webpage
m["type"] = "webpage"
m["file_id"] = str(getattr(wp, 'id', ''))
return {
"type": "webpage",
"url": getattr(wp, 'url', None),
"title": getattr(wp, 'title', None),
"downloaded": False
}
else:
m["type"] = type(msg.media).__name__
return m
def should_download(m):
"""Нужно ли скачивать этот файл."""
if not m or m.get("type") == "webpage":
return False
if m["type"] in ("video", "audio"):
return False # видео и аудио — всегда пропуск
# Дополнительно проверяем mime-тип — некоторые видео приходят как document
mime = m.get("mime") or ""
if mime.startswith("video/") or mime.startswith("audio/"):
return False
if m.get("size") and m["size"] > MAX_FILE_SIZE:
return False
if m["type"] == "photo" and DL_PHOTOS:
return True
if m["type"] == "document" and DL_DOCS:
return True
return False
def msg_to_dict(msg, topic_id, local_path=None, downloaded=False):
"""Конвертирует сообщение Telethon в словарь для сохранения."""
m = media_info(msg)
if m and local_path:
m["local_path"] = local_path
m["downloaded"] = downloaded
reply_msg_id = None
reply_top_id = None
quote_text = None
if msg.reply_to:
rt = msg.reply_to
# Для форумных сообщений:
# - корневое сообщение топика: reply_to_msg_id == topic_id, reply_to_top_id == None
# - ответ на сообщение: reply_to_msg_id == ID сообщения, reply_to_top_id == topic_id
if getattr(rt, 'forum_topic', False) and rt.reply_to_msg_id == topic_id:
# Корневое — не считаем это ответом на сообщение
reply_msg_id = None
else:
reply_msg_id = rt.reply_to_msg_id
reply_top_id = getattr(rt, 'reply_to_top_id', None)
quote_text = getattr(rt, 'quote_text', None)
return {
"id": msg.id,
"date": msg.date.astimezone(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ'),
"text": msg.text or None,
"from_id": getattr(msg.from_id, 'user_id', None) if msg.from_id else None,
"reply_to_msg_id": reply_msg_id,
"reply_to_top_id": reply_top_id,
"quote_text": quote_text,
"edit_date": msg.edit_date.astimezone(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ') if msg.edit_date else None,
"pinned": bool(msg.pinned),
"media": m
}
def load_state(state_file):
if state_file.exists():
with open(state_file) as f:
return json.load(f)
return None
def save_state(state_file, state):
with open(state_file, 'w', encoding='utf-8') as f:
json.dump(state, f, ensure_ascii=False, indent=2)
def save_batch(batch_dir, batch_num, messages):
out = batch_dir / f'batch_{batch_num:04d}.json'
with open(out, 'w', encoding='utf-8') as f:
json.dump(messages, f, ensure_ascii=False, indent=2)
return out
async def download_media(client, msg, media_dir, msg_id):
"""Скачивает медиафайл, возвращает относительный путь."""
media_dir.mkdir(exist_ok=True)
m = media_info(msg)
if not m:
return None
# Определяем расширение
ext = None
if m["type"] == "photo":
ext = "jpg"
elif m.get("filename"):
ext = Path(m["filename"]).suffix.lstrip('.') or "bin"
elif m.get("mime"):
ext = mimetypes.guess_extension(m["mime"], strict=False)
if ext:
ext = ext.lstrip('.')
else:
ext = "bin"
filename = f"{msg_id}_{m['type']}.{ext}"
local_abs = media_dir / filename
local_rel = f"media/{filename}"
if local_abs.exists():
return local_rel # уже скачан
try:
await client.download_media(msg, file=str(local_abs))
return local_rel
except Exception as e:
print(f" [{ts()}] ⚠ Ошибка скачивания медиа msg {msg_id}: {e}")
return None
async def load_topic(client, entity, topic_id, topic_title, channel_dir):
"""Загружает все сообщения одного топика."""
topic_dir = channel_dir / str(topic_id)
topic_dir.mkdir(parents=True, exist_ok=True)
media_dir = topic_dir / 'media'
state_file = topic_dir / 'state.json'
state = load_state(state_file)
if state and state.get('initial_load_complete'):
print(f"[{ts()}] ✅ Топик [{topic_id}] «{topic_title}» уже загружен, пропускаем")
return
# Восстанавливаем прогресс
if state:
offset_id = state.get('current_offset_id', 0)
batch_num = state.get('batch_num', 0)
total_loaded = state.get('total_loaded', 0)
print(f"[{ts()}] ↩ Продолжаем топик [{topic_id}] «{topic_title}» с пакета #{batch_num+1}")
else:
offset_id = 0
batch_num = 0
total_loaded = 0
state = {
"topic_id": topic_id,
"topic_title": topic_title,
"first_message_id": None,
"last_message_id": None,
"total_messages": 0,
"initial_load_complete": False,
"last_incremental_at": None,
"batch_num": 0,
"current_offset_id": 0,
"total_loaded": 0
}
print(f"[{ts()}] 📂 Топик [{topic_id}] «{topic_title}»")
while True:
# Пагинация назад по истории:
# offset_id=N → возвращает сообщения с id < N, от новых к старым
# offset_id=0 → возвращает самые свежие сообщения
batch_msgs_raw = await client.get_messages(
entity,
limit=BATCH_SIZE,
reply_to=topic_id,
offset_id=offset_id,
)
if not batch_msgs_raw:
print(f"[{ts()}] Сообщений больше нет.")
break
batch_num += 1
batch_data = []
dl_count = 0
for msg in batch_msgs_raw:
m = media_info(msg)
local_path = None
downloaded = False
if m and should_download(m):
local_path = await download_media(client, msg, media_dir, msg.id)
if local_path:
downloaded = True
dl_count += 1
batch_data.append(msg_to_dict(msg, topic_id, local_path, downloaded))
save_batch(topic_dir, batch_num, batch_data)
total_loaded += len(batch_msgs_raw)
ids = [msg.id for msg in batch_msgs_raw]
oldest_id = min(ids) # самый старый ID → следующий offset
newest_id = max(ids)
# last_message_id фиксируем только из первого батча (самые новые)
if state["last_message_id"] is None:
state["last_message_id"] = newest_id
state["first_message_id"] = oldest_id
state["batch_num"] = batch_num
state["current_offset_id"] = oldest_id
state["total_loaded"] = total_loaded
save_state(state_file, state)
text_count = sum(1 for m in batch_data if m.get('text'))
media_count = sum(1 for m in batch_data if m.get('media'))
print(f"[{ts()}] Пакет #{batch_num}: {len(batch_msgs_raw)} сообщений "
f"(id {oldest_id}..{newest_id}, {text_count} текст, {media_count} медиа, {dl_count} скачано)")
if len(batch_msgs_raw) < BATCH_SIZE:
print(f"[{ts()}] Последний пакет.")
break
# Следующий батч: берём сообщения старее текущего oldest_id
offset_id = oldest_id
print(f"[{ts()}] Пауза {PAUSE_MINUTES} мин...")
await client.disconnect()
time.sleep(PAUSE_MINUTES * 60)
await client.connect()
# Финализируем state
state["initial_load_complete"] = True
state["total_messages"] = total_loaded
# Убираем служебные поля прогресса
state.pop("current_offset_id", None)
state.pop("batch_num", None)
state.pop("total_loaded", None)
save_state(state_file, state)
print(f"[{ts()}] ✅ Топик [{topic_id}] «{topic_title}» загружен: {total_loaded} сообщений\n")
async def main():
print(f"[{ts()}] Стартуем первичную загрузку...")
client = TelegramClient(SESSION_PATH, API_ID, API_HASH)
await client.start()
# Читаем источники из конфига
sources = [s for s in CONFIG['sources'] if s.get('enabled')]
if not sources:
print("Нет активных источников в config.json")
return
for source in sources:
username = source['username']
print(f"\n[{ts()}] === Канал: @{username} ===")
entity = await client.get_entity(f'@{username}')
channel_id = entity.id
channel_dir = DATA_DIR / 'raw' / str(channel_id)
channel_dir.mkdir(parents=True, exist_ok=True)
# Обновляем channel_id в конфиге если ещё не записан
if not source.get('channel_id'):
source['channel_id'] = channel_id
with open(CONFIG_FILE, 'w') as f:
json.dump(CONFIG, f, ensure_ascii=False, indent=2)
# Получаем список топиков
topics_result = await client(GetForumTopicsRequest(
peer=entity, offset_date=None, offset_id=0,
offset_topic=0, limit=100
))
topics = {str(t.id): t.title for t in topics_result.topics}
print(f"[{ts()}] Топиков: {len(topics)}: {', '.join(topics.values())}")
# Сохраняем meta.json
meta = {
"id": channel_id,
"username": username,
"title": entity.title,
"topics": topics
}
with open(channel_dir / 'meta.json', 'w', encoding='utf-8') as f:
json.dump(meta, f, ensure_ascii=False, indent=2)
# Загружаем топик за топиком
for topic in topics_result.topics:
await load_topic(client, entity, topic.id, topic.title, channel_dir)
await client.disconnect()
print(f"\n[{ts()}] 🎉 Первичная загрузка завершена!")
if __name__ == '__main__':
asyncio.run(main())