Archived
1
0
Fork 0
This repository has been archived on 2024-04-26. You can view files and clone it, but cannot push or open issues or pull requests.
akari-bot/modules/wiki/wikilib.py

265 lines
12 KiB
Python
Raw Normal View History

2020-06-13 12:43:43 +00:00
import re
2020-08-01 03:25:34 +00:00
import traceback
2020-08-12 16:01:34 +00:00
import urllib
2020-09-05 09:51:43 +00:00
import aiohttp
2021-02-01 15:13:11 +00:00
import core.dirty_check
from .helper import check_wiki_available
2020-08-12 16:01:34 +00:00
2020-08-12 08:01:00 +00:00
2021-02-01 15:13:11 +00:00
class wikilib:
2020-10-27 15:48:41 +00:00
async def get_data(self, url: str, fmt: str):
async with aiohttp.ClientSession() as session:
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=20)) as req:
if hasattr(req, fmt):
return await getattr(req, fmt)()
else:
raise ValueError(f"NoSuchMethod: {fmt}")
except Exception:
traceback.print_exc()
2021-02-01 15:13:11 +00:00
return False
def danger_wiki_check(self):
if self.wikilink.upper().find('WIKIPEDIA') != -1:
return True
2021-02-02 11:40:13 +00:00
if self.wikilink.upper().find('UNCYCLOPEDIA') != -1:
return True
if self.wikilink.upper().find('HMOEGIRL') != -1:
return True
if self.wikilink.upper().find('EVCHK') != -1:
return True
if self.wikilink.upper().find('HONGKONG.FANDOM') != -1:
return True
if self.wikilink.upper().find('WIKILEAKS') != -1:
return True
2021-02-01 15:13:11 +00:00
return False
async def danger_text_check(self, text):
if not self.danger_wiki_check():
return False
check = await core.dirty_check.check([text])
print(check)
if check.find('<吃掉了>') != -1 or check.find('<全部吃掉了>') != -1:
return True
return False
async def get_interwiki(self, url):
interwiki_list = url + '?action=query&meta=siteinfo&siprop=interwikimap&sifilteriw=local&format=json'
json = await self.get_data(interwiki_list, 'json')
interwikimap = json['query']['interwikimap']
interwiki_dict = {}
for interwiki in interwikimap:
2021-02-02 05:36:35 +00:00
interwiki_dict[interwiki['prefix']] = re.sub(r'(?:wiki/|)\$1', '', interwiki['url'])
2021-02-01 15:13:11 +00:00
return interwiki_dict
async def get_image(self, pagename):
try:
url = self.wikilink + f'?action=query&titles={pagename}&prop=imageinfo&iiprop=url&format=json'
json = await self.get_data(url, 'json')
parsepageid = self.parsepageid(json)
imagelink = json['query']['pages'][parsepageid]['imageinfo'][0]['url']
return imagelink
except:
traceback.print_exc()
return False
2020-10-27 15:48:41 +00:00
async def getpage(self):
2021-02-01 15:13:11 +00:00
getlinkurl = self.wikilink + '?action=query&format=json&prop=info&inprop=url&redirects&titles=' + self.pagename
2020-10-27 15:48:41 +00:00
getpage = await self.get_data(getlinkurl, "json")
return getpage
2020-10-28 15:27:36 +00:00
def parsepageid(self, pageraw):
pageraw = pageraw['query']['pages']
2020-10-27 15:48:41 +00:00
pagelist = iter(pageraw)
pageid = pagelist.__next__()
return pageid
async def researchpage(self):
2020-09-09 12:16:01 +00:00
try:
2021-02-01 15:13:11 +00:00
searchurl = self.wikilink + '?action=query&generator=search&gsrsearch=' + self.pagename + '&gsrsort=just_match&gsrenablerewrites&prop=info&gsrlimit=1&format=json'
2020-10-27 15:48:41 +00:00
getsecjson = await self.get_data(searchurl, "json")
2020-10-28 15:27:36 +00:00
secpageid = self.parsepageid(getsecjson)
2020-10-27 15:48:41 +00:00
sectitle = getsecjson['query']['pages'][secpageid]['title']
if self.interwiki == '':
target = ''
else:
target = f'{self.interwiki}:'
2021-02-01 15:13:11 +00:00
prompt = f'找不到{target}{self.pagename},您是否要找的是:[[{target}{sectitle}]]'
if self.templateprompt:
prompt = self.templateprompt + prompt
if await self.danger_text_check(prompt):
2021-02-01 17:20:45 +00:00
return {'status': 'done', 'text': 'https://wdf.ink/6OUp'}
2021-02-01 15:13:11 +00:00
return {'status': 'wait', 'title': f'{target}{sectitle}', 'text': prompt}
2020-10-27 15:48:41 +00:00
except Exception:
try:
2021-02-01 15:13:11 +00:00
searchurl = self.wikilink + '?action=query&list=search&srsearch=' + self.pagename + '&srwhat=text&srlimit=1&srenablerewrites=&format=json'
2020-10-27 15:48:41 +00:00
getsecjson = await self.get_data(searchurl, "json")
sectitle = getsecjson['query']['search'][0]['title']
if self.interwiki == '':
target = ''
2020-09-09 12:16:01 +00:00
else:
2020-10-27 15:48:41 +00:00
target = f'{self.interwiki}:'
2021-02-01 15:13:11 +00:00
prompt = f'找不到{target}{self.pagename},您是否要找的是:[[{target}{sectitle}]]'
if self.templateprompt:
prompt = self.templateprompt + prompt
if await self.danger_text_check(prompt):
2021-02-01 17:20:45 +00:00
return {'status': 'done', 'text': 'https://wdf.ink/6OUp'}
2021-02-01 15:13:11 +00:00
return {'status': 'wait', 'title': f'{target}{sectitle}', 'text': prompt}
2020-10-27 15:48:41 +00:00
except Exception:
2021-02-01 15:13:11 +00:00
traceback.print_exc()
return {'status': 'done', 'text': '找不到条目。'}
2020-10-27 15:48:41 +00:00
async def nullpage(self):
if 'invalid' in self.psepgraw:
rs1 = re.sub('The requested page title contains invalid characters:', '请求的页面标题包含非法字符:',
self.psepgraw['invalidreason'])
rs = '发生错误:“' + rs1 + '”。'
rs = re.sub('".”', '"', rs)
2021-02-01 15:13:11 +00:00
return {'status': 'done', 'text': rs}
2020-10-27 15:48:41 +00:00
if 'missing' in self.psepgraw:
self.rspt = await self.researchpage()
return self.rspt
2021-02-01 15:13:11 +00:00
self.orginwikilink = re.sub('api.php', '', self.orginwikilink)
if not self.sentyouprompt:
msg = self.orginwikilink + urllib.parse.quote(self.pagename.encode('UTF-8'))
else:
msg = '您要的' + self.pagename + '' + self.orginwikilink + urllib.parse.quote(self.pagename.encode('UTF-8'))
return {'status': 'done', 'text': msg}
2020-10-27 15:48:41 +00:00
async def getdesc(self):
try:
2021-02-01 15:13:11 +00:00
descurl = self.wikilink + '?action=query&prop=extracts&exsentences=1&&explaintext&exsectionformat=wiki' \
'&format=json&titles=' + self.pagename
2020-10-27 15:48:41 +00:00
loadtext = await self.get_data(descurl, "json")
2020-10-28 15:27:36 +00:00
pageid = self.parsepageid(loadtext)
2020-10-27 15:48:41 +00:00
desc = loadtext['query']['pages'][pageid]['extract']
2020-09-09 12:16:01 +00:00
except Exception:
2020-10-28 15:27:36 +00:00
traceback.print_exc()
2020-10-27 15:48:41 +00:00
desc = ''
return desc
2020-09-05 09:51:43 +00:00
2020-10-27 15:48:41 +00:00
async def getfirstline(self):
try:
2021-02-01 15:13:11 +00:00
descurl = self.wikilink + f'?action=parse&page={self.gflpagename}&prop=wikitext&section=1&format=json'
2020-10-27 15:48:41 +00:00
loaddesc = await self.get_data(descurl, 'json')
descraw = loaddesc['parse']['wikitext']['*']
cutdesc = re.findall(r'(.*(?:!|\?|\.|;|||。|))', descraw, re.S | re.M)
desc = cutdesc[0]
except Exception:
2021-02-01 15:13:11 +00:00
traceback.print_exc()
2020-10-27 15:48:41 +00:00
desc = ''
return desc
async def step1(self):
2021-02-01 15:13:11 +00:00
if self.template:
self.pagename = 'Template:' + self.pagename
2020-10-27 15:48:41 +00:00
self.pageraw = await self.getpage()
2021-02-01 15:13:11 +00:00
if not self.pageraw:
return {'status': 'done', 'text': '发生错误:无法获取到页面。'}
2020-10-28 15:27:36 +00:00
if 'redirects' in self.pageraw['query']:
self.pagename = self.pageraw['query']['redirects'][0]['to']
2021-02-02 10:01:46 +00:00
try:
self.pageid = self.parsepageid(self.pageraw)
except:
return {'status': 'done', 'text': '发生错误无法获取到页面请检查是否设置了对应Interwiki。'}
2020-10-27 15:48:41 +00:00
self.psepgraw = self.pageraw['query']['pages'][self.pageid]
if self.pageid == '-1':
if self.igmessage == False:
if self.template == True:
2020-11-23 04:58:44 +00:00
self.pagename = self.orginpagename = re.sub(r'^Template:', '', self.pagename)
2020-10-27 15:48:41 +00:00
self.template = False
2021-02-01 15:13:11 +00:00
self.templateprompt = f'提示:[Template:{self.pagename}]不存在,已自动回滚搜索页面。\n'
return await self.step1()
2020-10-27 15:48:41 +00:00
return await self.nullpage()
2020-06-13 12:43:43 +00:00
else:
2020-10-27 15:48:41 +00:00
return await self.step2()
async def step2(self):
fullurl = self.psepgraw['fullurl']
2020-11-23 04:58:44 +00:00
geturlpagename = re.match(r'(https?://.*?/(?:index.php/|wiki/|.*/wiki/|))(.*)', fullurl, re.M | re.I)
2020-10-27 15:48:41 +00:00
desc = await self.getdesc()
if desc == '':
2020-10-28 15:27:36 +00:00
self.gflpagename = geturlpagename.group(2)
2020-10-27 15:48:41 +00:00
desc = await self.getfirstline()
2021-02-01 15:13:11 +00:00
print(desc)
2020-06-13 12:43:43 +00:00
try:
2020-10-27 15:48:41 +00:00
section = re.match(r'.*(\#.*)', self.pagename)
finpgname = geturlpagename.group(2) + urllib.parse.quote(section.group(1).encode('UTF-8'))
fullurl = self.psepgraw['fullurl'] + urllib.parse.quote(section.group(1).encode('UTF-8'))
2020-09-05 09:51:43 +00:00
except Exception:
2020-10-27 15:48:41 +00:00
finpgname = geturlpagename.group(2)
finpgname = urllib.parse.unquote(finpgname)
finpgname = re.sub('_', ' ', finpgname)
2020-10-28 15:27:36 +00:00
if finpgname == self.orginpagename:
2021-02-01 15:13:11 +00:00
rmlstlb = re.sub('\n$', '', desc)
2020-09-09 14:57:25 +00:00
else:
2020-10-27 15:48:41 +00:00
rmlstlb = re.sub('\n$', '',
2021-02-01 15:13:11 +00:00
f'(重定向[{self.orginpagename}] -> [{finpgname}]{desc}')
2020-10-27 15:48:41 +00:00
rmlstlb = re.sub('\n\n', '\n', rmlstlb)
rmlstlb = re.sub('\n\n', '\n', rmlstlb)
try:
rm5lline = re.findall(r'.*\n.*\n.*\n.*\n.*\n', rmlstlb)
result = rm5lline[0] + '...行数过多已截断。'
except Exception:
result = rmlstlb
2021-02-02 11:40:13 +00:00
msgs = {'status': 'done', 'url': fullurl, 'text': result, 'apilink': self.wikilink}
2021-02-01 15:13:11 +00:00
matchimg = re.match(r'File:.*?\.(?:png|gif|jpg|jpeg|webp|bmp|ico)', self.pagename, re.I)
if matchimg:
getimg = await self.get_image(self.pagename)
if getimg:
msgs['net_image'] = getimg
if await self.danger_text_check(result):
2021-02-01 17:20:45 +00:00
return {'status': 'done', 'text': 'https://wdf.ink/6OUp'}
2021-02-01 15:13:11 +00:00
return msgs
async def main(self, wikilink, pagename, interwiki=None, igmessage=False, template=False, tryiw=0):
2020-10-27 15:48:41 +00:00
print(wikilink)
print(pagename)
print(interwiki)
2021-02-02 05:33:47 +00:00
if pagename == '':
return {'status': 'done', 'text': '错误:需要查询的页面为空。'}
2021-02-01 15:13:11 +00:00
pagename = re.sub('_', ' ', pagename)
pagename = pagename.split('|')[0]
2020-12-29 15:05:43 +00:00
self.orginwikilink = wikilink
2021-02-01 15:13:11 +00:00
self.wikilink = re.sub('index.php/', '', self.orginwikilink) # fxxk
danger_check = self.danger_wiki_check()
if danger_check:
if await self.danger_text_check(pagename):
2021-02-01 17:20:45 +00:00
return {'status': 'done', 'text': 'https://wdf.ink/6OUp'}
2020-10-28 15:27:36 +00:00
self.orginpagename = pagename
2020-10-27 15:48:41 +00:00
self.pagename = pagename
2021-02-01 15:13:11 +00:00
if interwiki == None:
self.interwiki = ''
else:
self.interwiki = interwiki
2020-10-27 15:48:41 +00:00
self.igmessage = igmessage
self.template = template
2021-02-01 15:13:11 +00:00
self.templateprompt = None
2020-10-27 15:48:41 +00:00
try:
matchinterwiki = re.match(r'(.*?):(.*)', self.pagename)
if matchinterwiki:
2021-02-01 15:13:11 +00:00
iwlist = await self.get_interwiki(self.wikilink)
print(iwlist)
if matchinterwiki.group(1) in iwlist:
if tryiw <= 5:
interwiki_link = iwlist[matchinterwiki.group(1)]
check = await check_wiki_available(interwiki_link)
if check:
return await self.main(check[0], matchinterwiki.group(2),
matchinterwiki.group(1),
2021-02-02 06:17:05 +00:00
self.igmessage, self.template, tryiw + 1)
2021-02-01 15:13:11 +00:00
else:
return {'status': 'done',
'text': f'发生错误指向的interwiki不是一个有效的MediaWiki。{interwiki_link}{matchinterwiki.group(2)}'}
else:
return {'status': 'warn', 'text': '警告尝试重定向已超过5次继续尝试将有可能导致你被机器人加入黑名单。'}
2020-10-27 15:48:41 +00:00
return await self.step1()
except Exception as e:
traceback.print_exc()
if igmessage == False:
2021-02-01 15:13:11 +00:00
return f'发生错误:{str(e)}' + '\n'