概述
在 Telegram 中,频道是信息传播的重要载体,许多用户和组织依赖频道来分享内容、进行讨论和管理社区。然而,随着频道内容的不断增加,备份和迁移需求日益凸显。本文介绍一个基于 Telethon 的自动化工具,用于克隆 Telegram 频道,包括下载文件和转发消息等功能。
项目结构与功能概述
作业流程
主要功能
- 文件下载:从指定频道下载特定类型的文件(如
.pdf
,.docx
等)。 - 消息统计:统计频道中指定类型的文件数量,并记录最近的文件信息。
- 消息转发:将指定频道的消息转发到另一个频道,支持批量处理和错误重试。
- 频道克隆:克隆整个频道的内容,包括消息和文件,并记录消息ID映射关系。
代码实现细节
1. 连接与认证
async def connect(self):
self.client = TelegramClient('session_name', API_ID, API_HASH)
await self.client.start(phone=PHONE_NUMBER)
logger.info("已连接到Telegram")
该方法负责创建 TelegramClient 实例并进行认证。TelegramClient
是 Telethon 提供的客户端类,用于与 Telegram 服务器进行交互。
2. 文件下载
async def download_files(self, days_ago=7):
# 计算起始时间
start_date = datetime.now() - timedelta(days=days_ago)
# 获取频道
channel = await self.client.get_entity(CHANNEL_USERNAME)
# 获取消息并下载文件
async for message in self.client.iter_messages(channel, offset_date=start_date):
if message.media and isinstance(message.media, MessageMediaDocument):
document = message.media.document
file_name = document.attributes[-1].file_name
if any(file_name.lower().endswith(ext) for ext in FILE_TYPES):
file_path = os.path.join(DOWNLOAD_PATH, file_name)
if not os.path.exists(file_path):
await self.client.download_media(message, file_path)
该方法从指定频道下载指定类型且最近 days_ago
天内的文件。通过 iter_messages
方法获取消息,并检查消息是否包含文件以及文件类型是否在允许的范围内。
3. 消息统计
async def count_channel_files(self, channel_username):
# 获取频道
channel = await self.client.get_entity(channel_username)
# 初始化计数器
total_files = 0
file_type_count = {ext: 0 for ext in FILE_TYPES}
file_list = []
# 获取所有消息
async for message in self.client.iter_messages(channel):
if message.media and isinstance(message.media, MessageMediaDocument):
document = message.media.document
file_name = document.attributes[-1].file_name
if any(file_name.lower().endswith(ext) for ext in FILE_TYPES):
total_files += 1
file_type_count[ext] += 1
file_list.append({
'name': file_name,
'size': f"{file_size:.2f}MB",
'type': ext,
'message_id': message.id,
'date': message.date.strftime('%Y-%m-%d %H:%M:%S')
})
该方法统计指定频道中指定类型的文件数量,并记录最近的文件信息。通过 iter_messages
方法获取所有消息,并检查消息是否包含文件以及文件类型是否在允许的范围内。
4. 消息转发
async def forward_all_files(self, from_channel, to_channel):
# 获取源频道和目标频道
source_channel = await self.client.get_entity(from_channel)
target_channel = await self.client.get_entity(to_channel)
# 初始化计数器
total_forwarded = 0
failed_forwards = []
# 获取所有消息
async for message in self.client.iter_messages(source_channel):
if message.media and isinstance(message.media, MessageMediaDocument):
file_name = message.media.document.attributes[-1].file_name
if any(file_name.lower().endswith(ext) for ext in FILE_TYPES):
try:
await self.client.forward_messages(target_channel, messages=message, from_peer=source_channel)
total_forwarded += 1
except Exception as e:
failed_forwards.append({'message_id': message.id, 'error': str(e)})
该方法将指定频道中的文件转发到另一个频道。通过 iter_messages
方法获取所有消息,并检查消息是否包含文件以及文件类型是否在允许的范围内。使用 forward_messages
方法进行转发,并记录成功和失败的消息ID。
5. 频道克隆
async def clone_channel(self, from_channel, to_channel, start_from_id=None, batch_size=20):
# 获取源频道和目标频道
source_channel = await self.client.get_entity(from_channel)
target_channel = await self.client.get_entity(to_channel)
# 初始化计数器和状态
total_forwarded = 0
failed_forwards = []
last_processed_id = None
# 获取所有消息
async for message in self.client.iter_messages(source_channel, limit=None, reverse=True, min_id=min_id):
if message.media:
messages.append(message)
last_processed_id = message.id
if len(messages) >= batch_size:
await self._process_clone_batch(messages, source_channel, target_channel, total_forwarded)
total_forwarded += len(messages)
messages = []
self._save_clone_status(status_file, last_processed_id, total_forwarded)
await asyncio.sleep(random.randint(60, 120))
该方法克隆整个频道的内容,包括消息和文件。通过 iter_messages
方法获取所有消息,并检查消息是否包含文件。使用 _process_clone_batch
方法处理消息批次,并记录进度和状态。
6. 错误处理与重试机制
async def _retry_forward(self, message, target_channel, source_channel, max_retries=3):
for i in range(max_retries):
try:
await self.client.send_message(target_channel, new_text, file=message.media)
return True
except Exception as e:
logger.error(f"重试 {i+1}/{max_retries} 失败: {str(e)}")
await asyncio.sleep(30)
return False
该方法在转发消息失败时进行重试,最多重试 max_retries
次,每次重试之间等待 30 秒。
7. 日志记录与状态保存
def _save_clone_status(self, status_file, last_id, total_count):
with open(status_file, 'w') as f:
json.dump({
'last_processed_id': last_id,
'total_forwarded': total_count,
'last_update': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}, f)
该方法保存克隆进度,包括最后处理的消息ID和已转发的消息数量。通过 json.dump
方法将进度信息保存到文件中。
总结
本文介绍了一个基于 Telethon 的 Telegram 频道克隆工具,该工具可以自动下载文件、统计消息、转发消息和克隆频道。通过合理的设计和实现,该工具能够高效地处理大量的消息和文件,并且具有良好的错误处理和重试机制。
tip
在使用该工具时,请确保遵守 Telegram 的使用规则和法律法规,不要用于非法用途。
未来工作
- 性能优化:进一步优化消息处理和转发的效率,减少等待时间。
- 错误处理:增强错误处理机制,提高工具的 robustness。
- 功能扩展:增加对更多消息类型(如图片、视频等)的支持。