diff --git a/core/dirty_check.py b/core/dirty_check.py index 3d80d894..db3bdd71 100644 --- a/core/dirty_check.py +++ b/core/dirty_check.py @@ -1,3 +1,7 @@ +'''利用阿里云API检查字符串是否合规。 + +在使用前,应该在配置中填写"Check_accessKeyId"和"Check_accessKeySecret"以便进行鉴权。 +''' import base64 import datetime import hashlib @@ -23,7 +27,12 @@ def computeMD5hash(my_string): @retry(stop=stop_after_attempt(3), wait=wait_fixed(3)) -async def check(*text): +async def check(*text) -> str: + '''检查字符串是否合规 + + :param text: 字符串(List/Union)。 + :returns: 经过审核后的字符串。不合规部分会被替换为'<吃掉了>',全部不合规则是'<全部吃掉了>'。 + ''' accessKeyId = Config("Check_accessKeyId") accessKeySecret = Config("Check_accessKeySecret") if not accessKeyId or not accessKeySecret: diff --git a/core/logger.py b/core/logger.py index 49e31664..440f8046 100644 --- a/core/logger.py +++ b/core/logger.py @@ -1,3 +1,4 @@ +'''基于logging的日志器。''' import logging from abc import ABC, abstractmethod diff --git a/core/scheduler.py b/core/scheduler.py index e059d0b5..7bd312ea 100644 --- a/core/scheduler.py +++ b/core/scheduler.py @@ -1,3 +1,4 @@ +'''基于apscheduler的计划任务。''' from apscheduler.schedulers.asyncio import AsyncIOScheduler Scheduler = AsyncIOScheduler() diff --git a/core/utils/bot.py b/core/utils/bot.py index e11158f0..5054fe6e 100644 --- a/core/utils/bot.py +++ b/core/utils/bot.py @@ -1,7 +1,9 @@ +'''编写机器人时可能会用到的一些工具类方法。''' import os import traceback import uuid from os.path import abspath +from typing import Union import aiohttp import filetype as ft @@ -16,14 +18,16 @@ class PrivateAssets: path = os.path.abspath('.') @staticmethod - def set(path): + def set(path: str) -> None: + '''创建一个文件夹以储存资源。请配合.gitignore使用。''' path = os.path.abspath(path) if not os.path.exists(path): os.mkdir(path) PrivateAssets.path = path -def init(): +def init() -> None: + '''初始化机器人。仅用于bot.py与unit_test.py。''' version = os.path.abspath(PrivateAssets.path + '/version') write_version = open(version, 'w') write_version.write(os.popen('git rev-parse HEAD', 'r').read()[0:6]) @@ -34,14 +38,23 @@ def init(): write_tag.close() -async def get_url(url: str, headers=None): +async def get_url(url: str, headers: list=None) -> str: + '''利用AioHttp获取指定url的内容。 + + :param url: 需要获取的url。 + :param headers: 请求时使用的http头。 + :returns: 指定url的内容(字符串)。''' async with RetryClient(headers=headers, retry_options=ExponentialRetry(attempts=3)) as session: async with session.get(url, timeout=aiohttp.ClientTimeout(total=20), headers=headers) as req: text = await req.text() return text -async def download_to_cache(link): +async def download_to_cache(link: str) -> Union[str, False]: + '''利用AioHttp下载指定url的内容,并保存到缓存(./cache目录)。 + + :param link: 需要获取的link。 + :returns: 文件的相对路径,若获取失败则返回False。''' try: async with RetryClient(retry_options=ExponentialRetry(attempts=3)) as session: async with session.get(link) as resp: @@ -60,7 +73,11 @@ def cache_name(): return abspath(f'./cache/{str(uuid.uuid4())}') -async def slk_converter(filepath): +async def slk_converter(filepath: str) -> str: + '''将指定文件转为slk格式。 + + :param filepath: 需要获取的link。 + :returns: 文件的相对路径。''' filepath2 = filepath + '.silk' Logger.info('Start encoding voice...') os.system('python slk_coder.py ' + filepath) @@ -68,7 +85,7 @@ async def slk_converter(filepath): return filepath2 -async def load_prompt(bot: FetchTarget): +async def load_prompt(bot: FetchTarget) -> None: author_cache = os.path.abspath(PrivateAssets.path + '/cache_restart_author') loader_cache = os.path.abspath('.cache_loader') if os.path.exists(author_cache): diff --git a/core/utils/message.py b/core/utils/message.py index 785a32c7..872ed44b 100644 --- a/core/utils/message.py +++ b/core/utils/message.py @@ -1,7 +1,12 @@ import re -def remove_ineffective_text(prefix, lst): +def remove_ineffective_text(prefix: str, lst: list) -> list: + '''删除命令首尾的空格和换行以及重复命令。 + + :param prefix: 机器人的命令前缀。 + :param lst: 字符串(List/Union)。 + :returns: 净化后的字符串。''' remove_list = ['\n', ' '] # 首尾需要移除的东西 for x in remove_list: list_cache = [] @@ -27,7 +32,11 @@ def remove_ineffective_text(prefix, lst): return lst -def RemoveDuplicateSpace(text: str): +def RemoveDuplicateSpace(text: str) -> str: + '''删除命令中间多余的空格。 + + :param text: 字符串。 + :returns: 净化后的字符串。''' strip_display_space = text.split(' ') display_list = [] # 清除指令中间多余的空格 for x in strip_display_space: diff --git a/modules/wiki/__init__.py b/modules/wiki/__init__.py index 28f89500..a753ffa1 100644 --- a/modules/wiki/__init__.py +++ b/modules/wiki/__init__.py @@ -1,6 +1,4 @@ import re -from sqlalchemy.ext.declarative import api -from sqlalchemy.sql.expression import false import ujson as json @@ -13,7 +11,7 @@ from database import BotDBUtil from modules.wiki.dbutils import WikiTargetInfo from modules.wiki.wikilib import wikilib from .getinfobox import get_infobox_pic -from .audit import audit_allow, audit_remove, audit_list, audit_query +from .audit import WikiWhitelistError, audit_allow, audit_remove, audit_list, audit_query @command('wiki', help_doc=('~wiki {搜索一个Wiki页面,若搜索random则随机一个页面。}', @@ -45,23 +43,15 @@ async def wiki(msg: MessageSession): await regex_proc(msg, command, typing=False) -async def check_whitelist(apiLink: str): - whitepair = await audit_list() - whitelist = [] - for pair in whitepair: - whitelist.append(pair[0]) - for pattern in whitelist: - if re.match(pattern, apiLink) is apiLink: - return True - return False - async def set_start_wiki(msg: MessageSession): if not await msg.checkPermission(): return await msg.sendMessage('你没有使用该命令的权限,请联系管理员进行操作。') check = await wikilib().check_wiki_available(msg.parsed_msg['']) if check[0]: - result = WikiTargetInfo(msg).add_start_wiki(check[0]) - if result: + result = await WikiTargetInfo(msg).add_start_wiki(check[0]) + if result and result is 'whitelist': + await msg.sendMessage(f'起始Wiki不在此白名单中:{check[1]}') + elif result: await msg.sendMessage(f'成功添加起始Wiki:{check[1]}') else: result = '错误:' + check[1] diff --git a/modules/wiki/audit.py b/modules/wiki/audit.py index 1e88faf5..b73537e1 100644 --- a/modules/wiki/audit.py +++ b/modules/wiki/audit.py @@ -1,3 +1,5 @@ +import re + from .orm import WikiWhitelist from database.orm import DBSession from tenacity import retry, stop_after_attempt @@ -49,3 +51,15 @@ async def audit_list(): except Exception: raise +async def check_whitelist(apiLink: str): + whitepair = await audit_list() + whitelist = [] + for pair in whitepair: + whitelist.append(pair[0]) + for pattern in whitelist: + if re.match(pattern, apiLink): + return True + return False + +class WikiWhitelistError(Exception): + '''The wiki is not in the bot whitelist''' diff --git a/modules/wiki/dbutils.py b/modules/wiki/dbutils.py index e4b6ddd0..15d013a0 100644 --- a/modules/wiki/dbutils.py +++ b/modules/wiki/dbutils.py @@ -1,3 +1,4 @@ +from modules.wiki.audit import WikiWhitelistError, check_whitelist import ujson as json from core.elements import MessageSession @@ -27,7 +28,9 @@ class WikiTargetInfo: raise @retry(stop=stop_after_attempt(3)) - def add_start_wiki(self, url): + async def add_start_wiki(self, url): + if await check_whitelist(url): + return 'whitelist' try: self.query.link = url session.commit() @@ -43,7 +46,9 @@ class WikiTargetInfo: return False @retry(stop=stop_after_attempt(3)) - def config_interwikis(self, iw: str, iwlink: str = None, let_it=True): + async def config_interwikis(self, iw: str, iwlink: str = None, let_it=True): + if not await check_whitelist(iwlink): + return 'whitelist' try: interwikis = json.loads(self.query.iws) if let_it: