Archived
1
0
Fork 0

change db isolation level to READ UNCOMMITTED

This commit is contained in:
yzhh 2023-08-28 20:38:05 +08:00
parent bb01b26841
commit 530937858a
6 changed files with 33 additions and 18 deletions

View file

@ -2,12 +2,11 @@ from typing import List, Union
from PIL import Image from PIL import Image
from core.builtins import Plain, Image as BImage, confirm_command, Bot from core.builtins import Plain, Image as BImage, confirm_command, Bot, FetchTarget as FT, FetchedSession as FS
from core.builtins.message import MessageSession as MS from core.builtins.message import MessageSession as MS
from core.builtins.message.chain import MessageChain from core.builtins.message.chain import MessageChain
from core.logger import Logger from core.logger import Logger
from core.types import Session, MsgInfo, FetchTarget as FT, \ from core.types import Session, MsgInfo, FinishedSession as FinS, AutoSession as AS, AutoSession
FetchedSession as FS, FinishedSession as FinS, AutoSession as AS, AutoSession
class FinishedSession(FinS): class FinishedSession(FinS):
@ -132,14 +131,6 @@ class Template(MS):
class FetchedSession(FS): class FetchedSession(FS):
def __init__(self, targetFrom, targetId):
self.target = MsgInfo(targetId=f'{targetFrom}|{targetId}',
senderId=f'{targetFrom}|{targetId}',
targetFrom=targetFrom,
senderFrom=targetFrom,
senderName='', clientName='TEST', messageId=0, replyId=None)
self.session = Session(message=False, target=targetId, sender=targetId)
self.parent = Template(self.target, self.session)
async def sendMessage(self, msgchain, disable_secret_check=False): async def sendMessage(self, msgchain, disable_secret_check=False):
""" """
@ -155,8 +146,8 @@ class FetchTarget(FT):
name = 'TEST' name = 'TEST'
@staticmethod @staticmethod
async def fetch_target(targetId) -> FetchedSession: async def fetch_target(targetId, senderId=None) -> FetchedSession:
return FetchedSession('TEST|Console', targetId) return FetchedSession('TEST|Console', '0', 'TEST', '0')
@staticmethod @staticmethod
async def post_message(module_name, message, user_list: List[FetchedSession] = None, i18n=False, **kwargs): async def post_message(module_name, message, user_list: List[FetchedSession] = None, i18n=False, **kwargs):

View file

@ -1,6 +1,7 @@
import asyncio import asyncio
import ujson as json import ujson as json
from core.logger import Logger
from core.scheduler import Scheduler, IntervalTrigger from core.scheduler import Scheduler, IntervalTrigger
from core.builtins import Bot from core.builtins import Bot
from database import BotDBUtil from database import BotDBUtil
@ -38,9 +39,11 @@ async def check_job_queue():
_queue_tasks[tskid]['flag'].set() _queue_tasks[tskid]['flag'].set()
get_all = BotDBUtil.JobQueue.get_all(target_client=Bot.FetchTarget.name) get_all = BotDBUtil.JobQueue.get_all(target_client=Bot.FetchTarget.name)
for tsk in get_all: for tsk in get_all:
Logger.debug(f'Received job queue task {tsk.taskid}, action: {tsk.action}')
args = json.loads(tsk.args)
if tsk.action == 'validate_permission': if tsk.action == 'validate_permission':
fetch = await Bot.FetchTarget.fetch_target(tsk.args['target_id'], tsk.args['sender_id']) fetch = await Bot.FetchTarget.fetch_target(args['target_id'], args['sender_id'])
if fetch: if fetch:
BotDBUtil.JobQueue.return_val(tsk, json.dumps({'value': True})) BotDBUtil.JobQueue.return_val(tsk, json.dumps({'value': await fetch.parent.checkPermission()}))
return await check_job_queue() return await check_job_queue()

View file

@ -292,7 +292,7 @@ class FetchTarget:
name = '' name = ''
@staticmethod @staticmethod
async def fetch_target(targetId) -> FetchedSession: async def fetch_target(targetId, senderId=None) -> FetchedSession:
""" """
尝试从数据库记录的对象ID中取得对象消息会话实际此会话中的消息文本会被设为False因为本来就没有 尝试从数据库记录的对象ID中取得对象消息会话实际此会话中的消息文本会被设为False因为本来就没有
""" """

View file

@ -15,6 +15,7 @@ from core.logger import Logger
from core.scheduler import Scheduler from core.scheduler import Scheduler
from core.utils.http import get_url from core.utils.http import get_url
from core.utils.ip import IP from core.utils.ip import IP
from core.queue import check_job_queue
async def init_async() -> None: async def init_async() -> None:
@ -29,6 +30,7 @@ async def init_async() -> None:
await asyncio.gather(*gather_list) await asyncio.gather(*gather_list)
Scheduler.start() Scheduler.start()
logging.getLogger('apscheduler.executors.default').setLevel(logging.WARNING) logging.getLogger('apscheduler.executors.default').setLevel(logging.WARNING)
await asyncio.create_task(check_job_queue())
await load_secret() await load_secret()

View file

@ -402,6 +402,11 @@ class BotDBUtil:
def get(taskid: str) -> JobQueueTable: def get(taskid: str) -> JobQueueTable:
return session.query(JobQueueTable).filter_by(taskid=taskid).first() return session.query(JobQueueTable).filter_by(taskid=taskid).first()
@staticmethod
@retry(stop=stop_after_attempt(3))
def get_first(target_client: str) -> JobQueueTable:
return session.query(JobQueueTable).filter_by(targetClient=target_client, hasDone=False).first()
@staticmethod @staticmethod
@retry(stop=stop_after_attempt(3)) @retry(stop=stop_after_attempt(3))
def get_all(target_client: str) -> List[JobQueueTable]: def get_all(target_client: str) -> List[JobQueueTable]:
@ -411,7 +416,7 @@ class BotDBUtil:
@retry(stop=stop_after_attempt(3)) @retry(stop=stop_after_attempt(3))
@auto_rollback_error @auto_rollback_error
def return_val(query: JobQueueTable, value): def return_val(query: JobQueueTable, value):
query.value = json.dumps(value) query.returnVal = json.dumps(value)
query.hasDone = True query.hasDone = True
session.commit() session.commit()
session.expire_all() session.expire_all()

View file

@ -1,4 +1,5 @@
from sqlalchemy import create_engine from sqlalchemy import create_engine
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import sessionmaker
from config import Config from config import Config
@ -9,7 +10,7 @@ DB_LINK = Config('db_path')
class DBSession: class DBSession:
def __init__(self): def __init__(self):
self.engine = engine = create_engine(DB_LINK) self.engine = create_engine(DB_LINK, isolation_level="READ UNCOMMITTED")
self.Session = sessionmaker() self.Session = sessionmaker()
self.Session.configure(bind=self.engine) self.Session.configure(bind=self.engine)
@ -21,4 +22,17 @@ class DBSession:
Base.metadata.create_all(bind=self.engine, checkfirst=True) Base.metadata.create_all(bind=self.engine, checkfirst=True)
class AsyncDBSession:
def __init__(self):
self.engine = create_async_engine(DB_LINK, isolation_level="READ UNCOMMITTED")
self.Session = async_sessionmaker()
self.Session.configure(bind=self.engine)
async def session(self):
return self.Session()
def create(self):
Base.metadata.create_all(bind=self.engine, checkfirst=True)
Session = DBSession() Session = DBSession()