191 lines
8.1 KiB
Python
191 lines
8.1 KiB
Python
import asyncio
|
||
import re
|
||
import traceback
|
||
from typing import List
|
||
from pathlib import Path
|
||
|
||
from aiocqhttp import MessageSegment
|
||
from core.bots.aiocqhttp.client import bot
|
||
from core.bots.aiocqhttp.tasks import MessageTaskManager, FinishedTasks
|
||
from core.elements import Plain, Image, MessageSession as MS, MsgInfo, Session, Voice, FetchTarget as FT
|
||
from core.elements.others import confirm_command
|
||
from core.logger import Logger
|
||
from database import BotDBUtil
|
||
import html
|
||
|
||
|
||
def convert2lst(s) -> list:
|
||
if isinstance(s, str):
|
||
return [Plain(s)]
|
||
elif isinstance(s, list):
|
||
return s
|
||
elif isinstance(s, tuple):
|
||
return list(s)
|
||
|
||
|
||
class MessageSession(MS):
|
||
class Feature:
|
||
image = True
|
||
voice = True
|
||
forward = True
|
||
|
||
async def sendMessage(self, msgchain, quote=True):
|
||
msg = MessageSegment.text('')
|
||
if quote:
|
||
msg = MessageSegment.reply(self.session.message.message_id)
|
||
if isinstance(msgchain, str):
|
||
msg = msg + (MessageSegment.text(msgchain) if msgchain != '' else '发生错误:机器人尝试发送空文本消息,请联系机器人开发者解决问题。'
|
||
'\n错误汇报地址:https://github.com/Teahouse-Studios/bot/issues/new?assignees=OasisAkari&labels=bug&template=5678.md&title=')
|
||
elif isinstance(msgchain, (list, tuple)):
|
||
for x in msgchain:
|
||
if isinstance(x, Plain):
|
||
msg = msg + MessageSegment.text(x.text)
|
||
elif isinstance(x, Image):
|
||
msg = msg + MessageSegment.image(Path(await x.get()).as_uri())
|
||
elif isinstance(x, Voice):
|
||
msg = msg + MessageSegment.record(Path(x.path).as_uri())
|
||
else:
|
||
msg = msg + MessageSegment.text('发生错误:机器人尝试发送非法消息链,请联系机器人开发者解决问题。'
|
||
'\n错误汇报地址:https://github.com/Teahouse-Studios/bot/issues/new?assignees=OasisAkari&labels=bug&template=5678.md&title=')
|
||
Logger.info(f'[Bot] -> [{self.target.targetId}]: {msg}')
|
||
if self.target.targetFrom == 'QQ|Group':
|
||
send = await bot.send_group_msg(group_id=self.session.target, message=msg)
|
||
else:
|
||
send = await bot.send_private_msg(user_id=self.session.target, message=msg)
|
||
|
||
return MessageSession(target=MsgInfo(targetId=0, senderId=0, senderName='', targetFrom='QQ|Bot',
|
||
senderFrom='QQ|Bot'),
|
||
session=Session(message=send,
|
||
target=self.session.target,
|
||
sender=self.session.sender))
|
||
|
||
async def waitConfirm(self, msgchain=None, quote=True):
|
||
send = None
|
||
if msgchain is not None:
|
||
msgchain = convert2lst(msgchain)
|
||
msgchain.append(Plain('(发送“是”或符合确认条件的词语来确认)'))
|
||
send = await self.sendMessage(msgchain, quote)
|
||
flag = asyncio.Event()
|
||
MessageTaskManager.add_task(self.session.sender, flag)
|
||
await flag.wait()
|
||
if send is not None:
|
||
await send.delete()
|
||
if FinishedTasks.get()[self.session.sender] in confirm_command:
|
||
return True
|
||
return False
|
||
|
||
async def checkPermission(self):
|
||
if self.target.targetFrom == 'QQ' or self.target.senderInfo.check_TargetAdmin(
|
||
self.target.targetId) or self.target.senderInfo.query.isSuperUser:
|
||
return True
|
||
get_member_info = await bot.call_action('get_group_member_info', group_id=self.session.target, user_id=self.session.sender)
|
||
if get_member_info['role'] in ['owner', 'admin']:
|
||
return True
|
||
return False
|
||
|
||
def checkSuperUser(self):
|
||
return True if self.target.senderInfo.query.isSuperUser else False
|
||
|
||
def asDisplay(self):
|
||
return html.unescape(self.session.message.message)
|
||
|
||
async def fake_forward_msg(self, nodelist):
|
||
if self.target.targetFrom == 'QQ|Group':
|
||
await bot.call_action('send_group_forward_msg', group_id=self.session.target, messages=nodelist)
|
||
|
||
async def delete(self):
|
||
try:
|
||
if isinstance(self.session.message, list):
|
||
for x in self.session.message:
|
||
await bot.call_action('delete_msg', message_id=x['message_id'])
|
||
else:
|
||
print(self.session.message)
|
||
await bot.call_action('delete_msg', message_id=self.session.message['message_id'])
|
||
except Exception:
|
||
traceback.print_exc()
|
||
|
||
class Typing:
|
||
def __init__(self, msg: MS):
|
||
self.msg = msg
|
||
|
||
async def __aenter__(self):
|
||
if self.msg.target.targetFrom == 'QQ|Group':
|
||
await bot.send_group_msg(group_id=self.msg.session.target, message=f'[CQ:poke,qq={self.msg.session.sender}]')
|
||
#await bot.answer_chat_action(self.msg.session.target, 'typing')
|
||
pass
|
||
|
||
async def __aexit__(self, exc_type, exc_val, exc_tb):
|
||
pass
|
||
|
||
|
||
class FetchTarget(FT):
|
||
@staticmethod
|
||
async def fetch_target(targetId) -> MessageSession:
|
||
matchTarget = re.match(r'^((?:QQ\|Group|QQ))\|(.*)', targetId)
|
||
if matchTarget:
|
||
return MessageSession(MsgInfo(targetId=targetId, senderId=targetId, senderName='',
|
||
targetFrom=matchTarget.group(1), senderFrom=matchTarget.group(1)),
|
||
Session(message=False, target=int(matchTarget.group(2)),
|
||
sender=int(matchTarget.group(2))))
|
||
else:
|
||
return False
|
||
|
||
@staticmethod
|
||
async def fetch_target_list(targetList: list) -> List[MessageSession]:
|
||
lst = []
|
||
group_list_raw = await bot.call_action('get_group_list')
|
||
group_list = []
|
||
for g in group_list_raw:
|
||
group_list.append(g['group_id'])
|
||
friend_list_raw = await bot.call_action('get_friend_list')
|
||
friend_list = []
|
||
for f in friend_list_raw:
|
||
friend_list.append(f)
|
||
for x in targetList:
|
||
fet = await FetchTarget.fetch_target(x)
|
||
if fet:
|
||
if fet.target.targetFrom == 'QQ|Group':
|
||
if fet.session.target not in group_list:
|
||
continue
|
||
if fet.target.targetFrom == 'QQ':
|
||
if fet.session.target not in friend_list:
|
||
continue
|
||
lst.append(fet)
|
||
return lst
|
||
|
||
@staticmethod
|
||
async def post_message(module_name, message, user_list: List[MessageSession] = None):
|
||
send_list = []
|
||
if user_list is not None:
|
||
for x in user_list:
|
||
try:
|
||
send = await x.sendMessage(message, quote=False)
|
||
send_list.append(send)
|
||
except Exception:
|
||
traceback.print_exc()
|
||
else:
|
||
get_target_id = BotDBUtil.Module.get_enabled_this(module_name)
|
||
group_list_raw = await bot.call_action('get_group_list')
|
||
group_list = []
|
||
for g in group_list_raw:
|
||
group_list.append(g['group_id'])
|
||
friend_list_raw = await bot.call_action('get_friend_list')
|
||
friend_list = []
|
||
for f in friend_list_raw:
|
||
friend_list.append(f)
|
||
for x in get_target_id:
|
||
fetch = await FetchTarget.fetch_target(x)
|
||
if fetch:
|
||
if fetch.target.targetFrom == 'QQ|Group':
|
||
if fetch.session.target not in group_list:
|
||
continue
|
||
if fetch.target.targetFrom == 'QQ':
|
||
if fetch.session.target not in friend_list:
|
||
continue
|
||
try:
|
||
send = await fetch.sendMessage(message, quote=False)
|
||
send_list.append(send)
|
||
await asyncio.sleep(0.5)
|
||
except Exception:
|
||
traceback.print_exc()
|
||
return send_list
|