2022-01-07 15:02:57 +00:00
|
|
|
import base64
|
2022-01-20 12:13:03 +00:00
|
|
|
import re
|
|
|
|
from typing import Union, List, Tuple
|
2022-01-10 15:30:41 +00:00
|
|
|
from urllib.parse import urlparse
|
|
|
|
|
2022-01-07 15:02:57 +00:00
|
|
|
import ujson as json
|
|
|
|
|
2023-02-05 14:33:33 +00:00
|
|
|
from core.builtins.message.internal import Plain, Image, Voice, Embed, Url, ErrorMessage
|
|
|
|
from core.builtins.utils import Secret
|
2022-01-13 12:08:14 +00:00
|
|
|
from core.logger import Logger
|
2023-02-05 14:33:33 +00:00
|
|
|
from core.types.message import MessageChain as MC
|
2022-01-04 15:26:37 +00:00
|
|
|
|
2021-12-31 14:44:34 +00:00
|
|
|
|
2023-02-05 14:33:33 +00:00
|
|
|
class MessageChain(MC):
|
2022-01-17 13:28:49 +00:00
|
|
|
def __init__(self, elements: Union[str, List[Union[Plain, Image, Voice, Embed, Url]],
|
2023-04-30 03:30:59 +00:00
|
|
|
Tuple[Union[Plain, Image, Voice, Embed, Url]],
|
|
|
|
Plain, Image, Voice, Embed, Url]):
|
2021-12-31 14:44:34 +00:00
|
|
|
self.value = []
|
2022-01-04 15:32:09 +00:00
|
|
|
if isinstance(elements, ErrorMessage):
|
|
|
|
elements = str(elements)
|
2021-12-31 14:44:34 +00:00
|
|
|
if isinstance(elements, str):
|
2023-03-09 05:38:04 +00:00
|
|
|
elements = Plain(elements)
|
2022-01-17 13:28:49 +00:00
|
|
|
if isinstance(elements, (Plain, Image, Voice, Embed, Url)):
|
2022-01-07 15:02:57 +00:00
|
|
|
if isinstance(elements, Plain):
|
|
|
|
if elements.text != '':
|
|
|
|
elements = match_kecode(elements.text)
|
2021-12-31 14:44:34 +00:00
|
|
|
else:
|
2022-01-07 15:02:57 +00:00
|
|
|
elements = [elements]
|
|
|
|
if isinstance(elements, (list, tuple)):
|
2021-12-31 14:44:34 +00:00
|
|
|
for e in elements:
|
2022-01-04 15:32:09 +00:00
|
|
|
if isinstance(e, ErrorMessage):
|
2022-01-17 13:28:49 +00:00
|
|
|
self.value.append(Plain(str(e)))
|
|
|
|
elif isinstance(e, Url):
|
|
|
|
self.value.append(Plain(e.url))
|
|
|
|
elif isinstance(e, (Plain, Image, Voice, Embed)):
|
2022-01-07 15:02:57 +00:00
|
|
|
if isinstance(e, Plain):
|
|
|
|
if e.text != '':
|
|
|
|
self.value += match_kecode(e.text)
|
|
|
|
else:
|
|
|
|
self.value.append(e)
|
2023-02-17 10:57:13 +00:00
|
|
|
elif isinstance(e, str):
|
|
|
|
if e != '':
|
|
|
|
self.value += match_kecode(e)
|
2021-12-31 14:44:34 +00:00
|
|
|
else:
|
2022-01-13 12:08:14 +00:00
|
|
|
Logger.error(f'Unexpected message type: {elements}')
|
2021-12-31 14:44:34 +00:00
|
|
|
elif isinstance(elements, MessageChain):
|
|
|
|
self.value = elements.value
|
|
|
|
else:
|
2022-01-13 12:08:14 +00:00
|
|
|
Logger.error(f'Unexpected message type: {elements}')
|
2021-12-31 14:44:34 +00:00
|
|
|
|
|
|
|
@property
|
|
|
|
def is_safe(self):
|
2022-01-18 12:57:04 +00:00
|
|
|
def unsafeprompt(name, secret, text):
|
|
|
|
return f'{name} contains unsafe text "{secret}": {text}'
|
2022-01-20 12:13:03 +00:00
|
|
|
|
2021-12-31 14:44:34 +00:00
|
|
|
for v in self.value:
|
|
|
|
if isinstance(v, Plain):
|
|
|
|
for secret in Secret.list:
|
2023-07-08 05:03:46 +00:00
|
|
|
if secret in ["", None, True, False]:
|
|
|
|
continue
|
2021-12-31 14:44:34 +00:00
|
|
|
if v.text.upper().find(secret.upper()) != -1:
|
2022-01-18 12:57:04 +00:00
|
|
|
Logger.warn(unsafeprompt('Plain', secret, v.text))
|
2021-12-31 14:44:34 +00:00
|
|
|
return False
|
|
|
|
elif isinstance(v, Embed):
|
|
|
|
for secret in Secret.list:
|
2023-07-04 16:10:30 +00:00
|
|
|
if secret in ["", None, True, False]:
|
|
|
|
continue
|
2022-04-04 06:20:22 +00:00
|
|
|
if v.title is not None:
|
|
|
|
if v.title.upper().find(secret.upper()) != -1:
|
|
|
|
Logger.warn(unsafeprompt('Embed.title', secret, v.title))
|
|
|
|
return False
|
|
|
|
if v.description is not None:
|
|
|
|
if v.description.upper().find(secret.upper()) != -1:
|
|
|
|
Logger.warn(unsafeprompt('Embed.description', secret, v.description))
|
|
|
|
return False
|
|
|
|
if v.footer is not None:
|
|
|
|
if v.footer.upper().find(secret.upper()) != -1:
|
|
|
|
Logger.warn(unsafeprompt('Embed.footer', secret, v.footer))
|
|
|
|
return False
|
|
|
|
if v.author is not None:
|
|
|
|
if v.author.upper().find(secret.upper()) != -1:
|
|
|
|
Logger.warn(unsafeprompt('Embed.author', secret, v.author))
|
|
|
|
return False
|
|
|
|
if v.url is not None:
|
|
|
|
if v.url.upper().find(secret.upper()) != -1:
|
|
|
|
Logger.warn(unsafeprompt('Embed.url', secret, v.url))
|
|
|
|
return False
|
2021-12-31 14:44:34 +00:00
|
|
|
for f in v.fields:
|
|
|
|
if f.name.upper().find(secret.upper()) != -1:
|
2022-01-18 12:57:04 +00:00
|
|
|
Logger.warn(unsafeprompt('Embed.field.name', secret, f.name))
|
2021-12-31 14:44:34 +00:00
|
|
|
return False
|
|
|
|
if f.value.upper().find(secret.upper()) != -1:
|
2022-01-18 12:57:04 +00:00
|
|
|
Logger.warn(unsafeprompt('Embed.field.value', secret, f.value))
|
2021-12-31 14:44:34 +00:00
|
|
|
return False
|
|
|
|
return True
|
|
|
|
|
2023-03-05 11:15:44 +00:00
|
|
|
def asSendable(self, locale="zh_cn", embed=True):
|
2021-12-31 14:44:34 +00:00
|
|
|
value = []
|
|
|
|
for x in self.value:
|
|
|
|
if isinstance(x, Embed) and not embed:
|
2021-12-31 17:53:08 +00:00
|
|
|
value += x.to_msgchain()
|
2023-03-05 11:15:44 +00:00
|
|
|
elif isinstance(x, ErrorMessage):
|
|
|
|
value.append(ErrorMessage(x.error_message, locale=locale))
|
2023-03-09 05:38:04 +00:00
|
|
|
elif isinstance(x, Plain):
|
|
|
|
if x.text != '':
|
|
|
|
value.append(x)
|
|
|
|
else:
|
|
|
|
Plain(ErrorMessage('{error.message.chain.plain.empty}', locale=locale))
|
2021-12-31 14:44:34 +00:00
|
|
|
else:
|
|
|
|
value.append(x)
|
2023-03-09 05:38:04 +00:00
|
|
|
if not value:
|
|
|
|
value.append(Plain(ErrorMessage('{error.message.chain.empty}', locale=locale)))
|
2021-12-31 14:44:34 +00:00
|
|
|
return value
|
|
|
|
|
|
|
|
def append(self, element):
|
|
|
|
self.value.append(element)
|
|
|
|
|
|
|
|
def remove(self, element):
|
|
|
|
self.value.remove(element)
|
|
|
|
|
2023-06-10 03:59:24 +00:00
|
|
|
def insert(self, index, element):
|
|
|
|
self.value.insert(index, element)
|
|
|
|
|
2021-12-31 14:44:34 +00:00
|
|
|
def __str__(self):
|
2022-06-26 06:06:26 +00:00
|
|
|
return f'[{", ".join([x.__repr__() for x in self.value])}]'
|
2021-12-31 14:44:34 +00:00
|
|
|
|
|
|
|
def __repr__(self):
|
2022-06-26 06:06:26 +00:00
|
|
|
return self.__str__()
|
2021-12-31 14:44:34 +00:00
|
|
|
|
|
|
|
|
2022-01-10 15:30:41 +00:00
|
|
|
site_whitelist = ['http.cat']
|
|
|
|
|
|
|
|
|
2022-01-07 15:02:57 +00:00
|
|
|
def match_kecode(text: str) -> List[Union[Plain, Image, Voice, Embed]]:
|
|
|
|
split_all = re.split(r'(\[Ke:.*?])', text)
|
|
|
|
for x in split_all:
|
|
|
|
if x == '':
|
|
|
|
split_all.remove('')
|
|
|
|
elements = []
|
|
|
|
for e in split_all:
|
|
|
|
match = re.match(r'\[Ke:(.*?),(.*)]', e)
|
|
|
|
if not match:
|
|
|
|
if e != '':
|
|
|
|
elements.append(Plain(e))
|
|
|
|
else:
|
|
|
|
element_type = match.group(1).lower()
|
|
|
|
args = re.split(r',|,.\s', match.group(2))
|
|
|
|
for x in args:
|
|
|
|
if x == '':
|
|
|
|
args.remove('')
|
|
|
|
if element_type == 'plain':
|
|
|
|
for a in args:
|
|
|
|
ma = re.match(r'(.*?)=(.*)', a)
|
|
|
|
if ma:
|
|
|
|
if ma.group(1) == 'text':
|
|
|
|
elements.append(Plain(ma.group(2)))
|
|
|
|
else:
|
|
|
|
elements.append(Plain(a))
|
|
|
|
else:
|
|
|
|
elements.append(Plain(a))
|
|
|
|
elif element_type == 'image':
|
|
|
|
for a in args:
|
|
|
|
ma = re.match(r'(.*?)=(.*)', a)
|
|
|
|
if ma:
|
|
|
|
img = None
|
|
|
|
if ma.group(1) == 'path':
|
2022-01-10 15:30:41 +00:00
|
|
|
parse_url = urlparse(ma.group(2))
|
|
|
|
if parse_url[0] == 'file' or parse_url[1] in site_whitelist:
|
|
|
|
img = Image(path=ma.group(2))
|
2022-01-07 15:02:57 +00:00
|
|
|
if ma.group(1) == 'headers':
|
|
|
|
img.headers = json.loads(str(base64.b64decode(ma.group(2)), "UTF-8"))
|
2022-01-07 15:16:28 +00:00
|
|
|
if img is not None:
|
|
|
|
elements.append(img)
|
2022-01-07 15:02:57 +00:00
|
|
|
else:
|
|
|
|
elements.append(Image(a))
|
|
|
|
elif element_type == 'voice':
|
|
|
|
for a in args:
|
|
|
|
ma = re.match(r'(.*?)=(.*)', a)
|
|
|
|
if ma:
|
|
|
|
if ma.group(1) == 'path':
|
2022-01-10 15:30:41 +00:00
|
|
|
parse_url = urlparse(ma.group(2))
|
|
|
|
if parse_url[0] == 'file' or parse_url[1] in site_whitelist:
|
|
|
|
elements.append(Voice(path=ma.group(2)))
|
2022-01-07 15:02:57 +00:00
|
|
|
else:
|
|
|
|
elements.append(Voice(a))
|
|
|
|
else:
|
|
|
|
elements.append(Voice(a))
|
|
|
|
return elements
|
|
|
|
|
|
|
|
|
2021-12-31 14:44:34 +00:00
|
|
|
__all__ = ["MessageChain"]
|