466 lines
23 KiB
Python
466 lines
23 KiB
Python
# -*- coding: utf-8 -*-
|
||
# from __future__ import annotations
|
||
|
||
from telethon import TelegramClient, events
|
||
from telethon.tl.types import PeerUser, PeerChat, PeerChannel, Message
|
||
import re
|
||
import os
|
||
import json
|
||
import logging
|
||
import asyncio
|
||
from datetime import datetime, timedelta
|
||
from dotenv import find_dotenv, load_dotenv
|
||
from typing import Optional
|
||
|
||
# ==== Настройка логирования ====
|
||
log = logging.getLogger(__name__)
|
||
log.setLevel(logging.DEBUG)
|
||
console_handler = logging.StreamHandler()
|
||
console_handler.setFormatter(logging.Formatter('[%(levelname)s] %(asctime)s | %(message)s'))
|
||
log.addHandler(console_handler)
|
||
|
||
# === Загрузка переменных окружения ===
|
||
env = find_dotenv()
|
||
log.info(f'Использую: {env}')
|
||
load_dotenv(env)
|
||
|
||
# === Данные юзер-бота ===
|
||
api_id=os.getenv('api_id')
|
||
api_hash=os.getenv('api_hash')
|
||
session_name=os.getenv('session_name') or 'userbot'
|
||
if not all([api_id, api_hash, session_name]):
|
||
log.error('Не получены api_id, api_hash или session_name. Они должны быть в переменных окружения или в файле .evg')
|
||
exit(1)
|
||
|
||
# === Настройки ===
|
||
source_channel_username=os.getenv('source_channel_username') # Канал, который слушаем
|
||
target_groups=os.getenv('target_groups').split(',') # Группы, куда пересылаем
|
||
for i, s in enumerate(target_groups):
|
||
if s.isdigit() or s.startswith('-') and s[1:].isdigit():
|
||
target_groups[i] = int(s)
|
||
target_entities: list[PeerUser|PeerChat|PeerChannel] = list()
|
||
target_entities_dict: dict[int: PeerUser|PeerChat|PeerChannel] = dict()
|
||
groups_clean_srv_msgs=os.getenv('groups_clean_srv_msgs') # Группы, где удаляем сообщения о присоединившихся
|
||
if groups_clean_srv_msgs:
|
||
groups_clean_srv_msgs = groups_clean_srv_msgs.split(',')
|
||
for i, s in enumerate(groups_clean_srv_msgs):
|
||
if s.isdigit() or s.startswith('-') and s[1:].isdigit():
|
||
groups_clean_srv_msgs[i] = int(s)
|
||
else:
|
||
groups_clean_srv_msgs = list(target_groups)
|
||
filter_keywords=os.getenv('filter_keywords') # Строки или регулярное выражение для поиска
|
||
filter_keywords=filter_keywords.split(',') if filter_keywords else []
|
||
filter_negative_keywords=os.getenv('filter_negative_keywords') # Строки или регулярное выражение для поиска
|
||
filter_negative_keywords=filter_negative_keywords.split(',') if filter_negative_keywords else []
|
||
DATA_FILE=os.getenv('DATA_FILE') or 'messages.json' # Файл для сохранения сообщений
|
||
MAX_MESSAGES = os.getenv('MAX_MESSAGES') # Количество хранящихся сообщений
|
||
MAX_MESSAGES=int(MAX_MESSAGES) if MAX_MESSAGES and MAX_MESSAGES.isdigit() else 50
|
||
USER_DATA_DIR = os.getenv('USER_DATA_DIR') or 'userdata' # директория для Volume
|
||
|
||
session_name = os.path.join(USER_DATA_DIR, session_name)
|
||
DATA_FILE = os.path.join(USER_DATA_DIR, DATA_FILE)
|
||
if not os.path.exists(USER_DATA_DIR):
|
||
os.mkdir(USER_DATA_DIR, mode=666)
|
||
required_fields = {
|
||
'source_channel_username':source_channel_username,
|
||
'target_groups':target_groups,
|
||
'filter_keywords':filter_keywords,
|
||
}
|
||
missing_fields = [field_name for field_name, field_val in required_fields.items() if not field_val]
|
||
if missing_fields:
|
||
missing_fields = ', '.join(missing_fields)
|
||
log.info('api_id, api_hash и session_name успешно загружены.')
|
||
log.error(f'Не получены обязательные {missing_fields}. Они должны быть в переменных окружения или в файле .env')
|
||
exit(1)
|
||
|
||
# =======================
|
||
|
||
client = TelegramClient(session_name, api_id, api_hash)
|
||
|
||
# ==== Загрузка/сохранение состояния ====
|
||
def load_state():
|
||
if os.path.exists(DATA_FILE):
|
||
with open(DATA_FILE, 'r', encoding='utf-8') as f:
|
||
return json.load(f)
|
||
return [{},{}]
|
||
|
||
def save_state():
|
||
with open(DATA_FILE, 'w', encoding='utf-8') as f:
|
||
json.dump([last_known_msg_id, source_messages], f, ensure_ascii=False, indent=2)
|
||
log.debug('Состояние сохранено')
|
||
|
||
# ==== Хранение данных ====
|
||
last_known_msg_id, source_messages = load_state()
|
||
# last_known_msg_id {chat_id: msg_id}
|
||
# source_messages {msg_id: {'text': ..., 'forwarded_msg_id': {chat_id: message_id}}}
|
||
|
||
# =======================
|
||
def shorten(text: str, length: int = 80) -> str:
|
||
'''Возвращает короткое сообщение для логов'''
|
||
text = text.replace('\n', ' ↵ ')
|
||
if len(text)>length:
|
||
return f'{text[:length-1]}…'
|
||
return text
|
||
|
||
async def prepare_entities(target_groups: list) -> list:
|
||
"""
|
||
Обрабатывает список target_groups и возвращает список сущностей для рассылки сообщений.
|
||
|
||
:param client: Экземпляр TelegramClient
|
||
:param target_groups: Список, содержащий chat_id, пригласительные ссылки или имена пользователей
|
||
:return: Список сущностей, готовых для отправки сообщений
|
||
"""
|
||
for item in target_groups:
|
||
try:
|
||
# Проверяем, является ли элемент числом (chat_id)
|
||
if isinstance(item, (int, str)) and str(item).isdigit() or str(item).startswith('-') and str(item)[1:].isdigit():
|
||
# Если это chat_id, преобразуем в Peer
|
||
chat_id = int(item)
|
||
entity = await client.get_entity(chat_id)
|
||
target_entities_dict[int(item)] = entity
|
||
target_entities.append(entity)
|
||
|
||
# Проверяем, является ли элемент username (начинается с @)
|
||
elif isinstance(item, str) and item.startswith('@'):
|
||
entity = await client.get_entity(item)
|
||
target_entities_dict[entity.chat.id] = entity
|
||
target_entities.append(entity)
|
||
|
||
# Проверяем, является ли элемент пригласительной ссылкой
|
||
elif isinstance(item, str) and (item.startswith('t.me/') or item.startswith('https://t.me/')):
|
||
# Удаляем https://, если есть
|
||
link = item.replace('https://', '')
|
||
# Проверяем, является ли ссылка на публичный канал/группу (@username)
|
||
if re.match(r'^t\.me/@[a-zA-Z0-9_]+$', link):
|
||
entity = await client.get_entity(link.replace('t.me/', ''))
|
||
target_entities_dict[entity.chat.id] = entity
|
||
target_entities.append(entity)
|
||
# Проверяем, является ли ссылка на приватный чат/канал (t.me/joinchat/XXXX или t.me/+XXXX)
|
||
elif re.match(r'^t\.me/(joinchat/|\+)[a-zA-Z0-9_-]+$', link):
|
||
# Для приватных чатов нужно сначала присоединиться
|
||
from telethon.tl.functions.messages import ImportChatInviteRequest
|
||
hash = link.split('/')[-1].replace('+', '')
|
||
try:
|
||
result = await client(ImportChatInviteRequest(hash))
|
||
entity = result.chats[0] # Получаем сущность чата
|
||
target_entities_dict[entity.chat.id] = entity
|
||
target_entities.append(entity)
|
||
except Exception as e:
|
||
print(f"Ошибка при попытке присоединиться к чату {link}: {e}")
|
||
else:
|
||
print(f"Неподдерживаемый формат ссылки: {item}")
|
||
|
||
else:
|
||
print(f"Неподдерживаемый формат элемента: {item}")
|
||
|
||
except ValueError as e:
|
||
print(f"Ошибка при обработке {item}: {e}")
|
||
except Exception as e:
|
||
print(f"Неизвестная ошибка при обработке {item}: {e}")
|
||
|
||
|
||
async def forward_to_targets(
|
||
message: Message,
|
||
targets: Optional[list[str | int]] = None,
|
||
source_reply_to_msg_id: Optional[str | int] = None
|
||
) -> list[Message]:
|
||
# Пересылаем сообщения
|
||
new_msgs = []
|
||
for entity in (targets or target_entities):
|
||
if isinstance(entity, (str, int)):
|
||
entity = target_entities_dict[int(entity)]
|
||
if source_reply_to_msg_id and (forwarded_info:=source_messages.get(str(source_reply_to_msg_id))):
|
||
for str_chat_id, target_msg_id in forwarded_info['forwarded_msg_id'].items():
|
||
if str_chat_id == str(entity.id):
|
||
link = await client.get_message_link(message)
|
||
text = f'Переслано из [{message.chat.title}]({link})\n{message.text}'
|
||
new_msg = await client.send_message(str_chat_id, text, reply_to=target_msg_id, parse_mode='markdown')
|
||
else:
|
||
new_msg = await client.forward_messages(entity, message)
|
||
new_msgs.append(new_msg)
|
||
# Запоминаем пересланные данные сообщения
|
||
source_messages[str(message.id)] = {
|
||
'text': message.text,
|
||
'timestamp': message.date.timestamp(),
|
||
'forwarded_msg_id': {str(msg.chat.id): msg.id for msg in new_msgs}
|
||
}
|
||
log.info(f"📩 Переслано: {shorten(message.text)}")
|
||
# Обновляем последние известные id для канала
|
||
for new_msg in new_msgs:
|
||
last_known_msg_id[str(new_msg.chat.id)] = new_msg.id
|
||
# Ограничиваем размер словаря
|
||
if len(source_messages) > MAX_MESSAGES:
|
||
oldest = min(source_messages.keys(), key=lambda m_id: source_messages[m_id]['timestamp'])
|
||
del source_messages[oldest]
|
||
# Сохраняем
|
||
save_state()
|
||
return new_msgs
|
||
|
||
|
||
# ==== Обработчик сообщений из получателей ====
|
||
@client.on(events.NewMessage(chats=target_entities))
|
||
async def handler(event: events.NewMessage.Event):
|
||
last_known_msg_id[str(event.chat.id)] = event.message.id
|
||
log.debug(f'last_known_msg_id[{event.chat.id}]={event.message.id} {event.chat.title}')
|
||
|
||
|
||
def check(text) -> Optional[bool]:
|
||
text = text or ""
|
||
if any(re.search(keyword, text, re.IGNORECASE) for keyword in filter_keywords) \
|
||
and not any(re.search(keyword, text, re.IGNORECASE) for keyword in filter_negative_keywords):
|
||
return True
|
||
|
||
|
||
# ==== Обработчик сообщений из источника ====
|
||
@client.on(events.NewMessage(chats=source_channel_username))
|
||
async def handler_NewMessages(event: events.NewMessage.Event):
|
||
message_text = event.message.text or ""
|
||
last_known_msg_id[source_channel_username] = event.message.id
|
||
|
||
if message_text:
|
||
log.info(f"Новое сообщение: {shorten(message_text)}")
|
||
|
||
# Проверяем само сообщение
|
||
if check(message_text):
|
||
await forward_to_targets(event.message)
|
||
return
|
||
|
||
# Проверяем цепочку ответов
|
||
reply = event.message.reply_to
|
||
if reply and reply.reply_to_msg_id:
|
||
source_reply_to_msg_id = await check_reply_chain(reply.reply_to_msg_id, depth=1)
|
||
if source_reply_to_msg_id:
|
||
await forward_to_targets(event.message, source_reply_to_msg_id=source_reply_to_msg_id)
|
||
log.info(f"⛓ Переслано по цепочке: {shorten(message_text)}")
|
||
|
||
|
||
@client.on(events.MessageEdited(chats=source_channel_username))
|
||
async def handler_edited(event: events.MessageEdited.Event):
|
||
stored_m_info = source_messages.get(str(event.message.id))
|
||
if stored_m_info and stored_m_info['text'] != event.message.text:
|
||
log.info(f"📝 Сообщение изменено: {shorten(stored_m_info['text'])} → {shorten(event.message.text)}")
|
||
# Редактируем пересланное сообщение
|
||
for str_chat_id, msg_id in stored_m_info['forwarded_msg_id'].items():
|
||
chat_id = int(str_chat_id)
|
||
# Получаем ID последнего сообщения в целевом чате
|
||
last_msg_id = last_known_msg_id.get(str_chat_id)
|
||
if not last_msg_id:
|
||
last_msg_id = await update_last_msg_id(chat_id)
|
||
log.debug(f'chat_id={chat_id}, stored_msg_id={msg_id}, last_msg_id_on_channel={last_msg_id}')
|
||
# Проверяем, является ли наше пересланное сообщение последним
|
||
if msg_id == last_msg_id:
|
||
# Это последнее сообщение — можно удалить и переслать заново
|
||
await forward_to_targets(event.message, targets=[chat_id])
|
||
await client.delete_messages(chat_id, msg_id)
|
||
log.info(f"🔁 {chat_id} Сообщение было последним → Заменено")
|
||
else:
|
||
# Не последнее → отправляем изменённый текст как ответ
|
||
new_msg = await client.send_message(
|
||
chat_id,
|
||
f"🔄 Текст изменён:\n{event.message.text}",
|
||
reply_to=stored_m_info['forwarded_msg_id'][str_chat_id]
|
||
)
|
||
# Запоминаем пересланные данные сообщения
|
||
sm = source_messages.get(str(event.message.id))
|
||
if not sm:
|
||
source_messages[str(event.message.id)] = {
|
||
'text': event.message.text,
|
||
'timestamp': event.message.date.timestamp(),
|
||
'forwarded_msg_id': {str(new_msg.chat.id): new_msg.id}
|
||
}
|
||
else:
|
||
sm['text'] = event.message.text
|
||
sm['timestamp'] = event.message.date.timestamp()
|
||
sm['forwarded_msg_id'][str(new_msg.chat.id)] = new_msg.id
|
||
# Обновляем последние известные id для канала
|
||
last_known_msg_id[str(new_msg.chat.id)] = new_msg.id
|
||
|
||
log.info(f"💬 {chat_id} Сообщение не последнее → Отправлен ответ")
|
||
|
||
|
||
@client.on(events.MessageDeleted(chats=source_channel_username))
|
||
async def handler_deleted(event: events.MessageDeleted.Event):
|
||
for msg_id in event.deleted_ids:
|
||
stored_m_info = source_messages.get(str(msg_id))
|
||
if stored_m_info:
|
||
log.info(f"🗑 Исходное сообщение удалено. Удаляю пересланное: {shorten(stored_m_info['text'])}")
|
||
for str_chat_id, fwd_msg_id in tuple(stored_m_info['forwarded_msg_id'].items()):
|
||
chat_id = int(str_chat_id)
|
||
await client.delete_messages(chat_id, fwd_msg_id)
|
||
del stored_m_info['forwarded_msg_id'][str_chat_id]
|
||
if not stored_m_info['forwarded_msg_id']:
|
||
del source_messages[str(msg_id)]
|
||
|
||
|
||
async def get_batch_messages(chat_id, min_id, max_id):
|
||
chat_id = int(chat_id)
|
||
approx_count = max_id - min_id + 1
|
||
limit = int(approx_count * 1.05) # Увеличиваем лимит на 5%, чтобы точно зацепить все нужные
|
||
limit = min(limit, 3000) # Максимум 3000 (Telegram API limit)
|
||
|
||
log.debug(f"🔍 Запрашиваю {limit} сообщений для чата {chat_id} из диапазона {min_id}–{max_id}")
|
||
|
||
try:
|
||
# Получаем batch сообщений
|
||
batch = await client.get_messages(chat_id, limit=limit)
|
||
batch_dict = {msg.id: msg for msg in batch}
|
||
return batch_dict
|
||
except Exception as e:
|
||
log.error(f"⚠ Ошибка при получении batch-сообщений: {e}")
|
||
return None
|
||
|
||
|
||
async def update_last_msg_id(chat_id: int|str) -> int:
|
||
# Получаем ID последнего сообщения в целевом чате
|
||
chat_id = int(chat_id)
|
||
last_msg = await client.get_messages(chat_id, limit=1)
|
||
last_msg_id = last_msg[0].id if last_msg else None
|
||
if last_msg_id:
|
||
last_known_msg_id[str(chat_id)] = last_msg_id
|
||
return last_msg_id
|
||
|
||
|
||
# ==== Фоновая задача: очистки старых сообщений ====
|
||
async def clear_old_messages():
|
||
while True:
|
||
await asyncio.sleep(600) # раз в 10 минут
|
||
now = datetime.now().timestamp()
|
||
cutoff_time = now - 86400 # 24 час
|
||
start_len = len(source_messages)
|
||
for str_msg_id, stored_m_info in tuple(source_messages.items()):
|
||
if stored_m_info['timestamp'] < cutoff_time:
|
||
del source_messages[str_msg_id]
|
||
if start_len != len(source_messages):
|
||
log.debug(f'Очищены старые сообщения: {start_len} → {len(source_messages)}')
|
||
save_state()
|
||
|
||
|
||
async def check_reply_chain(msg_id, depth=1, max_depth=10) -> bool|int:
|
||
"""Рекурсивная проверка цепочки ответов
|
||
В случае совпадения возвращает replied_msg.id"""
|
||
if depth > max_depth:
|
||
log.warning(f"🔁 Превышена максимальная глубина цепочки ({max_depth}), остановлено.")
|
||
return False
|
||
|
||
try:
|
||
replied_msg = await client.get_messages(source_channel_username, ids=msg_id)
|
||
except Exception as e:
|
||
log.warning(f"⚠ Не удалось получить сообщение по ID {msg_id}: {e}")
|
||
return False
|
||
|
||
if not replied_msg:
|
||
return False
|
||
|
||
text = replied_msg.text or ""
|
||
|
||
if check(text):
|
||
log.info(f"✔️ Есть совпадение на уровне {depth}: {shorten(text,30)}")
|
||
return replied_msg.id
|
||
else:
|
||
log.info(f"🔍 Проверяю сообщение на уровне {depth}: {shorten(text,30)}")
|
||
|
||
if replied_msg.reply_to:
|
||
return await check_reply_chain(replied_msg.reply_to.reply_to_msg_id, depth + 1)
|
||
|
||
return False
|
||
|
||
# ==== Удаление системных сообщений о входе/выходе в целевой группе ====
|
||
@client.on(events.ChatAction(chats=groups_clean_srv_msgs))
|
||
async def del_join_leave(event: events):
|
||
actions = {
|
||
'добавлен': event.user_added,
|
||
'присодинился': event.user_joined,
|
||
'покинул': event.user_left,
|
||
}
|
||
if True in actions.values():
|
||
action = [act for act, param in actions.items() if param]
|
||
action = ', '.join(action)
|
||
full_name = ' '.join([name for name in (event.user.first_name, event.user.last_name) if name])
|
||
await event.delete()
|
||
log.info(f"Удалено сообщение: Пользователь {full_name}({event.user_id}) {action} чат {event.chat.id}")
|
||
|
||
|
||
async def fetch_system_messages(days_to_check=5) -> tuple[int, int]:
|
||
"""Собирает системные сообщения за последние N дней"""
|
||
system_messages = []
|
||
for chat_id in groups_clean_srv_msgs:
|
||
try:
|
||
chat = await client.get_entity(chat_id)
|
||
except ValueError:
|
||
log.info("❌ Не удалось найти чат. Убедитесь, что вы состоите в группе.")
|
||
return system_messages
|
||
|
||
log.info(f"🔍 Ищем системные сообщения в '{chat.title}' за последние {days_to_check} дней…\n")
|
||
|
||
today = datetime.now()
|
||
cutoff_date = today - timedelta(days=days_to_check)
|
||
|
||
async for message in client.iter_messages(chat):
|
||
# Проверяем дату
|
||
if message.date.replace(tzinfo=None) < cutoff_date:
|
||
continue
|
||
|
||
# Проверяем, является ли сообщение системным (ChatAction)
|
||
if message.action:
|
||
time_str = message.date.strftime('%Y-%m-%d %H:%M:%S')
|
||
log.info(f"[{time_str}] | Тип действия: {message.action}")
|
||
system_messages.append((chat.id, message.id))
|
||
|
||
log.info(f"✅ Найдено {len(system_messages)} системных сообщений для удаления")
|
||
|
||
return system_messages
|
||
|
||
|
||
async def delete_system_msgs(messages_list):
|
||
"""Удаляет найденные системные сообщения"""
|
||
if not messages_list:
|
||
log.info("🚫 Нет сообщений для удаления")
|
||
return
|
||
|
||
async with client:
|
||
chat_id, _ = messages_list[0]
|
||
message_ids = [msg_id for _, msg_id in messages_list]
|
||
|
||
result = await client.delete_messages(chat_id, message_ids)
|
||
deleted_count = len(result._deleted)
|
||
|
||
log.info(f"🗑 Удалено {deleted_count} из {len(message_ids)} сообщений")
|
||
|
||
|
||
# ==== Запуск бота ====
|
||
async def main():
|
||
|
||
# log.info("🧹 Начинаем очистку старых системных сообщений…")
|
||
# messages = await fetch_system_messages(days_to_check=5)
|
||
# await delete_system_msgs(messages)
|
||
# if not client.is_connected():
|
||
# log.info("🔄 Клиент не подключён, пытаемся восстановить соединение…")
|
||
# await client.connect()
|
||
|
||
asyncio.create_task(clear_old_messages())
|
||
try:
|
||
await client.start()
|
||
await prepare_entities(target_groups)
|
||
log.info("Env Parameters:"
|
||
"\n"
|
||
f"\nsource_channel_username = {source_channel_username}"
|
||
f"\ntarget_groups = {target_groups}"
|
||
f"\ngroups_clean_srv_msgs = {groups_clean_srv_msgs}"
|
||
f"\nfilter_keywords = {filter_keywords}"
|
||
f"\nfilter_negative_keywords= {filter_negative_keywords}"
|
||
"\n"
|
||
"\n😊 Бот запущен. Ожидание событий…"
|
||
)
|
||
await client.run_until_disconnected()
|
||
except ConnectionError as e:
|
||
log.error(f"Ошибка подключения к Telegram: {e}. Повторная попытка через 10 секунд...")
|
||
await asyncio.sleep(10) # Подождать перед повторной попыткой
|
||
except KeyboardInterrupt:
|
||
log.info("🛑 Бот остановлен вручную.")
|
||
finally:
|
||
if client.is_connected():
|
||
await client.disconnect()
|
||
|
||
# =======================
|
||
if __name__ == '__main__':
|
||
client.loop.run_until_complete(main()) |