Archived
1
0
Fork 0
This repository has been archived on 2024-04-26. You can view files and clone it, but cannot push or open issues or pull requests.
akari-bot/core/utils/bot.py
2023-03-01 21:41:42 +08:00

99 lines
3.4 KiB
Python

import asyncio
import logging
import os
import traceback
from configparser import ConfigParser
from os.path import abspath
import ujson as json
from core.builtins import PrivateAssets, Secret
from core.exceptions import ConfigFileNotFound
from core.loader import load_modules, ModulesManager
from core.logger import Logger
from core.scheduler import Scheduler
from core.types import Schedule
from core.utils.http import get_url
from core.utils.ip import IP
bot_version = 'v4.0.10'
async def init_async() -> None:
load_modules()
version = os.path.abspath(PrivateAssets.path + '/version')
write_version = open(version, 'w')
try:
write_version.write(os.popen('git rev-parse HEAD', 'r').read()[0:6])
except Exception as e:
write_version.write(bot_version)
write_version.close()
tag = os.path.abspath(PrivateAssets.path + '/version_tag')
write_tag = open(tag, 'w')
try:
write_tag.write(os.popen('git tag -l', 'r').read().split('\n')[-2])
except Exception as e:
write_tag.write(bot_version)
write_tag.close()
gather_list = []
Modules = ModulesManager.return_modules_list_as_dict()
for x in Modules:
if isinstance(Modules[x], Schedule):
Scheduler.add_job(func=Modules[x].execute, trigger=Modules[x].trigger, misfire_grace_time=30,
max_instance=1)
await asyncio.gather(*gather_list)
Scheduler.start()
logging.getLogger('apscheduler.executors.default').setLevel(logging.WARNING)
await load_secret()
async def load_secret():
config_filename = 'config.cfg'
config_path = abspath('./config/' + config_filename)
cp = ConfigParser()
cp.read(config_path)
section = cp.sections()
if len(section) == 0:
raise ConfigFileNotFound(config_path) from None
section = section[0]
options = cp.options(section)
for option in options:
value = cp.get(section, option)
if value.upper() not in ['', 'TRUE', 'FALSE']:
Secret.add(value.upper())
try:
async def append_ip():
ip = await get_url('https://api.ip.sb/geoip', timeout=10, fmt='json')
if ip:
Secret.add(ip['ip'])
IP.country = ip['country']
IP.address = ip['ip']
Logger.info('Fetching IP information...')
await asyncio.create_task(append_ip())
Logger.info('Successfully fetched IP information.')
except Exception:
Logger.info('Failed to get IP information.')
Logger.error(traceback.format_exc())
async def load_prompt(bot) -> None:
author_cache = os.path.abspath(PrivateAssets.path + '/cache_restart_author')
loader_cache = os.path.abspath(PrivateAssets.path + '/.cache_loader')
if os.path.exists(author_cache):
open_author_cache = open(author_cache, 'r')
author = json.loads(open_author_cache.read())['ID']
open_loader_cache = open(loader_cache, 'r')
m = await bot.fetch_target(author)
if m:
if (read := open_loader_cache.read()) != '':
await m.sendDirectMessage('加载模块中发生了以下错误,对应模块未加载:\n' + read)
else:
await m.sendDirectMessage('所有模块已正常加载。')
open_loader_cache.close()
open_author_cache.close()
os.remove(author_cache)
os.remove(loader_cache)
__all__ = ['init_async', 'load_prompt']