Archived
1
0
Fork 0
This repository has been archived on 2024-04-26. You can view files and clone it, but cannot push or open issues or pull requests.
akari-bot/modules/wiki/database.py
2021-06-06 16:21:24 +08:00

189 lines
7.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import base64
import os
import re
import sqlite3
import traceback
dbpath = os.path.abspath('./modules/wiki/save.db')
class WD:
def __init__(self):
if not os.path.exists(dbpath):
self.initialize()
self.conn = sqlite3.connect(dbpath)
self.c = self.conn.cursor()
def initialize(self):
a = open(dbpath, 'w')
a.close()
self.conn = sqlite3.connect(dbpath)
self.c = self.conn.cursor()
self.c.execute('''CREATE TABLE start_wiki_link_Group
(ID INT PRIMARY KEY NOT NULL,
LINK TEXT);''')
self.c.execute('''CREATE TABLE custom_interwiki_Group
(ID INT PRIMARY KEY NOT NULL,
INTERWIKIS TEXT);''')
self.c.execute('''CREATE TABLE start_wiki_link_Friend
(ID INT PRIMARY KEY NOT NULL,
LINK TEXT);''')
self.c.execute('''CREATE TABLE custom_interwiki_Friend
(ID INT PRIMARY KEY NOT NULL,
INTERWIKIS TEXT);''')
self.c.execute('''CREATE TABLE wiki_info
(LINK TEXT PRIMARY KEY NOT NULL,
SITEINFO TEXT,
TS TIMESTAMP DEFAULT CURRENT_TIMESTAMP);''')
self.c.execute('''CREATE TABLE request_headers_Group
(ID TEXT PRIMARY KEY NOT NULL,
HEADERS TEXT);''')
self.c.execute('''CREATE TABLE request_headers_Friend
(ID TEXT PRIMARY KEY NOT NULL,
HEADERS TEXT);''')
self.c.close()
def add_start_wiki(self, table, id, value):
a = self.c.execute(f"SELECT * FROM {table} WHERE ID={id}").fetchone()
if a:
self.c.execute(f"UPDATE {table} SET LINK='{value}' WHERE ID='{id}'")
self.conn.commit()
return '成功设置起始Wiki'
else:
self.c.execute(f"INSERT INTO {table} (ID, Link) VALUES (?, ?)", (id, value))
self.conn.commit()
return '成功设置起始Wiki'
def get_start_wiki(self, table, id):
a = self.c.execute(f"SELECT * FROM {table} WHERE ID={id}").fetchone()
if a:
return a[1]
else:
return False
def config_custom_interwiki(self, do, table, id, iw, link=None):
a = self.c.execute(f"SELECT * FROM {table} WHERE ID={id}").fetchone()
if do == 'add':
if a:
split_iws = a[1].split('|')
iwlist = []
for iws in split_iws:
split_iw = iws.split('>')
iwlist.append(split_iw[0])
if iw in iwlist:
for iws in split_iws:
if iws.find(iw + '>') != -1:
split_iws.remove(iws)
split_iws.append(f'{iw}>{link}')
self.c.execute(
f"UPDATE {table} SET INTERWIKIS='{'|'.join(split_iws)}' WHERE ID='{id}'")
self.conn.commit()
return '成功更新自定义Interwiki'
else:
split_iws.append(f'{iw}>{link}')
self.c.execute(
f"UPDATE {table} SET INTERWIKIS='{'|'.join(split_iws)}' WHERE ID='{id}'")
self.conn.commit()
return '成功添加自定义Interwiki'
else:
self.c.execute(f"INSERT INTO {table} (ID, INTERWIKIS) VALUES (?, ?)", (id, f'{iw}>{link}'))
self.conn.commit()
return '成功添加自定义Interwiki'
elif do == 'del':
if a:
split_iws = a[1].split('|')
iwlist = []
for iws in split_iws:
split_iw = iws.split('>')
iwlist.append(split_iw[0])
if iw in iwlist:
for iws in split_iws:
if iws.find(iw + '>') != -1:
split_iws.remove(iws)
self.c.execute(
f"UPDATE {table} SET INTERWIKIS='{'|'.join(split_iws)}' WHERE ID='{id}'")
self.conn.commit()
return '成功删除自定义Interwiki'
else:
return '失败添加过此Interwiki'
else:
return '失败未添加过任何Interwiki。'
def get_custom_interwiki(self, table, id, iw):
a = self.c.execute(f"SELECT * FROM {table} WHERE ID={id}").fetchone()
if a:
interwikis = a[1].split('|')
for iws in interwikis:
if iws.find(iw + '>') != -1:
iws = iws.split('>')
return iws[1]
else:
return False
def get_custom_interwiki_list(self, table, id):
a = self.c.execute(f"SELECT * FROM {table} WHERE ID={id}").fetchone()
if a:
interwikis = a[1].split('|')
return '\n'.join(interwikis)
else:
return False
def config_headers(self, do, table, id, header=None):
try:
a = self.c.execute(f"SELECT * FROM {table} WHERE ID=?", (id,)).fetchone()
if do == 'set':
header = base64.encodebytes(header.encode('utf-8'))
if a:
self.c.execute(f"UPDATE {table} SET HEADERS=? WHERE ID=?", (header, id))
else:
self.c.execute(f"INSERT INTO {table} (ID, HEADERS) VALUES (?, ?)", (id, header))
self.conn.commit()
return '成功更新请求所使用的Headers。'
elif do == 'reset':
if a:
self.c.execute(f"DELETE FROM {table} WHERE ID=?", (id,))
return '成功重置请求所使用的Headers。'
else:
return '当前未自定义请求所使用的Headers。'
elif do == 'get':
headers = {}
if a:
headerd = str(base64.decodebytes(a[1]).decode('utf-8'))
headersp = headerd.split('\n')
print(headersp)
for x in headersp:
if x != '':
x = x.split(':')
headers[x[0]] = re.sub(r'^ ', '', x[1])
else:
headers['accept-language'] = 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6'
print(header)
return headers
elif do == 'show':
if a:
header = str(base64.decodebytes(a[1]).decode('utf-8'))
else:
header = '默认accept-language: zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6'
return header
except Exception as e:
traceback.print_exc()
return '发生错误' + str(e)
def update_wikiinfo(self, apilink, siteinfo):
a = self.c.execute(f"SELECT * FROM wiki_info WHERE LINK='{apilink}'").fetchone()
if a:
self.c.execute("UPDATE wiki_info SET SITEINFO=? WHERE LINK=?", (siteinfo, apilink))
else:
self.c.execute(f"INSERT INTO wiki_info (LINK, SITEINFO) VALUES (?, ?)", (apilink, siteinfo))
self.conn.commit()
def get_wikiinfo(self, apilink):
a = self.c.execute(f"SELECT * FROM wiki_info WHERE LINK='{apilink}'").fetchone()
if a:
return a[1:]
else:
return False
WikiDB = WD()