Основной функционал: * Портативная среда выполнения на виндовс * Пересылка сообщений из публичного канала по ключевым словам * Отслеживание изменений пересланных сообщений * Очистка от системных
298 lines
14 KiB
Python
298 lines
14 KiB
Python
# -*- coding: utf-8 -*-
|
||
|
||
from telethon import TelegramClient, events
|
||
import re
|
||
import os
|
||
import json
|
||
import logging
|
||
import asyncio
|
||
from datetime import datetime, timedelta
|
||
from dotenv import find_dotenv, load_dotenv
|
||
|
||
# ==== Настройка логирования ====
|
||
logging.basicConfig(
|
||
format='[%(levelname)s] %(asctime)s | %(message)s',
|
||
level=logging.INFO,
|
||
handlers=[
|
||
logging.StreamHandler() # Вывод в консоль
|
||
# logging.FileHandler("bot.log", encoding="utf-8") # Раскомментируй, чтобы сохранять логи в файл
|
||
]
|
||
)
|
||
log = logging.getLogger(__name__)
|
||
|
||
# === Загрузка переменных окружения ===
|
||
env = find_dotenv()
|
||
log.info(f'Использую: {env}')
|
||
load_dotenv(env)
|
||
|
||
# === Данные юзер-бота ===
|
||
api_id=os.getenv('api_id')
|
||
api_hash=os.getenv('api_hash')
|
||
session_name=os.getenv('session_name')
|
||
|
||
# === Настройки ===
|
||
source_channel_username=os.getenv('source_channel_username') # Канал, который слушаем
|
||
target_group_chat_id=int(os.getenv('target_group_chat_id')) # Группа, куда пересылаем
|
||
|
||
filter_keywords=os.getenv('filter_keywords') # Строки или регулярное выражение для поиска
|
||
filter_keywords=filter_keywords.split(',')
|
||
filter_negative_keywords=os.getenv('filter_negative_keywords') # Строки или регулярное выражение для поиска
|
||
filter_negative_keywords=filter_negative_keywords.split(',')
|
||
DATA_FILE=os.getenv('DATA_FILE') # Файл для сохранения сообщений
|
||
MAX_MESSAGES=int(os.getenv('MAX_MESSAGES')) # Количество хранящихся сообщений
|
||
CHECK_INTERVAL=int(os.getenv('CHECK_INTERVAL')) # Периодичность проверки удалений на канале
|
||
|
||
# =======================
|
||
|
||
client = TelegramClient(session_name, api_id, api_hash)
|
||
|
||
# ==== Загрузка/сохранение состояния ====
|
||
def load_state():
|
||
if os.path.exists(DATA_FILE):
|
||
with open(DATA_FILE, 'r', encoding='utf-8') as f:
|
||
return json.load(f)
|
||
return {}
|
||
|
||
def save_state(data):
|
||
with open(DATA_FILE, 'w', encoding='utf-8') as f:
|
||
json.dump(data, f, ensure_ascii=False, indent=2)
|
||
|
||
# ==== Хранение данных ====
|
||
source_messages = load_state() # {msg_id: {'text': ..., 'forwarded_msg_id': ...}}
|
||
|
||
# =======================
|
||
def shorten(text: str, length: int = 80) -> str:
|
||
'''Возвращает короткое сообщение для логов'''
|
||
text = text.replace('\n', ' ↵ ')
|
||
if len(text)>length:
|
||
return f'{text[:length-1]}…'
|
||
return text
|
||
|
||
# ==== Обработчик сообщений из источника ====
|
||
@client.on(events.NewMessage(chats=source_channel_username))
|
||
async def handler(event):
|
||
message_text = event.message.text or ""
|
||
msg_id = event.message.id
|
||
|
||
if message_text:
|
||
log.info(f"Новое сообщение: {shorten(message_text)}")
|
||
|
||
# Проверяем само сообщение
|
||
if any(re.search(keyword, message_text, re.IGNORECASE) for keyword in filter_keywords) \
|
||
and not any(re.search(keyword, message_text, re.IGNORECASE) for keyword in filter_negative_keywords):
|
||
new_msg = await client.forward_messages(target_group_chat_id, event.message)
|
||
source_messages[msg_id] = {
|
||
'text': message_text,
|
||
'timestamp': datetime.now().timestamp(),
|
||
'forwarded_msg_id': new_msg.id
|
||
}
|
||
# Ограничиваем размер словаря
|
||
if len(source_messages) > MAX_MESSAGES:
|
||
oldest = min(source_messages.keys(), key=lambda k: source_messages[k]['timestamp'])
|
||
del source_messages[oldest]
|
||
save_state(source_messages)
|
||
log.info(f"📩 Переслано: {shorten(message_text)}")
|
||
return
|
||
|
||
# Проверяем цепочку ответов
|
||
reply = event.message.reply_to
|
||
if reply and reply.reply_to_msg_id:
|
||
founded = await check_reply_chain(event, reply.reply_to_msg_id, depth=1)
|
||
if founded:
|
||
await client.forward_messages(target_group_chat_id, event.message)
|
||
log.info(f"⛓ Переслано по цепочке: {shorten(message_text)} | Глубина: {founded}")
|
||
|
||
|
||
# ==== Фоновая задача: проверка удалений и изменений ====
|
||
async def monitor_messages():
|
||
while True:
|
||
await asyncio.sleep(CHECK_INTERVAL)
|
||
|
||
now = datetime.now().timestamp()
|
||
cutoff_time = now - 86400 # 24 часа
|
||
|
||
for str_msg_id, stored_m_info in list(source_messages.items()):
|
||
msg_id = int(str_msg_id)
|
||
|
||
# Удаление по возрасту
|
||
if stored_m_info['timestamp'] < cutoff_time:
|
||
del source_messages[str_msg_id]
|
||
continue
|
||
|
||
# Получаем данные о пересланном сообщении
|
||
try:
|
||
target_msg = await client.get_messages(target_group_chat_id, ids=stored_m_info['forwarded_msg_id'])
|
||
except Exception as e:
|
||
log.warning(f"⚠ Не удалось получить пересланное сообщение {msg_id}: {e}")
|
||
target_msg = None
|
||
continue
|
||
# Если пересланного сообщения нет (удалено)
|
||
if target_msg is None or getattr(target_msg, 'empty', False):
|
||
log.info(f"🗑 Пересланное сообщение отсутвует (удалено администратором): {shorten(stored_m_info['text'])}")
|
||
del source_messages[str_msg_id]
|
||
continue
|
||
|
||
# Получаем сообщение из источника
|
||
try:
|
||
source_msg = await client.get_messages(source_channel_username, ids=msg_id)
|
||
except Exception as e:
|
||
log.warning(f"⚠ Не удалось получить исходное сообщение {msg_id}: {e}")
|
||
source_msg = None
|
||
continue
|
||
# Если исходного сообщения нет (удалено)
|
||
if source_msg is None or getattr(source_msg, 'empty', False):
|
||
log.info(f"🗑 Исходное сообщение удалено. Удаляю пересланное: {shorten(stored_m_info['text'])}")
|
||
await client.delete_messages(target_group_chat_id, msg_id)
|
||
del source_messages[str_msg_id]
|
||
continue
|
||
|
||
# Если текст изменился
|
||
if source_msg.text != target_msg.text:
|
||
log.info(f"📝 Сообщение изменено: {shorten(target_msg.text)} → {shorten(source_msg.text)}")
|
||
# Редактируем пересланное сообщение
|
||
try:
|
||
# Получаем ID последнего сообщения в целевом чате
|
||
last_msg = await client.get_messages(target_group_chat_id, limit=1)
|
||
last_msg_id = last_msg[0].id if last_msg else None
|
||
|
||
# Проверяем, является ли наше пересланное сообщение последним
|
||
if target_msg.id == last_msg_id:
|
||
# Это последнее сообщение — можно удалить и переслать заново
|
||
await client.delete_messages(target_group_chat_id, target_msg.id)
|
||
new_msg = await client.forward_messages(target_group_chat_id, source_msg)
|
||
source_messages[str(source_msg.id)] = {
|
||
'text': source_msg.text,
|
||
'timestamp': datetime.now().timestamp(),
|
||
'forwarded_msg_id': new_msg.id
|
||
}
|
||
log.info("🔁 Сообщение было последним → Заменено")
|
||
else:
|
||
# Не последнее → отправляем изменённый текст как ответ
|
||
new_msg = await client.send_message(
|
||
target_group_chat_id,
|
||
f"🔄 Текст изменён:\n{source_msg.text}",
|
||
reply_to=stored_m_info['forwarded_msg_id']
|
||
)
|
||
source_messages[str(source_msg.id)] = {
|
||
'text': source_msg.text,
|
||
'timestamp': datetime.now().timestamp(),
|
||
'forwarded_msg_id': new_msg.id
|
||
}
|
||
log.info("💬 Сообщение не последнее → Отправлен ответ")
|
||
except Exception as e:
|
||
log.error(f"❌ Не удалось обновить: {e}")
|
||
|
||
# Сохраняем состояние
|
||
save_state(source_messages)
|
||
|
||
|
||
async def check_reply_chain(event, msg_id, depth=1, max_depth=10) -> bool|int:
|
||
"""Рекурсивная проверка цепочки ответов"""
|
||
if depth > max_depth:
|
||
log.warning(f"🔁 Превышена максимальная глубина цепочки ({max_depth}), остановлено.")
|
||
return False
|
||
|
||
try:
|
||
replied_msg = await event.get_reply_message()
|
||
except Exception as e:
|
||
log.warning(f"⚠ Не удалось получить сообщение по ID {msg_id}: {e}")
|
||
return False
|
||
|
||
if not replied_msg:
|
||
return False
|
||
|
||
text = replied_msg.text or ""
|
||
|
||
log.info(f"🔍 Проверяю сообщение на уровне {depth}: {shorten(text,30)}")
|
||
|
||
if any(re.search(keyword, text, re.IGNORECASE) for keyword in filter_keywords) \
|
||
and not any(re.search(keyword, text, re.IGNORECASE) for keyword in filter_negative_keywords):
|
||
return depth
|
||
|
||
if replied_msg.reply_to:
|
||
return await check_reply_chain(event, replied_msg.reply_to.reply_to_msg_id, depth + 1)
|
||
|
||
return False
|
||
|
||
# ==== Удаление системных сообщений о входе/выходе в целевой группе ====
|
||
@client.on(events.ChatAction(chats=target_group_chat_id))
|
||
async def del_join_leave(event: events):
|
||
actions = {
|
||
'добавлен': event.user_added,
|
||
'присодинился': event.user_joined,
|
||
'покинул': event.user_left,
|
||
}
|
||
if True in actions.values():
|
||
action = [act for act, param in actions.items() if param]
|
||
action = ', '.join(action)
|
||
full_name = ' '.join([name for name in (event.user.first_name, event.user.last_name) if name])
|
||
await event.delete()
|
||
log.info(f"Удалено сообщение: Пользователь {full_name}({event.user_id}) {action} чат {target_group_chat_id}")
|
||
|
||
|
||
async def fetch_system_messages(days_to_check=5) -> tuple[int, int]:
|
||
"""Собирает системные сообщения за последние N дней"""
|
||
system_messages = []
|
||
|
||
async with client:
|
||
try:
|
||
chat = await client.get_entity(target_group_chat_id)
|
||
except ValueError:
|
||
log.info("❌ Не удалось найти чат. Убедитесь, что вы состоите в группе.")
|
||
return system_messages
|
||
|
||
log.info(f"🔍 Ищем системные сообщения в '{chat.title}' за последние {days_to_check} дней…\n")
|
||
|
||
today = datetime.now()
|
||
cutoff_date = today - timedelta(days=days_to_check)
|
||
|
||
async for message in client.iter_messages(chat):
|
||
# Проверяем дату
|
||
if message.date.replace(tzinfo=None) < cutoff_date:
|
||
continue
|
||
|
||
# Проверяем, является ли сообщение системным (ChatAction)
|
||
if message.action:
|
||
time_str = message.date.strftime('%Y-%m-%d %H:%M:%S')
|
||
log.info(f"[{time_str}] | Тип действия: {message.action}")
|
||
system_messages.append((chat.id, message.id))
|
||
|
||
log.info(f"✅ Найдено {len(system_messages)} системных сообщений для удаления")
|
||
return system_messages
|
||
|
||
|
||
async def delete_system_msgs(messages_list):
|
||
"""Удаляет найденные системные сообщения"""
|
||
if not messages_list:
|
||
log.info("🚫 Нет сообщений для удаления")
|
||
return
|
||
|
||
async with client:
|
||
chat_id, _ = messages_list[0]
|
||
message_ids = [msg_id for _, msg_id in messages_list]
|
||
|
||
result = await client.delete_messages(chat_id, message_ids)
|
||
deleted_count = len(result._deleted)
|
||
|
||
log.info(f"🗑 Удалено {deleted_count} из {len(message_ids)} сообщений")
|
||
|
||
|
||
# ==== Запуск бота ====
|
||
async def main():
|
||
await client.start()
|
||
|
||
# log.info("🧹 Начинаем очистку старых системных сообщений…")
|
||
# messages = await fetch_system_messages(days_to_check=5)
|
||
# await delete_system_msgs(messages)
|
||
# if not client.is_connected():
|
||
# log.info("🔄 Клиент не подключён, пытаемся восстановить соединение…")
|
||
# await client.connect()
|
||
|
||
asyncio.create_task(monitor_messages())
|
||
|
||
log.info("😊 Бот запущен. Ожидание событий…")
|
||
await client.run_until_disconnected()
|
||
|
||
# =======================
|
||
if __name__ == '__main__':
|
||
client.loop.run_until_complete(main()) |