Archived
1
0
Fork 0

Aduit test

This commit is contained in:
Dianliang233 2021-10-01 09:18:04 +08:00
parent 92bc00d604
commit e6c4bee841
No known key found for this signature in database
GPG key ID: 6C56F399D872F19C
8 changed files with 72 additions and 26 deletions

View file

@ -1,3 +1,7 @@
'''利用阿里云API检查字符串是否合规。
在使用前应该在配置中填写"Check_accessKeyId""Check_accessKeySecret"以便进行鉴权
'''
import base64
import datetime
import hashlib
@ -23,7 +27,12 @@ def computeMD5hash(my_string):
@retry(stop=stop_after_attempt(3), wait=wait_fixed(3))
async def check(*text):
async def check(*text) -> str:
'''检查字符串是否合规
:param text: 字符串List/Union
:returns: 经过审核后的字符串不合规部分会被替换为'<吃掉了>'全部不合规则是'<全部吃掉了>'
'''
accessKeyId = Config("Check_accessKeyId")
accessKeySecret = Config("Check_accessKeySecret")
if not accessKeyId or not accessKeySecret:

View file

@ -1,3 +1,4 @@
'''基于logging的日志器。'''
import logging
from abc import ABC, abstractmethod

View file

@ -1,3 +1,4 @@
'''基于apscheduler的计划任务。'''
from apscheduler.schedulers.asyncio import AsyncIOScheduler
Scheduler = AsyncIOScheduler()

View file

@ -1,7 +1,9 @@
'''编写机器人时可能会用到的一些工具类方法。'''
import os
import traceback
import uuid
from os.path import abspath
from typing import Union
import aiohttp
import filetype as ft
@ -16,14 +18,16 @@ class PrivateAssets:
path = os.path.abspath('.')
@staticmethod
def set(path):
def set(path: str) -> None:
'''创建一个文件夹以储存资源。请配合.gitignore使用。'''
path = os.path.abspath(path)
if not os.path.exists(path):
os.mkdir(path)
PrivateAssets.path = path
def init():
def init() -> None:
'''初始化机器人。仅用于bot.py与unit_test.py。'''
version = os.path.abspath(PrivateAssets.path + '/version')
write_version = open(version, 'w')
write_version.write(os.popen('git rev-parse HEAD', 'r').read()[0:6])
@ -34,14 +38,23 @@ def init():
write_tag.close()
async def get_url(url: str, headers=None):
async def get_url(url: str, headers: list=None) -> str:
'''利用AioHttp获取指定url的内容。
:param url: 需要获取的url
:param headers: 请求时使用的http头
:returns: 指定url的内容字符串'''
async with RetryClient(headers=headers, retry_options=ExponentialRetry(attempts=3)) as session:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=20), headers=headers) as req:
text = await req.text()
return text
async def download_to_cache(link):
async def download_to_cache(link: str) -> Union[str, False]:
'''利用AioHttp下载指定url的内容并保存到缓存./cache目录
:param link: 需要获取的link
:returns: 文件的相对路径若获取失败则返回False'''
try:
async with RetryClient(retry_options=ExponentialRetry(attempts=3)) as session:
async with session.get(link) as resp:
@ -60,7 +73,11 @@ def cache_name():
return abspath(f'./cache/{str(uuid.uuid4())}')
async def slk_converter(filepath):
async def slk_converter(filepath: str) -> str:
'''将指定文件转为slk格式。
:param filepath: 需要获取的link
:returns: 文件的相对路径'''
filepath2 = filepath + '.silk'
Logger.info('Start encoding voice...')
os.system('python slk_coder.py ' + filepath)
@ -68,7 +85,7 @@ async def slk_converter(filepath):
return filepath2
async def load_prompt(bot: FetchTarget):
async def load_prompt(bot: FetchTarget) -> None:
author_cache = os.path.abspath(PrivateAssets.path + '/cache_restart_author')
loader_cache = os.path.abspath('.cache_loader')
if os.path.exists(author_cache):

View file

@ -1,7 +1,12 @@
import re
def remove_ineffective_text(prefix, lst):
def remove_ineffective_text(prefix: str, lst: list) -> list:
'''删除命令首尾的空格和换行以及重复命令。
:param prefix: 机器人的命令前缀
:param lst: 字符串List/Union
:returns: 净化后的字符串'''
remove_list = ['\n', ' '] # 首尾需要移除的东西
for x in remove_list:
list_cache = []
@ -27,7 +32,11 @@ def remove_ineffective_text(prefix, lst):
return lst
def RemoveDuplicateSpace(text: str):
def RemoveDuplicateSpace(text: str) -> str:
'''删除命令中间多余的空格。
:param text: 字符串
:returns: 净化后的字符串'''
strip_display_space = text.split(' ')
display_list = [] # 清除指令中间多余的空格
for x in strip_display_space:

View file

@ -1,6 +1,4 @@
import re
from sqlalchemy.ext.declarative import api
from sqlalchemy.sql.expression import false
import ujson as json
@ -13,7 +11,7 @@ from database import BotDBUtil
from modules.wiki.dbutils import WikiTargetInfo
from modules.wiki.wikilib import wikilib
from .getinfobox import get_infobox_pic
from .audit import audit_allow, audit_remove, audit_list, audit_query
from .audit import WikiWhitelistError, audit_allow, audit_remove, audit_list, audit_query
@command('wiki', help_doc=('~wiki <PageName> {搜索一个Wiki页面若搜索random则随机一个页面。}',
@ -45,23 +43,15 @@ async def wiki(msg: MessageSession):
await regex_proc(msg, command, typing=False)
async def check_whitelist(apiLink: str):
whitepair = await audit_list()
whitelist = []
for pair in whitepair:
whitelist.append(pair[0])
for pattern in whitelist:
if re.match(pattern, apiLink) is apiLink:
return True
return False
async def set_start_wiki(msg: MessageSession):
if not await msg.checkPermission():
return await msg.sendMessage('你没有使用该命令的权限,请联系管理员进行操作。')
check = await wikilib().check_wiki_available(msg.parsed_msg['<WikiUrl>'])
if check[0]:
result = WikiTargetInfo(msg).add_start_wiki(check[0])
if result:
result = await WikiTargetInfo(msg).add_start_wiki(check[0])
if result and result is 'whitelist':
await msg.sendMessage(f'起始Wiki不在此白名单中{check[1]}')
elif result:
await msg.sendMessage(f'成功添加起始Wiki{check[1]}')
else:
result = '错误:' + check[1]

View file

@ -1,3 +1,5 @@
import re
from .orm import WikiWhitelist
from database.orm import DBSession
from tenacity import retry, stop_after_attempt
@ -49,3 +51,15 @@ async def audit_list():
except Exception:
raise
async def check_whitelist(apiLink: str):
whitepair = await audit_list()
whitelist = []
for pair in whitepair:
whitelist.append(pair[0])
for pattern in whitelist:
if re.match(pattern, apiLink):
return True
return False
class WikiWhitelistError(Exception):
'''The wiki is not in the bot whitelist'''

View file

@ -1,3 +1,4 @@
from modules.wiki.audit import WikiWhitelistError, check_whitelist
import ujson as json
from core.elements import MessageSession
@ -27,7 +28,9 @@ class WikiTargetInfo:
raise
@retry(stop=stop_after_attempt(3))
def add_start_wiki(self, url):
async def add_start_wiki(self, url):
if await check_whitelist(url):
return 'whitelist'
try:
self.query.link = url
session.commit()
@ -43,7 +46,9 @@ class WikiTargetInfo:
return False
@retry(stop=stop_after_attempt(3))
def config_interwikis(self, iw: str, iwlink: str = None, let_it=True):
async def config_interwikis(self, iw: str, iwlink: str = None, let_it=True):
if not await check_whitelist(iwlink):
return 'whitelist'
try:
interwikis = json.loads(self.query.iws)
if let_it: