Archived
1
0
Fork 0

several bugfixes

The part of query jikiwiki still doesn't work
This commit is contained in:
yzhh 2022-01-04 23:26:37 +08:00
parent beeda64d7c
commit a5d2cd0e5d
7 changed files with 99 additions and 86 deletions

View file

@ -2,12 +2,16 @@ from .internal import Plain, Image, Voice, Embed
from core.elements.others import Secret, ErrorMessage
from typing import Union, List, Tuple
from ...logger import Logger
class MessageChain:
def __init__(self, elements: Union[str, List[Union[Plain, Image, Voice, Embed]],
Tuple[Union[Plain, Image, Voice, Embed]],
Plain, Image, Voice, Embed]):
self.value = []
if isinstance(elements, ErrorMessage):
elements = str(elements)
if isinstance(elements, str):
if elements != '':
self.value.append(Plain(elements))
@ -18,7 +22,9 @@ class MessageChain:
self.value.append(elements)
elif isinstance(elements, (list, tuple)):
for e in elements:
if isinstance(e, (Plain, Image, Voice, Embed)):
if isinstance(e, ErrorMessage):
self.value.append(str(e))
elif isinstance(e, (Plain, Image, Voice, Embed)):
self.value.append(e)
else:
self.value.append(
@ -26,6 +32,7 @@ class MessageChain:
elif isinstance(elements, MessageChain):
self.value = elements.value
else:
Logger.error(f'Unexpected message type: {elements.__dict__}')
self.value.append(
Plain(ErrorMessage('机器人尝试发送非法消息链,请联系机器人开发者解决问题。')))

View file

@ -1,6 +1,7 @@
from core.component import on_command
from core.elements import MessageSession
from core.dirty_check import check
from core.logger import Logger
from modules.meme.jiki import jiki
from modules.meme.moegirl import moegirl
from modules.meme.nbnhhsh import nbnhhsh
@ -11,6 +12,7 @@ meme = on_command(
desc='全功能梗查询。',
developers=['Dianliang233'])
@meme.handle(help_doc='<term> {在小鸡词典、萌娘百科、nbnhhsh、Urban Dictionary 中查询梗}')
async def _(msg: MessageSession):
res_jiki = await jiki(msg.parsed_msg['<term>'])
@ -22,5 +24,8 @@ async def _(msg: MessageSession):
for i in chk:
if not i['status']:
i = '[???] <REDACTED>'
res += i['res'] + '\n'
msg.sendMessage(res)
res += i + '\n'
else:
res += i['content'] + '\n'
Logger.info('res:'+res)
await msg.sendMessage(res)

View file

@ -1,3 +1,5 @@
import traceback
import ujson as json
from core.utils.bot import post_url
@ -10,6 +12,7 @@ async def jiki(term: str):
try:
url = 'https://api.jikipedia.com/go/search_entities' + term
text = await post_url(url, data={'page': 1, 'phrase': term, 'size': 1})
print(text)
data = json.loads(text)
count = data['total']
result = data['data'][0]['definitions']
@ -17,5 +20,6 @@ async def jiki(term: str):
content = result['plaintext']
link = 'https://jikipedia.com/definition/' + result['id']
return f'[小鸡百科]{count}个结果):{title}\n{content}\n{link}'
except:
except Exception:
traceback.print_exc()
return '[小鸡百科] 查询出错。'

View file

@ -1,42 +1,15 @@
from core.exceptions import AbuseWarning
from modules.wiki.wikilib_v2 import WikiLib, WhatAreUDoingError, PageInfo, InvalidWikiError
from modules.wiki import query_pages
from modules.wiki.wikilib_v2 import QueryInfo
async def moegirl(term: str):
try:
result = await WikiLib('https://zh.moegirl.org.cn/api.php').parse_page_info(term)
r: PageInfo = result
display_title = None
display_before_title = None
if r.title is not None:
display_title = r.title
if r.before_title is not None:
display_before_title = r.before_title
if r.status:
plain_slice = []
if display_before_title is not None and display_before_title != display_title:
if r.before_page_property == 'template' and r.page_property == 'page':
plain_slice.append(f'[{display_before_title}]不存在,已自动重定向至[{display_title}]')
else:
plain_slice.append(f'(重定向[{display_before_title}] -> [{display_title}]')
if r.link is not None:
plain_slice.append(r.link)
if r.desc is not None and r.desc != '':
plain_slice.append(r.desc)
else:
plain_slice = []
wait_plain_slice = []
if display_title is not None and display_before_title is not None:
wait_plain_slice.append(f'提示:[{display_before_title}]不存在,您是否想要找的是[{display_title}]')
elif r.before_title is not None:
plain_slice.append(f'提示:找不到[{display_before_title}]。')
if r.desc is not None and r.desc != '':
plain_slice.append(r.desc)
if r.invalid_namespace and r.before_title is not None:
s = r.before_title.split(":")
if len(s) > 1:
plain_slice.append(f'此Wiki上没有名为{s[0]}的命名空间,请检查拼写后再试。')
if wait_plain_slice:
page = '\n'.join(wait_plain_slice)
except WhatAreUDoingError:
raise AbuseWarning('使机器人重定向页面的次数过多。')
return '[萌娘百科]' + page
result = await query_pages(QueryInfo('https://zh.moegirl.org.cn/api.php'), term)
msg = ''
if result['msg_list']:
for msg_item in result['msg_list']:
msg += msg_item.text
if result['wait_msg_list']:
for msg_item in result['wait_msg_list']:
msg += msg_item.text
return '[萌娘百科]' + msg

View file

@ -1,3 +1,5 @@
import traceback
import ujson as json
from core.utils import post_url
@ -10,6 +12,7 @@ async def nbnhhsh(term: str):
try:
url = 'https://lab.magiconch.com/api/nbnhhsh/guess' + term
text = await post_url(url, data={'text': term})
print(text)
data = json.loads(text)['data']
result = data[0]
if 'trans' in result:
@ -22,5 +25,6 @@ async def nbnhhsh(term: str):
return f'[nbnhhsh]{count}个结果AI 猜测):{"".join(inputting)}'
else:
return '[nbnhhsh] 没有找到相关结果。'
except:
except Exception:
traceback.print_exc()
return '[nbnhhsh] 查询出错。'

View file

@ -19,7 +19,7 @@ from .utils.ab_qq import ab_qq
from .utils.newbie import newbie
from .utils.rc import rc
from .utils.rc_qq import rc_qq
from .wikilib_v2 import WikiLib, WhatAreUDoingError, PageInfo, InvalidWikiError
from .wikilib_v2 import WikiLib, WhatAreUDoingError, PageInfo, InvalidWikiError, QueryInfo
wiki = on_command('wiki',
alias={'wiki_start_site': 'wiki set', 'interwiki': 'wiki iw'},
@ -261,8 +261,7 @@ async def _(msg: MessageSession):
await msg.sendMessage(send_msgs)
legacy = False
if legacy:
wikis = []
wikis.append('现有白名单:')
wikis = ['现有白名单:']
for al in allow_list:
wikis.append(f'{al[0]}by {al[1]}')
wikis.append('现有黑名单:')
@ -311,17 +310,27 @@ async def _(msg: MessageSession):
await query_pages(msg, query_list, mediawiki=True)
async def query_pages(msg: MessageSession, title: Union[str, list, tuple],
template=False, mediawiki=False, useprefix=True):
target = WikiTargetInfo(msg)
start_wiki = target.get_start_wiki()
interwiki_list = target.get_interwikis()
headers = target.get_headers()
prefix = target.get_prefix()
enabled_fandom_addon = BotDBUtil.Module(msg).check_target_enabled_module('wiki_fandom_addon')
async def query_pages(session: Union[MessageSession, QueryInfo], title: Union[str, list, tuple],
template=False, mediawiki=False, use_prefix=True):
if isinstance(session, MessageSession):
target = WikiTargetInfo(session)
start_wiki = target.get_start_wiki()
interwiki_list = target.get_interwikis()
headers = target.get_headers()
prefix = target.get_prefix()
enabled_fandom_addon = BotDBUtil.Module(session).check_target_enabled_module('wiki_fandom_addon')
elif isinstance(session, QueryInfo):
start_wiki = session.api
interwiki_list = []
headers = session.headers
prefix = session.prefix
enabled_fandom_addon = False
else:
raise TypeError('session must be MessageSession or QueryInfo.')
if start_wiki is None:
await msg.sendMessage('没有指定起始Wiki已默认指定为中文Minecraft Wiki发送~wiki set <域名>来设定自定义起始Wiki。'
'\n例子:~wiki set https://minecraft.fandom.com/zh/wiki/')
await session.sendMessage('没有指定起始Wiki已默认指定为中文Minecraft Wiki发送~wiki set <域名>来设定自定义起始Wiki。'
'\n例子:~wiki set https://minecraft.fandom.com/zh/wiki/')
start_wiki = 'https://minecraft.fandom.com/zh/api.php'
if isinstance(title, str):
title = [title]
@ -329,7 +338,7 @@ async def query_pages(msg: MessageSession, title: Union[str, list, tuple],
raise AbuseWarning('一次性查询的页面超出15个。')
query_task = {start_wiki: {'query': [], 'iw_prefix': ''}}
for t in title:
if prefix is not None and useprefix:
if prefix is not None and use_prefix:
t = prefix + t
if t[0] == ':':
if len(t) > 1:
@ -418,7 +427,7 @@ async def query_pages(msg: MessageSession, title: Union[str, list, tuple],
if r.file is not None:
dl_list.append(r.file)
else:
if msg.Feature.image and r.link is not None:
if r.link is not None:
web_render_list.append({r.link: r.info.realurl})
else:
plain_slice = []
@ -441,33 +450,37 @@ async def query_pages(msg: MessageSession, title: Union[str, list, tuple],
except WhatAreUDoingError:
raise AbuseWarning('使机器人重定向页面的次数过多。')
except InvalidWikiError as e:
await msg.sendMessage(f'发生错误:' + str(e))
if msg_list:
await msg.sendMessage(msg_list)
if web_render_list and msg.Feature.image:
infobox_msg_list = []
for i in web_render_list:
for ii in i:
get_infobox = await get_infobox_pic(i[ii], ii, headers)
if get_infobox:
infobox_msg_list.append(Image(get_infobox))
if infobox_msg_list:
await msg.sendMessage(infobox_msg_list, quote=False)
if dl_list:
for f in dl_list:
dl = await download_to_cache(f)
guess_type = filetype.guess(dl)
if guess_type is not None:
if guess_type.extension in ["png", "gif", "jpg", "jpeg", "webp", "bmp", "ico"]:
if msg.Feature.image:
await msg.sendMessage(Image(dl), quote=False)
elif guess_type.extension in ["oga", "ogg", "flac", "mp3", "wav"]:
if msg.Feature.voice:
await msg.sendMessage(Voice(dl), quote=False)
if wait_msg_list:
confirm = await msg.waitConfirm(wait_msg_list)
if confirm and wait_list:
await query_pages(msg, wait_list, useprefix=False)
await session.sendMessage(f'发生错误:' + str(e))
if isinstance(session, MessageSession):
if msg_list:
await session.sendMessage(msg_list)
if web_render_list and session.Feature.image:
infobox_msg_list = []
for i in web_render_list:
for ii in i:
get_infobox = await get_infobox_pic(i[ii], ii, headers)
if get_infobox:
infobox_msg_list.append(Image(get_infobox))
if infobox_msg_list:
await session.sendMessage(infobox_msg_list, quote=False)
if dl_list:
for f in dl_list:
dl = await download_to_cache(f)
guess_type = filetype.guess(dl)
if guess_type is not None:
if guess_type.extension in ["png", "gif", "jpg", "jpeg", "webp", "bmp", "ico"]:
if session.Feature.image:
await session.sendMessage(Image(dl), quote=False)
elif guess_type.extension in ["oga", "ogg", "flac", "mp3", "wav"]:
if session.Feature.voice:
await session.sendMessage(Voice(dl), quote=False)
if wait_msg_list:
confirm = await session.waitConfirm(wait_msg_list)
if confirm and wait_list:
await query_pages(session, wait_list, use_prefix=False)
else:
return {'msg_list': msg_list, 'web_render_list': web_render_list, 'dl_list': dl_list,
'wait_list': wait_list, 'wait_msg_list': wait_msg_list}
rc_ = on_command('rc', desc='获取默认wiki的最近更改', developers=['OasisAkari'])

View file

@ -34,6 +34,13 @@ class WhatAreUDoingError(Exception):
pass
class QueryInfo:
def __init__(self, api, headers=None, prefix=None):
self.api = api
self.headers = headers if headers is not None else {'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6'}
self.prefix = prefix
class WikiInfo:
def __init__(self,
api: str,