Archived
1
0
Fork 0
This repository has been archived on 2024-04-26. You can view files and clone it, but cannot push or open issues or pull requests.
akari-bot/modules/chemical_code/__init__.py

169 lines
5.2 KiB
Python
Raw Normal View History

2022-06-17 05:59:15 +00:00
import asyncio
import random
from bs4 import BeautifulSoup
from sqlalchemy import create_engine, Column, String, Text, Integer
2022-06-17 05:59:15 +00:00
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from tenacity import retry, stop_after_attempt
from datetime import datetime
from core.component import on_command
from core.elements import MessageSession, Image, Plain
from core.utils import get_url
2022-06-17 06:35:59 +00:00
from core.logger import Logger
2022-06-17 05:59:15 +00:00
2022-06-18 05:18:45 +00:00
2022-06-17 05:59:15 +00:00
Base = declarative_base()
DB_LINK = 'sqlite:///modules/chemical_code/answer.db'
class Answer(Base):
__tablename__ = "Answer"
id = Column(Integer, primary_key=True)
cas = Column(String(512))
answer = Column(Text)
class MSGDBSession:
def __init__(self):
self.engine = engine = create_engine(DB_LINK)
Base.metadata.create_all(bind=engine, checkfirst=True)
self.Session = sessionmaker()
self.Session.configure(bind=self.engine)
@property
def session(self):
return self.Session()
session = MSGDBSession().session
def auto_rollback_error(func):
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
session.rollback()
raise e
return wrapper
@retry(stop=stop_after_attempt(3))
@auto_rollback_error
def randcc():
num = random.randint(1, 20000)
query = session.query(Answer).filter_by(id=num).first()
return query.cas, query.answer
csr_link = 'https://www.chemspider.com'
async def search_csr(keyword: str):
2022-06-19 06:20:41 +00:00
try:
get = await get_url(csr_link + '/Search.aspx?q=' + keyword, 200, fmt='text')
2022-06-19 06:26:23 +00:00
# Logger.info(get)
2022-06-19 06:20:41 +00:00
soup = BeautifulSoup(get, 'html.parser')
results = soup.find_all('tbody')[0].find_all('tr')
2022-06-19 06:20:41 +00:00
rlist = []
2022-06-19 06:20:41 +00:00
for x in results:
sub = x.find_all('td')[0:4]
name = sub[2].text
image = sub[1].find_all('img')[0].get('src')
rlist.append({'name': name, 'image': csr_link + image})
2022-06-19 06:20:41 +00:00
return rlist
except Exception:
return False
2022-06-17 12:44:04 +00:00
cc = on_command('chemical_code', alias=['cc', 'chemicalcode'], desc='化学式验证码测试', developers=['OasisAkari'])
play_state = {}
2022-06-17 05:59:15 +00:00
@cc.handle()
async def _(msg: MessageSession):
if msg.target.targetId in play_state and play_state[msg.target.targetId]['active']:
2022-06-17 05:59:15 +00:00
await msg.finish('当前有一局游戏正在进行中。')
play_state.update({msg.target.targetId: {'active': True}})
2022-06-17 05:59:15 +00:00
get_rand = randcc()
csr = await search_csr(get_rand[1])
2022-06-19 06:20:41 +00:00
if not csr:
play_state[msg.target.targetId]['active'] = False
await msg.finish('发生错误:拉取题目失败,请重新发起游戏。')
choice = random.choice(csr)
play_state[msg.target.targetId]['answer'] = choice['name']
Logger.info(f'Answer: {choice["name"]}')
"""get_image = await download_to_cache(f'https://www.chemicalbook.com/CAS/GIF/{get_rand[0]}.gif')
2022-06-17 06:35:59 +00:00
Logger.info(get_rand[1])
play_state[msg.target.targetId]['answer'] = get_rand[1]
2022-06-18 05:18:45 +00:00
with PILImage.open(get_image) as im:
2022-06-19 04:45:52 +00:00
if im.size[0] < 10:
del play_state[msg.target.targetId]
2022-06-18 05:18:45 +00:00
return await _(msg)
im.seek(0)
image = im.convert("RGBA")
datas = image.getdata()
newData = []
for item in datas:
if item[3] == 0: # if transparent
2022-06-19 03:42:32 +00:00
newData.append((230, 230, 230)) # set transparent color in jpg
2022-06-18 05:18:45 +00:00
else:
newData.append(tuple(item[:3]))
2022-06-19 03:28:18 +00:00
image = PILImage.new("RGBA", im.size)
2022-06-18 05:18:45 +00:00
image.getdata()
image.putdata(newData)
2022-06-19 03:28:18 +00:00
newpath = random_cache_path() + '.png'
image.save(newpath)"""
2022-06-18 05:18:45 +00:00
await msg.sendMessage([Image(choice['image']),
2022-06-18 05:18:45 +00:00
Plain('请于2分钟内发送正确答案。请使用字母表顺序CHBrClF')])
2022-06-17 05:59:15 +00:00
time_start = datetime.now().timestamp()
async def ans(msg: MessageSession, answer):
wait = await msg.waitAnyone()
if play_state[msg.target.targetId]['active']:
if wait.asDisplay() != answer:
2022-06-19 04:45:52 +00:00
Logger.info(f'{wait.asDisplay()} != {answer}')
return await ans(wait, answer)
else:
await wait.sendMessage('回答正确。')
play_state[msg.target.targetId]['active'] = False
async def timer(start):
if play_state[msg.target.targetId]['active']:
if datetime.now().timestamp() - start > 120:
await msg.sendMessage(f'已超时,正确答案是 {play_state[msg.target.targetId]["answer"]}', quote=False)
play_state[msg.target.targetId]['active'] = False
else:
await asyncio.sleep(1)
await timer(start)
2022-06-17 05:59:15 +00:00
await asyncio.gather(ans(msg, get_rand[1]), timer(time_start))
2022-06-17 05:59:15 +00:00
2022-06-18 06:19:11 +00:00
@cc.handle('stop {停止当前的游戏。}')
async def s(msg: MessageSession):
state = play_state.get(msg.target.targetId, False)
if state:
if state['active']:
play_state[msg.target.targetId]['active'] = False
await msg.sendMessage(f'已停止,正确答案是 {state["answer"]}', quote=False)
else:
await msg.sendMessage('当前无活跃状态的游戏。')
else:
await msg.sendMessage('当前无活跃状态的游戏。')
2022-06-17 05:59:15 +00:00