Archived
1
0
Fork 0
This repository has been archived on 2024-04-26. You can view files and clone it, but cannot push or open issues or pull requests.
akari-bot/bots/discord/message.py
2023-12-14 11:11:00 +08:00

249 lines
10 KiB
Python

import datetime
import re
import traceback
from typing import List, Union
import discord
import filetype
from bots.discord.client import client
from bots.discord.info import client_name
from config import Config
from core.builtins import Bot, Plain, Image, MessageSession as MessageSessionT, MessageTaskManager
from core.builtins.message.chain import MessageChain
from core.builtins.message.internal import Embed, ErrorMessage, Voice
from core.logger import Logger
from core.types import FetchTarget as FetchTargetT, FinishedSession as FinS
from core.utils.http import download_to_cache
from database import BotDBUtil
enable_analytics = Config('enable_analytics')
async def convert_embed(embed: Embed):
if isinstance(embed, Embed):
files = []
embeds = discord.Embed(title=embed.title if embed.title is not None else discord.Embed.Empty,
description=embed.description if embed.description is not None else discord.Embed.Empty,
color=embed.color if embed.color is not None else discord.Embed.Empty,
url=embed.url if embed.url is not None else discord.Embed.Empty,
timestamp=datetime.datetime.fromtimestamp(
embed.timestamp) if embed.timestamp is not None else discord.Embed.Empty, )
if embed.image is not None:
upload = discord.File(await embed.image.get(), filename="image.png")
files.append(upload)
embeds.set_image(url="attachment://image.png")
if embed.thumbnail is not None:
upload = discord.File(await embed.thumbnail.get(), filename="thumbnail.png")
files.append(upload)
embeds.set_thumbnail(url="attachment://thumbnail.png")
if embed.author is not None:
embeds.set_author(name=embed.author)
if embed.footer is not None:
embeds.set_footer(text=embed.footer)
if embed.fields is not None:
for field in embed.fields:
embeds.add_field(name=field.name, value=field.value, inline=field.inline)
return embeds, files
class FinishedSession(FinS):
async def delete(self):
"""
用于删除这条消息。
"""
try:
for x in self.result:
await x.delete()
except Exception:
Logger.error(traceback.format_exc())
class MessageSession(MessageSessionT):
class Feature:
image = True
voice = True
embed = True
forward = False
delete = True
quote = True
wait = True
async def send_message(self, message_chain, quote=True, disable_secret_check=False, allow_split_image=True,
callback=None
) -> FinishedSession:
message_chain = MessageChain(message_chain)
if not message_chain.is_safe and not disable_secret_check:
return await self.send_message(Plain(ErrorMessage(self.locale.t("error.message.chain.unsafe"))))
self.sent.append(message_chain)
count = 0
send = []
for x in message_chain.as_sendable(self):
if isinstance(x, Plain):
send_ = await self.session.target.send(x.text,
reference=self.session.message if quote and count == 0
and self.session.message else None)
Logger.info(f'[Bot] -> [{self.target.target_id}]: {x.text}')
elif isinstance(x, Image):
send_ = await self.session.target.send(file=discord.File(await x.get()),
reference=self.session.message if quote and count == 0
and self.session.message else None)
Logger.info(f'[Bot] -> [{self.target.target_id}]: Image: {str(x.__dict__)}')
elif isinstance(x, Voice):
send_ = await self.session.target.send(file=discord.File(x.path),
reference=self.session.message if quote and count == 0
and self.session.message else None)
Logger.info(f'[Bot] -> [{self.target.target_id}]: Voice: {str(x.__dict__)}')
elif isinstance(x, Embed):
embeds, files = await convert_embed(x)
send_ = await self.session.target.send(embed=embeds,
reference=self.session.message if quote and count == 0
and self.session.message else None,
files=files)
Logger.info(f'[Bot] -> [{self.target.target_id}]: Embed: {str(x.__dict__)}')
else:
send_ = None
if send_:
send.append(send_)
count += 1
msg_ids = []
for x in send:
msg_ids.append(x.id)
if callback:
MessageTaskManager.add_callback(x.id, callback)
return FinishedSession(self, msg_ids, send)
async def check_native_permission(self):
if not self.session.message:
channel = await client.fetch_channel(self.session.target)
author = await channel.guild.fetch_member(self.session.sender)
else:
channel = self.session.message.channel
author = self.session.message.author
try:
if channel.permissions_for(author).administrator \
or isinstance(channel, discord.DMChannel):
return True
except Exception:
Logger.error(traceback.format_exc())
return False
async def to_message_chain(self):
lst = []
lst.append(Plain(self.session.message.content))
for x in self.session.message.attachments:
d = await download_to_cache(x.url)
if filetype.is_image(d):
lst.append(Image(d))
return MessageChain(lst)
def as_display(self, text_only=False):
msg = self.session.message.content
msg = re.sub(r'<@(.*?)>', r'Discord|Client|\1', msg)
return msg
async def delete(self):
try:
await self.session.message.delete()
except Exception:
Logger.error(traceback.format_exc())
sendMessage = send_message
asDisplay = as_display
toMessageChain = to_message_chain
checkNativePermission = check_native_permission
class Typing:
def __init__(self, msg: MessageSessionT):
self.msg = msg
async def __aenter__(self):
async with self.msg.session.target.typing() as typing:
return typing
async def __aexit__(self, exc_type, exc_val, exc_tb):
pass
class FetchedSession(Bot.FetchedSession):
async def send_direct_message(self, message_chain, disable_secret_check=False, allow_split_image=True):
try:
get_channel = await client.fetch_channel(self.session.target)
except Exception:
Logger.error(traceback.format_exc())
return False
self.session.target = self.session.sender = self.parent.session.target = self.parent.session.sender = get_channel
return await self.parent.send_direct_message(message_chain, disable_secret_check=disable_secret_check)
Bot.FetchedSession = FetchedSession
class FetchTarget(FetchTargetT):
name = client_name
@staticmethod
async def fetch_target(target_id, sender_id=None) -> Union[Bot.FetchedSession]:
match_channel = re.match(r'^(Discord\|(?:DM\||)Channel)\|(.*)', target_id)
if match_channel:
target_from = sender_from = match_channel.group(1)
target_id = match_channel.group(2)
if sender_id:
match_sender = re.match(r'^(Discord\|Client)\|(.*)', sender_id)
if match_sender:
sender_from = match_sender.group(1)
sender_id = match_sender.group(2)
else:
sender_id = target_id
return Bot.FetchedSession(target_from, target_id, sender_from, sender_id)
@staticmethod
async def fetch_target_list(target_list: list) -> List[Bot.FetchedSession]:
lst = []
for x in target_list:
fet = await FetchTarget.fetch_target(x)
if fet:
lst.append(fet)
return lst
@staticmethod
async def post_message(module_name, message, user_list: List[Bot.FetchedSession] = None, i18n=False, **kwargs):
if user_list is not None:
for x in user_list:
try:
msgchain = message
if isinstance(message, str):
if i18n:
msgchain = MessageChain([Plain(x.parent.locale.t(message, **kwargs))])
else:
msgchain = MessageChain([Plain(message)])
await x.send_direct_message(msgchain)
if enable_analytics:
BotDBUtil.Analytics(x).add('', module_name, 'schedule')
except Exception:
Logger.error(traceback.format_exc())
else:
get_target_id = BotDBUtil.TargetInfo.get_enabled_this(module_name, "Discord")
for x in get_target_id:
fetch = await FetchTarget.fetch_target(x.targetId)
if fetch:
try:
msgchain = message
if isinstance(message, str):
if i18n:
msgchain = MessageChain([Plain(fetch.parent.locale.t(message, **kwargs))])
else:
msgchain = MessageChain([Plain(message)])
await fetch.send_direct_message(msgchain)
if enable_analytics:
BotDBUtil.Analytics(fetch).add('', module_name, 'schedule')
except Exception:
Logger.error(traceback.format_exc())
Bot.MessageSession = MessageSession
Bot.FetchTarget = FetchTarget