Archived
1
0
Fork 0
This commit is contained in:
yzhh 2022-01-08 21:41:25 +08:00
commit 4bea41eeb3
12 changed files with 175 additions and 13 deletions

View file

@ -88,6 +88,15 @@ class MessageSession(MS):
return True
return False
async def checkNativePermission(self):
if self.target.targetFrom == 'QQ':
return True
get_member_info = await bot.call_action('get_group_member_info', group_id=self.session.target,
user_id=self.session.sender)
if get_member_info['role'] in ['owner', 'admin']:
return True
return False
def checkSuperUser(self):
return True if self.target.senderInfo.query.isSuperUser else False

View file

@ -84,6 +84,15 @@ class MessageSession(MS):
return True
return False
async def checkNativePermission(self):
match_guild = re.match(r'(.*)\|(.*)', self.session.target)
get_member_info = await bot.call_action('get_guild_members', guild_id=match_guild.group(1))
tiny_id = self.session.sender
for m in get_member_info['admins']:
if m['tiny_id'] == tiny_id:
return True
return False
def checkSuperUser(self):
return True if self.target.senderInfo.query.isSuperUser else False
@ -187,4 +196,4 @@ class FetchTarget(FT):
except Exception:
traceback.print_exc()
return send_list
"""
"""

View file

@ -84,6 +84,14 @@ class MessageSession(MS):
return True
return False
async def checkNativePermission(self):
if self.session.message.chat.type == 'private':
return True
admins = [member.user.id for member in await dp.bot.get_chat_administrators(self.session.message.chat.id)]
if self.session.sender in admins:
return True
return False
def checkSuperUser(self):
return True if self.target.senderInfo.query.isSuperUser else False

View file

@ -104,6 +104,12 @@ class MessageSession(MS):
return True
return False
async def checkNativePermission(self):
if self.session.message.channel.permissions_for(self.session.message.author).administrator \
or isinstance(self.session.message.channel, discord.DMChannel):
return True
return False
def checkSuperUser(self):
return True if self.target.senderInfo.query.isSuperUser else False

View file

@ -51,6 +51,10 @@ class Template(MessageSession):
print("(Try to check your permissions, but this is a unit test environment. Have fun!)")
return True
async def checkNativePermission(self):
print("(Try to check your native permissions, but this is a unit test environment. Have fun!)")
return True
def checkSuperUser(self):
print("(Try to check if you are superuser, but this is a unit test environment. Have fun!)")
return True

View file

@ -74,6 +74,12 @@ class MessageSession:
"""
...
async def checkNativePermission(self):
"""
用于检查消息发送者原本在聊天平台中是否具有管理员权限
"""
...
async def fake_forward_msg(self, nodelist):
"""
用于发送假转发消息QQ

View file

@ -85,6 +85,9 @@ async def parser(msg: MessageSession):
msg.trigger_msg = command # 触发该命令的消息,去除消息前缀
command_first_word = command_spilt[0].lower()
sudo = False
mute = False
if command_first_word == 'mute':
mute = True
if command_first_word == 'sudo':
if not msg.checkSuperUser():
return await msg.sendMessage('你不是本机器人的超级管理员无法使用sudo命令。')
@ -93,6 +96,11 @@ async def parser(msg: MessageSession):
command_first_word = command_spilt[0].lower()
msg.trigger_msg = ' '.join(command_spilt)
if senderInfo.query.isInBlockList and not senderInfo.query.isInAllowList and not sudo: # 如果是以 sudo 执行的命令,则不检查是否已 ban
ExecutionLockList.remove(msg)
return
in_mute = BotDBUtil.Muting(msg).check()
if in_mute and not mute:
ExecutionLockList.remove(msg)
return
if command_first_word in ModulesAliases:
command_spilt[0] = ModulesAliases[command_first_word]
@ -133,7 +141,10 @@ async def parser(msg: MessageSession):
await msg.sendMessage(f'{command_first_word}模块未启用,请发送~enable {command_first_word}启用本模块。')
continue
elif module.required_admin:
if not await msg.checkPermission():
if not await msg.checkPermission() and not mute:
if in_mute:
ExecutionLockList.remove(msg)
continue
await msg.sendMessage(f'{command_first_word}命令仅能被该群组的管理员所使用,请联系管理员执行此命令。')
continue
if not module.match_list.set:

View file

@ -62,6 +62,18 @@ async def get_url(url: str, status_code: int = False, headers: dict = None, fmt=
return text
@retry(stop=stop_after_attempt(3), wait=wait_fixed(3), reraise=True)
async def post_url(url: str, data: any, headers: dict = None):
'''发送POST请求。
:param url: 需要发送的url
:param data: 需要发送的数据
:param headers: 请求时使用的http头
:returns: 发送请求后的响应'''
async with aiohttp.ClientSession(headers=headers) as session:
async with session.post(url, data=data, headers=headers) as req:
return await req.text()
@retry(stop=stop_after_attempt(3), wait=wait_fixed(3), reraise=True)
async def download_to_cache(link: str) -> Union[str, bool]:
'''利用AioHttp下载指定url的内容并保存到缓存./cache目录

View file

@ -6,7 +6,7 @@ from config import Config
from core.elements.message import MessageSession
from core.elements.temp import EnabledModulesCache, SenderInfoCache
from database.orm import DBSession
from database.tables import EnabledModules, SenderInfo, TargetAdmin, CommandTriggerTime, GroupAllowList
from database.tables import EnabledModules, MuteList, SenderInfo, TargetAdmin, CommandTriggerTime, GroupAllowList
cache = Config('db_cache')
@ -226,4 +226,33 @@ class BotDBUtil:
return False
class Muting:
@retry(stop=stop_after_attempt(3))
@auto_rollback_error
def __init__(self, msg: MessageSession):
self.msg = msg
self.targetId = msg.target.targetId
self.query = session.query(MuteList).filter_by(targetId=self.targetId).first()
@retry(stop=stop_after_attempt(3))
@auto_rollback_error
def check(self):
if self.query is not None:
return True
return False
@retry(stop=stop_after_attempt(3))
@auto_rollback_error
def add(self):
session.add(MuteList(targetId=self.targetId))
session.commit()
@retry(stop=stop_after_attempt(3))
@auto_rollback_error
def remove(self):
if self.query is not None:
session.delete(self.query)
session.commit()
__all__ = ["BotDBUtil", "auto_rollback_error", "session"]

View file

@ -30,6 +30,11 @@ class TargetAdmin(Base):
targetId = Column(String(512))
class MuteList(Base):
"""禁言列表"""
__tablename__ = "MuteList"
targetId = Column(String(512), primary_key=True)
class CommandTriggerTime(Base):
"""命令触发时间"""
__tablename__ = "CommandTriggerTime"

View file

@ -424,25 +424,32 @@ async def del_su(message: MessageSession):
await message.sendMessage('操作成功:已将' + user + '移出超级用户。')
whoami = on_command('whoami', developers=['Dianliang233'], desc='获取发送命令的账号在机器人内部的识别码', base=True)
whoami = on_command('whoami', developers=['Dianliang233'], desc='获取发送命令的账号在机器人内部的 ID', base=True)
@whoami.handle()
async def _(msg: MessageSession):
await msg.sendMessage(f'你是:{msg.target.senderId}\n本对话是:{msg.target.targetId}', disable_secret_check=True)
rights = ''
if await msg.checkNativePermission():
rights += '\n(你拥有本对话的管理员权限)'
elif await msg.checkPermission():
rights += '\n(你拥有本对话的机器人管理员权限)'
if msg.checkSuperUser():
rights += '\n(你拥有本机器人的超级用户权限)'
await msg.sendMessage(f'你的 ID 是:{msg.target.senderId}\n本对话的 ID 是:{msg.target.targetId}' + rights, disable_secret_check=True)
ab = on_command('abuse', alias=['ae'], developers=['Dianliang233'], required_superuser=True)
ae = on_command('abuse', alias=['ae'], developers=['Dianliang233'], required_superuser=True)
@ab.handle('check <user>')
@ae.handle('check <user>')
async def _(msg: MessageSession):
user = msg.parsed_msg['<user>']
warns = BotDBUtil.SenderInfo(user).query.warns
await msg.sendMessage(f'{user} 已被警告 {warns} 次。')
@ab.handle('warn <user> [<count>]')
@ae.handle('warn <user> [<count>]')
async def _(msg: MessageSession):
count = int(msg.parsed_msg['<count>'] or 1)
user = msg.parsed_msg['<user>']
@ -450,7 +457,7 @@ async def _(msg: MessageSession):
await msg.sendMessage(f'成功警告 {user} {count} 次。此用户已被警告 {warn_count} 次。')
@ab.handle('revoke <user> [<count>]')
@ae.handle('revoke <user> [<count>]')
async def _(msg: MessageSession):
count = 0 - int(msg.parsed_msg['<count>'] or -1)
user = msg.parsed_msg['<user>']
@ -458,28 +465,28 @@ async def _(msg: MessageSession):
await msg.sendMessage(f'成功移除警告 {user}{count} 次警告。此用户已被警告 {warn_count} 次。')
@ab.handle('clear <user>')
@ae.handle('clear <user>')
async def _(msg: MessageSession):
user = msg.parsed_msg['<user>']
await pardon_user(user)
await msg.sendMessage(f'成功清除 {user} 的警告。')
@ab.handle('untempban <user>')
@ae.handle('untempban <user>')
async def _(msg: MessageSession):
user = msg.parsed_msg['<user>']
await remove_temp_ban(user)
await msg.sendMessage(f'成功解除 {user} 的临时封禁。')
@ab.handle('ban <user>')
@ae.handle('ban <user>')
async def _(msg: MessageSession):
user = msg.parsed_msg['<user>']
if BotDBUtil.SenderInfo(user).edit('isInBlockList', True):
await msg.sendMessage(f'成功封禁 {user}')
@ab.handle('unban <user>')
@ae.handle('unban <user>')
async def _(msg: MessageSession):
user = msg.parsed_msg['<user>']
if BotDBUtil.SenderInfo(user).edit('isInBlockList', False):
@ -570,3 +577,16 @@ async def _(msg: MessageSession):
else:
target.edit('disable_typing', False)
await msg.sendMessage('成功打开输入提示。')
mute = on_command('mute', developers=['Dianliang233'], base=True, required_admin=True)
@mute.handle()
async def _(msg: MessageSession):
if BotDBUtil.Muting(msg).check():
BotDBUtil.Muting(msg).remove()
await msg.sendMessage('成功取消禁言。')
else:
BotDBUtil.Muting(msg).add()
await msg.sendMessage('成功禁言。')

View file

@ -0,0 +1,43 @@
import traceback
import ujson as json
from core.elements import MessageSession
from core.component import on_command
from core.utils import post_url
n = on_command(
bind_prefix='nbnhhsh',
desc='能不能好好说话?拼音首字母缩写释义工具',
developers=['Dianliang233'],)
@n.handle('<term>')
async def _(msg: MessageSession):
await msg.sendMessage(await nbnhhsh(msg.parsed_msg['<term>']))
async def nbnhhsh(term: str):
'''查询nbnhhsh。
:param term: 需要查询的term
:returns: 查询结果'''
try:
url = 'https://lab.magiconch.com/api/nbnhhsh/guess'
req = json.dumps({'text': term})
text = await post_url(url, data=req, headers={'Content-Type': 'application/json', 'Accept': '*/*', 'Content-Length': str(len(req))})
print(text)
data = json.loads(text)
result = data[0]
if 'trans' in result:
trans = result['trans']
count = trans.__len__()
return f'[nbnhhsh]{count}个结果,已收录):{"".join(trans)}'
elif 'inputting' in result and result['inputting'] != []:
inputting = result['inputting']
count = inputting.__len__()
return f'[nbnhhsh]{count}个结果AI 猜测):{"".join(inputting)}'
else:
return '[nbnhhsh] 没有找到相关结果。'
except Exception:
traceback.print_exc()
return '[nbnhhsh] 查询出错。'