Archived
1
0
Fork 0
This commit is contained in:
yzhh 2022-01-08 23:14:49 +08:00
commit 5620d3cd78
8 changed files with 201 additions and 55 deletions

View file

@ -7,6 +7,10 @@ from core.elements.others import Secret, ErrorMessage
from typing import Union, List, Tuple
from core.logger import Logger
from ...logger import Logger
from ...logger import Logger
class MessageChain:
def __init__(self, elements: Union[str, List[Union[Plain, Image, Voice, Embed]],
@ -43,6 +47,7 @@ class MessageChain:
elif isinstance(elements, MessageChain):
self.value = elements.value
else:
Logger.error(f'Unexpected message type: {elements.__dict__}')
self.value.append(
Plain(ErrorMessage('机器人尝试发送非法消息链,请联系机器人开发者解决问题。')))
if not self.value:

33
modules/meme/__init__.py Normal file
View file

@ -0,0 +1,33 @@
from core.component import on_command
from core.elements import MessageSession
from core.dirty_check import check
from core.logger import Logger
from modules.meme.jiki import jiki
from modules.meme.moegirl import moegirl
from modules.meme.nbnhhsh import nbnhhsh
from modules.meme.urban import urban
meme = on_command(
bind_prefix='meme',
desc='全功能梗查询。',
developers=['Dianliang233'])
@meme.handle(help_doc='<term> {在萌娘百科、nbnhhsh、Urban Dictionary 中查询梗}')
async def _(msg: MessageSession):
# jiki is not going to be supported as least for a while due to the strict anti-bot config.
# res_jiki = await jiki(msg.parsed_msg['<term>'])
res_moegirl = await moegirl(msg.parsed_msg['<term>'])
res_nbnhhsh = await nbnhhsh(msg.parsed_msg['<term>'])
res_urban = await urban(msg.parsed_msg['<term>'])
# chk = await check(res_jiki, res_moegirl, res_nbnhhsh, res_urban)
chk = await check(res_moegirl, res_nbnhhsh, res_urban)
res = ''
for i in chk:
if not i['status']:
i = '[???] <REDACTED>'
res += i + '\n'
else:
res += i['content'] + '\n'
Logger.info('res:'+res)
await msg.sendMessage(res)

29
modules/meme/jiki.py Normal file
View file

@ -0,0 +1,29 @@
import traceback
import ujson as json
from core.utils.bot import post_url
async def jiki(term: str):
'''查询小鸡百科。
:param term: 需要查询的term
:returns: 查询结果'''
try:
url = 'https://api.jikipedia.com/go/search_entities' + term
text = await post_url(url, data={'page': 1, 'phrase': term, 'size': 1}, headers={'accept': '*/*',
'accept-encoding': 'gzip, deflate',
'accept-language': 'zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7,en-GB;q=0.6',
'content-type': 'application/json',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36 Edg/96.0.1054.62'})
print(text)
data = json.loads(text)
count = data['total']
result = data['data'][0]['definitions']
title = result['term']['title']
content = result['plaintext']
link = 'https://jikipedia.com/definition/' + result['id']
return f'[小鸡百科]{count}个结果):{title}\n{content}\n{link}'
except Exception:
traceback.print_exc()
return '[小鸡百科] 查询出错。'

30
modules/meme/moegirl.py Normal file
View file

@ -0,0 +1,30 @@
import re
from modules.wiki import query_pages
from modules.wiki.wikilib_v2 import QueryInfo
async def moegirl(term: str):
result = await query_pages(QueryInfo('https://zh.moegirl.org.cn/api.php', headers={'accept': '*/*',
'accept-encoding': 'gzip, deflate',
'accept-language': 'zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7,en-GB;q=0.6',
'content-type': 'application/json',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36 Edg/96.0.1054.62'}), term)
msg = ''
if result['msg_list']:
for msg_item in result['msg_list']:
msg += msg_item.text
if result['wait_msg_list']:
for msg_item in result['wait_msg_list']:
print('msg_item.text: ', msg_item.text)
redirect = re.search(r'(?<=是\[)(.*?)(?=\])', msg_item.text)[0]
print(redirect)
if redirect:
wait = await query_pages(QueryInfo('https://zh.moegirl.org.cn/api.php', headers={'accept': '*/*',
'accept-encoding': 'gzip, deflate',
'accept-language': 'zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7,en-GB;q=0.6',
'content-type': 'application/json',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36 Edg/96.0.1054.62'
}), redirect)
msg += wait['msg_list'][0].text
return '[萌娘百科] ' + msg

View file

@ -2,23 +2,10 @@ import traceback
import ujson as json
from core.elements import MessageSession
from core.component import on_command
from core.utils import post_url
n = on_command(
bind_prefix='nbnhhsh',
desc='能不能好好说话?拼音首字母缩写释义工具',
developers=['Dianliang233'],)
@n.handle('<term>')
async def _(msg: MessageSession):
await msg.sendMessage(await nbnhhsh(msg.parsed_msg['<term>']))
async def nbnhhsh(term: str):
'''查询nbnhhsh。
:param term: 需要查询的term
:returns: 查询结果'''
try:

42
modules/meme/urban.py Normal file
View file

@ -0,0 +1,42 @@
import traceback
import ujson as json
from config import Config
from core.utils import get_url
async def urban(term: str):
'''查询urban dictionary。
:param term: 需要查询的term
:returns: 查询结果'''
try:
url = 'http://api.urbandictionary.com/v0/define?term=' + term
webrender = Config('web_render')
if webrender:
url = webrender + 'source?url=' + url
text = await get_url(url, headers={'accept': '*/*',
'accept-encoding': 'gzip, deflate',
'accept-language': 'zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7,en-GB;q=0.6',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36 Edg/96.0.1054.62'})
print(text)
data = json.loads(text)['list']
if data == []:
return '[Urban Dictionary] 没有找到相关结果。'
else:
count = data.__len__()
word = data[0]['word']
definition = limit_length(data[0]['definition'])
example = limit_length(data[0]['example'])
link = data[0]['permalink']
return f'[Urban Dictionary]{count}个结果):\n{word}\n{definition}\nExample: {example}\n{link}'
except Exception:
traceback.print_exc()
return '[Urban Dictionary] 查询出错。'
def limit_length(text, limit=50):
new = text
length = new.split(' ').__len__()
if length > limit:
new = ' '.join(new.split(' ')[0:limit]) + ''
return new

View file

@ -19,7 +19,7 @@ from .utils.ab_qq import ab_qq
from .utils.newbie import newbie
from .utils.rc import rc
from .utils.rc_qq import rc_qq
from .wikilib_v2 import WikiLib, WhatAreUDoingError, PageInfo, InvalidWikiError
from .wikilib_v2 import WikiLib, WhatAreUDoingError, PageInfo, InvalidWikiError, QueryInfo
wiki = on_command('wiki',
alias={'wiki_start_site': 'wiki set', 'interwiki': 'wiki iw'},
@ -261,8 +261,7 @@ async def _(msg: MessageSession):
await msg.sendMessage(send_msgs)
legacy = False
if legacy:
wikis = []
wikis.append('现有白名单:')
wikis = ['现有白名单:']
for al in allow_list:
wikis.append(f'{al[0]}by {al[1]}')
wikis.append('现有黑名单:')
@ -311,17 +310,27 @@ async def _(msg: MessageSession):
await query_pages(msg, query_list, mediawiki=True)
async def query_pages(msg: MessageSession, title: Union[str, list, tuple],
template=False, mediawiki=False, useprefix=True):
target = WikiTargetInfo(msg)
start_wiki = target.get_start_wiki()
interwiki_list = target.get_interwikis()
headers = target.get_headers()
prefix = target.get_prefix()
enabled_fandom_addon = BotDBUtil.Module(msg).check_target_enabled_module('wiki_fandom_addon')
async def query_pages(session: Union[MessageSession, QueryInfo], title: Union[str, list, tuple],
template=False, mediawiki=False, use_prefix=True):
if isinstance(session, MessageSession):
target = WikiTargetInfo(session)
start_wiki = target.get_start_wiki()
interwiki_list = target.get_interwikis()
headers = target.get_headers()
prefix = target.get_prefix()
enabled_fandom_addon = BotDBUtil.Module(session).check_target_enabled_module('wiki_fandom_addon')
elif isinstance(session, QueryInfo):
start_wiki = session.api
interwiki_list = []
headers = session.headers
prefix = session.prefix
enabled_fandom_addon = False
else:
raise TypeError('session must be MessageSession or QueryInfo.')
if start_wiki is None:
await msg.sendMessage('没有指定起始Wiki已默认指定为中文Minecraft Wiki发送~wiki set <域名>来设定自定义起始Wiki。'
'\n例子:~wiki set https://minecraft.fandom.com/zh/wiki/')
await session.sendMessage('没有指定起始Wiki已默认指定为中文Minecraft Wiki发送~wiki set <域名>来设定自定义起始Wiki。'
'\n例子:~wiki set https://minecraft.fandom.com/zh/wiki/')
start_wiki = 'https://minecraft.fandom.com/zh/api.php'
if isinstance(title, str):
title = [title]
@ -329,7 +338,7 @@ async def query_pages(msg: MessageSession, title: Union[str, list, tuple],
raise AbuseWarning('一次性查询的页面超出15个。')
query_task = {start_wiki: {'query': [], 'iw_prefix': ''}}
for t in title:
if prefix is not None and useprefix:
if prefix is not None and use_prefix:
t = prefix + t
if t[0] == ':':
if len(t) > 1:
@ -418,7 +427,7 @@ async def query_pages(msg: MessageSession, title: Union[str, list, tuple],
if r.file is not None:
dl_list.append(r.file)
else:
if msg.Feature.image and r.link is not None:
if r.link is not None:
web_render_list.append({r.link: r.info.realurl})
else:
plain_slice = []
@ -441,33 +450,37 @@ async def query_pages(msg: MessageSession, title: Union[str, list, tuple],
except WhatAreUDoingError:
raise AbuseWarning('使机器人重定向页面的次数过多。')
except InvalidWikiError as e:
await msg.sendMessage(f'发生错误:' + str(e))
if msg_list:
await msg.sendMessage(msg_list)
if web_render_list and msg.Feature.image:
infobox_msg_list = []
for i in web_render_list:
for ii in i:
get_infobox = await get_infobox_pic(i[ii], ii, headers)
if get_infobox:
infobox_msg_list.append(Image(get_infobox))
if infobox_msg_list:
await msg.sendMessage(infobox_msg_list, quote=False)
if dl_list:
for f in dl_list:
dl = await download_to_cache(f)
guess_type = filetype.guess(dl)
if guess_type is not None:
if guess_type.extension in ["png", "gif", "jpg", "jpeg", "webp", "bmp", "ico"]:
if msg.Feature.image:
await msg.sendMessage(Image(dl), quote=False)
elif guess_type.extension in ["oga", "ogg", "flac", "mp3", "wav"]:
if msg.Feature.voice:
await msg.sendMessage(Voice(dl), quote=False)
if wait_msg_list:
confirm = await msg.waitConfirm(wait_msg_list)
if confirm and wait_list:
await query_pages(msg, wait_list, useprefix=False)
await session.sendMessage(f'发生错误:' + str(e))
if isinstance(session, MessageSession):
if msg_list:
await session.sendMessage(msg_list)
if web_render_list and session.Feature.image:
infobox_msg_list = []
for i in web_render_list:
for ii in i:
get_infobox = await get_infobox_pic(i[ii], ii, headers)
if get_infobox:
infobox_msg_list.append(Image(get_infobox))
if infobox_msg_list:
await session.sendMessage(infobox_msg_list, quote=False)
if dl_list:
for f in dl_list:
dl = await download_to_cache(f)
guess_type = filetype.guess(dl)
if guess_type is not None:
if guess_type.extension in ["png", "gif", "jpg", "jpeg", "webp", "bmp", "ico"]:
if session.Feature.image:
await session.sendMessage(Image(dl), quote=False)
elif guess_type.extension in ["oga", "ogg", "flac", "mp3", "wav"]:
if session.Feature.voice:
await session.sendMessage(Voice(dl), quote=False)
if wait_msg_list:
confirm = await session.waitConfirm(wait_msg_list)
if confirm and wait_list:
await query_pages(session, wait_list, use_prefix=False)
else:
return {'msg_list': msg_list, 'web_render_list': web_render_list, 'dl_list': dl_list,
'wait_list': wait_list, 'wait_msg_list': wait_msg_list}
rc_ = on_command('rc', desc='获取默认wiki的最近更改', developers=['OasisAkari'])

View file

@ -34,6 +34,13 @@ class WhatAreUDoingError(Exception):
pass
class QueryInfo:
def __init__(self, api, headers=None, prefix=None):
self.api = api
self.headers = headers if headers is not None else {'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6'}
self.prefix = prefix
class WikiInfo:
def __init__(self,
api: str,