165 lines
5.7 KiB
Python
165 lines
5.7 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Загрузка сообщений в Meilisearch.
|
||
Вход: tasks/snowbike-rag/data/messages.jsonl
|
||
"""
|
||
|
||
import json
|
||
import sys
|
||
import time
|
||
from pathlib import Path
|
||
|
||
import meilisearch
|
||
|
||
MEILI_URL = "http://127.0.0.1:7700"
|
||
INDEX_NAME = "snowbike_messages"
|
||
BATCH_SIZE = 1000
|
||
|
||
DATA_FILE = Path(__file__).parent.parent / "data" / "messages.jsonl"
|
||
|
||
|
||
def wait_for_task(client, task_uid, timeout=300):
|
||
"""Ждём завершения задачи Meilisearch."""
|
||
start = time.time()
|
||
while time.time() - start < timeout:
|
||
task = client.get_task(task_uid)
|
||
if task.status in ("succeeded", "failed", "canceled"):
|
||
return task
|
||
time.sleep(1)
|
||
raise TimeoutError(f"Task {task_uid} не завершилась за {timeout}с")
|
||
|
||
|
||
def setup_index(client):
|
||
"""Создаём и настраиваем индекс."""
|
||
try:
|
||
index = client.get_index(INDEX_NAME)
|
||
print(f"Индекс '{INDEX_NAME}' уже существует, используем его")
|
||
except meilisearch.errors.MeilisearchApiError:
|
||
print(f"Создаём индекс '{INDEX_NAME}'...")
|
||
task = client.create_index(INDEX_NAME, {"primaryKey": "id"})
|
||
wait_for_task(client, task.task_uid)
|
||
index = client.get_index(INDEX_NAME)
|
||
print("Индекс создан")
|
||
|
||
# Настройки индекса
|
||
print("Настраиваем индекс...")
|
||
|
||
task = index.update_settings({
|
||
"searchableAttributes": ["text"],
|
||
"filterableAttributes": ["topic_id", "date", "from_id"],
|
||
"sortableAttributes": ["date"],
|
||
"typoTolerance": {
|
||
"enabled": True,
|
||
"minWordSizeForTypos": {
|
||
"oneTypo": 5,
|
||
"twoTypos": 9
|
||
}
|
||
},
|
||
"stopWords": ["и", "в", "на", "с", "для", "это", "что", "как", "не", "а",
|
||
"но", "или", "по", "из", "от", "до", "при", "за", "об", "так",
|
||
"же", "то", "ли", "бы", "да", "нет", "все", "они", "мне", "его"]
|
||
})
|
||
wait_for_task(client, task.task_uid)
|
||
print("Настройки применены")
|
||
|
||
return index
|
||
|
||
|
||
def load_messages():
|
||
"""Загружаем сообщения из JSONL."""
|
||
messages = []
|
||
if not DATA_FILE.exists():
|
||
print(f"ОШИБКА: файл {DATA_FILE} не найден. Сначала запустите parse_messages.py")
|
||
sys.exit(1)
|
||
|
||
with open(DATA_FILE, "r", encoding="utf-8") as f:
|
||
for line in f:
|
||
line = line.strip()
|
||
if line:
|
||
messages.append(json.loads(line))
|
||
|
||
print(f"Загружено {len(messages)} сообщений из JSONL")
|
||
return messages
|
||
|
||
|
||
def index_messages(index, messages):
|
||
"""Индексируем сообщения батчами."""
|
||
total = len(messages)
|
||
indexed = 0
|
||
|
||
for i in range(0, total, BATCH_SIZE):
|
||
batch = messages[i:i + BATCH_SIZE]
|
||
task = index.add_documents(batch)
|
||
|
||
# Ждём каждые 10 батчей или в конце
|
||
if (i // BATCH_SIZE) % 10 == 9 or i + BATCH_SIZE >= total:
|
||
wait_for_task(index._http_requests.session.base_url.replace("/", "").replace("http:", "").replace("https:", "").split(":")[0],
|
||
task.task_uid)
|
||
|
||
indexed += len(batch)
|
||
progress = (indexed / total) * 100
|
||
print(f" Прогресс: {indexed}/{total} ({progress:.1f}%)", end="\r")
|
||
|
||
print(f"\nИндексация завершена: {indexed} сообщений")
|
||
|
||
|
||
def index_messages_v2(client, index, messages):
|
||
"""Индексируем сообщения батчами (простой вариант)."""
|
||
total = len(messages)
|
||
task_uids = []
|
||
|
||
print(f"Загружаем {total} сообщений батчами по {BATCH_SIZE}...")
|
||
|
||
for i in range(0, total, BATCH_SIZE):
|
||
batch = messages[i:i + BATCH_SIZE]
|
||
task = index.add_documents(batch)
|
||
task_uids.append(task.task_uid)
|
||
|
||
indexed = min(i + BATCH_SIZE, total)
|
||
progress = (indexed / total) * 100
|
||
print(f" Отправлено: {indexed}/{total} ({progress:.1f}%)", end="\r")
|
||
|
||
print(f"\nОжидаем завершения индексации...")
|
||
|
||
# Ждём последнюю задачу (все предыдущие уже должны быть выполнены)
|
||
if task_uids:
|
||
last_task = wait_for_task(client, task_uids[-1], timeout=600)
|
||
if last_task.status == "succeeded":
|
||
print(f"Индексация успешна!")
|
||
else:
|
||
print(f"ОШИБКА индексации: {last_task.error}")
|
||
sys.exit(1)
|
||
|
||
# Финальная статистика
|
||
stats = index.get_stats()
|
||
print(f"Документов в индексе: {stats.number_of_documents}")
|
||
|
||
|
||
def main():
|
||
print("=== Meilisearch Индексация ===")
|
||
|
||
# Подключаемся
|
||
client = meilisearch.Client(MEILI_URL)
|
||
try:
|
||
health = client.health()
|
||
print(f"Meilisearch: {health}")
|
||
except Exception as e:
|
||
print(f"ОШИБКА: не могу подключиться к Meilisearch ({MEILI_URL}): {e}")
|
||
print("Запустите Meilisearch: ./bin/meilisearch --db-path data/meilisearch --http-addr 127.0.0.1:7700 --no-analytics &")
|
||
sys.exit(1)
|
||
|
||
# Настраиваем индекс
|
||
index = setup_index(client)
|
||
|
||
# Загружаем данные
|
||
messages = load_messages()
|
||
|
||
# Индексируем
|
||
index_messages_v2(client, index, messages)
|
||
|
||
print("\n✓ Готово!")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|