alerts_bot/main.py
Евгений Панков 42a13742cf Более точное поведение при пересылке.
* другой алгоритм отслеживания изменённых сообщений.
По событиям вместо частых запросов.
* add: portable-install.sh
* add: docker-compose.yml
2025-08-15 20:11:05 +03:00

441 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
# from __future__ import annotations
from telethon import TelegramClient, events
from telethon.tl.types import PeerUser, PeerChat, PeerChannel, Message
import re
import os
import json
import logging
import asyncio
from datetime import datetime, timedelta
from dotenv import find_dotenv, load_dotenv
from typing import Optional
# ==== Настройка логирования ====
log = logging.getLogger(__name__)
log.setLevel(logging.DEBUG)
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter('[%(levelname)s] %(asctime)s | %(message)s'))
log.addHandler(console_handler)
# === Загрузка переменных окружения ===
env = find_dotenv()
log.info(f'Использую: {env}')
load_dotenv(env)
# === Данные юзер-бота ===
api_id=os.getenv('api_id')
api_hash=os.getenv('api_hash')
session_name=os.getenv('session_name') or 'userbot'
# === Настройки ===
source_channel_username=os.getenv('source_channel_username') # Канал, который слушаем
target_groups=os.getenv('target_groups').split(',') # Группы, куда пересылаем
for i, s in enumerate(target_groups):
if s.isdigit() or s.startswith('-') and s[1:].isdigit():
target_groups[i] = int(s)
target_entities=[]
groups_clean_srv_msgs=os.getenv('groups_clean_srv_msgs').split(',') # Группы, где удаляем сообщения о присоединившихся
if not groups_clean_srv_msgs:
groups_clean_srv_msgs = list(target_groups)
for i, s in enumerate(groups_clean_srv_msgs):
if s.isdigit() or s.startswith('-') and s[1:].isdigit():
groups_clean_srv_msgs[i] = int(s)
filter_keywords=os.getenv('filter_keywords') # Строки или регулярное выражение для поиска
filter_keywords=filter_keywords.split(',')
filter_negative_keywords=os.getenv('filter_negative_keywords') # Строки или регулярное выражение для поиска
filter_negative_keywords=filter_negative_keywords.split(',')
DATA_FILE=os.getenv('DATA_FILE') # Файл для сохранения сообщений
if not DATA_FILE:
DATA_FILE='messages.json'
MAX_MESSAGES = os.getenv('MAX_MESSAGES') # Количество хранящихся сообщений
MAX_MESSAGES=int(MAX_MESSAGES) if MAX_MESSAGES else 50
# =======================
client = TelegramClient(session_name, api_id, api_hash)
# ==== Загрузка/сохранение состояния ====
def load_state():
if os.path.exists(DATA_FILE):
with open(DATA_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
return [{},{}]
def save_state():
with open(DATA_FILE, 'w', encoding='utf-8') as f:
json.dump([last_known_msg_id, source_messages], f, ensure_ascii=False, indent=2)
log.debug('Состояние сохранено')
# ==== Хранение данных ====
last_known_msg_id, source_messages = load_state()
# last_known_msg_id {chat_id: msg_id}
# source_messages {msg_id: {'text': ..., 'forwarded_msg_id': {chat_id: message_id}}}
# =======================
def shorten(text: str, length: int = 80) -> str:
'''Возвращает короткое сообщение для логов'''
text = text.replace('\n', '')
if len(text)>length:
return f'{text[:length-1]}'
return text
async def prepare_entities(target_groups: list) -> list:
"""
Обрабатывает список target_groups и возвращает список сущностей для рассылки сообщений.
:param client: Экземпляр TelegramClient
:param target_groups: Список, содержащий chat_id, пригласительные ссылки или имена пользователей
:return: Список сущностей, готовых для отправки сообщений
"""
for item in target_groups:
try:
# Проверяем, является ли элемент числом (chat_id)
if isinstance(item, (int, str)) and str(item).isdigit() or str(item).startswith('-') and str(item)[1:].isdigit():
# Если это chat_id, преобразуем в Peer
chat_id = int(item)
entity = await client.get_entity(chat_id)
target_entities.append(entity)
# Проверяем, является ли элемент username (начинается с @)
elif isinstance(item, str) and item.startswith('@'):
entity = await client.get_entity(item)
target_entities.append(entity)
# Проверяем, является ли элемент пригласительной ссылкой
elif isinstance(item, str) and (item.startswith('t.me/') or item.startswith('https://t.me/')):
# Удаляем https://, если есть
link = item.replace('https://', '')
# Проверяем, является ли ссылка на публичный канал/группу (@username)
if re.match(r'^t\.me/@[a-zA-Z0-9_]+$', link):
entity = await client.get_entity(link.replace('t.me/', ''))
target_entities.append(entity)
# Проверяем, является ли ссылка на приватный чат/канал (t.me/joinchat/XXXX или t.me/+XXXX)
elif re.match(r'^t\.me/(joinchat/|\+)[a-zA-Z0-9_-]+$', link):
# Для приватных чатов нужно сначала присоединиться
from telethon.tl.functions.messages import ImportChatInviteRequest
hash = link.split('/')[-1].replace('+', '')
try:
result = await client(ImportChatInviteRequest(hash))
entity = result.chats[0] # Получаем сущность чата
target_entities.append(entity)
except Exception as e:
print(f"Ошибка при попытке присоединиться к чату {link}: {e}")
else:
print(f"Неподдерживаемый формат ссылки: {item}")
else:
print(f"Неподдерживаемый формат элемента: {item}")
except ValueError as e:
print(f"Ошибка при обработке {item}: {e}")
except Exception as e:
print(f"Неизвестная ошибка при обработке {item}: {e}")
# return target_entities
async def forward_to_targets(
message: Message,
targets: Optional[list[str | int]] = None,
source_reply_to_msg_id: Optional[str | int] = None
) -> list[Message]:
# Пересылаем сообщения
new_msgs = []
for entity in (targets or target_entities):
if source_reply_to_msg_id and (forwarded_info:=source_messages.get(str(source_reply_to_msg_id))):
for str_chat_id, target_msg_id in forwarded_info['forwarded_msg_id'].items():
if str_chat_id == entity.chat.id:
link = await client.get_message_link(message)
text = f'Переслано из [{message.chat.title}]({link})\n{message.text}'
new_msg = await client.send_message(str_chat_id, text, reply_to=target_msg_id, parse_mode='markdown')
else:
new_msg = await client.forward_messages(entity, message)
new_msgs.append(new_msg)
# Запоминаем пересланные данные сообщения
source_messages[str(message.id)] = {
'text': message.text,
'timestamp': message.date.timestamp(),
'forwarded_msg_id': {str(msg.chat.id): msg.id for msg in new_msgs}
}
# Обновляем последние известные id для канала
for new_msg in new_msgs:
last_known_msg_id[str(new_msg.chat.id)] = new_msg.id
# Ограничиваем размер словаря
if len(source_messages) > MAX_MESSAGES:
oldest = min(source_messages.keys(), key=lambda m_id: source_messages[m_id]['timestamp'])
del source_messages[oldest]
# Сохраняем
save_state()
return new_msgs
# ==== Обработчик сообщений из получателей ====
@client.on(events.NewMessage(chats=target_entities))
async def handler(event: events.NewMessage.Event):
last_known_msg_id[str(event.chat.id)] = event.message.id
log.debug(f'last_known_msg_id[{event.chat.id}]={event.message.id} {event.chat.title}')
def check(text) -> Optional[bool]:
text = text or ""
if any(re.search(keyword, text, re.IGNORECASE) for keyword in filter_keywords) \
and not any(re.search(keyword, text, re.IGNORECASE) for keyword in filter_negative_keywords):
return True
# ==== Обработчик сообщений из источника ====
@client.on(events.NewMessage(chats=source_channel_username))
async def handler_NewMessages(event: events.NewMessage.Event):
message_text = event.message.text or ""
last_known_msg_id[source_channel_username] = event.message.id
if message_text:
log.info(f"Новое сообщение: {shorten(message_text)}")
# Проверяем само сообщение
if check(message_text):
await forward_to_targets(event.message)
log.info(f"📩 Переслано: {shorten(message_text)}")
return
# Проверяем цепочку ответов
reply = event.message.reply_to
if reply and reply.reply_to_msg_id:
source_reply_to_msg_id = await check_reply_chain(reply.reply_to_msg_id, depth=1)
if source_reply_to_msg_id:
await forward_to_targets(event.message, source_reply_to_msg_id=source_reply_to_msg_id)
log.info(f"⛓ Переслано по цепочке: {shorten(message_text)}")
@client.on(events.MessageEdited(chats=source_channel_username))
async def handler_edited(event: events.MessageEdited.Event):
stored_m_info = source_messages.get(str(event.message.id))
if stored_m_info and stored_m_info['text'] != event.message.text:
log.info(f"📝 Сообщение изменено: {shorten(stored_m_info['text'])}{shorten(event.message.text)}")
# Редактируем пересланное сообщение
for str_chat_id, msg_id in stored_m_info['forwarded_msg_id'].items():
chat_id = int(str_chat_id)
# Получаем ID последнего сообщения в целевом чате
last_msg_id = last_known_msg_id.get(str_chat_id)
if not last_msg_id:
last_msg_id = await update_last_msg_id(chat_id)
log.debug(f'chat_id={chat_id}, stored_msg_id={msg_id}, last_msg_id_on_channel={last_msg_id}')
# Проверяем, является ли наше пересланное сообщение последним
if msg_id == last_msg_id:
# Это последнее сообщение — можно удалить и переслать заново
await client.delete_messages(chat_id, msg_id)
forward_to_targets(event.message, targets=[chat_id])
log.info(f"🔁 {chat_id} Сообщение было последним → Заменено")
else:
# Не последнее → отправляем изменённый текст как ответ
new_msg = await client.send_message(
chat_id,
f"🔄 Текст изменён:\n{event.message.text}",
reply_to=stored_m_info['forwarded_msg_id'][str_chat_id]
)
# Запоминаем пересланные данные сообщения
sm = source_messages.get(str(event.message.id))
if not sm:
source_messages[str(event.message.id)] = {
'text': event.message.text,
'timestamp': event.message.date.timestamp(),
'forwarded_msg_id': {str(new_msg.chat.id): new_msg.id}
}
else:
sm['text'] = event.message.text
sm['timestamp'] = event.message.date.timestamp()
sm['forwarded_msg_id'][str(new_msg.chat.id)] = new_msg.id
# Обновляем последние известные id для канала
last_known_msg_id[str(new_msg.chat.id)] = new_msg.id
log.info(f"💬 {chat_id} Сообщение не последнее → Отправлен ответ")
@client.on(events.MessageDeleted(chats=source_channel_username))
async def handler_deleted(event: events.MessageDeleted.Event):
for msg_id in event.deleted_ids:
stored_m_info = source_messages.get(str(msg_id))
if stored_m_info:
log.info(f"🗑 Исходное сообщение удалено. Удаляю пересланное: {shorten(stored_m_info['text'])}")
for str_chat_id, fwd_msg_id in tuple(stored_m_info['forwarded_msg_id'].items()):
chat_id = int(str_chat_id)
await client.delete_messages(chat_id, fwd_msg_id)
del stored_m_info['forwarded_msg_id'][str_chat_id]
if not stored_m_info['forwarded_msg_id']:
del source_messages[str(msg_id)]
async def get_batch_messages(chat_id, min_id, max_id):
chat_id = int(chat_id)
approx_count = max_id - min_id + 1
limit = int(approx_count * 1.05) # Увеличиваем лимит на 5%, чтобы точно зацепить все нужные
limit = min(limit, 3000) # Максимум 3000 (Telegram API limit)
log.debug(f"🔍 Запрашиваю {limit} сообщений для чата {chat_id} из диапазона {min_id}{max_id}")
try:
# Получаем batch сообщений
batch = await client.get_messages(chat_id, limit=limit)
batch_dict = {msg.id: msg for msg in batch}
return batch_dict
except Exception as e:
log.error(f"⚠ Ошибка при получении batch-сообщений: {e}")
return None
async def update_last_msg_id(chat_id: int|str) -> int:
# Получаем ID последнего сообщения в целевом чате
chat_id = int(chat_id)
last_msg = await client.get_messages(chat_id, limit=1)
last_msg_id = last_msg[0].id if last_msg else None
if last_msg_id:
last_known_msg_id[str(chat_id)] = last_msg_id
return last_msg_id
# ==== Фоновая задача: очистки старых сообщений ====
async def clear_old_messages():
while True:
await asyncio.sleep(600) # раз в 10 минут
now = datetime.now().timestamp()
cutoff_time = now - 86400 # 24 час
start_len = len(source_messages)
for str_msg_id, stored_m_info in tuple(source_messages.items()):
if stored_m_info['timestamp'] < cutoff_time:
del source_messages[str_msg_id]
if start_len != len(source_messages):
log.debug(f'Очищены старые сообщения: {start_len}{len(source_messages)}')
save_state()
async def check_reply_chain(msg_id, depth=1, max_depth=10) -> bool|int:
"""Рекурсивная проверка цепочки ответов
В случае совпадения возвращает replied_msg.id"""
if depth > max_depth:
log.warning(f"🔁 Превышена максимальная глубина цепочки ({max_depth}), остановлено.")
return False
try:
replied_msg = await client.get_messages(source_channel_username, ids=msg_id)
except Exception as e:
log.warning(f"Не удалось получить сообщение по ID {msg_id}: {e}")
return False
if not replied_msg:
return False
text = replied_msg.text or ""
if check(text):
log.info(f"✔️ Есть совпадение на уровне {depth}: {shorten(text,30)}")
return replied_msg.id
else:
log.info(f"🔍 Проверяю сообщение на уровне {depth}: {shorten(text,30)}")
if replied_msg.reply_to:
return await check_reply_chain(replied_msg.reply_to.reply_to_msg_id, depth + 1)
return False
# ==== Удаление системных сообщений о входе/выходе в целевой группе ====
@client.on(events.ChatAction(chats=groups_clean_srv_msgs))
async def del_join_leave(event: events):
actions = {
'добавлен': event.user_added,
'присодинился': event.user_joined,
'покинул': event.user_left,
}
if True in actions.values():
action = [act for act, param in actions.items() if param]
action = ', '.join(action)
full_name = ' '.join([name for name in (event.user.first_name, event.user.last_name) if name])
await event.delete()
log.info(f"Удалено сообщение: Пользователь {full_name}({event.user_id}) {action} чат {event.chat.id}")
async def fetch_system_messages(days_to_check=5) -> tuple[int, int]:
"""Собирает системные сообщения за последние N дней"""
system_messages = []
for chat_id in groups_clean_srv_msgs:
try:
chat = await client.get_entity(chat_id)
except ValueError:
log.info("Не удалось найти чат. Убедитесь, что вы состоите в группе.")
return system_messages
log.info(f"🔍 Ищем системные сообщения в '{chat.title}' за последние {days_to_check} дней…\n")
today = datetime.now()
cutoff_date = today - timedelta(days=days_to_check)
async for message in client.iter_messages(chat):
# Проверяем дату
if message.date.replace(tzinfo=None) < cutoff_date:
continue
# Проверяем, является ли сообщение системным (ChatAction)
if message.action:
time_str = message.date.strftime('%Y-%m-%d %H:%M:%S')
log.info(f"[{time_str}] | Тип действия: {message.action}")
system_messages.append((chat.id, message.id))
log.info(f"✅ Найдено {len(system_messages)} системных сообщений для удаления")
return system_messages
async def delete_system_msgs(messages_list):
"""Удаляет найденные системные сообщения"""
if not messages_list:
log.info("🚫 Нет сообщений для удаления")
return
async with client:
chat_id, _ = messages_list[0]
message_ids = [msg_id for _, msg_id in messages_list]
result = await client.delete_messages(chat_id, message_ids)
deleted_count = len(result._deleted)
log.info(f"🗑 Удалено {deleted_count} из {len(message_ids)} сообщений")
# ==== Запуск бота ====
async def main():
# log.info("🧹 Начинаем очистку старых системных сообщений…")
# messages = await fetch_system_messages(days_to_check=5)
# await delete_system_msgs(messages)
# if not client.is_connected():
# log.info("🔄 Клиент не подключён, пытаемся восстановить соединение…")
# await client.connect()
asyncio.create_task(clear_old_messages())
try:
await client.start()
await prepare_entities(target_groups)
log.info("Env Parameters:"
"\n"
f"\nsource_channel_username = {source_channel_username}"
f"\ntarget_groups = {target_groups}"
f"\ngroups_clean_srv_msgs = {groups_clean_srv_msgs}"
f"\nfilter_keywords = {filter_keywords}"
f"\nfilter_negative_keywords= {filter_negative_keywords}"
"\n"
"\n😊 Бот запущен. Ожидание событий…"
)
await client.run_until_disconnected()
except ConnectionError as e:
log.error(f"Ошибка подключения к Telegram: {e}. Повторная попытка через 10 секунд...")
await asyncio.sleep(10) # Подождать перед повторной попыткой
except KeyboardInterrupt:
log.info("🛑 Бот остановлен вручную.")
finally:
if client.is_connected():
await client.disconnect()
# =======================
if __name__ == '__main__':
client.loop.run_until_complete(main())