自动化检测 Swagger API 接口未授权访问漏洞工具

SwaggerHound

针对大量 Swagger 目标并爬取所有接口并测试未授权漏洞, 通过阅读目前已公开相关项目代码发现一些痛点, 所以在前辈们项目基础上自己搓了一个。

痛点:

    1. 不适用于在大量 Swagger 资产场景下。

    1. 对于 Swagger API 文档参考不多且通用性差, 请求的参数格式细节也未作处理。

    1. 代码不完整且存在 BUG, 作者也不在维护。

主要功能

    • 支持单个目标和批量目标。

    • 适配目标链接为 /swagger-resources/api/docsswagger html

    • 适配多个版本 Swagger API 文档。

    • 自动爬取所有 API 链接, 对 GET 和 POST 进行请求 (危险请求方法进行跳过)。

    • 根据文档数据请求类型和参数类型填充默认值。

    • 支持爬取自定义的模型或对象并填充给参数。

    • 支持控制台显示, 生成日志文件, 导出扫描结果。

使用方法

python3 swagger-hound.py -u https://xxxx.ztna.xxxxx.com/api/swagger-ui.html
python3 swagger-hound.py -u https://xxxx.ztna.xxxxx.com/api/v2/docs
python3 swagger-hound.py -u https://xxxx.ztna.xxxxx.com/api/swagger-resources

python3 swagger-hound.py -f url.txt

图片[1]-自动化检测 Swagger API 接口未授权访问漏洞工具-李白你好
# -*- coding: utf-8 -*-
# https://github.com/evilc0deooo/SwaggerHound

import json
import sys
import csv
import argparse
import requests
import re
import random
import urllib3
from datetime import datetime
from urllib.parse import urlparse
from loguru import logger

# 禁用安全请求警告
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

logger.remove()
handler_id = logger.add(sys.stderr, level='DEBUG')  # 设置输出级别

now_time = datetime.now().strftime("%Y%m%d_%H%M%S")

proxies = {
    'https': 'http://127.0.0.1:7890',
    'http': 'http://127.0.0.1:7890'
}

# 开启代理
SET_PROXY = False

header_agents = [
    'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
    'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.75.14 (KHTML, like Gecko) Version/7.0.3 Safari/7046A194A',
    'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Code/1.96.2 Chrome/128.0.6613.186 Electron/32.2.6 Safari/537.36'
]


def http_req(url, method='get', **kwargs):
    kwargs.setdefault('verify', False)
    kwargs.setdefault('timeout', (10.1, 30.1))
    kwargs.setdefault('allow_redirects', False)

    headers = kwargs.get('headers', {})
    headers.setdefault('User-Agent', random.choice(header_agents))
    # 不允许缓存,每次请求都获取服务器上最新的资源
    headers.setdefault('Cache-Control', 'max-age=0')
    kwargs['headers'] = headers
    if SET_PROXY:
        kwargs['proxies'] = proxies

    conn = getattr(requests, method)(url, **kwargs)
    return conn


def check_page(url):
    """
    检查当前页面
    """
    res = http_req(url, method='get')
    if '<html' in res.text:
        logger.debug('[+] 输入为 swagger 首页,开始解析 api 文档地址')
        return 3  # swagger-html
    elif '"parameters"' in res.text:
        logger.debug('[+] 输入为 api 文档地址,开始构造请求发包')
        return 2  # api_docs
    elif '"location"' in res.text:
        logger.debug('[+] 输入为 resource 地址,开始解析 api 文档地址')
        return 1  # resource


def fill_parameters(parameters, url):
    """
    填充测试数据并替换 URL 中的占位符
    """
    filled_params = {}
    path_params = {}
    for param in parameters:
        param_name = param['name']
        param_in = param['in']
        param_type = param['type']

        # 根据类型填充默认值
        if param_type == 'string':
            value = 'a'
        elif param_type == 'integer':
            value = 1
        elif param_type == 'boolean':
            value = True
        else:
            value = ''

        if param_in == 'query':
            filled_params[param_name] = value
        elif param_in == 'path':
            path_params[param_name] = value
            filled_params[param_name] = value
        elif param_in == 'body':
            if 'body' not in filled_params:
                filled_params['body'] = {}
            filled_params['body'][param_name] = value

    # 替换 URL 中的占位符
    for key, value in path_params.items():
        url = url.replace(f'{{{key}}}', str(value))

    return filled_params, url


def get_api_docs_path(resource_url):
    """
    输入 resource 解析 api 文档 url
    """
    domain = urlparse(resource_url)
    domain = domain.scheme + '://' + domain.netloc
    try:
        res = http_req(resource_url, method='get')
        resources = json.loads(res.text)
    except Exception as e:
        logger.error(f'[-] {resource_url} error info {e}')
        return []

    paths = []
    if isinstance(resources, tuple):
        if 'apis' in resources.keys():  # 版本不同, 格式不一样
            for api_docs in resources.get('apis', {}):
                paths.append(domain + api_docs['path'])
            return paths
    else:
        for i in resources:
            paths.append(domain + i['location'])
        return paths


def output_to_csv(data):
    _f = open(f'{now_time}.csv', 'a', newline='', encoding='utf-8')  # 使用追加模式
    writer = csv.writer(_f)
    writer.writerow(data)


def go_resources(url):
    """
    解析 swagger-resources 获取 api-docs
    """
    try:
        _domain = urlparse(url)
        domain = _domain.scheme + '://' + _domain.netloc
        domain_path = _domain.path
        stripped_path = domain_path.strip('/')
        res = http_req(url)
        data = json.loads(res.text)
        for _i in data:
            location = _i.get('location')
            # 判断如果不存在路径则直接进行拼接
            target = domain + location
            if len(stripped_path) > 0:
                # 如果存在路径则在原路径上继续拼接
                target = url.rsplit('/', 1)[0] + location
            go_api_docs(target)  # 调用 api_docs 扫描全部接口

    except Exception as e:
        logger.error(f'[-] {url} error info {e}')


def go_swagger_html(url):
    """
    解析 swagger-ui.html 获取 api 接口路径
    """
    response = http_req(url)
    response.raise_for_status()
    html_content = response.text
    # 在 swagger-initializer.js 中获取 swagger.json 接口
    initializer_pattern = r'<script\s+src=["\']([^"\']*swagger-initializer\.js[^"\']*)["\']'
    initializer_match = re.search(initializer_pattern, html_content)
    if initializer_match:
        js_file_path = initializer_match.group(1)
        if js_file_path.startswith('http'):
            js_file_url = js_file_path
        else:
            _domain = urlparse(url)
            domain = _domain.scheme + '://' + _domain.netloc
            domain_path = _domain.path
            stripped_path = domain_path.strip('/')
            if len(stripped_path) > 0:
                base_url = url.rsplit('/', 1)[0]
                js_file_url = f'{base_url}/{js_file_path.lstrip("/")}'
            else:
                js_file_url = f'{domain}/{js_file_path.lstrip("/")}'

        js_response = http_req(js_file_url)
        js_response.raise_for_status()
        js_content = js_response.text

        # 正则获取 defaultDefinitionUrl 的值 swagger.json 接口路径
        js_pattern = r'const\s+defaultDefinitionUrl\s*=\s*["\']([^"\']+)["\'];'
        js_match = re.search(js_pattern, js_content)
        if js_match:
            api_docs_path = js_match.group(1)
            go_api_docs(api_docs_path)
            return

    # 未找到 swagger-initializer.js 文件或 defaultDefinitionUrl 定义, 则尝试查找 springfox.js 文件
    springfox_pattern = r'<script\s+src=["\']([^"\']*springfox\.js[^"\']*)["\']'
    springfox_match = re.search(springfox_pattern, html_content)
    if not springfox_match:
        logger.debug('[-] 未找到 swagger-initializer.js 和 springfox.js 文件路径')
        return

    # 获取 springfox.js 文件的相对或绝对路径
    springfox_file_path = springfox_match.group(1)
    if springfox_file_path.startswith('http'):
        springfox_file_url = springfox_file_path
    else:
        base_url = url.rsplit('/', 1)[0]
        springfox_file_url = f'{base_url}/{springfox_file_path.lstrip("/")}'

    # 发送请求获取 springfox.js 文件内容
    springfox_response = http_req(springfox_file_url)
    springfox_response.raise_for_status()
    springfox_content = springfox_response.text
    if "/swagger-resources" in springfox_content:
        base_url = url.rsplit('/', 1)[0]
        resource_url = f"{base_url}/swagger-resources"
        go_resources(resource_url)
        return


def go_api_docs(url):
    """
    开始 api-docs 解析并扫描
    """
    try:
        domain = urlparse(url)
        domain = domain.scheme + '://' + domain.netloc

        res = http_req(url)
        if res.status_code != 200:
            logger.error(f'[-] {url} req status is {res.status_code}')
            return
        try:
            data = json.loads(res.text)
        except json.JSONDecodeError:
            # 遇到 html 标签内存在双引号 json.loads 无法格式化, 需要特殊处理
            data = res.text.replace("'", '"')
            result = re.sub(r'<[^>]*>', lambda match: match.group(0).replace('"', "'"), data)
            data = json.loads(result, strict=False)

        if 'basePath' in data.keys():
            base_path = data['basePath']
        elif 'servers' in data.keys():
            base_path = data['servers']['url']
        else:
            base_path = ''

        paths = data.get('paths', {})
        definitions = data.get('definitions', {})
        swagger_result = []
        for path, methods in paths.items():
            for method, details in methods.items():  # get / post / put / update / delete / head...
                if method.upper() not in ['GET', 'POST']:  # http 请求方式白名单
                    continue
                req_path = domain + base_path + path
                summary = details.get('summary', path)  # 概要信息
                consumes = details.get('consumes', [])  # 数据请求类型 application/json
                params = details.get('parameters', [])
                logger.debug(f'test on {summary} => {method} => {req_path}')
                param_info = []
                for param in params:
                    param_name = param.get('name')
                    param_in = param.get('in')
                    schema = param.get('schema')
                    # 判断是否存在自定义的模型或对象
                    if schema and '$ref' in schema:
                        ref = schema['$ref'].split('/')[-1]
                        if ref in definitions:  # 如果在 definitions 中声明了参数属性,则去 definitions 定义中获取参数及属性信息
                            # 递归处理定义中的属性
                            for prop_name, prop_details in definitions[ref].get('properties', {}).items():
                                param_info.append({
                                    'name': prop_name,
                                    'in': param_in,
                                    'type': prop_details.get('type')
                                })
                    else:
                        param_type = param.get('type')
                        param_info.append({
                            'name': param_name,
                            'in': param_in,
                            'type': param_type
                        })

                # 解析 swagger 获取到所有需要的数据
                swagger_result.append({
                    'summary': summary,
                    'req_path': req_path,
                    'method': method,
                    'consumes': consumes,
                    'parameters': param_info
                })

        black_list_status = [401, 404, 502, 503]  # 状态码黑名单
        for item in swagger_result:
            summary = item['summary']
            req_path = item['req_path']
            method = item['method']
            consumes = item['consumes']
            parameters = item['parameters']
            # 生成发送的 Body 数据
            filled_params, new_url = fill_parameters(parameters, req_path)
            headers = {}

            if 'application/json' in consumes:
                headers = {'Content-Type': 'application/json'}
            if method.lower() == 'get':
                response = http_req(new_url, method='get', params=filled_params)
                if response.status_code in black_list_status:
                    logger.debug(f'[-] {method} {new_url} req status is {response.status_code}')
                    continue
                if response.status_code == 200:
                    logger.debug(f'[+] {method} {new_url} req status is {response.status_code}')
                    write_result = [url, new_url, summary, method, consumes, filled_params, response.status_code, response.text]
                    output_to_csv(write_result)

            elif method.lower() == 'post':
                if 'body' in filled_params:
                    response = http_req(new_url, method='post', json=filled_params['body'], headers=headers)
                    if response.status_code in black_list_status:
                        logger.debug(f'[-] {method} {new_url} req status is {response.status_code}')
                        continue
                    if response.status_code == 200:
                        logger.debug(f'[+] {method} {new_url} req status is {response.status_code}')
                        write_result = [url, new_url, summary, method, consumes, filled_params, response.status_code, response.text]
                        output_to_csv(write_result)

                else:
                    response = http_req(new_url, method='post', params=filled_params, headers=headers)
                    if response.status_code in black_list_status:
                        logger.debug(f'[-] {method} {new_url} req status is {response.status_code}')
                        continue
                    if response.status_code == 200:
                        logger.debug(f'[+] {method} {new_url} req status is {response.status_code}')
                        write_result = [url, new_url, summary, method, consumes, filled_params, response.status_code, response.text]
                        output_to_csv(write_result)


    except Exception as e:
        logger.error(f'[-] {url} error info {e}')


def run(target):
    """
    执行程序
    """
    url_type = check_page(target)
    if url_type == 1:
        logger.success('working on {}'.format(target), 'type: source')
        go_resources(target)
    elif url_type == 2:
        logger.success('working on {}'.format(target), 'type: api-docs')
        go_api_docs(target)
    else:
        logger.success('working on {}'.format(target), 'type: html')
        go_swagger_html(target)


if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('-u', '--url', dest='target_url', help='resource 地址 or api 文档地址 or swagger 首页地址')
    parser.add_argument('-f', '--file', dest='url_file', help='批量测试')
    args = parser.parse_args()

    logger.add('debug.log', format='{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}')
    if args.target_url:
        run(args.target_url)
    elif args.url_file:
        with open(args.url_file, 'r') as f:
            urls = [line.strip() for line in f.readlines()]
        for target_url in urls:
            print(target_url)
            run(target_url)

参考

感谢 @jayus0821 @lijiejie

https://github.com/jayus0821/swagger-hack/tree/main

https://github.com/lijiejie/swagger-exp

© 版权声明
THE END
喜欢就支持一下吧
点赞13 分享
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

取消
昵称表情代码图片

    暂无评论内容