aboutsummaryrefslogtreecommitdiff
path: root/FereFit_configRealTimeMeasure_BLE.py
blob: 3059263f80d8e5e0eb8c443ee25cf9a68b3c75c6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import asyncio
import sys
try:
    from bleak import BleakScanner, BleakClient
except ModuleNotFoundError:
    print("Error importing bleak, are you sure you installed it?")
    print("Try running the following command: \"pip3 install bleak\"")
    print("If that fails, try this: \"pip3 install bleak --break-system-packages\"")
    sys.exit(1)

async def sync(device_name: str):
    print(f"Scanning for {device_name}...")
    device = await BleakScanner.find_device_by_name(device_name, timeout=10)
    if not device:
        print("Watch not found!")
        return
    print(f"Found at {device.address}")

    got_aggregate = False
    got_generic = False

    def notification_handler(sender, data):
        nonlocal got_aggregate, got_generic
        if data[0] == 0xE1:
            aggregate_rates = [data[1], data[5], data[6], data[7]]  # skip 2-4 because they seem to be zero padding or something redundant
            print(f"Got aggregate rates: {aggregate_rates}")
            got_aggregate = True
        if data[0] == 0x94:
            print(f"Got generic measurement: {data[1]}")
            got_generic = True

    try:
        async with BleakClient(device) as client:
            await client.start_notify("6E40FC21-B5A3-F393-E0A9-E50E24DCCA9E", notification_handler)
            # byte[1] = 0x00 for heart rate, TODO add more measurements later, maybe split into separate scripts if logic differs
            packet = bytes([0x60, 0x00, 0x01])
            await client.write_gatt_char("6E40FC20-B5A3-F393-E0A9-E50E24DCCA9E", packet)
            while not got_generic:
                await asyncio.sleep(1)
            while not got_aggregate:
                await asyncio.sleep(1)
            # byte[2] = 0x00 for stop
            packet = bytes([0x60, 0x00, 0x00])
            await client.write_gatt_char("6E40FC20-B5A3-F393-E0A9-E50E24DCCA9E", packet)
            await client.stop_notify("6E40FC21-B5A3-F393-E0A9-E50E24DCCA9E")
    except Exception as e:
        print(f"Failed: {e}")

if __name__ == "__main__":
    watch_name = input("Enter watch name shown in BLE discovery (default: Watch ULTRA): ")
    if watch_name == "":
        watch_name = "Watch ULTRA"
    asyncio.run(sync(watch_name))