import asyncio import os import time import traceback import uuid from datetime import datetime, timedelta from os.path import abspath import aiohttp import ujson as json from PIL import Image, ImageEnhance, ImageFont, ImageDraw, ImageOps from aiofile import async_open from gql import Client, gql from gql.transport.aiohttp import AIOHTTPTransport from config import Config from core.logger import Logger from core.utils import get_url async def get_rating(uid, query_type): try: if query_type == 'b30': query_type = 'bestRecords' elif query_type == 'r30': query_type = 'recentRecords' Profile_url = '' + uid Profile_json = json.loads(await get_url(Profile_url, 200)) if 'statusCode' in Profile_json: if Profile_json['statusCode'] == 404: return {'status': False, 'text': '发生错误:此用户不存在。'} ProfileId = Profile_json['user']['id'] ProfileRating = Profile_json['rating'] ProfileLevel = Profile_json['exp']['currentLevel'] ProfileUid = Profile_json['user']['uid'] nick = Profile_json['user']['name'] if nick is None: nick = ProfileUid if 'avatar' in Profile_json['user']: Avatar_img = Profile_json['user']['avatar']['medium'] else: Avatar_img = None transport = AIOHTTPTransport(url='') client = Client(transport=transport, fetch_schema_from_transport=True) query = gql( f""" query StudioAnalytics($id: ID = "{ProfileId}") {{ profile(id: $id) {{ id {query_type}(limit: 30) {{ ...RecordFragment }} }} }} fragment RecordFragment on UserRecord {{ id date chart {{ id difficulty type level {{ uid title }} }} score accuracy rating details {{ perfect great good bad miss }} }} """) result = await client.execute_async(query) workdir = os.path.abspath(Config("cache_path") + str(uuid.uuid4())) os.mkdir(workdir) bestRecords = result['profile'][query_type] rank = 0 resources = [] songcards = [] async def mkresources(x, rank): thumbpath = await download_cover_thumb(x['chart']['level']['uid']) chart_type = x['chart']['type'] difficulty = x['chart']['difficulty'] chart_name = x['chart']['level']['title'] score = str(x['score']) acc = x['accuracy'] rt = x['rating'] details = x['details'] _date = datetime.strptime(x['date'], "%Y-%m-%dT%H:%M:%S.%fZ") local_time = _date + timedelta(hours=8) playtime = local_time.timestamp() nowtime = time.time() playtime = playtime - nowtime playtime = - playtime t = playtime / 60 / 60 / 24 dw = 'd' if t < 1: t = playtime / 60 / 60 dw = 'h' if t < 1: t = playtime / 60 dw = 'm' if t < 1: t = playtime dw = 's' playtime = str(int(t)) + dw if thumbpath: havecover = True else: havecover = False songcards.append( make_songcard(thumbpath, chart_type, difficulty, chart_name, score, acc, rt, playtime, rank, details, havecover)) for x in bestRecords: rank += 1 resources.append(mkresources(x, rank)) await asyncio.gather(*resources) cards_ = await asyncio.gather(*songcards) cards_d = {} for x in cards_: for k in x: cards_d[k] = x[k] cards = [cards_d[x] for x in cards_d] # b30card b30img ="RGBA", (1975, 1610), '#1e2129') avatar_path = await download_avatar_thumb(Avatar_img, ProfileId) if avatar_path: im = im = im.resize((110, 110)) try: bigsize = (im.size[0] * 3, im.size[1] * 3) mask ='L', bigsize, 0) draw = ImageDraw.Draw(mask) draw.ellipse((0, 0) + bigsize, fill=255) mask = mask.resize(im.size, Image.ANTIALIAS) im.putalpha(mask) output =, mask.size, centering=(0.5, 0.5)) output.putalpha(mask) output.convert('RGBA') b30img.alpha_composite(output, (1825, 24)) except: traceback.print_exc() font4 = ImageFont.truetype(os.path.abspath('./assets/Nunito-Regular.ttf'), 35) drawtext = ImageDraw.Draw(b30img) get_name_width = font4.getsize(nick)[0] get_img_width = b30img.width drawtext.text((get_img_width - get_name_width - 160, 30), nick, '#ffffff', font=font4) font5 = ImageFont.truetype(os.path.abspath('./assets/Noto Sans CJK DemiLight.otf'), 20) level_text = f'等级 {ProfileLevel}' level_text_width = font5.getsize(level_text)[0] level_text_height = font5.getsize(level_text)[1] img_level ="RGBA", (level_text_width + 20, 40), '#050a1a') drawtext_level = ImageDraw.Draw(img_level) drawtext_level.text(((img_level.width - level_text_width) / 2, (img_level.height - level_text_height) / 2), level_text, '#ffffff', font=font5) b30img.alpha_composite(img_level, (1825 - img_level.width - 20, 85)) font6 = ImageFont.truetype(os.path.abspath('./assets/Nunito-Light.ttf'), 20) rating_text = f'Rating {str(round(float(ProfileRating), 2))}' rating_text_width = font6.getsize(rating_text)[0] rating_text_height = font6.getsize(rating_text)[1] img_rating ="RGBA", (rating_text_width + 20, 40), '#050a1a') drawtext_level = ImageDraw.Draw(img_rating) drawtext_level.text(((img_rating.width - rating_text_width) / 2, (img_rating.height - rating_text_height) / 2), rating_text, '#ffffff', font=font6) b30img.alpha_composite(img_rating, (1825 - img_level.width - img_rating.width - 30, 85)) textdraw = ImageDraw.Draw(b30img) textdraw.text((5, 5), f'Based on CytoidAPI | Generated by Teahouse Studios "Akaribot"', 'white', font=font6) i = 0 fname = 1 t = 0 s = 0 for card in cards: try: w = 15 + 384 * i h = 135 if s == 5: s = 0 t += 1 h = h + 240 * t w = w - 384 * 5 * t i += 1 b30img.alpha_composite(card, (w, h)) fname += 1 s += 1 except Exception: traceback.print_exc() break if __name__ == '__main__': else: savefilename = os.path.abspath(f'{Config("cache_path")}{str(uuid.uuid4())}.jpg') b30img.convert("RGB").save(savefilename) # shutil.rmtree(workdir) return {'status': True, 'path': savefilename} except Exception as e: traceback.print_exc() return {'status': False, 'text': '发生错误:' + str(e)} async def download_cover_thumb(uid): try: d = abspath('./assets/cytoid-cover/' + uid + '/') if not os.path.exists(d): os.makedirs(d) path = d + '/thumbnail.png' if not os.path.exists(d): os.mkdir(d) if not os.path.exists(path): level_url = '' + uid get_level = json.loads(await get_url(level_url)) cover_thumbnail = get_level['cover']['original'] + "?h=240&w=384" async with aiohttp.ClientSession() as session: async with session.get(cover_thumbnail) as resp: async with async_open(path, 'wb+') as jpg: await jpg.write(await return path else: return path except: traceback.print_exc() return False async def download_avatar_thumb(link, id): Logger.debug(f'Downloading avatar for {str(id)}') try: d = abspath('./assets/cytoid-avatar/') if not os.path.exists(d): os.makedirs(d) path = d + f'/{id}.png' if not os.path.exists(d): os.mkdir(d) if os.path.exists(path): os.remove(path) async with aiohttp.ClientSession() as session: async with session.get(link, timeout=aiohttp.ClientTimeout(total=20)) as resp: async with async_open(path, 'wb+') as jpg: await jpg.write(await return path except: traceback.print_exc() return False async def make_songcard(coverpath, chart_type, difficulty, chart_name, score, acc, rt, playtime, rank, details, havecover=True): if havecover: try: img = except: os.remove(coverpath) img ='RGBA', (384, 240), 'black') else: img ='RGBA', (384, 240), 'black') img = img.convert('RGBA') downlight = ImageEnhance.Brightness(img) img = downlight.enhance(0.5).resize((384, 240)) img_type ='./assets/cytoid/{chart_type}.png') img_type = img_type.convert('RGBA') img_type = img_type.resize((40, 40)) img.alpha_composite(img_type, (20, 20)) font_path = './assets/Noto Sans CJK DemiLight.otf' font = ImageFont.truetype(os.path.abspath(font_path), 25) font2 = ImageFont.truetype(os.path.abspath(font_path), 15) font3 = ImageFont.truetype(os.path.abspath(font_path), 20) drawtext = ImageDraw.Draw(img) drawtext.text((20, 130), score, '#ffffff', font=font3) drawtext.text((20, 155), chart_name, '#ffffff', font=font) drawtext.text((20, 185), f'Acc: {round(acc, 4)} Perfect: {details["perfect"]} Great: {details["great"]} Good: {details["good"]}' f'\nRating: {round(rt, 4)} Bad: {details["bad"]} Miss: {details["miss"]}', font=font2) playtime = f'{playtime} #{rank}' playtime_width = font3.getsize(playtime)[0] songimg_width = 384 drawtext.text((songimg_width - playtime_width - 15, 205), playtime, '#ffffff', font=font3) type_ = str(difficulty) type_text ='RGBA', (32, 32)) draw_typetext = ImageDraw.Draw(type_text) draw_typetext.text(((32 - font3.getsize(type_)[0] - font.getoffset(type_)[0]) / 2, 0), type_, "#ffffff", font=font3) img.alpha_composite(type_text, (23, 29)) Logger.debug('Image generated: ' + str(rank)) return {int(rank): img}