Zoey.asia

December 30, 2024

Telegram 频道克隆方案:基于 Telethon 的自动化工具

share6.1 min to read

概述

在 Telegram 中,频道是信息传播的重要载体,许多用户和组织依赖频道来分享内容、进行讨论和管理社区。然而,随着频道内容的不断增加,备份和迁移需求日益凸显。本文介绍一个基于 Telethon 的自动化工具,用于克隆 Telegram 频道,包括下载文件和转发消息等功能。

项目结构与功能概述

作业流程

主要功能

  1. 文件下载:从指定频道下载特定类型的文件(如 .pdf, .docx 等)。
  2. 消息统计:统计频道中指定类型的文件数量,并记录最近的文件信息。
  3. 消息转发:将指定频道的消息转发到另一个频道,支持批量处理和错误重试。
  4. 频道克隆:克隆整个频道的内容,包括消息和文件,并记录消息ID映射关系。

代码实现细节

1. 连接与认证

async def connect(self):
    self.client = TelegramClient('session_name', API_ID, API_HASH)
    await self.client.start(phone=PHONE_NUMBER)
    logger.info("已连接到Telegram")

该方法负责创建 TelegramClient 实例并进行认证。TelegramClient 是 Telethon 提供的客户端类,用于与 Telegram 服务器进行交互。

2. 文件下载

async def download_files(self, days_ago=7):
    # 计算起始时间
    start_date = datetime.now() - timedelta(days=days_ago)
    
    # 获取频道
    channel = await self.client.get_entity(CHANNEL_USERNAME)
    
    # 获取消息并下载文件
    async for message in self.client.iter_messages(channel, offset_date=start_date):
        if message.media and isinstance(message.media, MessageMediaDocument):
            document = message.media.document
            file_name = document.attributes[-1].file_name
            if any(file_name.lower().endswith(ext) for ext in FILE_TYPES):
                file_path = os.path.join(DOWNLOAD_PATH, file_name)
                if not os.path.exists(file_path):
                    await self.client.download_media(message, file_path)

该方法从指定频道下载指定类型且最近 days_ago 天内的文件。通过 iter_messages 方法获取消息,并检查消息是否包含文件以及文件类型是否在允许的范围内。

3. 消息统计

async def count_channel_files(self, channel_username):
    # 获取频道
    channel = await self.client.get_entity(channel_username)
    
    # 初始化计数器
    total_files = 0
    file_type_count = {ext: 0 for ext in FILE_TYPES}
    file_list = []
    
    # 获取所有消息
    async for message in self.client.iter_messages(channel):
        if message.media and isinstance(message.media, MessageMediaDocument):
            document = message.media.document
            file_name = document.attributes[-1].file_name
            if any(file_name.lower().endswith(ext) for ext in FILE_TYPES):
                total_files += 1
                file_type_count[ext] += 1
                file_list.append({
                    'name': file_name,
                    'size': f"{file_size:.2f}MB",
                    'type': ext,
                    'message_id': message.id,
                    'date': message.date.strftime('%Y-%m-%d %H:%M:%S')
                })

该方法统计指定频道中指定类型的文件数量,并记录最近的文件信息。通过 iter_messages 方法获取所有消息,并检查消息是否包含文件以及文件类型是否在允许的范围内。

4. 消息转发

async def forward_all_files(self, from_channel, to_channel):
    # 获取源频道和目标频道
    source_channel = await self.client.get_entity(from_channel)
    target_channel = await self.client.get_entity(to_channel)
    
    # 初始化计数器
    total_forwarded = 0
    failed_forwards = []
    
    # 获取所有消息
    async for message in self.client.iter_messages(source_channel):
        if message.media and isinstance(message.media, MessageMediaDocument):
            file_name = message.media.document.attributes[-1].file_name
            if any(file_name.lower().endswith(ext) for ext in FILE_TYPES):
                try:
                    await self.client.forward_messages(target_channel, messages=message, from_peer=source_channel)
                    total_forwarded += 1
                except Exception as e:
                    failed_forwards.append({'message_id': message.id, 'error': str(e)})

该方法将指定频道中的文件转发到另一个频道。通过 iter_messages 方法获取所有消息,并检查消息是否包含文件以及文件类型是否在允许的范围内。使用 forward_messages 方法进行转发,并记录成功和失败的消息ID。

5. 频道克隆

async def clone_channel(self, from_channel, to_channel, start_from_id=None, batch_size=20):
    # 获取源频道和目标频道
    source_channel = await self.client.get_entity(from_channel)
    target_channel = await self.client.get_entity(to_channel)
    
    # 初始化计数器和状态
    total_forwarded = 0
    failed_forwards = []
    last_processed_id = None
    
    # 获取所有消息
    async for message in self.client.iter_messages(source_channel, limit=None, reverse=True, min_id=min_id):
        if message.media:
            messages.append(message)
            last_processed_id = message.id
            if len(messages) >= batch_size:
                await self._process_clone_batch(messages, source_channel, target_channel, total_forwarded)
                total_forwarded += len(messages)
                messages = []
                self._save_clone_status(status_file, last_processed_id, total_forwarded)
                await asyncio.sleep(random.randint(60, 120))

该方法克隆整个频道的内容,包括消息和文件。通过 iter_messages 方法获取所有消息,并检查消息是否包含文件。使用 _process_clone_batch 方法处理消息批次,并记录进度和状态。

6. 错误处理与重试机制

async def _retry_forward(self, message, target_channel, source_channel, max_retries=3):
    for i in range(max_retries):
        try:
            await self.client.send_message(target_channel, new_text, file=message.media)
            return True
        except Exception as e:
            logger.error(f"重试 {i+1}/{max_retries} 失败: {str(e)}")
            await asyncio.sleep(30)
    return False

该方法在转发消息失败时进行重试,最多重试 max_retries 次,每次重试之间等待 30 秒。

7. 日志记录与状态保存

def _save_clone_status(self, status_file, last_id, total_count):
    with open(status_file, 'w') as f:
        json.dump({
            'last_processed_id': last_id,
            'total_forwarded': total_count,
            'last_update': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        }, f)

该方法保存克隆进度,包括最后处理的消息ID和已转发的消息数量。通过 json.dump 方法将进度信息保存到文件中。

总结

本文介绍了一个基于 Telethon 的 Telegram 频道克隆工具,该工具可以自动下载文件、统计消息、转发消息和克隆频道。通过合理的设计和实现,该工具能够高效地处理大量的消息和文件,并且具有良好的错误处理和重试机制。

tip

在使用该工具时,请确保遵守 Telegram 的使用规则和法律法规,不要用于非法用途。

未来工作

  1. 性能优化:进一步优化消息处理和转发的效率,减少等待时间。
  2. 错误处理:增强错误处理机制,提高工具的 robustness。
  3. 功能扩展:增加对更多消息类型(如图片、视频等)的支持。

参考资料