Archived
1
0
Fork 0

SECURITY: Deny private IP requests

This commit is contained in:
Dianliang233 2022-07-22 22:59:28 +08:00
parent c26990e7fa
commit ecebb36ef1
No known key found for this signature in database
GPG key ID: 6C56F399D872F19C

View file

@ -1,13 +1,26 @@
import traceback
from typing import Union
import aiohttp
from aiofile import async_open
import filetype as ft
from tenacity import retry, wait_fixed, stop_after_attempt
from core.logger import Logger
from .cache import random_cache_path
from core.logger import Logger
from tenacity import retry, wait_fixed, stop_after_attempt
import filetype as ft
from aiofile import async_open
import aiohttp
from typing import Union
import traceback
import socket
import re
import urllib
def private_ip_check(hostname: str) -> bool:
'''检查是否为私有IP。
:param hostname: 需要检查的主机名
returns: 是否为私有IP'''
addr_info = socket.getaddrinfo(hostname, 80)
private_ips = re.compile(
r'^(?:127\.|0?10\.|172\.0?1[6-9]\.|172\.0?2[0-9]\.172\.0?3[01]\.|192\.168\.|169\.254\.|::1|[fF][cCdD][0-9a-fA-F]{2}:|[fF][eE][89aAbB][0-9a-fA-F]:)')
addr = addr_info[0][4][0]
return private_ips.match(addr)
@retry(stop=stop_after_attempt(3), wait=wait_fixed(3), reraise=True)
@ -22,12 +35,19 @@ async def get_url(url: str, status_code: int = False, headers: dict = None, fmt=
:param timeout: 超时时间
:returns: 指定url的内容字符串
"""
hostname = urllib.parse.urlparse(url).hostname
check = private_ip_check(hostname)
if check:
raise ValueError(
f'Attempt of requesting private IP addresses is not allowed, requesting {hostname}.')
async with aiohttp.ClientSession(headers=headers) as session:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=timeout), headers=headers) as req:
if log:
Logger.info(await req.read())
if status_code and req.status != status_code:
raise ValueError(f'{str(req.status)}[Ke:Image,path=https://http.cat/{str(req.status)}.jpg]')
raise ValueError(
f'{str(req.status)}[Ke:Image,path=https://http.cat/{str(req.status)}.jpg]')
if fmt is not None:
if hasattr(req, fmt):
return await getattr(req, fmt)()
@ -38,24 +58,36 @@ async def get_url(url: str, status_code: int = False, headers: dict = None, fmt=
return text
@retry(stop=stop_after_attempt(3), wait=wait_fixed(3), reraise=True)
@ retry(stop=stop_after_attempt(3), wait=wait_fixed(3), reraise=True)
async def post_url(url: str, data: any, headers: dict = None):
'''发送POST请求。
:param url: 需要发送的url
:param data: 需要发送的数据
:param headers: 请求时使用的http头
:returns: 发送请求后的响应'''
hostname = urllib.parse.urlparse(url).hostname
check = private_ip_check(hostname)
if check:
raise ValueError(
'Attempt of requesting private IP addresses is not allowed.')
async with aiohttp.ClientSession(headers=headers) as session:
async with session.post(url, data=data, headers=headers) as req:
return await req.text()
@retry(stop=stop_after_attempt(3), wait=wait_fixed(3), reraise=True)
@ retry(stop=stop_after_attempt(3), wait=wait_fixed(3), reraise=True)
async def download_to_cache(link: str) -> Union[str, bool]:
'''利用AioHttp下载指定url的内容并保存到缓存./cache目录
:param link: 需要获取的link
:returns: 文件的相对路径若获取失败则返回False'''
hostname = urllib.parse.urlparse(url).hostname
check = private_ip_check(hostname)
if check:
raise ValueError(
'Attempt of requesting private IP addresses is not allowed.')
try:
async with aiohttp.ClientSession() as session:
async with session.get(link) as resp: