Archived
1
0
Fork 0
This repository has been archived on 2024-04-26. You can view files and clone it, but cannot push or open issues or pull requests.
akari-bot/core/dirty_check.py

167 lines
6.1 KiB
Python
Raw Normal View History

2021-10-01 01:18:04 +00:00
'''利用阿里云API检查字符串是否合规。
2023-07-05 16:25:32 +00:00
在使用前应该在配置中填写"check_accessKeyId""check_accessKeySecret"以便进行鉴权
2021-10-01 01:18:04 +00:00
'''
2020-08-02 01:46:20 +00:00
import base64
import datetime
import hashlib
import hmac
import json
2023-06-09 09:43:35 +00:00
import re
2023-09-03 08:50:12 +00:00
import time
2020-08-12 16:01:34 +00:00
2021-09-12 12:51:13 +00:00
import aiohttp
from tenacity import retry, wait_fixed, stop_after_attempt
2021-11-12 14:25:53 +00:00
from config import Config
2023-09-03 08:50:12 +00:00
from core.builtins import EnableDirtyWordCheck
2023-07-27 19:31:53 +00:00
from core.exceptions import NoReportException
2021-10-11 14:45:28 +00:00
from core.logger import Logger
2022-07-27 14:32:16 +00:00
from database.local import DirtyWordCache
2020-09-19 10:35:13 +00:00
2020-08-12 16:01:34 +00:00
2020-08-02 01:46:20 +00:00
def hash_hmac(key, code, sha1):
2020-08-27 13:19:40 +00:00
hmac_code = hmac.new(key.encode(), code.encode(), hashlib.sha1)
2020-08-02 01:46:20 +00:00
return base64.b64encode(hmac_code.digest()).decode('utf-8')
def computeMD5hash(my_string):
m = hashlib.md5()
m.update(my_string.encode('gb2312'))
return m.hexdigest()
2020-08-12 16:01:34 +00:00
2021-10-11 14:45:28 +00:00
def parse_data(result: dict):
original_content = content = result['content']
2021-11-12 14:25:53 +00:00
status = True
2021-10-11 14:45:28 +00:00
for itemResult in result['results']:
if itemResult['suggestion'] == 'block':
for itemDetail in itemResult['details']:
if 'contexts' in itemDetail:
for itemContext in itemDetail["contexts"]:
content = re.sub(itemContext['context'], "<吃掉了>", content, flags=re.I)
2021-11-12 14:25:53 +00:00
status = False
2021-10-11 14:45:28 +00:00
else:
content = "<全部吃掉了>"
2021-11-12 14:25:53 +00:00
status = False
return {'content': content, 'status': status, 'original': original_content}
2021-10-11 14:45:28 +00:00
2021-09-12 12:51:13 +00:00
@retry(stop=stop_after_attempt(3), wait=wait_fixed(3))
2021-10-11 14:45:28 +00:00
async def check(*text) -> list:
2021-10-01 01:18:04 +00:00
'''检查字符串是否合规
2023-03-18 14:08:49 +00:00
2021-10-01 01:18:04 +00:00
:param text: 字符串List/Union
2021-11-12 14:25:53 +00:00
:returns: 经过审核后的字符串不合规部分会被替换为'<吃掉了>'全部不合规则是'<全部吃掉了>'结构为[{'审核后的字符串': 处理结果True/False默认为True}]
2021-10-01 01:18:04 +00:00
'''
2023-09-03 11:49:14 +00:00
access_key_id = Config("check_accessKeyId")
access_key_secret = Config("check_accessKeySecret")
2021-11-06 12:01:37 +00:00
text = list(text)
2023-09-03 11:49:14 +00:00
if not access_key_id or not access_key_secret or not EnableDirtyWordCheck.status:
2021-10-11 14:45:28 +00:00
Logger.warn('Dirty words filter was disabled, skip.')
2021-11-13 10:53:04 +00:00
query_list = []
for t in text:
query_list.append({'content': t, 'status': True, 'original': t})
return query_list
if not text:
return []
2021-10-11 14:45:28 +00:00
query_list = {}
count = 0
for t in text:
2021-11-12 14:25:53 +00:00
if t == '':
query_list.update({count: {t: {'content': t, 'status': True, 'original': t}}})
else:
query_list.update({count: {t: False}})
2021-10-11 14:45:28 +00:00
count += 1
for q in query_list:
for pq in query_list[q]:
2021-11-12 14:25:53 +00:00
if not query_list[q][pq]:
cache = DirtyWordCache(pq)
if not cache.need_insert:
query_list.update({q: {pq: parse_data(cache.get())}})
2021-10-11 14:45:28 +00:00
call_api_list = {}
for q in query_list:
for pq in query_list[q]:
if not query_list[q][pq]:
2021-11-12 14:25:53 +00:00
if pq not in call_api_list:
call_api_list.update({pq: []})
call_api_list[pq].append(q)
2021-10-11 14:45:28 +00:00
call_api_list_ = [x for x in call_api_list]
2023-01-12 07:51:03 +00:00
Logger.debug(call_api_list_)
2021-10-11 14:45:28 +00:00
if call_api_list_:
body = {
"scenes": [
"antispam"
],
"tasks": list(map(lambda x: {
"dataId": "Nullcat is god {}".format(time.time()),
"content": x
}, call_api_list_))
}
2023-09-03 11:49:14 +00:00
client_info = '{}'
2021-10-11 14:45:28 +00:00
root = 'https://green.cn-shanghai.aliyuncs.com'
2023-09-03 11:49:14 +00:00
url = '/green/text/scan?{}'.format(client_info)
2021-10-11 14:45:28 +00:00
2023-09-03 11:49:14 +00:00
gmt_format = '%a, %d %b %Y %H:%M:%S GMT'
date = datetime.datetime.utcnow().strftime(gmt_format)
2023-01-12 07:51:03 +00:00
nonce = 'LittleC sb {}'.format(time.time())
2023-09-03 11:49:14 +00:00
content_md5 = base64.b64encode(hashlib.md5(json.dumps(body).encode('utf-8')).digest()).decode('utf-8')
2021-10-11 14:45:28 +00:00
headers = {
'Accept': 'application/json',
'Content-Type': 'application/json',
2023-09-03 11:49:14 +00:00
'Content-MD5': content_md5,
2021-10-11 14:45:28 +00:00
'Date': date,
'x-acs-version': '2018-05-09',
'x-acs-signature-nonce': nonce,
'x-acs-signature-version': '1.0',
'x-acs-signature-method': 'HMAC-SHA1'
}
tmp = {
'x-acs-version': '2018-05-09',
'x-acs-signature-nonce': nonce,
'x-acs-signature-version': '1.0',
'x-acs-signature-method': 'HMAC-SHA1'
}
sorted_header = {k: tmp[k] for k in sorted(tmp)}
step1 = '\n'.join(list(map(lambda x: "{}:{}".format(x, sorted_header[x]), list(sorted_header.keys()))))
step2 = url
step3 = "POST\napplication/json\n{contentMd5}\napplication/json\n{date}\n{step1}\n{step2}".format(
2023-09-03 11:49:14 +00:00
contentMd5=content_md5,
2021-10-11 14:45:28 +00:00
date=headers['Date'], step1=step1, step2=step2)
2023-09-03 11:49:14 +00:00
sign = "acs {}:{}".format(access_key_id, hash_hmac(access_key_secret, step3, hashlib.sha1))
2021-10-11 14:45:28 +00:00
headers['Authorization'] = sign
# 'Authorization': "acs {}:{}".format(accessKeyId, sign)
async with aiohttp.ClientSession(headers=headers) as session:
async with session.post('{}{}'.format(root, url), data=json.dumps(body)) as resp:
if resp.status == 200:
result = await resp.json()
2023-01-12 07:51:03 +00:00
Logger.debug(result)
2021-10-11 14:45:28 +00:00
for item in result['data']:
content = item['content']
2021-11-12 14:25:53 +00:00
for n in call_api_list[content]:
2021-11-13 05:40:19 +00:00
query_list.update({n: {content: parse_data(item)}})
2021-10-11 14:45:28 +00:00
DirtyWordCache(content).update(item)
else:
raise ValueError(await resp.text())
results = []
2022-08-04 07:52:42 +00:00
Logger.debug(query_list)
2021-10-11 14:45:28 +00:00
for x in query_list:
2021-11-13 05:40:19 +00:00
for y in query_list[x]:
results.append(query_list[x][y])
2021-10-11 14:45:28 +00:00
return results
2023-03-18 14:08:49 +00:00
async def check_bool(*text):
chk = await check(*text)
for x in chk:
if not x['status']:
return True
return False
2023-07-14 07:59:01 +00:00
2023-07-27 19:31:53 +00:00
def rickroll(msg):
2023-09-24 06:53:28 +00:00
if Config("enable_rickroll") and Config("rickroll_url"):
2023-12-07 13:21:58 +00:00
return Config("rickroll_url")
else:
2023-12-07 13:21:58 +00:00
return msg.locale.t("error.message.chain.unsafe")