brax3-ubports/overlay/system/usr/libexec/umtprd-manager
erascape 59281ccd87 initial brax3 device makefiles
* boots and works fine

Signed-off-by: erascape <erascape@proton.me>
2025-09-24 08:38:38 +00:00

112 lines
3.8 KiB
Python
Executable file

#!/usr/bin/python3
# Copyright (C) 2022 UBports Foundation.
# SPDX-License-Identifier: GPL-3.0-or-later
# This Python script exists because there's no convienient way for usb-moded, a
# system service, to tell a user's systemd to start something. So, the control
# is inverted: this script sits in user systemd & listen for a signal that emits
# from usb-moded, and then start or stop the service as needed.
# This was further modified from /usr/libexec/mtp-server-usb-moded-watcher to
# unlock access to MTP storage devices if/when the device gets unlocked itself.
from gi.repository import GLib, Gio
from subprocess import run
is_greeter_active = True
already_unlocked_storage = False
mtp_enabled = False
old_usb_mode = "undefined"
def handle_unlock():
global already_unlocked_storage
print(f"handle_unlock(): mtp_enabled={mtp_enabled}, already_unlocked_storage={already_unlocked_storage}, is_greeter_active={is_greeter_active}")
if not mtp_enabled or already_unlocked_storage or is_greeter_active:
# We only care about unlocking locked storage when MTP is enabled and not on lockscreen.
return
run(["umtprd", "-cmd:unlock"])
already_unlocked_storage = True
def handle_current_mode(mode: str):
global mtp_enabled, old_usb_mode, already_unlocked_storage
if mode == "busy":
# This mode is transient, and a signal should be sent when the final
# mode is reached.
return
if mode != old_usb_mode:
# Act like MTP wasn't enabled as it won't be when switching between
# "mtp" and "mtp_adb" for example.
already_unlocked_storage = False
print(f"handle_current_mode(): {old_usb_mode} -> {mode}")
old_usb_mode = mode
mtp_enabled = mode in ("mtp", "mtp_adb") # FIXME: better way for this list?
handle_unlock()
def handle_dbus_properties_signal(
obj, sender_name: str, signal_name: str, parameters: GLib.Variant
):
global is_greeter_active
if signal_name != "PropertiesChanged":
return
is_greeter_active = bool(parameters.get_child_value(1).get_child_value(0).get_child_value(1))
handle_unlock()
def handle_usb_moded_signal(
obj, sender_name: str, signal_name: str, parameters: GLib.Variant
):
if signal_name != "sig_usb_current_state_ind":
return
mode = parameters.get_child_value(0).get_string()
handle_current_mode(mode)
dbus_properties = Gio.DBusProxy.new_for_bus_sync(
Gio.BusType.SESSION,
Gio.DBusProxyFlags.DO_NOT_LOAD_PROPERTIES,
None, # DBusInterfaceInfo
"com.lomiri.LomiriGreeter",
"/com/lomiri/LomiriGreeter",
"org.freedesktop.DBus.Properties",
None, # Cancellable
)
is_greeter_active = bool(dbus_properties.call_sync(
"Get",
GLib.Variant("(ss)", ("com.lomiri.LomiriGreeter", "IsActive")), # interface_name, property_name
Gio.DBusCallFlags.NONE,
500, # TimeoutMsec
None, # Cancellable
).get_child_value(0))
# TODO: don't track locked state when cable unplugged?
dbus_properties.connect("g-signal", handle_dbus_properties_signal)
usb_moded = Gio.DBusProxy.new_for_bus_sync(
Gio.BusType.SYSTEM,
Gio.DBusProxyFlags.DO_NOT_LOAD_PROPERTIES,
None, # DBusInterfaceInfo
"com.meego.usb_moded",
"/com/meego/usb_moded",
"com.meego.usb_moded",
None, # Cancellable
)
usb_moded.connect("g-signal", handle_usb_moded_signal)
current_mode = usb_moded.call_sync(
"mode_request",
None, # Parameters
Gio.DBusCallFlags.NONE,
500, # TimeoutMsec
None, # Cancellable
).get_child_value(0).get_string()
handle_current_mode(current_mode)
# TODO: keep track of removable disks via UDisks2 (HintSystem=false) with MountPoints[0]
# and IdLabel/IdUUID used for e.g.
# - umtprd '-cmd:addstorage:"/media/phablet/1234-5678" "1234-5678" rw'
# - umtprd '-cmd:rmstorage:"1234-5678"'
loop = GLib.MainLoop(None)
loop.run()