#!/usr/bin/python
# bluetooth pair/scan tool for MiSTer
# (C) 2021 Alexey Melnikov
import signal
import sys
import time
import threading
import inspect
import dbus
import dbus.service
import dbus.mainloop.glib
from gi.repository import GLib
_DEBUG = False
g_devices = {}
g_mainloop = None
g_address = ""
g_bus = None
g_dev_path = ""
g_search_mode = 0
SERVICE_NAME = "org.bluez"
ADAPTER_INTERFACE = SERVICE_NAME + ".Adapter1"
DEVICE_INTERFACE = SERVICE_NAME + ".Device1"
AGENTMGR_INTERFACE = SERVICE_NAME + ".AgentManager1"
AGENT_INTERFACE = SERVICE_NAME + ".Agent1"
OBJMGR_INTERFACE = "org.freedesktop.DBus.ObjectManager"
PROPERTIES_INTERFACE = "org.freedesktop.DBus.Properties"
def debug_msg(message):
if _DEBUG:
caller_name = inspect.stack()[1][3]
if 'debug_' in caller_name:
caller_name = inspect.stack()[2][3]
print("DEBUG: %s: %s" % (caller_name, message))
sys.stdout.flush()
def msg(message):
print(message)
sys.stdout.flush()
def get_managed_objects():
manager = dbus.Interface(g_bus.get_object(SERVICE_NAME, "/"), OBJMGR_INTERFACE)
return manager.GetManagedObjects()
def find_adapter(pattern=None):
for path, ifaces in get_managed_objects().items():
adapter = ifaces.get(ADAPTER_INTERFACE)
if adapter is None:
continue
if not pattern or pattern == adapter["Address"] or path.endswith(pattern):
obj = g_bus.get_object(SERVICE_NAME, path)
return dbus.Interface(obj, ADAPTER_INTERFACE)
raise Exception("Bluetooth adapter not found")
def find_device(device_address, adapter, paired=False):
path_prefix = adapter.object_path
for path, ifaces in get_managed_objects().items():
device = ifaces.get(DEVICE_INTERFACE)
if device is None:
continue
if (device["Address"] == device_address and path.startswith(path_prefix)) and (not paired or device["Paired"]):
obj = g_bus.get_object(SERVICE_NAME, path)
return dbus.Interface(obj, DEVICE_INTERFACE)
return None
def register_device(path, properties):
global g_dev_path
global g_address
global g_search_mode
dev = g_devices[path] if path in g_devices else dict()
if ("Name" in properties) and (len(properties["Name"]) > 0):
dev["Name"] = properties["Name"]
if ("Address" in properties) and (len(properties["Address"]) > 0):
dev["Address"] = properties["Address"]
if "Icon" in properties:
dev["Icon"] = properties["Icon"]
g_devices[path] = dev
if g_search_mode == 1 or g_search_mode == 3:
if "Logged" in dev:
return
if ("Name" in dev) and ("Address" in dev) and ("Icon" in dev):
dev["Logged"] = True
g_devices[path] = dev
type = dev["Icon"]
isinput = type.lower().startswith("input-")
if g_search_mode == 1:
msg("NAME: %s\nMAC: %s\n" % (dev["Name"], dev["Address"]))
if isinput:
g_dev_path = path
g_address = dev["Address"]
g_search_mode = 0;
g_mainloop.quit()
else:
msg("Skipping: non-input device\n")
else:
if isinput:
msg("%s %s" % (dev["Address"], dev["Name"]))
else:
msg("%s (%s, non-input) %s " % (dev["Address"], type, dev["Name"]))
elif g_search_mode == 2:
if ("Address" in properties) and (properties["Address"] == g_address):
g_search_mode = 0;
g_mainloop.quit()
def scan_interfaces_added(path, interfaces):
if DEVICE_INTERFACE in interfaces:
register_device(path, interfaces[DEVICE_INTERFACE])
def scan_properties_changed(interface, changed, invalidated, path):
if interface == DEVICE_INTERFACE:
register_device(path, changed)
class Agent(dbus.service.Object):
exit_on_release = True
def set_exit_on_release(self, exit_on_release):
self.exit_on_release = exit_on_release
@dbus.service.method(AGENT_INTERFACE, in_signature="", out_signature="")
def Release(self):
debug_msg("Release")
if self.exit_on_release:
g_mainloop.quit()
@dbus.service.method(AGENT_INTERFACE, in_signature="os", out_signature="")
def AuthorizeService(self, device, uuid):
return
@dbus.service.method(AGENT_INTERFACE, in_signature="o", out_signature="s")
def RequestPinCode(self, device):
msg("Type 0000 and <Enter>")
return "0000"
@dbus.service.method(AGENT_INTERFACE, in_signature="o", out_signature="u")
def RequestPasskey(self, device):
return dbus.UInt32("0000")
@dbus.service.method(AGENT_INTERFACE, in_signature="ouq", out_signature="")
def DisplayPasskey(self, device, passkey, entered):
debug_msg("DisplayPasskey (%s, %06u entered %u)" % (device, passkey, entered))
@dbus.service.method(AGENT_INTERFACE, in_signature="os", out_signature="")
def DisplayPinCode(self, device, pincode):
msg("Type "+ pincode +" and <Enter>")
@dbus.service.method(AGENT_INTERFACE, in_signature="ou", out_signature="")
def RequestConfirmation(self, device, passkey):
return
@dbus.service.method(AGENT_INTERFACE, in_signature="o", out_signature="")
def RequestAuthorization(self, device):
return
@dbus.service.method(AGENT_INTERFACE, in_signature="", out_signature="")
def Cancel(self):
debug_msg("Cancel")
def pair_reply():
try:
obj = g_bus.get_object(SERVICE_NAME, g_dev_path)
msg("Trusting...")
dbus.Interface(obj, PROPERTIES_INTERFACE).Set(DEVICE_INTERFACE, "Trusted", True)
msg("Connecting...")
dbus.Interface(obj, DEVICE_INTERFACE).Connect()
msg("Done.")
except dbus.exceptions.DBusException as e:
msg("Failed!")
g_mainloop.quit()
def pair_error(error):
err_name = error.get_dbus_name()
device = dbus.Interface(g_bus.get_object(SERVICE_NAME, g_dev_path), DEVICE_INTERFACE)
if err_name == "org.freedesktop.DBus.Error.NoReply" and device:
msg("Timed out.\nCancelling pairing.")
device.CancelPairing()
else:
msg("%s\nPair error!" % (error.get_dbus_message()))
g_mainloop.quit()
if __name__ == '__main__':
capability = "KeyboardDisplay" if sys.stdin.isatty() else "DisplayOnly"
if len(sys.argv) < 2:
msg("arguments required\n")
quit()
cmd = sys.argv[1].lower();
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
g_mainloop = GLib.MainLoop()
g_bus = dbus.SystemBus()
adapter = find_adapter()
discovery = False
g_bus.add_signal_receiver(scan_interfaces_added,
dbus_interface = OBJMGR_INTERFACE,
signal_name = "InterfacesAdded")
g_bus.add_signal_receiver(scan_properties_changed,
dbus_interface = PROPERTIES_INTERFACE,
signal_name = "PropertiesChanged",
arg0 = DEVICE_INTERFACE,
path_keyword = "path")
adapter.SetDiscoveryFilter(dict())
try:
if cmd == "remove":
if len(sys.argv) < 3:
msg("arguments required\n")
quit()
address = sys.argv[2].upper();
device = find_device(address, adapter)
if device is not None:
msg("Removing existing pair...")
adapter.RemoveDevice(device)
elif cmd == "disconnect":
if len(sys.argv) < 3:
msg("arguments required\n")
quit()
address = sys.argv[2].upper();
device = find_device(address, adapter)
if device is not None:
msg("Disconnecting device " + address)
device.Disconnect()
elif cmd == "pair":
for path, interfaces in get_managed_objects().items():
if DEVICE_INTERFACE in interfaces:
g_devices[path] = interfaces[DEVICE_INTERFACE]
agentpath = "/test/agent"
agent = Agent(g_bus, agentpath)
obj = g_bus.get_object(SERVICE_NAME, "/org/bluez");
manager = dbus.Interface(obj, AGENTMGR_INTERFACE)
manager.RegisterAgent(agentpath, capability)
agent.set_exit_on_release(False)
debug_msg("Agent registered")
discovery = True
adapter.StartDiscovery()
while True:
g_search_mode = 1;
g_mainloop.run()
debug_msg("new device: %s %s" % (g_address, g_dev_path))
device = find_device(g_address, adapter, True)
if device is not None:
msg("Removing existing pair...")
adapter.RemoveDevice(device)
msg("Searching...")
g_search_mode = 2;
g_mainloop.run()
msg("Pairing...")
device = find_device(g_address, adapter)
device.Pair(reply_handler=pair_reply, error_handler=pair_error, timeout=60000)
g_mainloop.run()
msg("")
elif cmd == "scan":
for path, interfaces in get_managed_objects().items():
if DEVICE_INTERFACE in interfaces:
g_devices[path] = interfaces[DEVICE_INTERFACE]
discovery = True
adapter.StartDiscovery()
g_search_mode = 3;
g_mainloop.run()
except KeyboardInterrupt:
debug_msg("ctrl-c")
if discovery:
adapter.StopDiscovery()