Archived
1
0
Fork 0
This repository has been archived on 2024-04-26. You can view files and clone it, but cannot push or open issues or pull requests.
akari-bot/unit_test.py

53 lines
1.9 KiB
Python
Raw Normal View History

2021-07-30 19:32:58 +00:00
from config import Config
if not Config('db_path'):
raise AttributeError('Wait! You need to fill a valid database address into the config.cfg "db_path"\n'
'Example: \ndb_path = sqlite:///database/save.db\n'
2021-07-30 19:57:40 +00:00
'(Also you can fill in the above example directly,'
2021-07-30 19:32:58 +00:00
' bot will automatically create a SQLite database in the "./database/save.db")')
2021-02-19 08:56:57 +00:00
import asyncio
2021-04-10 13:12:32 +00:00
import traceback
import aioconsole
2021-02-19 08:56:57 +00:00
2021-08-20 16:32:46 +00:00
from init import init_bot
2021-08-07 07:56:48 +00:00
from core.elements import Module
2021-08-01 14:54:25 +00:00
from core.elements.message import MsgInfo, Session
2021-08-07 07:56:48 +00:00
from core.unit_test.template import Template as MessageSession, FetchTarget
2021-07-30 18:06:04 +00:00
from core.parser.message import parser
from core.scheduler import Scheduler
2021-07-30 18:06:04 +00:00
from core.loader import Modules
2021-02-19 12:20:00 +00:00
async def unit_test_scheduler():
gather_list = []
for x in Modules:
if isinstance(Modules[x], Module) and Modules[x].autorun:
2021-08-07 07:56:48 +00:00
gather_list.append(asyncio.ensure_future(Modules[x].function(FetchTarget)))
await asyncio.gather(*gather_list)
Scheduler.start()
async def unit_test_command():
try:
m = await aioconsole.ainput('> ')
2021-08-07 07:56:48 +00:00
await parser(MessageSession(target=MsgInfo(targetId='TEST|0',
senderId='TEST|0',
senderName='',
targetFrom='TEST|Console',
senderFrom='TEST|Console'),
2021-08-07 07:56:48 +00:00
session=Session(message=m, target='TEST|0', sender='TEST|0')))
print('----Process end----')
await unit_test_command()
except KeyboardInterrupt:
print('Exited.')
except Exception:
traceback.print_exc()
2021-08-23 12:44:31 +00:00
2021-08-07 12:55:07 +00:00
init_bot()
loop = asyncio.get_event_loop()
loop.create_task(unit_test_scheduler())
loop.create_task(unit_test_command())
loop.run_forever()