Archived
1
0
Fork 0
This repository has been archived on 2024-04-26. You can view files and clone it, but cannot push or open issues or pull requests.
akari-bot/core/parser/message.py
2022-12-08 17:16:53 +08:00

514 lines
29 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import copy
import difflib
import re
import traceback
from datetime import datetime
from typing import List, Dict
from aiocqhttp.exceptions import ActionFailed
from config import Config
from core.builtins.message import MessageSession
from core.elements import Command, command_prefix, ExecutionLockList, RegexCommand, ErrorMessage
from core.elements.module.component_meta import CommandMeta
from core.exceptions import AbuseWarning, FinishedException, InvalidCommandFormatError, InvalidHelpDocTypeError, \
WaitCancelException, NoReportException
from core.loader import ModulesManager
from core.logger import Logger
from core.parser.args import Template, ArgumentPattern, templates_to_str
from core.parser.command import CommandParser
from core.tos import warn_target
from core.utils import removeIneffectiveText, removeDuplicateSpace, MessageTaskManager
from database import BotDBUtil
enable_tos = Config('enable_tos')
enable_analytics = Config('enable_analytics')
counter_same = {} # 命令使用次数计数(重复使用单一命令)
counter_all = {} # 命令使用次数计数(使用所有命令)
temp_ban_counter = {} # 临时限制计数
async def remove_temp_ban(msg: MessageSession):
is_temp_banned = temp_ban_counter.get(msg.target.senderId)
if is_temp_banned is not None:
del temp_ban_counter[msg.target.senderId]
async def msg_counter(msg: MessageSession, command: str):
same = counter_same.get(msg.target.senderId)
if same is None or datetime.now().timestamp() - same['ts'] > 300 or same['command'] != command:
# 检查是否滥用(重复使用同一命令)
counter_same[msg.target.senderId] = {'command': command, 'count': 1,
'ts': datetime.now().timestamp()}
else:
same['count'] += 1
if same['count'] > 10:
raise AbuseWarning('一段时间内使用相同命令的次数过多')
all_ = counter_all.get(msg.target.senderId)
if all_ is None or datetime.now().timestamp() - all_['ts'] > 300: # 检查是否滥用(重复使用同一命令)
counter_all[msg.target.senderId] = {'count': 1,
'ts': datetime.now().timestamp()}
else:
all_['count'] += 1
if all_['count'] > 20:
raise AbuseWarning('一段时间内使用命令的次数过多')
async def temp_ban_check(msg: MessageSession):
is_temp_banned = temp_ban_counter.get(msg.target.senderId)
if is_temp_banned is not None:
ban_time = datetime.now().timestamp() - is_temp_banned['ts']
if ban_time < 300:
if is_temp_banned['count'] < 2:
is_temp_banned['count'] += 1
return await msg.finish('提示:\n'
'由于你的行为触发了警告,我们已对你进行临时限制。\n'
f'距离解封时间还有{str(int(300 - ban_time))}秒。')
elif is_temp_banned['count'] <= 5:
is_temp_banned['count'] += 1
return await msg.finish('即使是触发了临时限制,继续使用命令还是可能会导致你被再次警告。\n'
f'距离解封时间还有{str(int(300 - ban_time))}秒。')
else:
raise AbuseWarning('无视临时限制警告')
async def parser(msg: MessageSession, require_enable_modules: bool = True, prefix: list = None,
running_mention: bool = False):
"""
接收消息必经的预处理器
:param msg: 从监听器接收到的dict该dict将会经过此预处理器传入下游
:param require_enable_modules: 是否需要检查模块是否已启用
:param prefix: 使用的命令前缀。如果为None则使用默认的命令前缀存在''值的情况下则代表无需命令前缀
:param running_mention: 消息内若包含机器人名称,则检查是否有命令正在运行
:return: 无返回
"""
identify_str = f'[{msg.target.senderId}{f" ({msg.target.targetId})" if msg.target.targetFrom != msg.target.senderFrom else ""}]'
# Logger.info(f'{identify_str} -> [Bot]: {display}')
try:
MessageTaskManager.check(msg)
modules = ModulesManager.return_modules_list_as_dict(msg.target.targetFrom)
modulesAliases = ModulesManager.return_modules_alias_map()
modulesRegex: Dict[str, RegexCommand] = ModulesManager.return_specified_type_modules(RegexCommand,
targetFrom=msg.target.targetFrom)
display = removeDuplicateSpace(msg.asDisplay()) # 将消息转换为一般显示形式
if len(display) == 0:
return
msg.trigger_msg = display
msg.target.senderInfo = senderInfo = BotDBUtil.SenderInfo(msg.target.senderId)
if senderInfo.query.isInBlockList and not senderInfo.query.isInAllowList:
return ExecutionLockList.remove(msg)
msg.prefixes = command_prefix.copy() # 复制一份作为基础命令前缀
get_custom_alias = msg.options.get('command_alias')
if get_custom_alias is not None:
get_display_alias = get_custom_alias.get(msg.trigger_msg)
if get_display_alias is not None:
msg.trigger_msg = display = get_display_alias
get_custom_prefix = msg.options.get('command_prefix') # 获取自定义命令前缀
if get_custom_prefix is not None:
msg.prefixes = get_custom_prefix + msg.prefixes # 混合
disable_prefix = False
if prefix is not None: # 如果上游指定了命令前缀,则使用指定的命令前缀
if '' in prefix:
disable_prefix = True
msg.prefixes.clear()
msg.prefixes.extend(prefix)
display_prefix = ''
in_prefix_list = False
for cp in msg.prefixes: # 判断是否在命令前缀列表中
if display.startswith(cp):
display_prefix = cp
in_prefix_list = True
break
if in_prefix_list or disable_prefix: # 检查消息前缀
if len(display) <= 1 or display[:2] == '~~': # 排除 ~~xxx~~ 的情况
return
Logger.info(
f'{identify_str} -> [Bot]: {display}')
if disable_prefix and not in_prefix_list:
command = display
else:
command = display[len(display_prefix):]
command_list = removeIneffectiveText(display_prefix, command.split('&&')) # 并行命令处理
if len(command_list) > 5 and not senderInfo.query.isSuperUser:
return await msg.sendMessage('你不是本机器人的超级管理员最多只能并排执行5个命令。')
if not ExecutionLockList.check(msg): # 加锁
ExecutionLockList.add(msg)
else:
return await msg.sendMessage('您有命令正在执行,请稍后再试。')
for command in command_list:
no_alias = False
for moduleName in modules:
if command.startswith(moduleName): # 判断此命令是否匹配一个实际的模块
no_alias = True
if not no_alias: # 如果没有匹配到模块,则判断是否匹配命令别名
alias_list = []
for alias in modulesAliases:
if command.startswith(alias) and not command.startswith(modulesAliases[alias]):
alias_list.append(alias)
if alias_list:
max_ = max(alias_list, key=len)
command = command.replace(max_, modulesAliases[max_], 1)
command_split: list = command.split(' ') # 切割消息
msg.trigger_msg = command # 触发该命令的消息,去除消息前缀
command_first_word = command_split[0].lower()
if command_first_word not in modules:
"""if msg.options.get('typo_check', True): # 判断是否开启错字检查 todo: alias检查
nmsg, command_first_word, command_split = await typo_check(msg, display_prefix, modules,
command_first_word, command_split)
if nmsg is None:
return ExecutionLockList.remove(msg)
msg = nmsg"""
sudo = False
mute = False
if command_first_word == 'mute':
mute = True
if command_first_word == 'sudo':
if not msg.checkSuperUser():
return await msg.sendMessage('你不是本机器人的超级管理员无法使用sudo命令。')
sudo = True
del command_split[0]
command_first_word = command_split[0].lower()
msg.trigger_msg = ' '.join(command_split)
in_mute = msg.muted
if in_mute and not mute:
return ExecutionLockList.remove(msg)
if command_first_word in modules: # 检查触发命令是否在模块列表中
time_start = datetime.now()
try:
if enable_tos:
await temp_ban_check(msg)
module = modules[command_first_word]
if not isinstance(module, Command): # 如果不是Command类则展示模块简介
if module.desc is not None:
desc = f'介绍:\n{module.desc}'
if command_first_word not in msg.enabled_modules:
desc += f'\n{command_first_word}模块未启用,请发送{msg.prefixes[0]}enable {command_first_word}启用本模块。'
await msg.sendMessage(desc)
continue
if module.required_superuser:
if not msg.checkSuperUser():
await msg.sendMessage('你没有使用该命令的权限。')
continue
elif not module.base:
if command_first_word not in msg.enabled_modules and not sudo and require_enable_modules: # 若未开启
await msg.sendMessage(
f'{command_first_word}模块未启用,请发送{msg.prefixes[0]}enable {command_first_word}启用本模块。')
continue
elif module.required_admin:
if not await msg.checkPermission():
await msg.sendMessage(f'{command_first_word}命令仅能被该群组的管理员所使用,请联系管理员执行此命令。')
continue
if not module.match_list.set:
await msg.sendMessage(ErrorMessage(f'{command_first_word}未绑定任何命令,请联系开发者处理。'))
continue
none_doc = True # 检查模块绑定的命令是否有文档
for func in module.match_list.get(msg.target.targetFrom):
if func.help_doc:
none_doc = False
if not none_doc: # 如果有,送入命令解析
async def execute_submodule(msg: MessageSession, command_first_word, command_split):
try:
command_parser = CommandParser(module, msg=msg, bind_prefix=command_first_word,
command_prefixes=msg.prefixes)
try:
parsed_msg = command_parser.parse(msg.trigger_msg) # 解析命令对应的子模块
submodule = parsed_msg[0]
msg.parsed_msg = parsed_msg[1] # 使用命令模板解析后的消息
Logger.debug(msg.parsed_msg)
if submodule.required_superuser:
if not msg.checkSuperUser():
await msg.sendMessage('你没有使用该命令的权限。')
return
elif submodule.required_admin:
if not await msg.checkPermission():
await msg.sendMessage(
f'此命令仅能被该群组的管理员所使用,请联系管理员执行此命令。')
return
if not senderInfo.query.disable_typing:
async with msg.Typing(msg):
await parsed_msg[0].function(msg) # 将msg传入下游模块
else:
await parsed_msg[0].function(msg)
raise FinishedException(msg.sent) # if not using msg.finish
except InvalidCommandFormatError:
await msg.sendMessage(f'语法错误。\n使用~help {command_first_word}查看帮助。')
"""if msg.options.get('typo_check', True): # 判断是否开启错字检查
nmsg, command_first_word, command_split = await typo_check(msg,
display_prefix,
modules,
command_first_word,
command_split)
if nmsg is None:
return ExecutionLockList.remove(msg)
msg = nmsg
await execute_submodule(msg, command_first_word, command_split)"""
return
except InvalidHelpDocTypeError:
Logger.error(traceback.format_exc())
await msg.sendMessage(ErrorMessage(f'{command_first_word}模块的帮助信息有误,请联系开发者处理。'))
return
await execute_submodule(msg, command_first_word, command_split)
else: # 如果没有,直接传入下游模块
msg.parsed_msg = None
for func in module.match_list.set:
if not func.help_doc:
if not senderInfo.query.disable_typing:
async with msg.Typing(msg):
await func.function(msg) # 将msg传入下游模块
else:
await func.function(msg)
raise FinishedException(msg.sent) # if not using msg.finish
except ActionFailed:
ExecutionLockList.remove(msg)
await msg.sendMessage('消息发送失败,可能被风控,请稍后再试。')
continue
except FinishedException as e:
time_used = datetime.now() - time_start
Logger.info(f'Successfully finished session from {identify_str}, returns: {str(e)}. '
f'Times take up: {str(time_used)}')
if msg.target.targetFrom != 'QQ|Guild' or command_first_word != 'module' and enable_tos:
await msg_counter(msg, msg.trigger_msg)
else:
Logger.debug(f'Tos is disabled, check the configuration is correct.')
ExecutionLockList.remove(msg)
if enable_analytics:
BotDBUtil.Analytics(msg).add(msg.trigger_msg, command_first_word, 'normal')
continue
except NoReportException as e:
Logger.error(traceback.format_exc())
ExecutionLockList.remove(msg)
await msg.sendMessage('执行命令时发生错误:\n' + str(e) + '\n此问题并非机器人程序错误API请求出错等'
'请勿将此消息报告给机器人开发者。')
continue
except Exception as e:
Logger.error(traceback.format_exc())
ExecutionLockList.remove(msg)
await msg.sendMessage(ErrorMessage('执行命令时发生错误,请报告机器人开发者:\n' + str(e)))
continue
ExecutionLockList.remove(msg)
return msg
if running_mention:
if display.find('小可') != -1:
if ExecutionLockList.check(msg):
return await msg.sendMessage('您先前的命令正在执行中。')
for regex in modulesRegex: # 遍历正则模块列表
try:
if regex in msg.enabled_modules: # 如果模块已启用
regex_module = modulesRegex[regex]
if regex_module.required_superuser:
if not msg.checkSuperUser():
continue
elif regex_module.required_admin:
if not await msg.checkPermission():
continue
for rfunc in regex_module.match_list.set: # 遍历正则模块的表达式
time_start = datetime.now()
try:
msg.matched_msg = False
matched = False
if rfunc.mode.upper() in ['M', 'MATCH']:
msg.matched_msg = re.match(rfunc.pattern, display, flags=rfunc.flags)
if msg.matched_msg is not None:
matched = True
elif rfunc.mode.upper() in ['A', 'FINDALL']:
msg.matched_msg = re.findall(rfunc.pattern, display, flags=rfunc.flags)
if msg.matched_msg and msg.matched_msg is not None:
matched = True
if matched: # 如果匹配成功
if rfunc.logging:
Logger.info(
f'{identify_str} -> [Bot]: {display}')
if enable_tos and rfunc.show_typing:
await temp_ban_check(msg)
if regex_module.required_superuser:
if not msg.checkSuperUser():
continue
elif regex_module.required_admin:
if not await msg.checkPermission():
continue
if not ExecutionLockList.check(msg):
ExecutionLockList.add(msg)
else:
return await msg.sendMessage('您有命令正在执行,请稍后再试。')
if rfunc.show_typing and not senderInfo.query.disable_typing:
async with msg.Typing(msg):
await rfunc.function(msg) # 将msg传入下游模块
else:
await rfunc.function(msg) # 将msg传入下游模块
raise FinishedException(msg.sent) # if not using msg.finish
except FinishedException as e:
time_used = datetime.now() - time_start
if rfunc.logging:
Logger.info(
f'Successfully finished session from {identify_str}, returns: {str(e)}. '
f'Times take up: {time_used}')
ExecutionLockList.remove(msg)
if enable_analytics and rfunc.show_typing:
BotDBUtil.Analytics(msg).add(msg.trigger_msg, regex, 'regex')
if enable_tos and rfunc.show_typing:
await msg_counter(msg, msg.trigger_msg)
else:
Logger.debug(f'Tos is disabled.')
continue
except ActionFailed:
ExecutionLockList.remove(msg)
await msg.sendMessage('消息发送失败,可能被风控,请稍后再试。')
continue
ExecutionLockList.remove(msg)
return msg
except AbuseWarning as e:
if enable_tos:
await warn_target(msg, str(e))
temp_ban_counter[msg.target.senderId] = {'count': 1,
'ts': datetime.now().timestamp()}
return
except WaitCancelException: # 出现于等待被取消的情况
Logger.warn('Waiting task cancelled by user.')
except Exception:
Logger.error(traceback.format_exc())
ExecutionLockList.remove(msg)
async def typo_check(msg: MessageSession, display_prefix, modules, command_first_word, command_split):
enabled_modules = []
for m in msg.enabled_modules:
if m in modules and isinstance(modules[m], Command):
enabled_modules.append(m)
match_close_module: list = difflib.get_close_matches(command_first_word, enabled_modules, 1, 0.6)
if match_close_module:
module = modules[match_close_module[0]]
none_doc = True # 检查模块绑定的命令是否有文档
for func in module.match_list.get(msg.target.targetFrom):
if func.help_doc is not None:
none_doc = False
len_command_split = len(command_split)
if not none_doc and len_command_split > 1:
get_submodules: List[CommandMeta] = module.match_list.get(msg.target.targetFrom)
docs = {} # 根据命令模板的空格数排序命令
for func in get_submodules:
help_doc: List[Template] = copy.deepcopy(func.help_doc)
if not help_doc:
... # todo: ...此处应该有一个处理例外情况的逻辑
for h_ in help_doc:
h_.args_ = [a for a in h_.args if isinstance(a, ArgumentPattern)]
if (len_args := len(h_.args)) not in docs:
docs[len_args] = [h_]
else:
docs[len_args].append(h_)
if len_command_split - 1 > len(docs): # 如果空格数远大于命令模板的空格数
select_docs = docs[max(docs)]
else:
select_docs = docs[len_command_split - 1] # 选择匹配的命令组
match_close_command: list = difflib.get_close_matches(' '.join(command_split[1:]), templates_to_str(select_docs),
1, 0.3) # 进一步匹配命令
if match_close_command:
match_split = match_close_command[0]
m_split_options = filter(None, re.split(r'(\[.*?])', match_split)) # 切割可选参数
old_command_split = command_split.copy()
del old_command_split[0]
new_command_split = [match_close_module[0]]
for m_ in m_split_options:
if m_.startswith('['): # 如果是可选参数
m_split = m_.split(' ') # 切割可选参数中的空格(说明存在多个子必须参数)
if len(m_split) > 1:
match_close_options = difflib.get_close_matches(m_split[0][1:], old_command_split, 1,
0.3) # 进一步匹配可选参数
if match_close_options:
position = old_command_split.index(match_close_options[0]) # 定位可选参数的位置
new_command_split.append(m_split[0][1:]) # 将可选参数插入到新命令列表中
new_command_split += old_command_split[position + 1: position + len(m_split)]
del old_command_split[position: position + len(m_split)] # 删除原命令列表中的可选参数
else:
if m_split[0][1] == '<':
new_command_split.append(old_command_split[0])
del old_command_split[0]
else:
new_command_split.append(m_split[0][1:-1])
else:
m__ = filter(None, m_.split(' ')) # 必须参数
for mm in m__:
if len(old_command_split) > 0:
if mm.startswith('<'):
new_command_split.append(old_command_split[0])
del old_command_split[0]
else:
match_close_args = difflib.get_close_matches(old_command_split[0], [mm], 1,
0.5) # 进一步匹配参数
if match_close_args:
new_command_split.append(mm)
del old_command_split[0]
else:
new_command_split.append(old_command_split[0])
del old_command_split[0]
else:
new_command_split.append(mm)
new_command_display = " ".join(new_command_split)
if new_command_display != msg.trigger_msg:
wait_confirm = await msg.waitConfirm(
f'您是否想要输入{display_prefix}{new_command_display}')
if wait_confirm:
command_split = new_command_split
command_first_word = new_command_split[0]
msg.trigger_msg = ' '.join(new_command_split)
return msg, command_first_word, command_split
else:
if len_command_split - 1 == 1:
new_command_display = f'{match_close_module[0]} {" ".join(command_split[1:])}'
if new_command_display != msg.trigger_msg:
wait_confirm = await msg.waitConfirm(
f'您是否想要输入{display_prefix}{new_command_display}')
if wait_confirm:
command_split = [match_close_module[0]] + command_split[1:]
command_first_word = match_close_module[0]
msg.trigger_msg = ' '.join(command_split)
return msg, command_first_word, command_split
else:
new_command_display = f'{match_close_module[0] + (" " + " ".join(command_split[1:]) if len(command_split) > 1 else "")}'
if new_command_display != msg.trigger_msg:
wait_confirm = await msg.waitConfirm(
f'您是否想要输入{display_prefix}{new_command_display}')
if wait_confirm:
command_split = [match_close_module[0]]
command_first_word = match_close_module[0]
msg.trigger_msg = ' '.join(command_split)
return msg, command_first_word, command_split
return None, None, None