add auto-parse infobox, section, image and sound in urls to wiki_inline module
This commit is contained in:
parent
061e2e9149
commit
d4d06c767a
5 changed files with 112 additions and 14 deletions
19
console.py
19
console.py
|
@ -10,6 +10,9 @@ if not Config('db_path'):
|
|||
'(Also you can fill in the above example directly,'
|
||||
' bot will automatically create a SQLite database in the "./database/save.db")')
|
||||
|
||||
from database import BotDBUtil, session
|
||||
from database.tables import DBVersion
|
||||
|
||||
import asyncio
|
||||
import traceback
|
||||
import aioconsole
|
||||
|
@ -21,6 +24,22 @@ from core.parser.message import parser
|
|||
from core.utils import init, init_async
|
||||
from core.logger import Logger
|
||||
|
||||
|
||||
query_dbver = session.query(DBVersion).first()
|
||||
if query_dbver is None:
|
||||
session.add_all([DBVersion(value='2')])
|
||||
session.commit()
|
||||
query_dbver = session.query(DBVersion).first()
|
||||
|
||||
if (current_ver := int(query_dbver.value)) < (target_ver := BotDBUtil.database_version):
|
||||
print(f'Updating database from {current_ver} to {target_ver}...')
|
||||
from database.update import update_database
|
||||
|
||||
update_database()
|
||||
print('Database updated successfully! Please restart the program.')
|
||||
sys.exit()
|
||||
|
||||
|
||||
EnableDirtyWordCheck.status = True
|
||||
PrivateAssets.set(os.path.abspath(os.path.dirname(__file__) + '/assets'))
|
||||
init()
|
||||
|
|
|
@ -51,7 +51,8 @@ class Bind:
|
|||
def __init__(self, bind_prefix):
|
||||
self.bind_prefix = bind_prefix
|
||||
|
||||
def handle(self, pattern: str, mode: str = "M", flags: re.RegexFlag = 0, show_typing: bool = True):
|
||||
def handle(self, pattern: Union[str, re.Pattern], mode: str = "M", flags: re.RegexFlag = 0,
|
||||
show_typing: bool = True):
|
||||
def decorator(function):
|
||||
ModulesManager.bind_to_module(self.bind_prefix, RegexMeta(function=function,
|
||||
pattern=pattern,
|
||||
|
|
|
@ -7,6 +7,7 @@ from core.elements import Plain, Image as BImage, Session, MsgInfo, FetchTarget
|
|||
FetchedSession as FS, FinishedSession as FinS, AutoSession as AS
|
||||
from core.elements.message.chain import MessageChain
|
||||
from core.elements.others import confirm_command
|
||||
from core.logger import Logger
|
||||
|
||||
|
||||
class FinishedSession(FinS):
|
||||
|
@ -36,9 +37,12 @@ class Template(MS):
|
|||
if isinstance(x, Plain):
|
||||
msg_list.append(x.text)
|
||||
print(x.text)
|
||||
Logger.info(f'[Bot] -> [{self.target.targetId}]: {x.text}')
|
||||
if isinstance(x, BImage):
|
||||
img = Image.open(await x.get())
|
||||
image_path = await x.get()
|
||||
img = Image.open(image_path)
|
||||
img.show()
|
||||
Logger.info(f'[Bot] -> [{self.target.targetId}]: Image: {image_path}')
|
||||
return FinishedSession([0], ['There should be a callable here... hmm...'])
|
||||
|
||||
async def waitConfirm(self, msgchain=None, quote=True, delete=True):
|
||||
|
|
|
@ -45,7 +45,7 @@ class CommandMeta:
|
|||
class RegexMeta:
|
||||
def __init__(self,
|
||||
function: Callable = None,
|
||||
pattern: str = None,
|
||||
pattern: Union[str, re.Pattern] = None,
|
||||
mode: str = None,
|
||||
flags: re.RegexFlag = 0,
|
||||
show_typing: bool = True,
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
import asyncio
|
||||
import re
|
||||
import traceback
|
||||
import urllib.parse
|
||||
from typing import Union
|
||||
|
||||
import filetype
|
||||
|
@ -8,6 +9,7 @@ import ujson as json
|
|||
|
||||
from core.builtins.message import MessageSession
|
||||
from core.component import on_command, on_regex
|
||||
from core.dirty_check import check
|
||||
from core.elements import Plain, Image, Voice, Url, confirm_command
|
||||
from core.exceptions import AbuseWarning
|
||||
from core.logger import Logger
|
||||
|
@ -246,7 +248,7 @@ async def _(msg: MessageSession):
|
|||
req = msg.parsed_msg
|
||||
api = req['<apiLink>']
|
||||
check = await WikiLib(api).check_wiki_available()
|
||||
if check:
|
||||
if check.available:
|
||||
api = check.value.api
|
||||
if req.get('distrust', False):
|
||||
res = Audit(api).remove_from_AllowList()
|
||||
|
@ -269,7 +271,7 @@ async def _(msg: MessageSession):
|
|||
req = msg.parsed_msg
|
||||
api = req['<apiLink>']
|
||||
check = await WikiLib(api).check_wiki_available()
|
||||
if check:
|
||||
if check.available:
|
||||
api = check.value.api
|
||||
audit = Audit(api)
|
||||
allow = audit.inAllowList
|
||||
|
@ -333,7 +335,7 @@ wiki_inline = on_regex('wiki_inline',
|
|||
alias='wiki_regex', developers=['OasisAkari'])
|
||||
|
||||
|
||||
@wiki_inline.handle(r'\[\[(.*?)]]', mode='A', flags=re.I)
|
||||
@wiki_inline.handle(re.compile(r'\[\[(.*?)]]', flags=re.I), mode='A')
|
||||
async def _(msg: MessageSession):
|
||||
query_list = []
|
||||
for x in msg.matched_msg:
|
||||
|
@ -343,10 +345,9 @@ async def _(msg: MessageSession):
|
|||
await query_pages(msg, query_list, inline_mode=True)
|
||||
|
||||
|
||||
@wiki_inline.handle(r'\{\{(.*?)}}', mode='A', flags=re.I)
|
||||
@wiki_inline.handle(re.compile(r'\{\{(.*?)}}', flags=re.I), mode='A')
|
||||
async def _(msg: MessageSession):
|
||||
query_list = []
|
||||
Logger.debug(msg.matched_msg)
|
||||
for x in msg.matched_msg:
|
||||
if x != '' and x not in query_list and x[0] != '#' and x.find("{") == -1:
|
||||
query_list.append(x.split("|")[0])
|
||||
|
@ -354,10 +355,9 @@ async def _(msg: MessageSession):
|
|||
await query_pages(msg, query_list, template=True, inline_mode=True)
|
||||
|
||||
|
||||
@wiki_inline.handle(r'≺(.*?)≻|⧼(.*?)⧽', mode='A', flags=re.I, show_typing=False)
|
||||
@wiki_inline.handle(re.compile(r'≺(.*?)≻|⧼(.*?)⧽', flags=re.I), mode='A', show_typing=False)
|
||||
async def _(msg: MessageSession):
|
||||
query_list = []
|
||||
Logger.debug(msg.matched_msg)
|
||||
for x in msg.matched_msg:
|
||||
for y in x:
|
||||
if y != '' and y not in query_list and y[0] != '#':
|
||||
|
@ -366,6 +366,79 @@ async def _(msg: MessageSession):
|
|||
await query_pages(msg, query_list, mediawiki=True, inline_mode=True)
|
||||
|
||||
|
||||
@wiki_inline.handle(re.compile(
|
||||
r'(https?://[-a-zA-Z0-9@:%._+~#=]{2,256}\.[a-z]{2,4}\b[-a-zA-Z0-9@:%_+.~#?&/=]*)', flags=re.I),
|
||||
mode='A', show_typing=False)
|
||||
async def _(msg: MessageSession):
|
||||
async def bgtask(msg: MessageSession):
|
||||
query_list = []
|
||||
target = WikiTargetInfo(msg)
|
||||
headers = target.get_headers()
|
||||
iws = target.get_interwikis()
|
||||
wikis = [target.get_start_wiki()] + [iws[w] for w in iws]
|
||||
Logger.debug(msg.matched_msg)
|
||||
for x in msg.matched_msg:
|
||||
if wiki_ := await WikiLib(x).check_wiki_available():
|
||||
if wiki_.available and wiki_.value.api in wikis:
|
||||
query_list.append({x: wiki_.value})
|
||||
if query_list:
|
||||
for q in query_list:
|
||||
async def infobox():
|
||||
if msg.Feature.image:
|
||||
for qq in q:
|
||||
get_infobox = await get_pic(q[qq].realurl, qq, headers, allow_special_page=q[qq].in_allowlist)
|
||||
if get_infobox:
|
||||
await msg.sendMessage(Image(get_infobox), quote=False)
|
||||
|
||||
async def section():
|
||||
if msg.Feature.image:
|
||||
for qq in q:
|
||||
section_ = []
|
||||
quote_code = False
|
||||
for qs in qq:
|
||||
if qs == '#':
|
||||
quote_code = True
|
||||
if qs == '?':
|
||||
quote_code = False
|
||||
if quote_code:
|
||||
section_.append(qs)
|
||||
if section_:
|
||||
s = urllib.parse.unquote(''.join(section_)[1:])
|
||||
if q[qq].realurl:
|
||||
get_section = await get_pic(q[qq].realurl, qq, headers, section=s)
|
||||
if get_section:
|
||||
await msg.sendMessage(Image(get_section))
|
||||
|
||||
async def image_and_voice():
|
||||
for qq in q:
|
||||
articlepath = q[qq].articlepath.replace('$1', '(.*)')
|
||||
get_title = re.sub(r'' + articlepath, '\\1', qq)
|
||||
if get_title != '':
|
||||
title = urllib.parse.unquote(get_title)
|
||||
if not q[qq].in_allowlist:
|
||||
for result in await check(title):
|
||||
if not result['status']:
|
||||
return
|
||||
|
||||
wiki_ = WikiLib(qq)
|
||||
get_page = await wiki_.parse_page_info(title)
|
||||
if get_page.status and get_page.file is not None:
|
||||
dl = await download_to_cache(get_page.file)
|
||||
guess_type = filetype.guess(dl)
|
||||
if guess_type is not None:
|
||||
if guess_type.extension in ["png", "gif", "jpg", "jpeg", "webp", "bmp", "ico"]:
|
||||
if msg.Feature.image:
|
||||
await msg.sendMessage(Image(dl), quote=False)
|
||||
elif guess_type.extension in ["oga", "ogg", "flac", "mp3", "wav"]:
|
||||
if msg.Feature.voice:
|
||||
await msg.sendMessage(Voice(dl), quote=False)
|
||||
|
||||
asyncio.create_task(infobox())
|
||||
asyncio.create_task(section())
|
||||
asyncio.create_task(image_and_voice())
|
||||
asyncio.create_task(bgtask(msg))
|
||||
|
||||
|
||||
async def search_pages(session: MessageSession, title: Union[str, list, tuple], use_prefix=True):
|
||||
target = WikiTargetInfo(session)
|
||||
start_wiki = target.get_start_wiki()
|
||||
|
@ -677,9 +750,10 @@ async def query_pages(session: Union[MessageSession, QueryInfo], title: Union[st
|
|||
section_msg_list = []
|
||||
for i in render_section_list:
|
||||
for ii in i:
|
||||
get_section = await get_pic(i[ii]['url'], ii, headers, section=i[ii]['section'])
|
||||
if get_section:
|
||||
section_msg_list.append(Image(get_section))
|
||||
if i[ii]['in_allowlist']:
|
||||
get_section = await get_pic(i[ii]['url'], ii, headers, section=i[ii]['section'])
|
||||
if get_section:
|
||||
section_msg_list.append(Image(get_section))
|
||||
if section_msg_list:
|
||||
await session.sendMessage(section_msg_list, quote=False)
|
||||
|
||||
|
|
Reference in a new issue