Archived
1
0
Fork 0
This repository has been archived on 2024-04-26. You can view files and clone it, but cannot push or open issues or pull requests.
akari-bot/modules/core/su_utils.py
2023-03-04 16:51:56 +08:00

335 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import os
import sys
from datetime import datetime, timedelta
import matplotlib.pyplot as plt
import numpy as np
import ujson as json
from config import Config
from core.builtins import Bot, PrivateAssets, Image, Plain, ExecutionLockList, Temp, MessageTaskManager
from core.component import module
from core.loader import ModulesManager
from core.parser.message import remove_temp_ban
from core.tos import pardon_user, warn_user
from core.utils.cache import random_cache_path
from database import BotDBUtil
su = module('superuser', alias=['su'], developers=['OasisAkari', 'Dianliang233'], required_superuser=True)
@su.handle('add <user>')
async def add_su(message: Bot.MessageSession):
user = message.parsed_msg['<user>']
if not user.startswith(f'{message.target.senderFrom}|'):
await message.finish(f'ID格式错误请对象使用{message.prefixes[0]}whoami命令查看用户ID。')
if user:
if BotDBUtil.SenderInfo(user).edit('isSuperUser', True):
await message.finish('操作成功:已将' + user + '设置为超级用户。')
@su.handle('del <user>')
async def del_su(message: Bot.MessageSession):
user = message.parsed_msg['<user>']
if not user.startswith(f'{message.target.senderFrom}|'):
await message.finish(f'ID格式错误请对象使用{message.prefixes[0]}whoami命令查看用户ID。')
if user:
if BotDBUtil.SenderInfo(user).edit('isSuperUser', False):
await message.finish('操作成功:已将' + user + '移出超级用户。')
ana = module('analytics', required_superuser=True)
@ana.handle()
async def _(msg: Bot.MessageSession):
if Config('enable_analytics'):
first_record = BotDBUtil.Analytics.get_first()
get_counts = BotDBUtil.Analytics.get_count()
await msg.finish(f'机器人已执行命令次数(自{str(first_record.timestamp)}开始统计):{get_counts}')
else:
await msg.finish('机器人未开启命令统计功能。')
@ana.handle('days [<name>]')
async def _(msg: Bot.MessageSession):
if Config('enable_analytics'):
first_record = BotDBUtil.Analytics.get_first()
module_ = None
if '<name>' in msg.parsed_msg:
module_ = msg.parsed_msg['<name>']
data_ = {}
for d in range(30):
new = datetime.now().replace(hour=0, minute=0, second=0) + timedelta(days=1) - timedelta(days=30 - d - 1)
old = datetime.now().replace(hour=0, minute=0, second=0) + timedelta(days=1) - timedelta(days=30 - d)
get_ = BotDBUtil.Analytics.get_count_by_times(new, old, module_)
data_[old.day] = get_
data_x = []
data_y = []
for x in data_:
data_x.append(str(x))
data_y.append(data_[x])
plt.plot(data_x, data_y, "-o")
plt.plot(data_x[-1], data_y[-1], "-ro")
plt.xlabel('Days')
plt.ylabel('Counts')
plt.tick_params(axis='x', labelrotation=45, which='major', labelsize=10)
plt.gca().yaxis.get_major_locator().set_params(integer=True)
for xitem, yitem in np.nditer([data_x, data_y]):
plt.annotate(yitem, (xitem, yitem), textcoords="offset points", xytext=(0, 10), ha="center")
path = random_cache_path() + '.png'
plt.savefig(path)
plt.close()
await msg.finish(
[Plain(
f'最近30天的{module_ if module_ is not None else ""}命令调用次数统计(自{str(first_record.timestamp)}开始统计):'),
Image(path)])
set_ = module('set', required_superuser=True)
@set_.handle('modules <targetId> <modules> ...')
async def _(msg: Bot.MessageSession):
target = msg.parsed_msg['<targetId>']
if not target.startswith(f'{msg.target.targetFrom}|'):
await msg.finish(f'ID格式错误。')
target_data = BotDBUtil.TargetInfo(target)
if target_data.query is None:
wait_confirm = await msg.waitConfirm('该群未初始化,确认进行操作吗?')
if not wait_confirm:
return
modules = [m for m in [msg.parsed_msg['<modules>']] + msg.parsed_msg.get('...', [])
if m in ModulesManager.return_modules_list_as_dict(msg.target.targetFrom)]
target_data.enable(modules)
await msg.finish(f'成功为对象配置了以下模块:{", ".join(modules)}')
@set_.handle('option <targetId> <k> <v>')
async def _(msg: Bot.MessageSession):
target = msg.parsed_msg['<targetId>']
k = msg.parsed_msg['<k>']
v = msg.parsed_msg['<v>']
if not target.startswith(f'{msg.target.targetFrom}|'):
await msg.finish(f'ID格式错误。')
target_data = BotDBUtil.TargetInfo(target)
if target_data.query is None:
wait_confirm = await msg.waitConfirm('该群未初始化,确认进行操作吗?')
if not wait_confirm:
return
if v.startswith(('[', '{')):
v = json.loads(v)
elif v.upper() == 'TRUE':
v = True
elif v.upper() == 'FALSE':
v = False
target_data.edit_option(k, v)
await msg.finish(f'成功为对象设置了以下参数:{k} -> {str(v)}')
ae = module('abuse', alias=['ae'], developers=['Dianliang233'], required_superuser=True)
@ae.handle('check <user>')
async def _(msg: Bot.MessageSession):
user = msg.parsed_msg['<user>']
if not user.startswith(f'{msg.target.senderFrom}|'):
await msg.finish(f'ID格式错误。')
warns = BotDBUtil.SenderInfo(user).query.warns
await msg.finish(f'{user} 已被警告 {warns} 次。')
@ae.handle('warn <user> [<count>]')
async def _(msg: Bot.MessageSession):
count = int(msg.parsed_msg.get('<count>', 1))
user = msg.parsed_msg['<user>']
if not user.startswith(f'{msg.target.senderFrom}|'):
await msg.finish(f'ID格式错误。')
warn_count = await warn_user(user, count)
await msg.finish(f'成功警告 {user} {count} 次。此用户已被警告 {warn_count} 次。')
@ae.handle('revoke <user> [<count>]')
async def _(msg: Bot.MessageSession):
count = 0 - int(msg.parsed_msg.get('<count>', 1))
user = msg.parsed_msg['<user>']
if not user.startswith(f'{msg.target.senderFrom}|'):
await msg.finish(f'ID格式错误。')
warn_count = await warn_user(user, count)
await msg.finish(f'成功移除警告 {user}{abs(count)} 次警告。此用户已被警告 {warn_count} 次。')
@ae.handle('clear <user>')
async def _(msg: Bot.MessageSession):
user = msg.parsed_msg['<user>']
if not user.startswith(f'{msg.target.senderFrom}|'):
await msg.finish(f'ID格式错误。')
await pardon_user(user)
await msg.finish(f'成功清除 {user} 的警告。')
@ae.handle('untempban <user>')
async def _(msg: Bot.MessageSession):
user = msg.parsed_msg['<user>']
if not user.startswith(f'{msg.target.senderFrom}|'):
await msg.finish(f'ID格式错误。')
await remove_temp_ban(user)
await msg.finish(f'成功解除 {user} 的临时限制。')
@ae.handle('ban <user>')
async def _(msg: Bot.MessageSession):
user = msg.parsed_msg['<user>']
if not user.startswith(f'{msg.target.senderFrom}|'):
await msg.finish(f'ID格式错误。')
if BotDBUtil.SenderInfo(user).edit('isInBlockList', True):
await msg.finish(f'成功封禁 {user}')
@ae.handle('unban <user>')
async def _(msg: Bot.MessageSession):
user = msg.parsed_msg['<user>']
if not user.startswith(f'{msg.target.senderFrom}|'):
await msg.finish(f'ID格式错误。')
if BotDBUtil.SenderInfo(user).edit('isInBlockList', False):
await msg.finish(f'成功解除 {user} 的封禁。')
rst = module('restart', developers=['OasisAkari'], required_superuser=True)
def restart():
sys.exit(233)
def write_version_cache(msg: Bot.MessageSession):
update = os.path.abspath(PrivateAssets.path + '/cache_restart_author')
write_version = open(update, 'w')
write_version.write(json.dumps({'From': msg.target.targetFrom, 'ID': msg.target.targetId}))
write_version.close()
restart_time = []
async def wait_for_restart(msg: Bot.MessageSession):
get = ExecutionLockList.get()
if datetime.now().timestamp() - restart_time[0] < 60:
if len(get) != 0:
await msg.sendMessage(f'{len(get)} 个命令正在执行中,将于执行完毕后重启。')
await asyncio.sleep(10)
return await wait_for_restart(msg)
else:
await msg.sendMessage('重启中...')
get_wait_list = MessageTaskManager.get()
for x in get_wait_list:
for y in get_wait_list[x]:
if get_wait_list[x][y]['active']:
await get_wait_list[x][y]['original_session'].sendMessage('由于机器人正在重启,您此次执行命令的后续操作已被强制取消。'
'请稍后重新执行命令,对此带来的不便,我们深感抱歉。')
else:
await msg.sendMessage('等待已超时,强制重启中...')
@rst.handle()
async def restart_bot(msg: Bot.MessageSession):
await msg.sendMessage('你确定吗?')
confirm = await msg.waitConfirm()
if confirm:
restart_time.append(datetime.now().timestamp())
await wait_for_restart(msg)
write_version_cache(msg)
restart()
upd = module('update', developers=['OasisAkari'], required_superuser=True)
def pull_repo():
return os.popen('git pull', 'r').read()[:-1]
def update_dependencies():
return os.popen('poetry install').read()[:-1]
@upd.handle()
async def update_bot(msg: Bot.MessageSession):
await msg.sendMessage('你确定吗?')
confirm = await msg.waitConfirm()
if confirm:
await msg.sendMessage(pull_repo())
await msg.sendMessage(update_dependencies())
upds = module('update&restart', developers=['OasisAkari'], required_superuser=True)
@upds.handle()
async def update_and_restart_bot(msg: Bot.MessageSession):
await msg.sendMessage('你确定吗?')
confirm = await msg.waitConfirm()
if confirm:
restart_time.append(datetime.now().timestamp())
await wait_for_restart(msg)
write_version_cache(msg)
await msg.sendMessage(pull_repo())
await msg.sendMessage(update_dependencies())
restart()
if Bot.FetchTarget.name == 'QQ':
resume = module('resume', developers=['OasisAkari'], required_superuser=True)
@resume.handle()
async def resume_sending_group_message(msg: Bot.MessageSession):
Temp.data['is_group_message_blocked'] = False
if targets := Temp.data['waiting_for_send_group_message']:
await msg.sendMessage(f'正在重发 {len(targets)} 条消息...')
for x in targets:
await x['fetch'].sendDirectMessage(x['message'])
Temp.data['waiting_for_send_group_message'].remove(x)
await msg.sendMessage('重发完成。')
else:
await msg.sendMessage('没有需要重发的消息。')
@resume.handle('continue')
async def resume_sending_group_message(msg: Bot.MessageSession):
del Temp.data['waiting_for_send_group_message'][0]
Temp.data['is_group_message_blocked'] = False
if targets := Temp.data['waiting_for_send_group_message']:
await msg.sendMessage(f'跳过一条消息,正在重发 {len(targets)} 条消息...')
for x in targets:
await x['fetch'].sendDirectMessage(x['message'])
Temp.data['waiting_for_send_group_message'].remove(x)
await msg.sendMessage('重发完成。')
else:
await msg.sendMessage('没有需要重发的消息。')
echo = module('echo', developers=['OasisAkari'], required_superuser=True)
@echo.handle('<display_msg>')
async def _(msg: Bot.MessageSession):
await msg.finish(msg.parsed_msg['<display_msg>'])
say = module('say', developers=['OasisAkari'], required_superuser=True)
@say.handle('<display_msg>')
async def _(msg: Bot.MessageSession):
await msg.finish(msg.parsed_msg['<display_msg>'], quote=False)
if Config('enable_eval'):
_eval = module('eval', developers=['Dianliang233'], required_superuser=True)
@_eval.handle('<display_msg>')
async def _(msg: Bot.MessageSession):
await msg.finish(str(eval(msg.parsed_msg['<display_msg>'], {'msg': msg})))