Archived
1
0
Fork 0
This repository has been archived on 2024-04-26. You can view files and clone it, but cannot push or open issues or pull requests.
akari-bot/console.py

174 lines
6.4 KiB
Python
Raw Normal View History

2022-06-26 06:06:26 +00:00
import json
2021-08-24 10:22:36 +00:00
import os
2022-06-18 16:21:59 +00:00
import sys
2021-08-24 10:22:36 +00:00
2021-07-30 19:32:58 +00:00
from config import Config
if not Config('db_path'):
raise AttributeError('Wait! You need to fill a valid database address into the config.cfg "db_path"\n'
'Example: \ndb_path = sqlite:///database/save.db\n'
2021-07-30 19:57:40 +00:00
'(Also you can fill in the above example directly,'
2021-07-30 19:32:58 +00:00
' bot will automatically create a SQLite database in the "./database/save.db")')
from database import BotDBUtil, session
from database.tables import DBVersion
2021-02-19 08:56:57 +00:00
import asyncio
2021-04-10 13:12:32 +00:00
import traceback
import aioconsole
2021-02-19 08:56:57 +00:00
2022-07-01 06:26:41 +00:00
from bot import init_bot
2023-02-05 14:33:33 +00:00
from core.builtins import PrivateAssets, EnableDirtyWordCheck, Plain
from core.types import MsgInfo, AutoSession
2023-04-19 07:45:29 +00:00
from core.console.template import Template as MessageSession
2021-07-30 18:06:04 +00:00
from core.parser.message import parser
2023-03-01 13:41:42 +00:00
from core.utils.bot import init_async
2022-01-20 13:31:50 +00:00
from core.logger import Logger
2021-08-24 10:22:36 +00:00
query_dbver = session.query(DBVersion).first()
if query_dbver is None:
2023-04-14 13:17:59 +00:00
session.add_all([DBVersion(value=str(BotDBUtil.database_version))])
session.commit()
query_dbver = session.query(DBVersion).first()
if (current_ver := int(query_dbver.value)) < (target_ver := BotDBUtil.database_version):
print(f'Updating database from {current_ver} to {target_ver}...')
from database.update import update_database
update_database()
print('Database updated successfully! Please restart the program.')
sys.exit()
EnableDirtyWordCheck.status = True
2021-08-24 10:22:36 +00:00
PrivateAssets.set(os.path.abspath(os.path.dirname(__file__) + '/assets'))
2021-02-19 12:20:00 +00:00
async def console_scheduler():
2023-03-01 13:41:42 +00:00
await init_async()
async def console_command():
try:
m = await aioconsole.ainput('> ')
2022-08-13 07:13:30 +00:00
asyncio.create_task(console_command())
2022-06-18 16:21:59 +00:00
await send_command(m)
except KeyboardInterrupt:
print('Exited.')
2021-10-14 15:18:47 +00:00
exit()
except Exception:
2022-01-20 13:31:50 +00:00
Logger.error(traceback.format_exc())
2021-08-23 12:44:31 +00:00
2022-06-26 06:06:26 +00:00
async def send_command(msg, interactions=None):
2022-06-18 16:21:59 +00:00
Logger.info('-------Start-------')
2022-08-27 17:51:43 +00:00
returns = await parser(MessageSession(target=MsgInfo(targetId='TEST|Console|0',
2022-06-26 06:06:26 +00:00
senderId='TEST|0',
senderName='',
targetFrom='TEST|Console',
2022-08-27 17:51:43 +00:00
senderFrom='TEST', clientName='TEST', messageId=0,
2022-07-31 08:33:20 +00:00
replyId=None),
2022-08-27 17:51:43 +00:00
session=AutoSession(message=msg, target='TEST|Console|0', sender='TEST|0',
2022-06-26 06:06:26 +00:00
auto_interactions=interactions)))
2022-07-27 14:32:16 +00:00
# print(returns)
2022-06-18 16:21:59 +00:00
Logger.info('----Process end----')
2022-06-26 06:06:26 +00:00
return returns
2022-06-18 16:21:59 +00:00
async def autotest():
test_file = './test_commands.txt'
if not os.path.exists(test_file):
Logger.error('Test file not found.')
read = open(test_file, 'r', encoding='utf-8')
commands = read.read().split('\n\n')
for command in commands:
2022-06-26 06:06:26 +00:00
sub_c = command.split('\n')
cmds = ''
results = {}
interactions = []
for sub in sub_c:
if sub.startswith('~'):
cmds += sub
elif sub.startswith('!!results'):
results = json.loads(sub.replace('!!results=', '', 1))
elif sub.startswith('!!interactions'):
interactions = json.loads(sub.replace('!!interactions=', '', 1))
else:
cmds += '\n' + sub
Logger.info(cmds)
returns = (await send_command(cmds, interactions=interactions)).sent # todo: 需要收集结果
included_texts = results.get('include_texts', [])
excluded_texts = results.get('exclude_texts', [])
if isinstance(included_texts, str):
included_texts = [included_texts]
if isinstance(excluded_texts, str):
excluded_texts = [excluded_texts]
for text in included_texts:
included = False
for r in returns:
for rr in r.value:
if isinstance(rr, Plain):
if rr.text.find(text) != -1:
Logger.info('Found included text: ' + text)
included = True
if not included:
Logger.error('Included text not found: ' + text)
for text in excluded_texts:
excluded = False
for r in returns:
for rr in r.value:
if isinstance(rr, Plain):
if rr.text.find(text) != -1:
Logger.error('Found excluded text: ' + text)
excluded = True
if not excluded:
Logger.info('Excluded text not found: ' + text)
included_elements = results.get('include_elements', [])
excluded_elements = results.get('exclude_elements', [])
if isinstance(included_elements, str):
included_elements = [included_elements]
if isinstance(excluded_elements, str):
excluded_elements = [excluded_elements]
for element in included_elements:
if isinstance(element, str):
included2 = False
for r in returns:
for rr in r.value:
if rr.__class__.__name__ == element:
Logger.info('Found included element: ' + element)
included2 = True
if not included2:
Logger.error('Included element not found: ' + element)
for element in excluded_elements:
if isinstance(element, str):
excluded2 = False
for r in returns:
for rr in r.value:
if rr.__class__.__name__ == element:
Logger.error('Found excluded element: ' + element)
excluded = True
if not excluded2:
Logger.info('Excluded element not found: ' + element)
2022-06-18 16:21:59 +00:00
if __name__ == '__main__':
init_bot()
loop = asyncio.get_event_loop()
argv = sys.argv
autotest_ = False
if len(argv) > 1:
if argv[1] == 'autotest':
autotest_ = True
if not autotest_:
loop.create_task(console_scheduler())
loop.create_task(console_command())
loop.run_forever()
else:
loop.create_task(autotest())
loop.run_forever()