Archived
1
0
Fork 0
This repository has been archived on 2024-04-26. You can view files and clone it, but cannot push or open issues or pull requests.
akari-bot/bots/aiocqhttp/client.py

53 lines
1.4 KiB
Python
Raw Normal View History

2023-05-30 13:14:00 +00:00
import traceback
from typing import Dict, Any, Optional
2021-10-08 11:54:27 +00:00
2023-05-30 13:14:00 +00:00
from aiocqhttp import CQHttp, Error
from aiocqhttp.event import Event
class EventModded(Event):
@staticmethod
def from_payload(payload: Dict[str, Any]) -> 'Optional[Event]':
"""
OneBot 事件数据构造 `Event` 对象
"""
try:
e = EventModded(payload)
_ = e.type, e.detail_type
return e
except KeyError:
traceback.print_exc()
return None
@property
def detail_type(self) -> str:
"""
事件具体类型 `type` 的不同而不同 ``message`` 类型为例
``private````group````discuss``
"""
if self.type == 'message_sent':
return self['message_type']
return self[f'{self.type}_type']
class CQHttpModded(CQHttp):
async def _handle_event(self, payload: Dict[str, Any]) -> Any:
ev = EventModded.from_payload(payload)
if not ev:
return
event_name = ev.name
self.logger.info(f'received event: {event_name}')
if self._message_class and 'message' in ev:
ev['message'] = self._message_class(ev['message'])
results = list(
filter(lambda r: r is not None, await
self._bus.emit(event_name, ev)))
# return the first non-none result
return results[0] if results else None
bot = CQHttpModded()