Archived
1
0
Fork 0
This repository has been archived on 2024-04-26. You can view files and clone it, but cannot push or open issues or pull requests.
akari-bot/core/builtins/message/chain.py

185 lines
7.5 KiB
Python
Raw Normal View History

import base64
2022-01-20 12:13:03 +00:00
import re
from typing import Union, List, Tuple
from urllib.parse import urlparse
import ujson as json
2023-02-05 14:33:33 +00:00
from core.builtins.message.internal import Plain, Image, Voice, Embed, Url, ErrorMessage
from core.builtins.utils import Secret
2022-01-13 12:08:14 +00:00
from core.logger import Logger
2023-02-05 14:33:33 +00:00
from core.types.message import MessageChain as MC
2021-12-31 14:44:34 +00:00
2023-02-05 14:33:33 +00:00
class MessageChain(MC):
2022-01-17 13:28:49 +00:00
def __init__(self, elements: Union[str, List[Union[Plain, Image, Voice, Embed, Url]],
2023-04-30 03:30:59 +00:00
Tuple[Union[Plain, Image, Voice, Embed, Url]],
Plain, Image, Voice, Embed, Url]):
2021-12-31 14:44:34 +00:00
self.value = []
2022-01-04 15:32:09 +00:00
if isinstance(elements, ErrorMessage):
elements = str(elements)
2021-12-31 14:44:34 +00:00
if isinstance(elements, str):
2023-03-09 05:38:04 +00:00
elements = Plain(elements)
2022-01-17 13:28:49 +00:00
if isinstance(elements, (Plain, Image, Voice, Embed, Url)):
if isinstance(elements, Plain):
if elements.text != '':
elements = match_kecode(elements.text)
2021-12-31 14:44:34 +00:00
else:
elements = [elements]
if isinstance(elements, (list, tuple)):
2021-12-31 14:44:34 +00:00
for e in elements:
2022-01-04 15:32:09 +00:00
if isinstance(e, ErrorMessage):
2022-01-17 13:28:49 +00:00
self.value.append(Plain(str(e)))
elif isinstance(e, Url):
self.value.append(Plain(e.url))
elif isinstance(e, (Plain, Image, Voice, Embed)):
if isinstance(e, Plain):
if e.text != '':
self.value += match_kecode(e.text)
else:
self.value.append(e)
2023-02-17 10:57:13 +00:00
elif isinstance(e, str):
if e != '':
self.value += match_kecode(e)
2021-12-31 14:44:34 +00:00
else:
2022-01-13 12:08:14 +00:00
Logger.error(f'Unexpected message type: {elements}')
2021-12-31 14:44:34 +00:00
elif isinstance(elements, MessageChain):
self.value = elements.value
else:
2022-01-13 12:08:14 +00:00
Logger.error(f'Unexpected message type: {elements}')
2021-12-31 14:44:34 +00:00
@property
def is_safe(self):
2022-01-18 12:57:04 +00:00
def unsafeprompt(name, secret, text):
return f'{name} contains unsafe text "{secret}": {text}'
2022-01-20 12:13:03 +00:00
2021-12-31 14:44:34 +00:00
for v in self.value:
if isinstance(v, Plain):
for secret in Secret.list:
if v.text.upper().find(secret.upper()) != -1:
2022-01-18 12:57:04 +00:00
Logger.warn(unsafeprompt('Plain', secret, v.text))
2021-12-31 14:44:34 +00:00
return False
elif isinstance(v, Embed):
for secret in Secret.list:
2022-04-04 06:20:22 +00:00
if v.title is not None:
if v.title.upper().find(secret.upper()) != -1:
Logger.warn(unsafeprompt('Embed.title', secret, v.title))
return False
if v.description is not None:
if v.description.upper().find(secret.upper()) != -1:
Logger.warn(unsafeprompt('Embed.description', secret, v.description))
return False
if v.footer is not None:
if v.footer.upper().find(secret.upper()) != -1:
Logger.warn(unsafeprompt('Embed.footer', secret, v.footer))
return False
if v.author is not None:
if v.author.upper().find(secret.upper()) != -1:
Logger.warn(unsafeprompt('Embed.author', secret, v.author))
return False
if v.url is not None:
if v.url.upper().find(secret.upper()) != -1:
Logger.warn(unsafeprompt('Embed.url', secret, v.url))
return False
2021-12-31 14:44:34 +00:00
for f in v.fields:
if f.name.upper().find(secret.upper()) != -1:
2022-01-18 12:57:04 +00:00
Logger.warn(unsafeprompt('Embed.field.name', secret, f.name))
2021-12-31 14:44:34 +00:00
return False
if f.value.upper().find(secret.upper()) != -1:
2022-01-18 12:57:04 +00:00
Logger.warn(unsafeprompt('Embed.field.value', secret, f.value))
2021-12-31 14:44:34 +00:00
return False
return True
2023-03-05 11:15:44 +00:00
def asSendable(self, locale="zh_cn", embed=True):
2021-12-31 14:44:34 +00:00
value = []
for x in self.value:
if isinstance(x, Embed) and not embed:
2021-12-31 17:53:08 +00:00
value += x.to_msgchain()
2023-03-05 11:15:44 +00:00
elif isinstance(x, ErrorMessage):
value.append(ErrorMessage(x.error_message, locale=locale))
2023-03-09 05:38:04 +00:00
elif isinstance(x, Plain):
if x.text != '':
value.append(x)
else:
Plain(ErrorMessage('{error.message.chain.plain.empty}', locale=locale))
2021-12-31 14:44:34 +00:00
else:
value.append(x)
2023-03-09 05:38:04 +00:00
if not value:
value.append(Plain(ErrorMessage('{error.message.chain.empty}', locale=locale)))
2021-12-31 14:44:34 +00:00
return value
def append(self, element):
self.value.append(element)
def remove(self, element):
self.value.remove(element)
def __str__(self):
2022-06-26 06:06:26 +00:00
return f'[{", ".join([x.__repr__() for x in self.value])}]'
2021-12-31 14:44:34 +00:00
def __repr__(self):
2022-06-26 06:06:26 +00:00
return self.__str__()
2021-12-31 14:44:34 +00:00
site_whitelist = ['http.cat']
def match_kecode(text: str) -> List[Union[Plain, Image, Voice, Embed]]:
split_all = re.split(r'(\[Ke:.*?])', text)
for x in split_all:
if x == '':
split_all.remove('')
elements = []
for e in split_all:
match = re.match(r'\[Ke:(.*?),(.*)]', e)
if not match:
if e != '':
elements.append(Plain(e))
else:
element_type = match.group(1).lower()
args = re.split(r',|,.\s', match.group(2))
for x in args:
if x == '':
args.remove('')
if element_type == 'plain':
for a in args:
ma = re.match(r'(.*?)=(.*)', a)
if ma:
if ma.group(1) == 'text':
elements.append(Plain(ma.group(2)))
else:
elements.append(Plain(a))
else:
elements.append(Plain(a))
elif element_type == 'image':
for a in args:
ma = re.match(r'(.*?)=(.*)', a)
if ma:
img = None
if ma.group(1) == 'path':
parse_url = urlparse(ma.group(2))
if parse_url[0] == 'file' or parse_url[1] in site_whitelist:
img = Image(path=ma.group(2))
if ma.group(1) == 'headers':
img.headers = json.loads(str(base64.b64decode(ma.group(2)), "UTF-8"))
2022-01-07 15:16:28 +00:00
if img is not None:
elements.append(img)
else:
elements.append(Image(a))
elif element_type == 'voice':
for a in args:
ma = re.match(r'(.*?)=(.*)', a)
if ma:
if ma.group(1) == 'path':
parse_url = urlparse(ma.group(2))
if parse_url[0] == 'file' or parse_url[1] in site_whitelist:
elements.append(Voice(path=ma.group(2)))
else:
elements.append(Voice(a))
else:
elements.append(Voice(a))
return elements
2021-12-31 14:44:34 +00:00
__all__ = ["MessageChain"]