import json import subprocess import pydantic as pyd from .dfwindow import DFWINDOW from .mylogging import logger, setup_logging # def sendKey( # thekey: str, count: int = 1, cycle_delay: float = 0.1 # ): # | Key | KeyCode, input_ctx: Controller = Controller()): # codes = {"w": 17, "a": 30, "s": 31, "d": 32} # for _ in range(count): # subprocess.run(f"ydotool key {codes[thekey]}:1 {codes[thekey]}:0", shell=True) # if count > 1: # time.sleep(cycle_delay) # logger.debug(f"Tapped '{thekey}' {count} times") # def kd_getWindowGeo( # query: str = "dwarfort", exception_on_missing: bool = False # ) -> tuple[tuple[int, int], tuple[int, int]]: # try: # result = subprocess.run( # ["kdotool", "search", query, "getwindowgeometry"], check=True, capture_output=True, text=True # ) # except subprocess.CalledProcessError as e: # logger.error(f"Error: Could not find window '{query}'", e) # return ((0, 0), (0, 0)) # gx = 0 # gy = 0 # px = 0 # py = 0 # got_pos = False # got_geo = False # for line in result.stdout.split("\n"): # if m := re_window_geometry.match(line): # gx = math.floor(float(m.group(1))) # gy = math.floor(float(m.group(2))) # got_geo = True # if m := re_window_position.match(line): # px = math.floor(float(m.group(1))) # py = math.floor(float(m.group(2))) # got_pos = True # if (exception_on_missing) and not (got_geo and got_pos): # raise ValueError("Incomplete window information", query, got_pos, got_geo) # return ((px, py), (gx, gy)) # def kd_getDesktopForWindow(query: str = "dwarfort") -> str: # try: # result = subprocess.run( # ["kdotool", "search", query, "get_desktop_for_window"], check=True, capture_output=True, text=True # ) # except subprocess.CalledProcessError as e: # logger.error("Could not detect the desktop of the window", query, e) # return "" # return result.stdout # def focusWindow(query: str = "dwarfort") -> bool: # """Uses kdotool to reliably focus the window on Wayland.""" # try: # subprocess.run(["kdotool", "search", query, "windowactivate"], check=True) # return True # except subprocess.CalledProcessError: # logger.error(f"Error: Could not find window '{query}'") # return False # def moveMouse(x: int, y: int): # subprocess.run(["ydotool", "mousemove", "-x", str(x), "-y", str(y)], check=True) # return class WAYMONITOR_SIZE(pyd.BaseModel): width: int height: int class WAYMONITOR_POINT(pyd.BaseModel): x: int y: int class WAYMONITOR(pyd.BaseModel): id: int name: str pos: WAYMONITOR_POINT size: WAYMONITOR_SIZE class WAYMONITORS: def __init__(self) -> None: _result = subprocess.run(["kscreen-doctor", "-j"], check=True, capture_output=True, text=True) self._jobj = json.loads(_result.stdout) logger.debug(f"All Outputs {len(self._jobj['outputs'])}") self.active_outputs = [o for o in self._jobj["outputs"] if o["enabled"]] logger.debug(f"Active Outputs {len(self.active_outputs)}") self.monitors: list[WAYMONITOR] = [] self.monitor_ids = {} self.monitor_names = {} _max_x = 0 _max_y = 0 for o in self.active_outputs: logger.info( f"id: {o['id']} pos: {o['pos']['x']}, {o['pos']['y']} size: {o['size']['width']} x {o['size']['height']}" ) # TODO: Copy over everything monitor = WAYMONITOR( id=int(o.get("id", -1)), name=str(o.get("name", "")), pos=WAYMONITOR_POINT(x=int(o["pos"]["x"]), y=int(o["pos"]["y"])), size=WAYMONITOR_SIZE(width=int(o["size"]["width"]), height=int(o["size"]["height"])), ) _max_x = max(_max_x, monitor.size.width + monitor.pos.x) _max_y = max(_max_y, monitor.size.height + monitor.pos.y) self.monitor_ids[o["id"]] = monitor self.monitor_names[o["name"]] = monitor self.monitors.append(monitor) self.maximum_x = _max_x self.maximum_y = _max_y def pointInMonitor(self, x: int, y: int, monitor_id: int | str) -> bool: if isinstance(monitor_id, int): monitor = self.monitor_ids[monitor_id] elif isinstance(monitor_id, str): monitor = self.monitor_names[monitor_id] else: raise ValueError("id parameter to WAYMONITORS.pointInMonitor must be int or str") # Return if x and y are in the ranges return ( x >= monitor.pos.x and x < monitor.pos.x + monitor.size.width - 1 and y > monitor.pos.y and y < monitor.pos.y + monitor.size.height - 1 ) def absolute2relative(self, x: int, y: int) -> tuple[int, tuple[int, int]]: for m in self.monitors: if self.pointInMonitor(x, y, m.id): return (m.id, (x - m.pos.x, y - m.pos.y)) return (-1, (-9999, -9999)) # def moveMouseAway(): # moveMouse(-9999, -9999) # moveMouse(-9999, -9999) # moveMouse(-9999, -9999) # moveMouse(-9999, -9999) # def kd_getMonitorLayoutInfo(): # result = subprocess.run(["kscreen-doctor", "-j"], check=True, capture_output=True, text=True) # jobj = json.loads(result.stdout) # logger.info(f"All Outputs {len(jobj['outputs'])}") # active_outputs = [o for o in jobj["outputs"] if o["enabled"]] # logger.info(f"Active Outputs {len(active_outputs)}") # all_geo_pos = [] # for o in active_outputs: # logger.info( # f"id: {o['id']} pos: {o['pos']['x']}, {o['pos']['y']} size: {o['size']['width']} x {o['size']['height']}" # ) # monitor = {} # monitor["id"] = o["id"] # monitor["name"] = o["name"] # monitor["pos"] = o["pos"] # monitor["size"] = o["size"] # all_geo_pos.append(monitor) # # for output in jobj["outputs"]: # pass # def capActiveWindow(): # with tempfile.TemporaryDirectory() as tdir: # img_file = Path(tdir).joinpath("cap_image.png") # subprocess.run( # ["spectacle", "-b", "-n", "-a", "-o", str(img_file.resolve())], # check=True, # ) # if not img_file.exists(): # raise FileNotFoundError("captured image does not exist", img_file) # img = cv2.imread(str(img_file.resolve()), cv2.IMREAD_UNCHANGED) # if img is None: # raise Exception("Unable to load image", img_file) # alpha = img[:, :, 3] # _, thresh = cv2.threshold(alpha, 230, 255, cv2.THRESH_BINARY) # coords = cv2.findNonZero(thresh) # x, y, w, h = cv2.boundingRect(coords) # logger.debug(f"Bounding box: (x={x}, y={y}), size: {w}x{h}") # cropped = img[y : y + h, x : x + w] # return cropped # def cv_find_black_border(img_in) -> tuple[int, int]: # # grap a greyscale sliver at the top # num_rows = 55 # num_cols = 35 # ignore_cols = 20 # test_strip = cv2.cvtColor(img_in[0:num_rows, ignore_cols:-ignore_cols], cv2.COLOR_BGR2GRAY) # test_mean = np.mean(test_strip, axis=1) # y_black_start = np.where(test_mean < 4)[0][0] # logger.debug(f"{y_black_start=}") # test_strip = cv2.cvtColor(img_in[y_black_start + 1 : -y_black_start, 0:num_cols], cv2.COLOR_BGR2GRAY) # test_mean = np.mean(test_strip, axis=0) # x_black_start = np.where(test_mean < 4)[0][0] # logger.debug(f"{x_black_start=}") # return (x_black_start, y_black_start) # def cv_find_right_black(img, num_columns: int = 20) -> bool: # test_strip = cv2.cvtColor(img[:, -num_columns:], cv2.COLOR_BGR2GRAY) # _, thresh = cv2.threshold(test_strip, 10, 255, cv2.THRESH_BINARY) # return not np.any(thresh) # def cv_find_left_black(img, num_columns: int = 20) -> bool: # test_strip = cv2.cvtColor(img[:, :num_columns], cv2.COLOR_BGR2GRAY) # _, thresh = cv2.threshold(test_strip, 10, 255, cv2.THRESH_BINARY) # return not np.any(thresh) # def cv_find_bottom_black(img, num_rows: int = 20) -> bool: # test_strip = cv2.cvtColor(img[-num_rows:, :], cv2.COLOR_BGR2GRAY) # _, thresh = cv2.threshold(test_strip, 10, 255, cv2.THRESH_BINARY) # return not np.any(thresh) # def cv_find_top_black(img, num_rows: int = 20) -> bool: # test_strip = cv2.cvtColor(img[:num_rows, :], cv2.COLOR_BGR2GRAY) # _, thresh = cv2.threshold(test_strip, 10, 255, cv2.THRESH_BINARY) # return not np.any(thresh) # def cv_trim_windowframe(img_in): # # find Y that isnt all black # y_black_start = 0 # for y in range(img_in.shape[0]): # if np.any(img_in[y]): # y_blank_end = y # break # logger.debug(f"{y_blank_end=}") # # find X that isnt all black # x_blank_end = 0 # for x in range(img_in.shape[1]): # if np.any(img_in[:, x]): # x_blank_end = x # break # logger.debug(f"{x_blank_end=}") # # Find bottom end of blank # logger.info(f"Blank borders are {x_blank_end} wide and {y_blank_end} tall") # # top_strip = img_in[5:55, 20:-20] # # for y in range(5,img_in.shape[0]): # # if not np.any(img_in[y, 20:-20]): # # top_border = y # def cv_image_diff(img1, img2): # if img1.shape != img2.shape: # return float("inf") # grey1 = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY) # grey2 = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY) # # Apparently this is the Mean Squared Error (MES) # err = np.sum((grey1.astype("float") - grey2.astype("float")) ** 2) # err /= float(grey1.shape[0] * grey1.shape[0]) # return err def test5(): dfw = DFWINDOW() map_image = dfw.getPanoramaMap() # img = dfw.capWindow()[60:-60,20:-20] # gg = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) # mm = np.mean(gg, axis=1) # dt = img # logger.debug(f"{img=}") # def test4(): # bottom_to_ignore = 120 # save_dir = Path() # image_name_base = "grid_base_" # image_name_ext = "png" # moveMouseAway() # Prob best to move mouse then focus # focusWindow("dwarfort") # logger.debug("Window focused and mouse moved away.") # # Configure the croppin area # # slam to upper left # sendKey("w", 30) # sendKey("a", 30) # time.sleep(1) # img = capActiveWindow() # i_left, i_top = cv_find_black_border(img) # i_bottom = img.shape[0] - bottom_to_ignore # i_right = img.shape[1] - i_left # img = img[i_top:i_bottom, i_left:i_right] # # go down till the top of the map scrolls off # while cv_find_top_black(img): # sendKey("s") # img = capActiveWindow()[i_top:i_bottom, i_left:i_right] # sendKey("w") # img = capActiveWindow()[i_top:i_bottom, i_left:i_right] # if not cv_find_top_black(img): # raise Exception("Lost top of map while searching for it") # steps_down = 0 # while not cv_find_bottom_black(img): # sendKey("s") # steps_down += 1 # img = capActiveWindow()[i_top:i_bottom, i_left:i_right] # steps_vertical = steps_down # logger.debug(f"Map is about {steps_vertical} steps vertical. Current step is {steps_down}") # # go right until the left scrolls off # while cv_find_left_black(img): # sendKey("d") # img = capActiveWindow()[i_top:i_bottom, i_left:i_right] # sendKey("a") # img = capActiveWindow()[i_top:i_bottom, i_left:i_right] # if not cv_find_left_black(img): # raise Exception("Lost left of map while searching for it") # steps_right = 0 # while not cv_find_right_black(img): # sendKey("d") # steps_right += 1 # img = capActiveWindow()[i_top:i_bottom, i_left:i_right] # steps_horrizontal = steps_right # logger.debug(f"Map is about {steps_horrizontal} steps horrizontal. Current step is {steps_right}") # def test3(): # bottom_to_ignore = 120 # save_dir = Path() # image_name_base = "grid_base_" # image_name_ext = "png" # taps_to_reset_x = 30 # taps_to_reset_y = 30 # moveMouseAway() # Prob best to move mouse then focus # focusWindow("dwarfort") # logger.debug("Window focused and mouse moved away.") # # Configure the croppin area # # slam to upper left # sendKey("w", taps_to_reset_x) # sendKey("a", taps_to_reset_y) # time.sleep(1) # img = capActiveWindow() # i_left, i_top = cv_find_black_border(img) # i_bottom = img.shape[0] - bottom_to_ignore # i_right = img.shape[1] - i_left # # Measure size of map # img = img[i_top:i_bottom, i_left:i_right] # # Ensure we can see the left edge of the map # if not (cv_find_left_black(img) and cv_find_top_black(img)): # raise Exception("Something is wrong, we should be looking past the upper left edge of map") # steps_right = 0 # while not cv_find_right_black(img): # sendKey("d", 2) # steps_right += 2 # time.sleep(1) # img = capActiveWindow()[i_top:i_bottom, i_left:i_right] # logger.debug(f"Total of {steps_right} steps to right border") # # Reset to top, and count down # sendKey("w", 5) # time.sleep(1) # img = capActiveWindow()[i_top:i_bottom, i_left:i_right] # if not (cv_find_right_black(img) and cv_find_top_black(img)): # raise Exception("Something is wrong, we should be looking past the upper right edge of map") # steps_down = 0 # while not cv_find_bottom_black(img): # sendKey("s", 2) # steps_down += 2 # time.sleep(1) # img = capActiveWindow()[i_top:i_bottom, i_left:i_right] # logger.debug(f"Total of {steps_down} steps to bottom border") # # Now move back to grid 0,0 # # cur_grid = [steps_right, steps_down] # # tgt_grid = (0,0) # # while cur_grid != tgt_grid: # # if cur_grid[0] > tgt_grid[0] and cur_grid[0] > 0: # # sendKey("a") # # cur_grid[0] = cur_grid[0] - 1 # # elif cur_grid[0] < tgt_grid[0] and cur_grid[0] < steps_ # def test2(): # bottom_to_ignore = 120 # save_dir = Path() # image_name_base = "grid_base_" # image_name_ext = "png" # # find size and shape of df # window_pos, window_geo = getWindowGeo("dwarfort", True) # logger.debug(f"Pos: {window_pos} Geo: {window_geo}") # moveMouseAway() # Prob best to move mouse then focus # focusWindow("dwarfort") # logger.debug("Window focused and mouse moved away.") # time.sleep(1) # taps_to_reset_x = 18 # taps_to_reset_y = 18 # logger.debug(f"Resetting map view to upper left. {taps_to_reset_y}x'w' and {taps_to_reset_x}x'a'") # sendKey("w", taps_to_reset_x) # sendKey("a", taps_to_reset_y) # time.sleep(1) # first_image = capActiveWindow() # logger.debug(f"Took screen shot. {first_image.shape[1]} x {first_image.shape[0]}") # # skip the next capture after this # skip_capture = True # cv_find_black_border(first_image) # i_left, i_top = cv_find_black_border(first_image) # i_bottom = first_image.shape[0] - bottom_to_ignore # i_right = first_image.shape[1] - i_left # logger.debug(f"Local Box: ({i_left},{i_top}) -- ({i_right},{i_bottom})") # taps_per_grid_x = 4 # last_x = -1 # for x in range(8): # logger.debug(f"Processing grid piece {x}, (?)") # if not skip_capture: # old_img = img # type: ignore # noqa: F821 # img = capActiveWindow() # logger.debug(f"Took screen shot. {img.shape[1]} x {img.shape[0]}") # # Compare to the old image to see if we even moved. # mes = cv_image_diff(old_img, img) # logger.debug(f"{mes=}") # if mes < 5: # break # else: # skip_capture = False # img = first_image # crop = img[i_top:i_bottom, i_left:i_right] # found_left_margin = cv_find_left_black(crop[15:-15]) # found_right_margin = cv_find_right_black(crop[15:-15]) # found_top_margin = cv_find_top_black(crop[:, 15:-15]) # found_bottom_margin = cv_find_bottom_black(crop[:, 15:-15]) # logger.debug(f"{found_left_margin=} {found_right_margin=} {found_top_margin=} {found_bottom_margin=}") # savedfile = save_dir.joinpath(f"{image_name_base}{x}.{image_name_ext}") # cv2.imwrite(str(savedfile), crop) # logger.info(f"Saved grid piece {str(savedfile)}") # sendKey("d", taps_per_grid_x) # logger.debug(f"Tapped {taps_per_grid_x} 'd' to move right") # last_x = x # time.sleep(1) # def test1(): # time.sleep(0.1) # window_pos, window_geo = getWindowGeo("dwarfort", True) # logger.info(f"Pos: {window_pos} Geo: {window_geo}") # logger.info(f"{kd_getDesktopForWindow('dwarfort')}") # # time.sleep(1) # # kd_focusWindow("dwarfort") # # subprocess.run(["ydotool", "mousemove", "-a", "0", "0"], check=True) # # subprocess.run(["ydotool", "mousemove", str(window_pos[0]), str(window_pos[1])], check=True) # waym = WAYMONITORS() # logger.debug(f"1: {waym.pointInMonitor(10, 10, 1)}") # logger.debug(f"2: {waym.pointInMonitor(10, 10, 2)}") # m_id, rpos = waym.absolute2relative(1239, 1238) # logger.debug(f"10,10: #{m_id} {rpos[0]}, {rpos[1]}") # if 1 == 0: # focusWindow("dwarfort") # time.sleep(1) # moveMouseAway() # # f"{window_pos[0]},{window_pos[1]} {window_geo[0]}x{window_geo[1]}", # img = capActiveWindow() # else: # img = cv2.imread("./test_img.png", cv2.IMREAD_UNCHANGED) # alpha = img[:, :, 3] # _, thresh = cv2.threshold(alpha, 230, 255, cv2.THRESH_BINARY) # coords = cv2.findNonZero(thresh) # x, y, w, h = cv2.boundingRect(coords) # logger.debug(f"Bounding box: (x={x}, y={y}), size: {w}x{h}") # img = img[y : y + h, x : x + w] # left_x, top_y = cv_find_black_border(img) # right_x = left_x # bottom_y = 120 # clean_section = img[top_y:-bottom_y, left_x:-right_x] # logger.debug(f"{clean_section.shape[1]} x {clean_section.shape[0]}") # # subprocess.run(["xdg-open", "./test_img.png"], check=True) # # kd_getMonitorLayoutInfo() # # for i in range(3): # # # tap("w") # # tap("d") # # time.sleep(1) # # kd_movemouse(-99999, -99999) # # kd_movemouse(-99999, -99999) # # kd_movemouse(-99999, -99999) # # kd_movemouse(10, 800) # # kd_movemouse(99999, 0) # # kd_movemouse(10, 0) # # kd_movemouse(-999, -999) # # kd_movemouse(200, 200) # # for i in range(2): # # kd_movemouse(100, 100) # # time.sleep(0.1) # sys.exit() # # filename = os.path.join(TILES_DIR, f"tile_{i:03d}.png") if __name__ == "__main__": # level 5 is TRACE setup_logging(level=5, enqueue=False, console_show_time=False, console_tracebacks=True) test5()