Archived
1
0
Fork 0
This repository has been archived on 2024-04-26. You can view files and clone it, but cannot push or open issues or pull requests.
akari-bot/core/loader/__init__.py
2023-07-06 17:40:04 +08:00

203 lines
7.3 KiB
Python

import importlib
import os
import re
import sys
import traceback
from typing import Dict, Union
from config import Config
from core.builtins import PrivateAssets
from core.logger import Logger
from core.types import Module
from core.types.module.component_meta import CommandMeta, RegexMeta, ScheduleMeta
from core.utils.i18n import load_locale_file
load_dir_path = os.path.abspath('./modules/')
current_unloaded_modules = []
def load_modules():
unloaded_modules = Config('unloaded_modules')
err_prompt = []
locale_err = load_locale_file()
if locale_err:
locale_err.append('i18n:')
err_prompt.append('\n'.join(locale_err))
fun_file = None
dir_list = os.listdir(load_dir_path)
for file_name in dir_list:
try:
file_path = os.path.join(load_dir_path, file_name)
fun_file = None
if os.path.isdir(file_path):
if file_name[0] != '_':
fun_file = file_name
elif os.path.isfile(file_path):
if file_name[0] != '_' and file_name.endswith('.py'):
fun_file = file_name[:-3]
if fun_file is not None:
Logger.info(f'Loading modules.{fun_file}...')
if fun_file in unloaded_modules:
Logger.info(f'Skipped modules.{fun_file}!')
current_unloaded_modules.append(fun_file)
continue
modules = 'modules.' + fun_file
importlib.import_module(modules)
Logger.info(f'Succeeded loaded modules.{fun_file}!')
except Exception:
tb = traceback.format_exc()
errmsg = f'Failed to load modules.{fun_file}: \n{tb}'
Logger.error(errmsg)
err_prompt.append(errmsg)
loadercache = os.path.abspath(PrivateAssets.path + '/.cache_loader')
openloadercache = open(loadercache, 'w')
if err_prompt:
err_prompt = re.sub(r' File \"<frozen importlib.*?>\", .*?\n', '', '\n'.join(err_prompt))
openloadercache.write(err_prompt)
else:
openloadercache.write('')
openloadercache.close()
modules = ModulesManager.modules
for m in modules:
module = modules[m]
if module.alias:
ModulesManager.modules_aliases.update(module.alias)
class ModulesManager:
modules: Dict[str, Module] = {}
modules_aliases: Dict[str, str] = {}
modules_origin: Dict[str, str] = {}
@classmethod
def add_module(cls, module: Module, py_module_name: str):
if module.bind_prefix not in ModulesManager.modules:
cls.modules.update({module.bind_prefix: module})
cls.modules_origin.update({module.bind_prefix: py_module_name})
else:
raise ValueError(f'Duplicate bind prefix "{module.bind_prefix}"')
@classmethod
def remove_modules(cls, modules):
for module in modules:
if module in cls.modules:
Logger.info(f'Removing...{module}')
cls.modules.pop(module)
cls.modules_origin.pop(module)
else:
raise ValueError(f'Module "{module}" is not exist')
@classmethod
def search_related_module(cls, module, includeSelf=True):
if module in cls.modules_origin:
modules = []
py_module = cls.return_py_module(module)
for m in cls.modules_origin:
if cls.modules_origin[m].startswith(py_module):
modules.append(m)
if not includeSelf:
modules.remove(module)
return modules
else:
raise ValueError(f'Could not find "{module}" in modules_origin dict')
@classmethod
def return_py_module(cls, module):
if module in cls.modules_origin:
return re.match(r'^modules(\.[a-zA-Z0-9_]*)?', cls.modules_origin[module]).group()
else:
return None
@classmethod
def bind_to_module(cls, bind_prefix: str, meta: Union[CommandMeta, RegexMeta, ScheduleMeta]):
if bind_prefix in cls.modules:
if isinstance(meta, CommandMeta):
cls.modules[bind_prefix].command_list.add(meta)
elif isinstance(meta, RegexMeta):
cls.modules[bind_prefix].regex_list.add(meta)
elif isinstance(meta, ScheduleMeta):
cls.modules[bind_prefix].schedule_list.add(meta)
_return_cache = {}
@classmethod
def return_modules_list(cls, targetFrom: str = None) -> \
Dict[str, Module]:
if targetFrom is not None:
if targetFrom in cls._return_cache:
return cls._return_cache[targetFrom]
returns = {}
for m in cls.modules:
if isinstance(cls.modules[m], Module):
if targetFrom in cls.modules[m].exclude_from:
continue
available = cls.modules[m].available_for
if targetFrom in available or '*' in available:
returns.update({m: cls.modules[m]})
cls._return_cache.update({targetFrom: returns})
return returns
return cls.modules
@classmethod
def reload_module(cls, module_name: str):
"""
重载该小可模块(以及该模块所在文件的其它模块)
"""
py_module = cls.return_py_module(module_name)
unbind_modules = cls.search_related_module(module_name)
cls.remove_modules(unbind_modules)
cls._return_cache.clear()
return cls.reload_py_module(py_module)
@classmethod
def load_module(cls, module_name: str):
"""
加载该小可模块(以及该模块所在文件的其它模块)
"""
if module_name not in current_unloaded_modules:
return False
modules = 'modules.' + module_name
if modules in sys.modules:
cls.return_py_module(module_name)
else:
importlib.import_module(modules)
Logger.info(f'Succeeded loaded modules.{module_name}!')
cls._return_cache.clear()
current_unloaded_modules.remove(module_name)
return True
@classmethod
def unload_module(cls, module_name: str):
"""
卸载该小可模块(以及该模块所在文件的其它模块)
"""
origin_module = cls.modules_origin[module_name]
unbind_modules = cls.search_related_module(module_name)
cls.remove_modules(unbind_modules)
cls._return_cache.clear()
current_unloaded_modules.append(origin_module)
return True
@classmethod
def reload_py_module(cls, module_name: str):
"""
重载该py模块
"""
try:
Logger.info(f'Reloading {module_name} ...')
module = sys.modules[module_name]
cnt = 0
loadedModList = list(sys.modules.keys())
for mod in loadedModList:
if mod.startswith(f'{module_name}.'):
cnt += cls.reload_py_module(mod)
importlib.reload(module)
Logger.info(f'Succeeded reloaded {module_name}')
return cnt + 1
except BaseException:
tb = traceback.format_exc()
errmsg = f'Failed to reload {module_name}: \n{tb}'
Logger.error(errmsg)
return -999