Archived
1
0
Fork 0

use embed

This commit is contained in:
yzhh 2024-02-21 19:31:25 +08:00
parent 2aaefa6f40
commit 6e98441c1d
3 changed files with 32 additions and 19 deletions

View file

@ -1,5 +1,5 @@
import asyncio
from datetime import datetime
from datetime import datetime, UTC
from typing import List
from config import Config
@ -172,7 +172,7 @@ class MessageSession(MessageSessionT):
ftime_template.append("(UTC)")
else:
ftime_template.append(f"(UTC{self._tz_offset})")
return (datetime.utcfromtimestamp(timestamp) + self.timezone_offset).strftime(' '.join(ftime_template))
return (datetime.fromtimestamp(timestamp, UTC) + self.timezone_offset).strftime(' '.join(ftime_template))
__all__ = ["MessageSession"]

View file

@ -250,7 +250,15 @@ class Embed(EmbedT):
self.thumbnail = thumbnail
self.author = author
self.footer = footer
self.fields = fields
self.fields = []
if fields:
for f in fields:
if isinstance(f, EmbedField):
self.fields.append(f)
elif isinstance(f, dict):
self.fields.append(EmbedField(**f))
else:
raise TypeError(f"Invalid type {type(f)} for EmbedField")
def to_message_chain(self):
text_lst = []
@ -282,9 +290,9 @@ class Embed(EmbedT):
def __repr__(self):
return f'Embed(title="{self.title}", description="{self.description}", url="{self.url}", ' \
f'timestamp={self.timestamp}, color={self.color}, image={self.image.__repr__()}, ' \
f'thumbnail={self.thumbnail.__repr__()}, author="{self.author}", footer="{self.footer}", ' \
f'fields={self.fields})'
f'timestamp={self.timestamp}, color={self.color}, image={self.image.__repr__()}, ' \
f'thumbnail={self.thumbnail.__repr__()}, author="{self.author}", footer="{self.footer}", ' \
f'fields={self.fields})'
def to_dict(self):
return {
@ -299,7 +307,7 @@ class Embed(EmbedT):
'thumbnail': self.thumbnail,
'author': self.author,
'footer': self.footer,
'fields': self.fields}}
'fields': [f.to_dict() for f in self.fields]}}
__all__ = ["Plain", "Image", "Voice", "Embed", "EmbedField", "Url", "ErrorMessage", "FormattedTime", "I18NContext"]

View file

@ -4,7 +4,7 @@ from datetime import datetime, timezone
from database.local import CrowdinActivityRecords
from config import Config
from core.builtins import Plain
from core.builtins import Plain, Embed, EmbedField
from core.logger import Logger
from core.queue import JobQueue
from core.scheduler import Scheduler, IntervalTrigger
@ -32,13 +32,13 @@ async def check_crowdin():
if 'error' in get_json:
raise Exception(get_json['msg'])
for act in get_json['activity']:
m = html2text(act["message"], baseurl=base_url).strip()
m = html2text(act["message"], baseurl=base_url).strip().replace('\n', '')
if not any(x in m for x in filter_words):
continue
if act['count'] == 1:
identify = f'{act["user_id"]}{str(act['timestamp'])}{m}'
if not first and not CrowdinActivityRecords.check(identify):
await JobQueue.trigger_hook_all('mc_crowdin', message=[Plain(m).to_dict()])
await JobQueue.trigger_hook_all('mc_crowdin', message=[Embed(title=m).to_dict()])
else:
detail_url = f"https://crowdin.com/backend/project_actions/activity_stream_details?request_type=project&type={
act["type"]}&timestamp={
@ -50,19 +50,24 @@ async def check_crowdin():
Logger.info(get_detail_json)
if get_detail_json:
for detail_num in get_detail_json['activity']:
identify_ = []
identify_ = {}
if not isinstance(get_detail_json['activity'][detail_num], list):
continue
for detail in get_detail_json['activity'][detail_num]:
identify = f'{
detail["title"]}: {
html2text(
detail["content"],
baseurl=base_url)}'.strip()
identify_.append(identify)
identify = "\n".join(identify_)
content = detail["content"]
if 'icon-thumbs-up' in content:
content = '👍'
elif 'icon-thumbs-down' in content:
content = '👎'
else:
content = html2text(content, baseurl=base_url).strip()
identify = {detail['title']: content}
identify_.update(identify)
identify = f'{act["user_id"]}{str(act['timestamp'])}{m}{
"\n".join(f'{i}: {identify_[i]}' for i in identify_)}'
if not first and not CrowdinActivityRecords.check(identify):
await JobQueue.trigger_hook_all('mc_crowdin', message=[Plain(m + '\n' + identify).to_dict()])
await JobQueue.trigger_hook_all('mc_crowdin', message=[Embed(title=m, fields=[EmbedField(name=k, value=v) for k, v in identify_.items()]).to_dict()])
except Exception:
if Config('debug'):
Logger.error(traceback.format_exc())