import html import logging import os import re import ujson as json from aiocqhttp import Event, Error from bots.aiocqhttp.client import bot from bots.aiocqhttp.message import MessageSession, FetchTarget from config import Config from core.builtins import EnableDirtyWordCheck, PrivateAssets, Url from core.parser.message import parser from core.types import MsgInfo, Session from core.utils.bot import load_prompt, init_async from database import BotDBUtil PrivateAssets.set(os.path.abspath(os.path.dirname(__file__) + '/assets')) EnableDirtyWordCheck.status = True if Config('enable_dirty_check') else False Url.disable_mm = False if Config('enable_urlmanager') else True qq_account = str(Config("qq_account")) enable_listening_self_message = Config("qq_enable_listening_self_message") @bot.on_startup async def startup(): await init_async() bot.logger.setLevel(logging.WARNING) @bot.on_websocket_connection async def _(event: Event): await load_prompt(FetchTarget) async def message_handler(event: Event): if event.detail_type == 'private': if event.sub_type == 'group': if Config('qq_disable_temp_session'): return await bot.send(event, '请先添加好友后再进行命令交互。') filter_msg = re.match(r'.*?\[CQ:(?:json|xml).*?\].*?|.*?<\?xml.*?>.*?', event.message, re.MULTILINE | re.DOTALL) if filter_msg: match_json = re.match(r'.*?\[CQ:json,data=(.*?)\].*?', event.message, re.MULTILINE | re.DOTALL) if match_json: load_json = json.loads(html.unescape(match_json.group(1))) if load_json['app'] == 'com.tencent.multimsg': event.message = f'[CQ:forward,id={load_json["meta"]["detail"]["resid"]}]' else: return else: return replyId = None match_reply = re.match(r'^\[CQ:reply,id=(.*?)].*', event.message) if match_reply: replyId = int(match_reply.group(1)) prefix = None if match_at := re.match(r'^\[CQ:at,qq=(.*?)](.*)', event.message): if match_at.group(1) == qq_account: event.message = match_at.group(2) if event.message in ['', ' ']: event.message = 'help' prefix = [''] targetId = 'QQ|' + (f'Group|{str(event.group_id)}' if event.detail_type == 'group' else str(event.user_id)) msg = MessageSession(MsgInfo(targetId=targetId, senderId=f'QQ|{str(event.user_id)}', targetFrom='QQ|Group' if event.detail_type == 'group' else 'QQ', senderFrom='QQ', senderName=event.sender['nickname'], clientName='QQ', messageId=event.message_id, replyId=replyId), Session(message=event, target=event.group_id if event.detail_type == 'group' else event.user_id, sender=event.user_id)) await parser(msg, running_mention=True, prefix=prefix) if enable_listening_self_message: @bot.on('message_sent') async def _(event: Event): await message_handler(event) @bot.on_message('group', 'private') async def _(event: Event): await message_handler(event) class GuildAccountInfo: tiny_id = None @bot.on_message('guild') async def _(event): if GuildAccountInfo.tiny_id is None: profile = await bot.call_action('get_guild_service_profile') GuildAccountInfo.tiny_id = profile['tiny_id'] tiny_id = event.user_id if tiny_id == GuildAccountInfo.tiny_id: return replyId = None match_reply = re.match(r'^\[CQ:reply,id=(.*?)].*', event.message) if match_reply: replyId = int(match_reply.group(1)) targetId = f'QQ|Guild|{str(event.guild_id)}|{str(event.channel_id)}' msg = MessageSession(MsgInfo(targetId=targetId, senderId=f'QQ|Tiny|{str(event.user_id)}', targetFrom='QQ|Guild', senderFrom='QQ|Tiny', senderName=event.sender['nickname'], clientName='QQ', messageId=event.message_id, replyId=replyId), Session(message=event, target=f'{str(event.guild_id)}|{str(event.channel_id)}', sender=event.user_id)) await parser(msg, running_mention=True) """@bot.on('request.friend') async def _(event: Event): if BotDBUtil.SenderInfo('QQ|' + str(event.user_id)).query.isInBlockList: return {'approve': False} return {'approve': True}""" @bot.on('request.group.invite') async def _(event: Event): if BotDBUtil.SenderInfo('QQ|' + str(event.user_id)).query.isSuperUser: return {'approve': True} if not Config('allow_bot_auto_approve_group_invite'): await bot.send_private_msg(user_id=event.user_id, message='你好!本机器人暂时不主动同意入群请求。\n' f'请至{Config("qq_join_group_application_link")}申请入群。') else: return {'approve': True} @bot.on_notice('group_ban') async def _(event: Event): if event.user_id == int(qq_account): result = BotDBUtil.UnfriendlyActions(targetId=event.group_id, senderId=event.operator_id).add_and_check('mute', str(event.duration)) if event.duration >= 259200: result = True if result: await bot.call_action('set_group_leave', group_id=event.group_id) BotDBUtil.SenderInfo('QQ|' + str(event.operator_id)).edit('isInBlockList', True) await bot.call_action('delete_friend', friend_id=event.operator_id) """ @bot.on_message('group') async def _(event: Event): result = BotDBUtil.isGroupInAllowList(f'QQ|Group|{str(event.group_id)}') if not result: await bot.send(event=event, message='此群不在白名单中,已自动退群。' '\n如需申请白名单,请至https://github.com/Teahouse-Studios/bot/issues/new/choose发起issue。') await bot.call_action('set_group_leave', group_id=event.group_id) """ qq_host = Config("qq_host") if qq_host: host, port = qq_host.split(':') bot.run(host=host, port=port, debug=False)