Archived
1
0
Fork 0
This repository has been archived on 2024-04-26. You can view files and clone it, but cannot push or open issues or pull requests.
akari-bot/core/elements/message/chain.py

164 lines
6.3 KiB
Python
Raw Normal View History

import base64
from urllib.parse import urlparse
import ujson as json
import re
2021-12-31 14:44:34 +00:00
from .internal import Plain, Image, Voice, Embed
from core.elements.others import Secret, ErrorMessage
from typing import Union, List, Tuple
2022-01-13 12:08:14 +00:00
from core.logger import Logger
2021-12-31 14:44:34 +00:00
class MessageChain:
def __init__(self, elements: Union[str, List[Union[Plain, Image, Voice, Embed]],
Tuple[Union[Plain, Image, Voice, Embed]],
Plain, Image, Voice, Embed]):
self.value = []
2022-01-04 15:32:09 +00:00
if isinstance(elements, ErrorMessage):
elements = str(elements)
2021-12-31 14:44:34 +00:00
if isinstance(elements, str):
if elements != '':
elements = Plain(elements)
else:
elements = Plain(ErrorMessage('机器人尝试发送空文本消息,请联系机器人开发者解决问题。'))
if isinstance(elements, (Plain, Image, Voice, Embed)):
if isinstance(elements, Plain):
if elements.text != '':
elements = match_kecode(elements.text)
2021-12-31 14:44:34 +00:00
else:
elements = [elements]
if isinstance(elements, (list, tuple)):
2021-12-31 14:44:34 +00:00
for e in elements:
2022-01-04 15:32:09 +00:00
if isinstance(e, ErrorMessage):
self.value.append(str(e))
2021-12-31 14:44:34 +00:00
if isinstance(e, (Plain, Image, Voice, Embed)):
if isinstance(e, Plain):
if e.text != '':
self.value += match_kecode(e.text)
else:
self.value.append(e)
2021-12-31 14:44:34 +00:00
else:
2022-01-13 12:08:14 +00:00
Logger.error(f'Unexpected message type: {elements}')
2021-12-31 14:44:34 +00:00
self.value.append(
Plain(ErrorMessage('机器人尝试发送非法消息链,请联系机器人开发者解决问题。')))
elif isinstance(elements, MessageChain):
self.value = elements.value
else:
2022-01-13 12:08:14 +00:00
Logger.error(f'Unexpected message type: {elements}')
2021-12-31 14:44:34 +00:00
self.value.append(
Plain(ErrorMessage('机器人尝试发送非法消息链,请联系机器人开发者解决问题。')))
if not self.value:
self.value.append(Plain(ErrorMessage('机器人尝试发送空消息链,请联系机器人开发者解决问题。')))
2021-12-31 14:44:34 +00:00
@property
def is_safe(self):
for v in self.value:
if isinstance(v, Plain):
for secret in Secret.list:
if v.text.upper().find(secret.upper()) != -1:
return False
elif isinstance(v, Embed):
for secret in Secret.list:
if v.title.upper().find(secret.upper()) != -1:
return False
if v.description.upper().find(secret.upper()) != -1:
return False
if v.footer.upper().find(secret.upper()) != -1:
return False
if v.author.upper().find(secret.upper()) != -1:
return False
if v.url.upper().find(secret.upper()) != -1:
return False
for f in v.fields:
if f.name.upper().find(secret.upper()) != -1:
return False
if f.value.upper().find(secret.upper()) != -1:
return False
return True
def asSendable(self, embed=True):
value = []
for x in self.value:
if isinstance(x, Embed) and not embed:
2021-12-31 17:53:08 +00:00
value += x.to_msgchain()
2021-12-31 14:44:34 +00:00
else:
value.append(x)
return value
def append(self, element):
self.value.append(element)
def remove(self, element):
self.value.remove(element)
def __str__(self):
return f'[{", ".join([str(x.__dict__) for x in self.value])}]'
def __repr__(self):
return self.value
site_whitelist = ['http.cat']
def match_kecode(text: str) -> List[Union[Plain, Image, Voice, Embed]]:
split_all = re.split(r'(\[Ke:.*?])', text)
for x in split_all:
if x == '':
split_all.remove('')
elements = []
for e in split_all:
match = re.match(r'\[Ke:(.*?),(.*)]', e)
if not match:
if e != '':
elements.append(Plain(e))
else:
element_type = match.group(1).lower()
args = re.split(r',|,.\s', match.group(2))
for x in args:
if x == '':
args.remove('')
if element_type == 'plain':
for a in args:
ma = re.match(r'(.*?)=(.*)', a)
if ma:
if ma.group(1) == 'text':
elements.append(Plain(ma.group(2)))
else:
elements.append(Plain(a))
else:
elements.append(Plain(a))
elif element_type == 'image':
for a in args:
ma = re.match(r'(.*?)=(.*)', a)
if ma:
img = None
if ma.group(1) == 'path':
parse_url = urlparse(ma.group(2))
if parse_url[0] == 'file' or parse_url[1] in site_whitelist:
img = Image(path=ma.group(2))
if ma.group(1) == 'headers':
img.headers = json.loads(str(base64.b64decode(ma.group(2)), "UTF-8"))
2022-01-07 15:16:28 +00:00
if img is not None:
elements.append(img)
else:
elements.append(Image(a))
elif element_type == 'voice':
for a in args:
ma = re.match(r'(.*?)=(.*)', a)
if ma:
if ma.group(1) == 'path':
parse_url = urlparse(ma.group(2))
if parse_url[0] == 'file' or parse_url[1] in site_whitelist:
elements.append(Voice(path=ma.group(2)))
else:
elements.append(Voice(a))
else:
elements.append(Voice(a))
return elements
2021-12-31 14:44:34 +00:00
__all__ = ["MessageChain"]