Archived
1
0
Fork 0
This repository has been archived on 2024-04-26. You can view files and clone it, but cannot push or open issues or pull requests.
akari-bot/modules/wiki/wikilib.py

669 lines
32 KiB
Python
Raw Normal View History

2021-11-12 14:25:53 +00:00
import asyncio
2021-10-02 07:05:46 +00:00
import datetime
2021-11-12 14:25:53 +00:00
import re
2021-10-02 07:05:46 +00:00
import traceback
2021-11-12 14:25:53 +00:00
import urllib.parse
2021-10-02 07:05:46 +00:00
from typing import Union, Dict, List
import ujson as json
2022-06-12 07:07:53 +00:00
import core.utils.html2text as html2text
2022-07-31 08:27:58 +00:00
from config import Config
2021-11-12 14:25:53 +00:00
from core.dirty_check import check
2022-01-29 11:25:23 +00:00
from core.elements import Url
2022-01-26 14:13:36 +00:00
from core.logger import Logger
2021-10-02 07:05:46 +00:00
from core.utils import get_url
from .dbutils import WikiSiteInfo as DBSiteInfo, Audit
2021-10-02 07:05:46 +00:00
class InvalidPageIDError(Exception):
pass
class InvalidWikiError(Exception):
pass
class DangerousContentError(Exception):
pass
class PageNotFound(Exception):
pass
2021-10-02 17:27:45 +00:00
class WhatAreUDoingError(Exception):
pass
class QueryInfo:
def __init__(self, api, headers=None, prefix=None):
self.api = api
2022-01-20 12:13:03 +00:00
self.headers = headers if headers is not None else {
'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6'}
self.prefix = prefix
2021-10-02 07:05:46 +00:00
class WikiInfo:
def __init__(self,
2022-02-09 12:25:25 +00:00
api: str = '',
articlepath: str = '',
extensions=None,
interwiki=None,
realurl: str = '',
name: str = '',
namespaces=None,
namespaces_local=None,
namespacealiases=None,
2022-02-09 12:25:25 +00:00
in_allowlist=False,
in_blocklist=False,
script: str = '',
logo_url: str = ''):
if extensions is None:
extensions = []
if interwiki is None:
interwiki = {}
2021-10-02 07:05:46 +00:00
self.api = api
self.articlepath = articlepath
self.extensions = extensions
self.interwiki = interwiki
self.realurl = realurl
self.name = name
self.namespaces = namespaces
self.namespaces_local = namespaces_local
self.namespacealiases = namespacealiases
self.in_allowlist = in_allowlist
2021-11-17 13:17:13 +00:00
self.in_blocklist = in_blocklist
2022-01-26 16:01:49 +00:00
self.script = script
2022-02-09 12:25:25 +00:00
self.logo_url = logo_url
2021-10-02 07:05:46 +00:00
class WikiStatus:
def __init__(self,
available: bool,
value: Union[WikiInfo, bool],
message: str):
self.available = available
self.value = value
self.message = message
class PageInfo:
def __init__(self,
info: WikiInfo,
title: str,
2022-01-26 16:01:49 +00:00
id: int = -1,
2021-10-02 17:27:45 +00:00
before_title: str = None,
link: str = None,
file: str = None,
desc: str = None,
args: str = None,
2022-01-29 11:25:23 +00:00
section: str = None,
2021-10-02 17:27:45 +00:00
interwiki_prefix: str = '',
2021-10-28 15:38:22 +00:00
status: bool = True,
2021-10-28 15:43:43 +00:00
before_page_property: str = 'page',
page_property: str = 'page',
2022-03-26 14:56:58 +00:00
invalid_namespace: Union[str, bool] = False
2021-10-02 07:05:46 +00:00
):
self.info = info
2022-01-26 16:01:49 +00:00
self.id = id
2021-10-02 07:05:46 +00:00
self.title = title
self.before_title = before_title
self.link = link
self.file = file
self.desc = desc
2021-10-02 17:27:45 +00:00
self.args = args
2022-01-29 11:25:23 +00:00
self.section = section
2021-10-02 17:27:45 +00:00
self.interwiki_prefix = interwiki_prefix
2021-10-02 07:05:46 +00:00
self.status = status
2021-10-28 15:38:22 +00:00
self.before_page_property = before_page_property
self.page_property = page_property
2021-10-28 15:43:43 +00:00
self.invalid_namespace = invalid_namespace
2021-10-02 07:05:46 +00:00
class WikiLib:
2021-11-08 13:08:43 +00:00
def __init__(self, url: str, headers=None):
2021-10-02 07:05:46 +00:00
self.url = url
2022-02-09 12:25:25 +00:00
self.wiki_info = WikiInfo()
2021-10-02 07:05:46 +00:00
self.headers = headers
2021-11-12 14:43:04 +00:00
async def get_json_from_api(self, api, log=False, **kwargs) -> dict:
2021-10-24 10:55:45 +00:00
if kwargs is not None:
2021-11-10 15:05:21 +00:00
api = api + '?' + urllib.parse.urlencode(kwargs) + '&format=json'
2022-08-04 07:52:42 +00:00
Logger.debug(api)
2021-11-12 09:38:45 +00:00
else:
raise ValueError('kwargs is None')
2022-04-29 16:49:30 +00:00
try:
return await get_url(api, status_code=200, headers=self.headers, fmt="json", log=log)
except Exception as e:
if api.find('moegirl.org.cn') != -1:
raise InvalidWikiError('尝试请求萌娘百科失败,可能站点正在遭受攻击。请直接访问萌娘百科以获取信息。')
raise e
2021-10-02 07:05:46 +00:00
2022-01-26 16:01:49 +00:00
def rearrange_siteinfo(self, info: Union[dict, str], wiki_api_link) -> WikiInfo:
2021-10-02 07:05:46 +00:00
if isinstance(info, str):
info = json.loads(info)
extensions = info['query']['extensions']
ext_list = []
for ext in extensions:
ext_list.append(ext['name'])
real_url = info['query']['general']['server']
if real_url.startswith('//'):
real_url = self.url.split('//')[0] + real_url
2022-03-20 12:57:01 +00:00
namespaces = {}
2021-10-02 07:05:46 +00:00
namespaces_local = {}
namespacealiases = {}
2021-10-02 07:05:46 +00:00
for x in info['query']['namespaces']:
try:
ns = info['query']['namespaces'][x]
if '*' in ns:
2022-03-20 12:57:01 +00:00
namespaces[ns['*']] = ns['id']
2021-10-02 07:05:46 +00:00
if 'canonical' in ns:
2022-03-20 12:57:01 +00:00
namespaces[ns['canonical']] = ns['id']
2021-10-02 07:05:46 +00:00
if '*' in ns and 'canonical' in ns:
namespaces_local.update({ns['*']: ns['canonical']})
except Exception:
traceback.print_exc()
for x in info['query']['namespacealiases']:
2021-10-03 06:54:16 +00:00
if '*' in x:
2022-03-20 12:57:01 +00:00
namespaces[x['*']] = x['id']
namespacealiases[x['*'].lower()] = x['*']
2021-10-02 07:05:46 +00:00
interwiki_map = info['query']['interwikimap']
interwiki_dict = {}
for interwiki in interwiki_map:
2021-10-03 07:30:17 +00:00
interwiki_dict[interwiki['prefix']] = interwiki['url']
2022-01-26 16:01:49 +00:00
api_url = wiki_api_link
2021-11-17 13:17:13 +00:00
audit = Audit(api_url)
2021-10-03 07:30:17 +00:00
return WikiInfo(articlepath=real_url + info['query']['general']['articlepath'],
2021-10-02 07:05:46 +00:00
extensions=ext_list,
name=info['query']['general']['sitename'],
realurl=real_url,
api=api_url,
2021-10-02 07:05:46 +00:00
namespaces=namespaces,
namespaces_local=namespaces_local,
namespacealiases=namespacealiases,
interwiki=interwiki_dict,
2021-11-17 13:17:13 +00:00
in_allowlist=audit.inAllowList,
2022-01-26 16:01:49 +00:00
in_blocklist=audit.inBlockList,
2022-02-09 12:25:25 +00:00
script=real_url + info['query']['general']['script'],
logo_url=info['query']['general'].get('logo'))
2021-10-02 07:05:46 +00:00
async def check_wiki_available(self):
try:
2021-11-08 13:16:59 +00:00
api_match = re.match(r'(https?://.*?/api.php$)', self.url)
2021-10-02 07:05:46 +00:00
wiki_api_link = api_match.group(1)
except Exception:
try:
get_page = await get_url(self.url, fmt='text', headers=self.headers)
if get_page.find('<title>Attention Required! | Cloudflare</title>') != -1:
return WikiStatus(available=False, value=False, message='CloudFlare拦截了机器人的请求请联系站点管理员解决此问题。')
m = re.findall(
r'(?im)<\s*link\s*rel="EditURI"\s*type="application/rsd\+xml"\s*href="([^>]+?)\?action=rsd"\s*/\s*>',
get_page)
api_match = m[0]
if api_match.startswith('//'):
api_match = self.url.split('//')[0] + api_match
2022-01-05 11:14:45 +00:00
# Logger.info(api_match)
2021-10-02 07:05:46 +00:00
wiki_api_link = api_match
2021-11-12 14:25:53 +00:00
except (TimeoutError, asyncio.TimeoutError):
2021-10-02 07:05:46 +00:00
return WikiStatus(available=False, value=False, message='错误:尝试建立连接超时。')
except Exception as e:
traceback.print_exc()
if e.args == (403,):
2021-11-08 13:08:43 +00:00
message = '服务器拒绝了机器人的请求。'
2021-10-02 07:05:46 +00:00
elif not re.match(r'^(https?://).*', self.url):
2021-11-08 13:08:43 +00:00
message = '所给的链接没有指明协议头链接应以http://或https://开头)。'
2021-10-02 07:05:46 +00:00
else:
2021-11-08 13:08:43 +00:00
message = '此站点也许不是一个有效的Mediawiki' + str(e)
if self.url.find('moegirl.org.cn') != -1:
message += '\n萌娘百科的api接口不稳定请稍后再试或直接访问站点。'
return WikiStatus(available=False, value=False, message=message)
2021-10-02 07:05:46 +00:00
get_cache_info = DBSiteInfo(wiki_api_link).get()
if get_cache_info and datetime.datetime.now().timestamp() - get_cache_info[1].timestamp() < 43200:
return WikiStatus(available=True,
2022-01-26 16:01:49 +00:00
value=self.rearrange_siteinfo(get_cache_info[0], wiki_api_link),
2021-10-02 07:05:46 +00:00
message='')
try:
2021-11-12 14:43:04 +00:00
get_json = await self.get_json_from_api(wiki_api_link, log=True,
2021-11-12 09:38:45 +00:00
action='query',
meta='siteinfo',
siprop='general|namespaces|namespacealiases|interwikimap|extensions')
2021-10-02 07:05:46 +00:00
except Exception as e:
2021-11-12 14:25:53 +00:00
traceback.print_exc()
2021-11-12 14:43:04 +00:00
message = '从API获取信息时出错' + str(e)
if self.url.find('moegirl.org.cn') != -1:
message += '\n萌娘百科的api接口不稳定请稍后再试或直接访问站点。'
return WikiStatus(available=False, value=False, message=message)
2021-10-02 07:05:46 +00:00
DBSiteInfo(wiki_api_link).update(get_json)
2022-01-26 16:01:49 +00:00
info = self.rearrange_siteinfo(get_json, wiki_api_link)
2021-10-02 07:05:46 +00:00
return WikiStatus(available=True, value=info,
message='警告此wiki没有启用TextExtracts扩展返回的页面预览内容将为未处理的原始Wikitext文本。'
2021-10-02 17:27:45 +00:00
if 'TextExtracts' not in info.extensions else '')
2021-10-02 07:05:46 +00:00
2021-10-03 06:54:16 +00:00
async def fixup_wiki_info(self):
if self.wiki_info.api == '':
wiki_info = await self.check_wiki_available()
if wiki_info.available:
self.wiki_info = wiki_info.value
else:
raise InvalidWikiError(wiki_info.message if wiki_info.message != '' else '')
2021-10-02 07:05:46 +00:00
2021-11-12 09:38:45 +00:00
async def get_json(self, **kwargs) -> dict:
await self.fixup_wiki_info()
api = self.wiki_info.api
return await self.get_json_from_api(api, **kwargs)
2021-10-02 07:05:46 +00:00
@staticmethod
def parse_text(text):
try:
desc = text.split('\n')
desc_list = []
for x in desc:
if x != '':
desc_list.append(x)
desc = '\n'.join(desc_list)
desc_end = re.findall(r'(.*?(?:!\s|\?\s|\.\s|||。)).*', desc, re.S | re.M)
if desc_end:
2021-11-24 13:07:03 +00:00
if re.findall(r'[({\[>\"\'《【‘“「(]', desc_end[0]):
desc_end = re.findall(r'(.*?[)}\]>\"\'》】’”」)].*?(?:!\s|\?\s|\.\s|||。)).*', desc, re.S | re.M)
2021-10-02 07:05:46 +00:00
desc = desc_end[0]
except Exception:
traceback.print_exc()
desc = ''
if desc in ['...', '']:
desc = ''
2021-10-02 17:27:45 +00:00
ell = False
if len(desc) > 250:
desc = desc[0:250]
ell = True
split_desc = desc.split('\n')
for d in split_desc:
if d == '':
split_desc.remove('')
if len(split_desc) > 5:
split_desc = split_desc[0:5]
ell = True
2021-10-03 06:54:16 +00:00
return '\n'.join(split_desc) + ('...' if ell else '')
2021-10-02 07:05:46 +00:00
2022-01-28 12:55:50 +00:00
async def get_html_to_text(self, page_name, section=None):
2021-10-03 06:54:16 +00:00
await self.fixup_wiki_info()
2021-11-12 09:38:45 +00:00
get_parse = await self.get_json(action='parse',
2021-10-24 10:55:45 +00:00
page=page_name,
2021-11-10 15:05:21 +00:00
prop='text')
2021-10-02 07:05:46 +00:00
h = html2text.HTML2Text()
h.ignore_links = True
h.ignore_images = True
h.ignore_tables = True
2021-11-20 15:30:03 +00:00
h.single_line_break = True
2022-01-28 12:55:50 +00:00
t = h.handle(get_parse['parse']['text']['*'])
if section is not None:
s = re.split(r'(.*##[^#].*\[.*?])', t, re.M | re.S)
ls = len(s)
if ls > 1:
i = 0
for x in s:
i += 1
if re.match(r'##[^#]' + section + r'\[.*?]', x):
break
if i != ls:
t = ''.join(s[i:])
return t
2021-10-02 07:05:46 +00:00
async def get_wikitext(self, page_name):
2021-10-03 06:54:16 +00:00
await self.fixup_wiki_info()
2021-10-02 07:05:46 +00:00
try:
2021-11-12 09:38:45 +00:00
load_desc = await self.get_json(action='parse',
2021-10-24 10:55:45 +00:00
page=page_name,
2021-11-10 15:05:21 +00:00
prop='wikitext')
2021-10-02 07:05:46 +00:00
desc = load_desc['parse']['wikitext']['*']
except Exception:
traceback.print_exc()
desc = ''
return desc
2022-06-28 06:59:25 +00:00
async def search_page(self, search_text, namespace='*', limit=10):
2021-10-03 06:54:16 +00:00
await self.fixup_wiki_info()
2022-06-28 06:59:25 +00:00
title_split = search_text.split(':')
if title_split[0] in self.wiki_info.interwiki:
search_text = ':'.join(title_split[1:])
q_site = WikiLib(self.wiki_info.interwiki[title_split[0]], self.headers)
result = await q_site.search_page(search_text, namespace, limit)
2022-06-28 06:59:25 +00:00
result_ = []
for r in result:
result_.append(title_split[0] + ':' + r)
return result_
2021-11-12 09:38:45 +00:00
get_page = await self.get_json(action='query',
2021-10-24 10:55:45 +00:00
list='search',
2022-06-28 06:59:25 +00:00
srsearch=search_text,
2022-03-20 12:57:01 +00:00
srnamespace=namespace,
2021-10-24 10:55:45 +00:00
srwhat='text',
2022-06-28 06:59:25 +00:00
srlimit=limit,
2021-11-10 15:05:21 +00:00
srenablerewrites=True)
2022-06-28 06:59:25 +00:00
pagenames = []
for x in get_page['query']['search']:
pagenames.append(x['title'])
return pagenames
async def research_page(self, page_name: str, namespace='*'):
await self.fixup_wiki_info()
get_titles = await self.search_page(page_name, namespace=namespace, limit=1)
new_page_name = get_titles[0] if len(get_titles) > 0 else None
2021-10-02 07:05:46 +00:00
title_split = page_name.split(':')
2022-03-26 14:56:58 +00:00
invalid_namespace = False
if len(title_split) > 1 and title_split[0] not in self.wiki_info.namespaces \
and title_split[0].lower() not in self.wiki_info.namespacealiases:
2022-03-26 14:56:58 +00:00
invalid_namespace = title_split[0]
return new_page_name, invalid_namespace
2021-10-02 07:05:46 +00:00
async def parse_page_info(self, title: str = None, pageid: int = None, inline=False, lang=None, _doc=False,
2022-04-29 16:49:30 +00:00
_tried=0, _prefix='', _iw=False, _search=False) -> PageInfo:
2021-11-10 15:05:21 +00:00
"""
2022-01-27 13:16:53 +00:00
:param title: 页面标题如果为None则使用pageid
:param pageid: 页面id
2022-04-29 16:49:30 +00:00
:param inline: 是否为inline模式
:param lang: 所需的对应语言版本
2022-04-29 16:49:30 +00:00
:param _doc: 是否为文档模式仅用作内部递归调用判断
:param _tried: 尝试iw跳转的次数仅用作内部递归调用判断
:param _prefix: iw前缀仅用作内部递归调用判断
:param _iw: 是否为iw模式仅用作内部递归调用判断
:param _search: 是否为搜索模式仅用作内部递归调用判断
2021-11-10 15:05:21 +00:00
:return:
"""
2021-11-12 14:25:53 +00:00
try:
await self.fixup_wiki_info()
except InvalidWikiError as e:
2021-11-12 14:43:04 +00:00
link = None
2021-11-12 14:25:53 +00:00
if self.url.find('$1') != -1:
link = self.url.replace('$1', title)
2022-01-27 13:16:53 +00:00
return PageInfo(title=title if title is not None else pageid, id=pageid,
link=link, desc='发生错误:' + str(e), info=self.wiki_info)
2021-11-17 13:17:13 +00:00
ban = False
if self.wiki_info.in_blocklist and not self.wiki_info.in_allowlist:
ban = True
2022-04-29 16:49:30 +00:00
if _tried > 5:
2022-06-16 11:49:41 +00:00
if Config('enable_tos'):
raise WhatAreUDoingError
2022-01-28 12:55:50 +00:00
section = None
2022-01-27 13:16:53 +00:00
if title is not None:
if title == '':
return PageInfo(title='', link=self.wiki_info.articlepath.replace("$1", ""), info=self.wiki_info,
interwiki_prefix=_prefix)
2022-04-29 16:49:30 +00:00
if inline:
split_name = re.split(r'([#])', title)
else:
split_name = re.split(r'([#?])', title)
2022-01-27 13:16:53 +00:00
title = re.sub('_', ' ', split_name[0])
arg_list = []
2022-04-29 16:49:30 +00:00
_arg_list = []
2022-01-28 12:55:50 +00:00
section_list = []
2022-01-27 13:16:53 +00:00
quote_code = False
for a in split_name[1:]:
2022-04-29 16:49:30 +00:00
if len(a) > 0:
if a[0] == '#':
quote_code = True
if a[0] == '?':
quote_code = False
if quote_code:
2022-08-03 09:31:04 +00:00
arg_list.append(urllib.parse.quote(a))
2022-08-03 08:54:51 +00:00
section_list.append(a)
2022-04-29 16:49:30 +00:00
else:
_arg_list.append(a)
_arg = ''.join(_arg_list)
if _arg.find('=') != -1:
arg_list.append(_arg)
else:
2022-04-30 04:20:54 +00:00
if len(arg_list) > 0:
arg_list[-1] += _arg
else:
title += _arg
2022-01-28 12:55:50 +00:00
if len(section_list) > 1:
section = ''.join(section_list)[1:]
2022-04-29 16:49:30 +00:00
page_info = PageInfo(info=self.wiki_info, title=title, args=''.join(arg_list), interwiki_prefix=_prefix)
2022-01-29 11:25:23 +00:00
page_info.section = section
query_string = {'action': 'query', 'prop': 'info|imageinfo|langlinks', 'llprop': 'url',
'inprop': 'url', 'iiprop': 'url',
2022-01-27 13:16:53 +00:00
'redirects': 'True', 'titles': title}
elif pageid is not None:
2022-04-29 16:49:30 +00:00
page_info = PageInfo(info=self.wiki_info, title=title, args='', interwiki_prefix=_prefix)
query_string = {'action': 'query', 'prop': 'info|imageinfo|langlinks', 'llprop': 'url', 'inprop': 'url',
'iiprop': 'url', 'redirects': 'True', 'pageids': pageid}
2022-01-27 13:16:53 +00:00
else:
raise ValueError('title and pageid cannot be both None')
2021-10-02 07:05:46 +00:00
use_textextracts = True if 'TextExtracts' in self.wiki_info.extensions else False
2022-01-28 12:55:50 +00:00
if use_textextracts and section is None:
query_string.update({'prop': 'info|imageinfo|langlinks|extracts|pageprops',
2021-10-02 07:05:46 +00:00
'ppprop': 'description|displaytitle|disambiguation|infoboxes', 'explaintext': 'true',
'exsectionformat': 'plain', 'exchars': '200'})
2021-11-12 09:38:45 +00:00
get_page = await self.get_json(**query_string)
2021-10-02 07:05:46 +00:00
query = get_page.get('query')
2021-12-14 04:55:42 +00:00
if query is None:
2021-12-14 05:01:59 +00:00
return PageInfo(title=title, link=None, desc='发生错误API未返回任何内容请联系此站点管理员获取原因。',
info=self.wiki_info)
2021-10-30 15:55:57 +00:00
redirects_: List[Dict[str, str]] = query.get('redirects')
2021-10-02 07:05:46 +00:00
if redirects_ is not None:
for r in redirects_:
2021-10-30 15:55:57 +00:00
if r['from'] == title:
page_info.before_title = r['from']
page_info.title = r['to']
normalized_: List[Dict[str, str]] = query.get('normalized')
2021-10-02 07:05:46 +00:00
if normalized_ is not None:
for n in normalized_:
2021-10-30 15:55:57 +00:00
if n['from'] == title:
page_info.before_title = n['from']
page_info.title = n['to']
pages: Dict[str, dict] = query.get('pages')
# print(pages)
2021-10-02 07:05:46 +00:00
if pages is not None:
for page_id in pages:
page_info.status = False
2022-01-26 16:01:49 +00:00
page_info.id = int(page_id)
2021-10-02 07:05:46 +00:00
page_raw = pages[page_id]
2022-07-26 11:28:21 +00:00
if 'invalid' in page_raw:
rs1 = re.sub('The requested page title contains invalid characters:', '请求的页面标题包含非法字符:',
page_raw['invalidreason'])
rs = '发生错误:“' + rs1 + '”。'
rs = re.sub('".”', '"', rs)
page_info.desc = rs
elif 'missing' in page_raw:
2022-01-27 13:16:53 +00:00
if 'title' in page_raw:
2022-07-26 11:28:21 +00:00
if 'known' in page_raw:
2022-02-09 12:25:25 +00:00
full_url = re.sub(r'\$1', urllib.parse.quote(title.encode('UTF-8')),
self.wiki_info.articlepath) \
2022-01-27 13:16:53 +00:00
+ page_info.args
page_info.link = full_url
file = None
if 'imageinfo' in page_raw:
file = page_raw['imageinfo'][0]['url']
page_info.file = file
page_info.status = True
else:
split_title = title.split(':')
reparse = False
if (len(split_title) > 1 and split_title[0] in self.wiki_info.namespaces_local
and self.wiki_info.namespaces_local[split_title[0]] == 'Template'):
2022-01-27 13:16:53 +00:00
rstitle = ':'.join(split_title[1:]) + page_info.args
reparse = await self.parse_page_info(rstitle)
2022-01-27 13:16:53 +00:00
page_info.before_page_property = 'template'
elif len(split_title) > 1 and split_title[
0].lower() in self.wiki_info.namespacealiases and not _search:
rstitle = f'{self.wiki_info.namespacealiases[split_title[0].lower()]}:' \
+ ':'.join(split_title[1:]) + page_info.args
2022-04-29 16:49:30 +00:00
reparse = await self.parse_page_info(rstitle, _search=True)
if reparse:
page_info.title = reparse.title
page_info.link = reparse.link
page_info.desc = reparse.desc
page_info.file = reparse.file
page_info.before_title = title
page_info.status = reparse.status
2022-03-26 14:56:58 +00:00
page_info.invalid_namespace = reparse.invalid_namespace
2022-01-27 13:16:53 +00:00
else:
2022-03-20 12:57:01 +00:00
if len(split_title) > 1 and split_title[0] in self.wiki_info.namespaces:
research = await self.research_page(title,
self.wiki_info.namespaces[split_title[0]])
2022-03-20 12:57:01 +00:00
else:
research = await self.research_page(title)
2022-01-27 13:16:53 +00:00
page_info.title = research[0]
page_info.before_title = title
page_info.invalid_namespace = research[1]
else:
page_info.status = True
2022-01-27 13:16:53 +00:00
if 'special' in page_raw:
2021-10-24 10:55:45 +00:00
full_url = re.sub(r'\$1', urllib.parse.quote(title.encode('UTF-8')), self.wiki_info.articlepath) \
2021-10-30 15:55:57 +00:00
+ page_info.args
page_info.link = full_url
2022-01-20 10:37:27 +00:00
page_info.status = True
2021-10-02 07:05:46 +00:00
else:
query_langlinks = False
if lang is not None:
langlinks_ = {}
for x in page_raw['langlinks']:
langlinks_[x['lang']] = x['url']
if lang in langlinks_:
query_wiki = WikiLib(url=self.wiki_info.interwiki[lang], headers=self.headers)
await query_wiki.fixup_wiki_info()
query_wiki_info = query_wiki.wiki_info
q_articlepath = query_wiki_info.articlepath.replace('$1', '(.*)')
get_title = re.sub(r'' + q_articlepath, '\\1', langlinks_[lang])
2022-07-25 05:55:25 +00:00
query_langlinks = await query_wiki.parse_page_info(urllib.parse.unquote(get_title))
if 'WikibaseClient' in self.wiki_info.extensions and not query_langlinks:
2022-06-28 15:00:17 +00:00
title = (await self.parse_page_info(title)).title
qc_string = {'action': 'query', 'meta': 'wikibase', 'wbprop': 'url|siteid'}
query_client_info = await self.get_json(**qc_string)
repo_url = query_client_info['query']['wikibase']['repo']['url']['base']
siteid = query_client_info['query']['wikibase']['siteid']
query_target_site = WikiLib(self.wiki_info.interwiki[lang], headers=self.headers)
target_siteid = (await query_target_site.get_json(**qc_string))['query']['wikibase']['siteid']
qr_wiki_info = WikiLib(repo_url)
2022-06-28 15:00:17 +00:00
qr_string = {'action': 'wbgetentities', 'sites': siteid, 'titles': title,
'props': 'sitelinks/urls', 'redirects': 'yes'}
qr = await qr_wiki_info.get_json(**qr_string)
if 'entities' in qr:
qr_result = qr['entities']
for x in qr_result:
if 'missing' not in qr_result[x]:
target_site_page_title = qr_result[x]['sitelinks'][target_siteid]['title']
q_target = await query_target_site.parse_page_info(target_site_page_title)
if q_target.status:
query_langlinks = q_target
break
if lang in self.wiki_info.interwiki and not query_langlinks:
query_wiki = WikiLib(url=self.wiki_info.interwiki[lang], headers=self.headers)
await query_wiki.fixup_wiki_info()
query_wiki_info = query_wiki.wiki_info
q_articlepath = query_wiki_info.articlepath
get_title_schema = re.sub(r'' + q_articlepath.replace('$1', '(.*)'), '\\1',
self.wiki_info.interwiki[lang])
query_langlinks_ = await query_wiki.parse_page_info(
get_title_schema.replace('$1', title))
if query_langlinks_.status:
query_langlinks = query_langlinks_
if not query_langlinks:
title = page_raw['title']
page_desc = ''
split_title = title.split(':')
get_desc = True
if not _doc and len(split_title) > 1 and split_title[0] in self.wiki_info.namespaces_local \
and self.wiki_info.namespaces_local[split_title[0]] == 'Template':
get_all_text = await self.get_wikitext(title)
match_doc = re.match(r'.*{{documentation\|?(.*?)}}.*', get_all_text, re.I | re.S)
if match_doc:
match_link = re.match(r'link=(.*)', match_doc.group(1), re.I | re.S)
if match_link:
get_doc = match_link.group(1)
else:
get_doc = title + '/doc'
get_desc = False
get_doc_desc = await self.parse_page_info(get_doc, _doc=True)
page_desc = get_doc_desc.desc
page_info.before_page_property = page_info.page_property = 'template'
if get_desc:
if use_textextracts and section is None:
raw_desc = page_raw.get('extract')
if raw_desc is not None:
page_desc = self.parse_text(raw_desc)
2022-01-27 13:16:53 +00:00
else:
page_desc = self.parse_text(await self.get_html_to_text(title, section))
full_url = page_raw['fullurl'] + page_info.args
file = None
if 'imageinfo' in page_raw:
file = page_raw['imageinfo'][0]['url']
page_info.title = title
page_info.link = full_url
page_info.file = file
page_info.desc = page_desc
if not _iw and page_info.args == '':
page_info.link = self.wiki_info.script + f'?curid={page_info.id}'
else:
page_info.title = query_langlinks.title
2022-06-28 15:12:03 +00:00
page_info.before_title = query_langlinks.title
page_info.link = query_langlinks.link
page_info.file = query_langlinks.file
page_info.desc = query_langlinks.desc
2021-10-30 15:55:57 +00:00
interwiki_: List[Dict[str, str]] = query.get('interwiki')
if interwiki_ is not None:
for i in interwiki_:
2022-01-29 11:25:23 +00:00
if i['title'] == page_info.title:
2021-10-30 15:55:57 +00:00
iw_title = re.match(r'^' + i['iw'] + ':(.*)', i['title'])
2021-11-23 15:02:07 +00:00
iw_title = iw_title.group(1)
2022-04-29 16:49:30 +00:00
_prefix += i['iw'] + ':'
_iw = True
iw_query = await WikiLib(url=self.wiki_info.interwiki[i['iw']], headers=self.headers) \
.parse_page_info(iw_title, lang=lang,
_tried=_tried + 1,
_prefix=_prefix,
_iw=_iw)
2021-11-23 15:02:07 +00:00
before_page_info = page_info
2021-10-30 15:55:57 +00:00
page_info = iw_query
2021-11-12 09:38:45 +00:00
if iw_query.title == '':
2022-03-26 14:56:58 +00:00
page_info.title = ''
2021-11-12 09:38:45 +00:00
else:
2022-01-29 11:25:23 +00:00
page_info.before_title = before_page_info.title
2021-12-04 04:39:57 +00:00
t = page_info.title
2022-07-26 13:49:17 +00:00
if t != '' and t is not None:
2022-01-08 13:41:20 +00:00
if before_page_info.args is not None:
page_info.before_title += urllib.parse.unquote(before_page_info.args)
t += urllib.parse.unquote(before_page_info.args)
if page_info.link is not None:
page_info.link += before_page_info.args
2022-01-26 16:29:47 +00:00
else:
page_info.link = self.wiki_info.script + f'?curid={page_info.id}'
2022-04-29 16:49:30 +00:00
if _tried == 0:
2022-07-26 14:11:07 +00:00
if lang is not None and page_info.status:
2022-06-17 12:44:04 +00:00
page_info.before_title = page_info.title
2022-07-26 14:11:07 +00:00
else:
page_info.title = page_info.interwiki_prefix + t
2022-01-29 11:25:23 +00:00
if before_page_info.section is not None:
page_info.section = before_page_info.section
if not self.wiki_info.in_allowlist:
checklist = []
if page_info.title is not None:
checklist.append(page_info.title)
if page_info.before_title is not None:
checklist.append(page_info.before_title)
if page_info.desc is not None:
checklist.append(page_info.desc)
chk = await check(*checklist)
for x in chk:
2021-11-12 14:25:53 +00:00
if not x['status']:
2021-11-17 13:17:13 +00:00
ban = True
if ban:
2022-01-29 11:25:23 +00:00
page_info.status = False
page_info.title = page_info.before_title = None
page_info.id = -1
page_info.desc = str(Url(page_info.link, use_mm=True))
page_info.link = None
2021-10-30 15:55:57 +00:00
return page_info
2021-11-10 15:05:21 +00:00
async def random_page(self) -> PageInfo:
"""
获取随机页面
:return: 页面信息
"""
await self.fixup_wiki_info()
2021-11-12 09:38:45 +00:00
random_url = await self.get_json(action='query', list='random')
2021-11-10 15:05:21 +00:00
page_title = random_url['query']['random'][0]['title']
return await self.parse_page_info(page_title)