aboutsummaryrefslogtreecommitdiff
path: root/FereFit_configRealTimeMeasure_BLE.py
diff options
context:
space:
mode:
authorArslaan Pathan <[email protected]>2026-03-29 17:00:40 +1300
committerArslaan Pathan <[email protected]>2026-03-29 17:00:40 +1300
commit83c3b04e32813500b3e0431795c90242349f0172 (patch)
treeed67ecf6a443a9440c9009ba960a43c264f2f771 /FereFit_configRealTimeMeasure_BLE.py
parent66a2d552090998d45a923fde72303006c220f8c6 (diff)
downloadzwzn-freefit-re-83c3b04e32813500b3e0431795c90242349f0172.tar.xz
zwzn-freefit-re-83c3b04e32813500b3e0431795c90242349f0172.zip
feat: configRealTimeMeasure
Diffstat (limited to 'FereFit_configRealTimeMeasure_BLE.py')
-rw-r--r--FereFit_configRealTimeMeasure_BLE.py53
1 files changed, 53 insertions, 0 deletions
diff --git a/FereFit_configRealTimeMeasure_BLE.py b/FereFit_configRealTimeMeasure_BLE.py
new file mode 100644
index 0000000..3059263
--- /dev/null
+++ b/FereFit_configRealTimeMeasure_BLE.py
@@ -0,0 +1,53 @@
+import asyncio
+import sys
+try:
+ from bleak import BleakScanner, BleakClient
+except ModuleNotFoundError:
+ print("Error importing bleak, are you sure you installed it?")
+ print("Try running the following command: \"pip3 install bleak\"")
+ print("If that fails, try this: \"pip3 install bleak --break-system-packages\"")
+ sys.exit(1)
+
+async def sync(device_name: str):
+ print(f"Scanning for {device_name}...")
+ device = await BleakScanner.find_device_by_name(device_name, timeout=10)
+ if not device:
+ print("Watch not found!")
+ return
+ print(f"Found at {device.address}")
+
+ got_aggregate = False
+ got_generic = False
+
+ def notification_handler(sender, data):
+ nonlocal got_aggregate, got_generic
+ if data[0] == 0xE1:
+ aggregate_rates = [data[1], data[5], data[6], data[7]] # skip 2-4 because they seem to be zero padding or something redundant
+ print(f"Got aggregate rates: {aggregate_rates}")
+ got_aggregate = True
+ if data[0] == 0x94:
+ print(f"Got generic measurement: {data[1]}")
+ got_generic = True
+
+ try:
+ async with BleakClient(device) as client:
+ await client.start_notify("6E40FC21-B5A3-F393-E0A9-E50E24DCCA9E", notification_handler)
+ # byte[1] = 0x00 for heart rate, TODO add more measurements later, maybe split into separate scripts if logic differs
+ packet = bytes([0x60, 0x00, 0x01])
+ await client.write_gatt_char("6E40FC20-B5A3-F393-E0A9-E50E24DCCA9E", packet)
+ while not got_generic:
+ await asyncio.sleep(1)
+ while not got_aggregate:
+ await asyncio.sleep(1)
+ # byte[2] = 0x00 for stop
+ packet = bytes([0x60, 0x00, 0x00])
+ await client.write_gatt_char("6E40FC20-B5A3-F393-E0A9-E50E24DCCA9E", packet)
+ await client.stop_notify("6E40FC21-B5A3-F393-E0A9-E50E24DCCA9E")
+ except Exception as e:
+ print(f"Failed: {e}")
+
+if __name__ == "__main__":
+ watch_name = input("Enter watch name shown in BLE discovery (default: Watch ULTRA): ")
+ if watch_name == "":
+ watch_name = "Watch ULTRA"
+ asyncio.run(sync(watch_name))