From 83c3b04e32813500b3e0431795c90242349f0172 Mon Sep 17 00:00:00 2001 From: Arslaan Pathan Date: Sun, 29 Mar 2026 17:00:40 +1300 Subject: feat: configRealTimeMeasure --- FereFit_configRealTimeMeasure_BLE.py | 53 ++++++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) create mode 100644 FereFit_configRealTimeMeasure_BLE.py (limited to 'FereFit_configRealTimeMeasure_BLE.py') diff --git a/FereFit_configRealTimeMeasure_BLE.py b/FereFit_configRealTimeMeasure_BLE.py new file mode 100644 index 0000000..3059263 --- /dev/null +++ b/FereFit_configRealTimeMeasure_BLE.py @@ -0,0 +1,53 @@ +import asyncio +import sys +try: + from bleak import BleakScanner, BleakClient +except ModuleNotFoundError: + print("Error importing bleak, are you sure you installed it?") + print("Try running the following command: \"pip3 install bleak\"") + print("If that fails, try this: \"pip3 install bleak --break-system-packages\"") + sys.exit(1) + +async def sync(device_name: str): + print(f"Scanning for {device_name}...") + device = await BleakScanner.find_device_by_name(device_name, timeout=10) + if not device: + print("Watch not found!") + return + print(f"Found at {device.address}") + + got_aggregate = False + got_generic = False + + def notification_handler(sender, data): + nonlocal got_aggregate, got_generic + if data[0] == 0xE1: + aggregate_rates = [data[1], data[5], data[6], data[7]] # skip 2-4 because they seem to be zero padding or something redundant + print(f"Got aggregate rates: {aggregate_rates}") + got_aggregate = True + if data[0] == 0x94: + print(f"Got generic measurement: {data[1]}") + got_generic = True + + try: + async with BleakClient(device) as client: + await client.start_notify("6E40FC21-B5A3-F393-E0A9-E50E24DCCA9E", notification_handler) + # byte[1] = 0x00 for heart rate, TODO add more measurements later, maybe split into separate scripts if logic differs + packet = bytes([0x60, 0x00, 0x01]) + await client.write_gatt_char("6E40FC20-B5A3-F393-E0A9-E50E24DCCA9E", packet) + while not got_generic: + await asyncio.sleep(1) + while not got_aggregate: + await asyncio.sleep(1) + # byte[2] = 0x00 for stop + packet = bytes([0x60, 0x00, 0x00]) + await client.write_gatt_char("6E40FC20-B5A3-F393-E0A9-E50E24DCCA9E", packet) + await client.stop_notify("6E40FC21-B5A3-F393-E0A9-E50E24DCCA9E") + except Exception as e: + print(f"Failed: {e}") + +if __name__ == "__main__": + watch_name = input("Enter watch name shown in BLE discovery (default: Watch ULTRA): ") + if watch_name == "": + watch_name = "Watch ULTRA" + asyncio.run(sync(watch_name)) -- cgit v1.2.3