416 lines
18 KiB
Python
416 lines
18 KiB
Python
import datetime
|
||
import traceback
|
||
from typing import Union, Dict, List
|
||
|
||
import html2text
|
||
import ujson as json
|
||
import re
|
||
import urllib.parse
|
||
|
||
from core.logger import Logger
|
||
from core.utils import get_url
|
||
from core.dirty_check import check
|
||
|
||
from .dbutils import WikiSiteInfo as DBSiteInfo
|
||
from .audit import check_whitelist
|
||
|
||
|
||
class InvalidPageIDError(Exception):
|
||
pass
|
||
|
||
|
||
class InvalidWikiError(Exception):
|
||
pass
|
||
|
||
|
||
class DangerousContentError(Exception):
|
||
pass
|
||
|
||
|
||
class PageNotFound(Exception):
|
||
pass
|
||
|
||
|
||
class WhatAreUDoingError(Exception):
|
||
pass
|
||
|
||
|
||
class WikiInfo:
|
||
def __init__(self,
|
||
api: str,
|
||
articlepath: str,
|
||
extensions: list,
|
||
interwiki: dict,
|
||
realurl: str,
|
||
name: str,
|
||
namespaces: list,
|
||
namespaces_local: dict,
|
||
in_whitelist: bool):
|
||
self.api = api
|
||
self.articlepath = articlepath
|
||
self.extensions = extensions
|
||
self.interwiki = interwiki
|
||
self.realurl = realurl
|
||
self.name = name
|
||
self.namespaces = namespaces
|
||
self.namespaces_local = namespaces_local
|
||
self.in_whitelist = in_whitelist
|
||
|
||
|
||
class WikiStatus:
|
||
def __init__(self,
|
||
available: bool,
|
||
value: Union[WikiInfo, bool],
|
||
message: str):
|
||
self.available = available
|
||
self.value = value
|
||
self.message = message
|
||
|
||
|
||
class PageInfo:
|
||
def __init__(self,
|
||
info: WikiInfo,
|
||
title: str,
|
||
before_title: str = None,
|
||
link: str = None,
|
||
file: str = None,
|
||
desc: str = None,
|
||
args: str = None,
|
||
interwiki_prefix: str = '',
|
||
status: bool = True,
|
||
before_page_property: str = 'page',
|
||
page_property: str = 'page',
|
||
invalid_namespace: bool = False
|
||
):
|
||
self.info = info
|
||
self.title = title
|
||
self.before_title = before_title
|
||
self.link = link
|
||
self.file = file
|
||
self.desc = desc
|
||
self.args = args
|
||
self.interwiki_prefix = interwiki_prefix
|
||
self.status = status
|
||
self.before_page_property = before_page_property
|
||
self.page_property = page_property
|
||
self.invalid_namespace = invalid_namespace
|
||
|
||
|
||
class WikiLib:
|
||
def __init__(self, url, headers=None):
|
||
self.url = url
|
||
self.wiki_info = WikiInfo(api='', articlepath='', extensions=[], interwiki={}, realurl='', name='',
|
||
namespaces=[], namespaces_local={}, in_whitelist=False)
|
||
self.headers = headers
|
||
|
||
async def get_json(self, api, **kwargs) -> dict:
|
||
if kwargs is not None:
|
||
api = api + '?' + urllib.parse.urlencode(kwargs)
|
||
return await get_url(api, status_code=200, headers=self.headers, fmt="json")
|
||
|
||
def rearrange_siteinfo(self, info: Union[dict, str]) -> WikiInfo:
|
||
if isinstance(info, str):
|
||
info = json.loads(info)
|
||
extensions = info['query']['extensions']
|
||
ext_list = []
|
||
for ext in extensions:
|
||
ext_list.append(ext['name'])
|
||
real_url = info['query']['general']['server']
|
||
if real_url.startswith('//'):
|
||
real_url = self.url.split('//')[0] + real_url
|
||
namespaces = []
|
||
namespaces_local = {}
|
||
for x in info['query']['namespaces']:
|
||
try:
|
||
ns = info['query']['namespaces'][x]
|
||
if '*' in ns:
|
||
namespaces.append(ns['*'])
|
||
if 'canonical' in ns:
|
||
namespaces.append(ns['canonical'])
|
||
if '*' in ns and 'canonical' in ns:
|
||
namespaces_local.update({ns['*']: ns['canonical']})
|
||
except Exception:
|
||
traceback.print_exc()
|
||
for x in info['query']['namespacealiases']:
|
||
if '*' in x:
|
||
namespaces.append(x['*'])
|
||
interwiki_map = info['query']['interwikimap']
|
||
interwiki_dict = {}
|
||
for interwiki in interwiki_map:
|
||
interwiki_dict[interwiki['prefix']] = interwiki['url']
|
||
api_url = real_url + info['query']['general']['scriptpath'] + '/api.php'
|
||
return WikiInfo(articlepath=real_url + info['query']['general']['articlepath'],
|
||
extensions=ext_list,
|
||
name=info['query']['general']['sitename'],
|
||
realurl=real_url,
|
||
api=api_url,
|
||
namespaces=namespaces,
|
||
namespaces_local=namespaces_local,
|
||
interwiki=interwiki_dict,
|
||
in_whitelist=check_whitelist(api_url))
|
||
|
||
async def check_wiki_available(self):
|
||
try:
|
||
api_match = re.match(r'(https?://.*?/url.php$)', self.url)
|
||
wiki_api_link = api_match.group(1)
|
||
except Exception:
|
||
try:
|
||
get_page = await get_url(self.url, fmt='text', headers=self.headers)
|
||
if get_page.find('<title>Attention Required! | Cloudflare</title>') != -1:
|
||
return WikiStatus(available=False, value=False, message='CloudFlare拦截了机器人的请求,请联系站点管理员解决此问题。')
|
||
m = re.findall(
|
||
r'(?im)<\s*link\s*rel="EditURI"\s*type="application/rsd\+xml"\s*href="([^>]+?)\?action=rsd"\s*/\s*>',
|
||
get_page)
|
||
api_match = m[0]
|
||
if api_match.startswith('//'):
|
||
api_match = self.url.split('//')[0] + api_match
|
||
Logger.info(api_match)
|
||
wiki_api_link = api_match
|
||
except TimeoutError:
|
||
return WikiStatus(available=False, value=False, message='错误:尝试建立连接超时。')
|
||
except Exception as e:
|
||
traceback.print_exc()
|
||
if e.args == (403,):
|
||
return WikiStatus(available=False, value=False, message='服务器拒绝了机器人的请求。')
|
||
elif not re.match(r'^(https?://).*', self.url):
|
||
return WikiStatus(available=False, value=False, message='所给的链接没有指明协议头(链接应以http://或https://开头)。')
|
||
else:
|
||
return WikiStatus(available=False, value=False, message='此站点也许不是一个有效的Mediawiki:' + str(e))
|
||
get_cache_info = DBSiteInfo(wiki_api_link).get()
|
||
if get_cache_info and datetime.datetime.now().timestamp() - get_cache_info[1].timestamp() < 43200:
|
||
return WikiStatus(available=True,
|
||
value=self.rearrange_siteinfo(get_cache_info),
|
||
message='')
|
||
try:
|
||
get_json = await self.get_json(wiki_api_link,
|
||
action='query',
|
||
meta='siteinfo',
|
||
siprop='general|namespaces|namespacealiases|interwikimap|extensions',
|
||
format='json')
|
||
except Exception as e:
|
||
return WikiStatus(available=False, value=False, message='从API获取信息时出错:' + str(e))
|
||
DBSiteInfo(wiki_api_link).update(get_json)
|
||
info = self.rearrange_siteinfo(get_json)
|
||
return WikiStatus(available=True, value=info,
|
||
message='警告:此wiki没有启用TextExtracts扩展,返回的页面预览内容将为未处理的原始Wikitext文本。'
|
||
if 'TextExtracts' not in info.extensions else '')
|
||
|
||
async def fixup_wiki_info(self):
|
||
if self.wiki_info.api == '':
|
||
wiki_info = await self.check_wiki_available()
|
||
if wiki_info.available:
|
||
self.wiki_info = wiki_info.value
|
||
else:
|
||
raise InvalidWikiError(wiki_info.message if wiki_info.message != '' else '')
|
||
|
||
@staticmethod
|
||
def parse_text(text):
|
||
try:
|
||
desc = text.split('\n')
|
||
desc_list = []
|
||
for x in desc:
|
||
if x != '':
|
||
desc_list.append(x)
|
||
desc = '\n'.join(desc_list)
|
||
desc_end = re.findall(r'(.*?(?:!\s|\?\s|\.\s|!|?|。)).*', desc, re.S | re.M)
|
||
if desc_end:
|
||
desc = desc_end[0]
|
||
except Exception:
|
||
traceback.print_exc()
|
||
desc = ''
|
||
if desc in ['...', '…']:
|
||
desc = ''
|
||
ell = False
|
||
if len(desc) > 250:
|
||
desc = desc[0:250]
|
||
ell = True
|
||
split_desc = desc.split('\n')
|
||
for d in split_desc:
|
||
if d == '':
|
||
split_desc.remove('')
|
||
if len(split_desc) > 5:
|
||
split_desc = split_desc[0:5]
|
||
ell = True
|
||
return '\n'.join(split_desc) + ('...' if ell else '')
|
||
|
||
async def get_html_to_text(self, page_name):
|
||
await self.fixup_wiki_info()
|
||
get_parse = await self.get_json(self.wiki_info.api,
|
||
action='parse',
|
||
page=page_name,
|
||
prop='text',
|
||
format='json')
|
||
h = html2text.HTML2Text()
|
||
h.ignore_links = True
|
||
h.ignore_images = True
|
||
h.ignore_tables = True
|
||
return h.handle(get_parse['parse']['text']['*'])
|
||
|
||
async def get_wikitext(self, page_name):
|
||
await self.fixup_wiki_info()
|
||
try:
|
||
load_desc = await self.get_json(self.wiki_info.api,
|
||
action='parse',
|
||
page=page_name,
|
||
prop='wikitext',
|
||
format='json')
|
||
desc = load_desc['parse']['wikitext']['*']
|
||
except Exception:
|
||
traceback.print_exc()
|
||
desc = ''
|
||
return desc
|
||
|
||
async def research_page(self, page_name: str):
|
||
await self.fixup_wiki_info()
|
||
get_page = await self.get_json(self.wiki_info.api,
|
||
action='query',
|
||
list='search',
|
||
srsearch=page_name,
|
||
srwhat='text',
|
||
srlimit='1',
|
||
srenablerewrites=True,
|
||
format='json')
|
||
new_page_name = get_page['query']['search'][0]['title'] if len(get_page['query']['search']) > 0 else None
|
||
title_split = page_name.split(':')
|
||
print(title_split, len(title_split))
|
||
is_invalid_namespace = False
|
||
if len(title_split) > 1 and title_split[0] not in self.wiki_info.namespaces:
|
||
is_invalid_namespace = True
|
||
return new_page_name, is_invalid_namespace
|
||
|
||
async def parse_page_info(self, title: str, doc_mode=False,
|
||
tried_iw=0, iw_prefix='') -> PageInfo:
|
||
await self.fixup_wiki_info()
|
||
if tried_iw > 5:
|
||
raise WhatAreUDoingError
|
||
split_name = re.split(r'([#?])', title)
|
||
title = re.sub('_', ' ', split_name[0])
|
||
arg_list = []
|
||
quote_code = False
|
||
for a in split_name[1:]:
|
||
if a[0] == '#':
|
||
quote_code = True
|
||
if a[0] == '?':
|
||
quote_code = False
|
||
if quote_code:
|
||
arg_list.append(urllib.parse.quote(a))
|
||
else:
|
||
arg_list.append(a)
|
||
page_info = PageInfo(info=self.wiki_info, title=title, args=''.join(arg_list), interwiki_prefix=iw_prefix)
|
||
query_string = {'action': 'query', 'format': 'json', 'prop': 'info|imageinfo', 'inprop': 'url', 'iiprop': 'url',
|
||
'redirects': 'True', 'titles': title}
|
||
use_textextracts = True if 'TextExtracts' in self.wiki_info.extensions else False
|
||
if use_textextracts:
|
||
query_string.update({'prop': 'info|imageinfo|extracts',
|
||
'ppprop': 'description|displaytitle|disambiguation|infoboxes', 'explaintext': 'true',
|
||
'exsectionformat': 'plain', 'exchars': '200'})
|
||
get_page = await self.get_json(self.wiki_info.api, **query_string)
|
||
query = get_page.get('query')
|
||
redirects_: List[Dict[str, str]] = query.get('redirects')
|
||
if redirects_ is not None:
|
||
for r in redirects_:
|
||
if r['from'] == title:
|
||
page_info.before_title = r['from']
|
||
page_info.title = r['to']
|
||
normalized_: List[Dict[str, str]] = query.get('normalized')
|
||
if normalized_ is not None:
|
||
for n in normalized_:
|
||
if n['from'] == title:
|
||
page_info.before_title = n['from']
|
||
page_info.title = n['to']
|
||
|
||
pages: Dict[str, dict] = query.get('pages')
|
||
if pages is not None:
|
||
for page_id in pages:
|
||
page_raw = pages[page_id]
|
||
title = page_raw['title']
|
||
if int(page_id) < 0:
|
||
if 'missing' not in page_raw:
|
||
full_url = re.sub(r'\$1', urllib.parse.quote(title.encode('UTF-8')), self.wiki_info.articlepath) \
|
||
+ page_info.args
|
||
page_info.link = full_url
|
||
else:
|
||
split_title = title.split(':')
|
||
if len(split_title) > 1 and split_title[0] in self.wiki_info.namespaces_local \
|
||
and self.wiki_info.namespaces_local[split_title[0]] == 'Template':
|
||
rstitle = ':'.join(split_title[1:]) + page_info.args
|
||
research = await self.parse_page_info(rstitle)
|
||
page_info.title = research.title
|
||
page_info.link = research.link
|
||
page_info.desc = research.desc
|
||
page_info.file = research.file
|
||
page_info.before_title = title
|
||
page_info.before_page_property = 'template'
|
||
page_info.status = research.status
|
||
else:
|
||
research = await self.research_page(title)
|
||
page_info.title = research[0]
|
||
page_info.before_title = title
|
||
page_info.invalid_namespace = research[1]
|
||
page_info.status = False
|
||
else:
|
||
page_desc = ''
|
||
split_title = title.split(':')
|
||
get_desc = True
|
||
if not doc_mode and len(split_title) > 1 and split_title[0] in self.wiki_info.namespaces_local \
|
||
and self.wiki_info.namespaces_local[split_title[0]] == 'Template':
|
||
get_all_text = await self.get_wikitext(title)
|
||
match_doc = re.match(r'.*{{documentation\|?(.*?)}}.*', get_all_text, re.I | re.S)
|
||
if match_doc:
|
||
match_link = re.match(r'link=(.*)', match_doc.group(1), re.I | re.S)
|
||
if match_link:
|
||
get_doc = match_link.group(1)
|
||
else:
|
||
get_doc = title + '/doc'
|
||
get_desc = False
|
||
get_doc_desc = await self.parse_page_info(get_doc, doc_mode=True)
|
||
page_desc = get_doc_desc.desc
|
||
page_info.before_page_property = page_info.page_property = 'template'
|
||
if get_desc:
|
||
if use_textextracts:
|
||
raw_desc = page_raw.get('extract')
|
||
if raw_desc is not None:
|
||
page_desc = self.parse_text(raw_desc)
|
||
else:
|
||
page_desc = self.parse_text(await self.get_html_to_text(title))
|
||
full_url = page_raw['fullurl'] + page_info.args
|
||
file = None
|
||
if 'imageinfo' in page_raw:
|
||
file = page_raw['imageinfo'][0]['url']
|
||
page_info.title = title
|
||
page_info.link = full_url
|
||
page_info.file = file
|
||
page_info.desc = page_desc
|
||
interwiki_: List[Dict[str, str]] = query.get('interwiki')
|
||
if interwiki_ is not None:
|
||
for i in interwiki_:
|
||
if i['title'] == title:
|
||
iw_title = re.match(r'^' + i['iw'] + ':(.*)', i['title'])
|
||
iw_title = iw_title.group(1) + page_info.args
|
||
iw_prefix += i['iw'] + ':'
|
||
iw_query = await WikiLib(url=self.wiki_info.interwiki[i['iw']]).parse_page_info(iw_title,
|
||
tried_iw=tried_iw + 1,
|
||
iw_prefix=iw_prefix)
|
||
page_info = iw_query
|
||
page_info.before_title = title
|
||
|
||
t = page_info.title
|
||
if tried_iw == 0:
|
||
page_info.title = page_info.interwiki_prefix + t
|
||
if not self.wiki_info.in_whitelist:
|
||
checklist = []
|
||
if page_info.title is not None:
|
||
checklist.append(page_info.title)
|
||
if page_info.before_title is not None:
|
||
checklist.append(page_info.before_title)
|
||
if page_info.desc is not None:
|
||
checklist.append(page_info.desc)
|
||
chk = await check(*checklist)
|
||
for x in chk:
|
||
print(x)
|
||
if x.find("<吃掉了>") != -1 or x.find("<全部吃掉了>") != -1:
|
||
page_info.status = True
|
||
page_info.before_title = '?'
|
||
page_info.title = '¿'
|
||
page_info.link = 'https://wdf.ink/6OUp'
|
||
return page_info
|