aboutsummaryrefslogtreecommitdiff
path: root/FereFit_enterMakeDial_BLE.py
blob: 096e98686fcf9a8790e4a27f5fe2fb992e207632 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import asyncio
import sys
try:
    from bleak import BleakScanner, BleakClient
except ModuleNotFoundError:
    print("Error: pip3 install bleak")
    sys.exit(1)

try:
    from PIL import Image
except ModuleNotFoundError:
    print("Error: pip3 install Pillow")
    sys.exit(1)

WRITE_UUID = "6E40FC20-B5A3-F393-E0A9-E50E24DCCA9E"
NOTIFY_UUID = "6E40FC21-B5A3-F393-E0A9-E50E24DCCA9E"
WIDTH = 240
HEIGHT = 296
MTU = 148
CHUNK_SIZE = 140

def image_to_rgb565(image_path):
    """Load an image and convert to raw RGB565 format"""
    # Load and resize image
    img = Image.open(image_path)
    img = img.resize((WIDTH, HEIGHT), Image.Resampling.LANCZOS)
    
    # Convert to RGB if needed
    if img.mode != 'RGB':
        img = img.convert('RGB')
    
    # Convert to raw RGB565 pixels
    pixels = bytearray()
    for y in range(HEIGHT):
        for x in range(WIDTH):
            r, g, b = img.getpixel((x, y))
            # RGB888 to RGB565
            rgb565 = ((r >> 3) << 11) | ((g >> 2) << 5) | (b >> 3)
            pixels.append((rgb565 >> 8) & 0xFF)  # High byte
            pixels.append(rgb565 & 0xFF)         # Low byte
    
    return bytes(pixels)

def make_header(image, chunk_size):
    total_bytes = len(image)
    total_packets = (total_bytes + chunk_size - 1) // chunk_size
    pixel_sum = sum(image) & 0xFFFF
    
    return bytes([
        0xE4, 0x51, 0x01, 0x00,
        (total_packets >> 8) & 0xFF, total_packets & 0xFF,
        (total_bytes >> 24) & 0xFF, (total_bytes >> 16) & 0xFF,
        (total_bytes >> 8) & 0xFF, total_bytes & 0xFF,
        0x00,
        (chunk_size >> 8) & 0xFF, chunk_size & 0xFF,
        0x01, 
        0x01, 0x01, 0x00, 0x00, 0x00,
        (pixel_sum >> 8) & 0xFF, pixel_sum & 0xFF,
        0x01
    ])

def make_chunk(image, chunk_index, total_packets, chunk_size):
    offset = chunk_index * chunk_size
    data = image[offset:offset + chunk_size]
    is_last = offset + len(data) >= len(image)
    progress = ((chunk_index + 1) * 100) // total_packets
    
    packet = bytearray(MTU)
    packet[0:4] = bytes([0xE4, 0x52, 0x01, 0x02])
    packet[4] = ((chunk_index + 1) >> 8) & 0xFF
    packet[5] = (chunk_index + 1) & 0xFF
    packet[6] = (offset >> 24) & 0xFF
    packet[7] = (offset >> 16) & 0xFF
    packet[8] = (offset >> 8) & 0xFF
    packet[9] = offset & 0xFF
    packet[10] = progress
    packet[11] = 0x01 if is_last else 0x00
    packet[14:14 + len(data)] = data
    
    checksum = (sum(packet[:14]) + sum(data)) & 0xFFFF
    packet[12] = (checksum >> 8) & 0xFF
    packet[13] = checksum & 0xFF
    
    return bytes(packet)

async def send_watchface(device_name, image_path):
    print(f"Loading image: {image_path}")
    try:
        image = image_to_rgb565(image_path)
        print(f"Image converted: {len(image)} bytes")
    except Exception as e:
        print(f"Failed to load image: {e}")
        return
    
    print(f"Scanning for {device_name}...")
    device = await BleakScanner.find_device_by_name(device_name, timeout=10)
    if not device:
        print("Watch not found!")
        return
    print(f"Found at {device.address}")
    
    total_packets = (len(image) + CHUNK_SIZE - 1) // CHUNK_SIZE
    header = make_header(image, CHUNK_SIZE)
    ack = asyncio.Event()
    
    def notification_handler(sender, data):
        ack.set()
    
    async with BleakClient(device) as client:
        await client.start_notify(NOTIFY_UUID, notification_handler)
        
        print("Sending header...")
        await client.write_gatt_char(WRITE_UUID, header, response=False)
        await asyncio.wait_for(ack.wait(), timeout=5.0)
        
        print(f"Sending {total_packets} chunks...")
        for i in range(total_packets):
            ack.clear()
            chunk = make_chunk(image, i, total_packets, CHUNK_SIZE)
            await client.write_gatt_char(WRITE_UUID, chunk, response=False)
            try:
                await asyncio.wait_for(ack.wait(), timeout=5.0)
            except asyncio.TimeoutError:
                print(f"Timeout on chunk {i}")
            
            if i % 100 == 0:
                print(f"Progress: {i}/{total_packets}")
        
        await asyncio.sleep(2)
        print("Done!")

if __name__ == "__main__":
    watch_name = input("Enter watch name (default: Watch ULTRA): ") or "Watch ULTRA"
    image_path = input("Enter image file path: ")
    asyncio.run(send_watchface(watch_name, image_path))