import asyncio import sys try: from bleak import BleakScanner, BleakClient except ModuleNotFoundError: print("Error: pip3 install bleak") sys.exit(1) try: from PIL import Image except ModuleNotFoundError: print("Error: pip3 install Pillow") sys.exit(1) WRITE_UUID = "6E40FC20-B5A3-F393-E0A9-E50E24DCCA9E" NOTIFY_UUID = "6E40FC21-B5A3-F393-E0A9-E50E24DCCA9E" WIDTH = 240 HEIGHT = 296 MTU = 148 CHUNK_SIZE = 140 def image_to_rgb565(image_path): """Load an image and convert to raw RGB565 format""" # Load and resize image img = Image.open(image_path) img = img.resize((WIDTH, HEIGHT), Image.Resampling.LANCZOS) # Convert to RGB if needed if img.mode != 'RGB': img = img.convert('RGB') # Convert to raw RGB565 pixels pixels = bytearray() for y in range(HEIGHT): for x in range(WIDTH): r, g, b = img.getpixel((x, y)) # RGB888 to RGB565 rgb565 = ((r >> 3) << 11) | ((g >> 2) << 5) | (b >> 3) pixels.append((rgb565 >> 8) & 0xFF) # High byte pixels.append(rgb565 & 0xFF) # Low byte return bytes(pixels) def make_header(image, chunk_size): total_bytes = len(image) total_packets = (total_bytes + chunk_size - 1) // chunk_size pixel_sum = sum(image) & 0xFFFF return bytes([ 0xE4, 0x51, 0x01, 0x00, (total_packets >> 8) & 0xFF, total_packets & 0xFF, (total_bytes >> 24) & 0xFF, (total_bytes >> 16) & 0xFF, (total_bytes >> 8) & 0xFF, total_bytes & 0xFF, 0x00, (chunk_size >> 8) & 0xFF, chunk_size & 0xFF, 0x01, 0x01, 0x01, 0x00, 0x00, 0x00, (pixel_sum >> 8) & 0xFF, pixel_sum & 0xFF, 0x01 ]) def make_chunk(image, chunk_index, total_packets, chunk_size): offset = chunk_index * chunk_size data = image[offset:offset + chunk_size] is_last = offset + len(data) >= len(image) progress = ((chunk_index + 1) * 100) // total_packets packet = bytearray(MTU) packet[0:4] = bytes([0xE4, 0x52, 0x01, 0x02]) packet[4] = ((chunk_index + 1) >> 8) & 0xFF packet[5] = (chunk_index + 1) & 0xFF packet[6] = (offset >> 24) & 0xFF packet[7] = (offset >> 16) & 0xFF packet[8] = (offset >> 8) & 0xFF packet[9] = offset & 0xFF packet[10] = progress packet[11] = 0x01 if is_last else 0x00 packet[14:14 + len(data)] = data checksum = (sum(packet[:14]) + sum(data)) & 0xFFFF packet[12] = (checksum >> 8) & 0xFF packet[13] = checksum & 0xFF return bytes(packet) async def send_watchface(device_name, image_path): print(f"Loading image: {image_path}") try: image = image_to_rgb565(image_path) print(f"Image converted: {len(image)} bytes") except Exception as e: print(f"Failed to load image: {e}") return print(f"Scanning for {device_name}...") device = await BleakScanner.find_device_by_name(device_name, timeout=10) if not device: print("Watch not found!") return print(f"Found at {device.address}") total_packets = (len(image) + CHUNK_SIZE - 1) // CHUNK_SIZE header = make_header(image, CHUNK_SIZE) ack = asyncio.Event() def notification_handler(sender, data): ack.set() async with BleakClient(device) as client: await client.start_notify(NOTIFY_UUID, notification_handler) print("Sending header...") await client.write_gatt_char(WRITE_UUID, header, response=False) await asyncio.wait_for(ack.wait(), timeout=5.0) print(f"Sending {total_packets} chunks...") for i in range(total_packets): ack.clear() chunk = make_chunk(image, i, total_packets, CHUNK_SIZE) await client.write_gatt_char(WRITE_UUID, chunk, response=False) try: await asyncio.wait_for(ack.wait(), timeout=5.0) except asyncio.TimeoutError: print(f"Timeout on chunk {i}") if i % 100 == 0: print(f"Progress: {i}/{total_packets}") await asyncio.sleep(2) print("Done!") if __name__ == "__main__": watch_name = input("Enter watch name (default: Watch ULTRA): ") or "Watch ULTRA" image_path = input("Enter image file path: ") asyncio.run(send_watchface(watch_name, image_path))