SwaggerHound
针对大量 Swagger 目标并爬取所有接口并测试未授权漏洞, 通过阅读目前已公开相关项目代码发现一些痛点, 所以在前辈们项目基础上自己搓了一个。
痛点:
- 不适用于在大量 Swagger 资产场景下。
- 对于 Swagger API 文档参考不多且通用性差, 请求的参数格式细节也未作处理。
- 代码不完整且存在 BUG, 作者也不在维护。
主要功能
- 支持单个目标和批量目标。
- 适配目标链接为
/swagger-resources
和/api/docs
和swagger html
。
- 适配目标链接为
- 适配多个版本 Swagger API 文档。
- 自动爬取所有 API 链接, 对 GET 和 POST 进行请求 (危险请求方法进行跳过)。
- 根据文档数据请求类型和参数类型填充默认值。
- 支持爬取自定义的模型或对象并填充给参数。
- 支持控制台显示, 生成日志文件, 导出扫描结果。
使用方法
python3 swagger-hound.py -u https://xxxx.ztna.xxxxx.com/api/swagger-ui.html
python3 swagger-hound.py -u https://xxxx.ztna.xxxxx.com/api/v2/docs
python3 swagger-hound.py -u https://xxxx.ztna.xxxxx.com/api/swagger-resources
python3 swagger-hound.py -f url.txt
![图片[1]-自动化检测 Swagger API 接口未授权访问漏洞工具-李白你好](https://www.libaisec.com/wp-content/uploads/2025/03/20250312094111596-图片.png)
# -*- coding: utf-8 -*-
# https://github.com/evilc0deooo/SwaggerHound
import json
import sys
import csv
import argparse
import requests
import re
import random
import urllib3
from datetime import datetime
from urllib.parse import urlparse
from loguru import logger
# 禁用安全请求警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
logger.remove()
handler_id = logger.add(sys.stderr, level='DEBUG') # 设置输出级别
now_time = datetime.now().strftime("%Y%m%d_%H%M%S")
proxies = {
'https': 'http://127.0.0.1:7890',
'http': 'http://127.0.0.1:7890'
}
# 开启代理
SET_PROXY = False
header_agents = [
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.75.14 (KHTML, like Gecko) Version/7.0.3 Safari/7046A194A',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Code/1.96.2 Chrome/128.0.6613.186 Electron/32.2.6 Safari/537.36'
]
def http_req(url, method='get', **kwargs):
kwargs.setdefault('verify', False)
kwargs.setdefault('timeout', (10.1, 30.1))
kwargs.setdefault('allow_redirects', False)
headers = kwargs.get('headers', {})
headers.setdefault('User-Agent', random.choice(header_agents))
# 不允许缓存,每次请求都获取服务器上最新的资源
headers.setdefault('Cache-Control', 'max-age=0')
kwargs['headers'] = headers
if SET_PROXY:
kwargs['proxies'] = proxies
conn = getattr(requests, method)(url, **kwargs)
return conn
def check_page(url):
"""
检查当前页面
"""
res = http_req(url, method='get')
if '<html' in res.text:
logger.debug('[+] 输入为 swagger 首页,开始解析 api 文档地址')
return 3 # swagger-html
elif '"parameters"' in res.text:
logger.debug('[+] 输入为 api 文档地址,开始构造请求发包')
return 2 # api_docs
elif '"location"' in res.text:
logger.debug('[+] 输入为 resource 地址,开始解析 api 文档地址')
return 1 # resource
def fill_parameters(parameters, url):
"""
填充测试数据并替换 URL 中的占位符
"""
filled_params = {}
path_params = {}
for param in parameters:
param_name = param['name']
param_in = param['in']
param_type = param['type']
# 根据类型填充默认值
if param_type == 'string':
value = 'a'
elif param_type == 'integer':
value = 1
elif param_type == 'boolean':
value = True
else:
value = ''
if param_in == 'query':
filled_params[param_name] = value
elif param_in == 'path':
path_params[param_name] = value
filled_params[param_name] = value
elif param_in == 'body':
if 'body' not in filled_params:
filled_params['body'] = {}
filled_params['body'][param_name] = value
# 替换 URL 中的占位符
for key, value in path_params.items():
url = url.replace(f'{{{key}}}', str(value))
return filled_params, url
def get_api_docs_path(resource_url):
"""
输入 resource 解析 api 文档 url
"""
domain = urlparse(resource_url)
domain = domain.scheme + '://' + domain.netloc
try:
res = http_req(resource_url, method='get')
resources = json.loads(res.text)
except Exception as e:
logger.error(f'[-] {resource_url} error info {e}')
return []
paths = []
if isinstance(resources, tuple):
if 'apis' in resources.keys(): # 版本不同, 格式不一样
for api_docs in resources.get('apis', {}):
paths.append(domain + api_docs['path'])
return paths
else:
for i in resources:
paths.append(domain + i['location'])
return paths
def output_to_csv(data):
_f = open(f'{now_time}.csv', 'a', newline='', encoding='utf-8') # 使用追加模式
writer = csv.writer(_f)
writer.writerow(data)
def go_resources(url):
"""
解析 swagger-resources 获取 api-docs
"""
try:
_domain = urlparse(url)
domain = _domain.scheme + '://' + _domain.netloc
domain_path = _domain.path
stripped_path = domain_path.strip('/')
res = http_req(url)
data = json.loads(res.text)
for _i in data:
location = _i.get('location')
# 判断如果不存在路径则直接进行拼接
target = domain + location
if len(stripped_path) > 0:
# 如果存在路径则在原路径上继续拼接
target = url.rsplit('/', 1)[0] + location
go_api_docs(target) # 调用 api_docs 扫描全部接口
except Exception as e:
logger.error(f'[-] {url} error info {e}')
def go_swagger_html(url):
"""
解析 swagger-ui.html 获取 api 接口路径
"""
response = http_req(url)
response.raise_for_status()
html_content = response.text
# 在 swagger-initializer.js 中获取 swagger.json 接口
initializer_pattern = r'<script\s+src=["\']([^"\']*swagger-initializer\.js[^"\']*)["\']'
initializer_match = re.search(initializer_pattern, html_content)
if initializer_match:
js_file_path = initializer_match.group(1)
if js_file_path.startswith('http'):
js_file_url = js_file_path
else:
_domain = urlparse(url)
domain = _domain.scheme + '://' + _domain.netloc
domain_path = _domain.path
stripped_path = domain_path.strip('/')
if len(stripped_path) > 0:
base_url = url.rsplit('/', 1)[0]
js_file_url = f'{base_url}/{js_file_path.lstrip("/")}'
else:
js_file_url = f'{domain}/{js_file_path.lstrip("/")}'
js_response = http_req(js_file_url)
js_response.raise_for_status()
js_content = js_response.text
# 正则获取 defaultDefinitionUrl 的值 swagger.json 接口路径
js_pattern = r'const\s+defaultDefinitionUrl\s*=\s*["\']([^"\']+)["\'];'
js_match = re.search(js_pattern, js_content)
if js_match:
api_docs_path = js_match.group(1)
go_api_docs(api_docs_path)
return
# 未找到 swagger-initializer.js 文件或 defaultDefinitionUrl 定义, 则尝试查找 springfox.js 文件
springfox_pattern = r'<script\s+src=["\']([^"\']*springfox\.js[^"\']*)["\']'
springfox_match = re.search(springfox_pattern, html_content)
if not springfox_match:
logger.debug('[-] 未找到 swagger-initializer.js 和 springfox.js 文件路径')
return
# 获取 springfox.js 文件的相对或绝对路径
springfox_file_path = springfox_match.group(1)
if springfox_file_path.startswith('http'):
springfox_file_url = springfox_file_path
else:
base_url = url.rsplit('/', 1)[0]
springfox_file_url = f'{base_url}/{springfox_file_path.lstrip("/")}'
# 发送请求获取 springfox.js 文件内容
springfox_response = http_req(springfox_file_url)
springfox_response.raise_for_status()
springfox_content = springfox_response.text
if "/swagger-resources" in springfox_content:
base_url = url.rsplit('/', 1)[0]
resource_url = f"{base_url}/swagger-resources"
go_resources(resource_url)
return
def go_api_docs(url):
"""
开始 api-docs 解析并扫描
"""
try:
domain = urlparse(url)
domain = domain.scheme + '://' + domain.netloc
res = http_req(url)
if res.status_code != 200:
logger.error(f'[-] {url} req status is {res.status_code}')
return
try:
data = json.loads(res.text)
except json.JSONDecodeError:
# 遇到 html 标签内存在双引号 json.loads 无法格式化, 需要特殊处理
data = res.text.replace("'", '"')
result = re.sub(r'<[^>]*>', lambda match: match.group(0).replace('"', "'"), data)
data = json.loads(result, strict=False)
if 'basePath' in data.keys():
base_path = data['basePath']
elif 'servers' in data.keys():
base_path = data['servers']['url']
else:
base_path = ''
paths = data.get('paths', {})
definitions = data.get('definitions', {})
swagger_result = []
for path, methods in paths.items():
for method, details in methods.items(): # get / post / put / update / delete / head...
if method.upper() not in ['GET', 'POST']: # http 请求方式白名单
continue
req_path = domain + base_path + path
summary = details.get('summary', path) # 概要信息
consumes = details.get('consumes', []) # 数据请求类型 application/json
params = details.get('parameters', [])
logger.debug(f'test on {summary} => {method} => {req_path}')
param_info = []
for param in params:
param_name = param.get('name')
param_in = param.get('in')
schema = param.get('schema')
# 判断是否存在自定义的模型或对象
if schema and '$ref' in schema:
ref = schema['$ref'].split('/')[-1]
if ref in definitions: # 如果在 definitions 中声明了参数属性,则去 definitions 定义中获取参数及属性信息
# 递归处理定义中的属性
for prop_name, prop_details in definitions[ref].get('properties', {}).items():
param_info.append({
'name': prop_name,
'in': param_in,
'type': prop_details.get('type')
})
else:
param_type = param.get('type')
param_info.append({
'name': param_name,
'in': param_in,
'type': param_type
})
# 解析 swagger 获取到所有需要的数据
swagger_result.append({
'summary': summary,
'req_path': req_path,
'method': method,
'consumes': consumes,
'parameters': param_info
})
black_list_status = [401, 404, 502, 503] # 状态码黑名单
for item in swagger_result:
summary = item['summary']
req_path = item['req_path']
method = item['method']
consumes = item['consumes']
parameters = item['parameters']
# 生成发送的 Body 数据
filled_params, new_url = fill_parameters(parameters, req_path)
headers = {}
if 'application/json' in consumes:
headers = {'Content-Type': 'application/json'}
if method.lower() == 'get':
response = http_req(new_url, method='get', params=filled_params)
if response.status_code in black_list_status:
logger.debug(f'[-] {method} {new_url} req status is {response.status_code}')
continue
if response.status_code == 200:
logger.debug(f'[+] {method} {new_url} req status is {response.status_code}')
write_result = [url, new_url, summary, method, consumes, filled_params, response.status_code, response.text]
output_to_csv(write_result)
elif method.lower() == 'post':
if 'body' in filled_params:
response = http_req(new_url, method='post', json=filled_params['body'], headers=headers)
if response.status_code in black_list_status:
logger.debug(f'[-] {method} {new_url} req status is {response.status_code}')
continue
if response.status_code == 200:
logger.debug(f'[+] {method} {new_url} req status is {response.status_code}')
write_result = [url, new_url, summary, method, consumes, filled_params, response.status_code, response.text]
output_to_csv(write_result)
else:
response = http_req(new_url, method='post', params=filled_params, headers=headers)
if response.status_code in black_list_status:
logger.debug(f'[-] {method} {new_url} req status is {response.status_code}')
continue
if response.status_code == 200:
logger.debug(f'[+] {method} {new_url} req status is {response.status_code}')
write_result = [url, new_url, summary, method, consumes, filled_params, response.status_code, response.text]
output_to_csv(write_result)
except Exception as e:
logger.error(f'[-] {url} error info {e}')
def run(target):
"""
执行程序
"""
url_type = check_page(target)
if url_type == 1:
logger.success('working on {}'.format(target), 'type: source')
go_resources(target)
elif url_type == 2:
logger.success('working on {}'.format(target), 'type: api-docs')
go_api_docs(target)
else:
logger.success('working on {}'.format(target), 'type: html')
go_swagger_html(target)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-u', '--url', dest='target_url', help='resource 地址 or api 文档地址 or swagger 首页地址')
parser.add_argument('-f', '--file', dest='url_file', help='批量测试')
args = parser.parse_args()
logger.add('debug.log', format='{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}')
if args.target_url:
run(args.target_url)
elif args.url_file:
with open(args.url_file, 'r') as f:
urls = [line.strip() for line in f.readlines()]
for target_url in urls:
print(target_url)
run(target_url)
参考
感谢 @jayus0821 @lijiejie
© 版权声明
文章版权归作者所有,转载请标明出处。
THE END
暂无评论内容