First (way too late) commit

Dumping my fever dream coding session into source control finally.
This commit is contained in:
Doc
2026-01-12 19:15:18 -05:00
commit f386b154ab
22 changed files with 3267 additions and 0 deletions

124
src/df_tools/__init__.py Normal file
View File

@@ -0,0 +1,124 @@
import importlib.metadata
# Refer to pyproject.toml or package metadata for metadata
def _get_version(meta_name: str | None) -> str:
try:
if meta_name:
return importlib.metadata.version(meta_name)
except importlib.metadata.PackageNotFoundError:
pass
except Exception:
return "0.0.0-error"
return "0.0.0-dev"
def _get_license_type(metadata: importlib.metadata.PackageMetadata) -> str:
"""Return SPDX short identifier or Unknown"""
try:
_license = metadata["License-Expression"]
# Special case for my preferred license
if not _license:
_license = metadata["License"]
if _license and _license.lower().startswith("mit license"):
return "MIT"
else:
return _license
except Exception:
pass
return "Unknown"
def _get_project_name(metadata: importlib.metadata.PackageMetadata) -> str:
"""Return project name"""
return metadata.get("Name") or __package__ or __name__ or "Unknown Project"
# Define failsafe values
PROJECT_AUTHOR = "Unknown"
PROJECT_COPYRIGHT = "Copyright (c) Unknown"
PROJECT_EMAIL = ""
PROJECT_LICENSE = "Unknown"
PROJECT_NAME = "Uknown Project"
PROJECT_VERSION = "0.0.0-error"
# Try to set values, but avoid failing
attempts = [
__package__,
__name__,
__package__.replace("_", "-") if __package__ else None,
__package__.replace("-", "_") if __package__ else None,
]
_mdata: importlib.metadata.PackageMetadata | None = None
_meta_name = None
for chosen_name in attempts:
try:
if chosen_name:
_meta_name = chosen_name
_mdata = importlib.metadata.metadata(_meta_name)
break
except importlib.metadata.PackageNotFoundError:
continue
except Exception:
pass
__version__ = _get_version(_meta_name)
PROJECT_VERSION = __version__
if _mdata:
try:
PROJECT_NAME = _get_project_name(_mdata)
except Exception:
PROJECT_NAME = "Unknown"
try:
PROJECT_LICENSE = _get_license_type(_mdata)
except Exception:
PROJECT_LICENSE = "Unknown"
if "Author" in _mdata:
__author__ = _mdata["Author"]
PROJECT_AUTHOR = __author__
else:
PROJECT_AUTHOR = "Unknown"
if "Author-Email" in _mdata:
__email__ = _mdata["Author-Email"]
try:
if "<" in __email__:
# Split at the first "<"
_split = __email__.split("<", 1)
# Don't capture after ">", as there may be multiple emails
PROJECT_EMAIL = _split[-1].split(">", 1)[0]
__email__ = PROJECT_EMAIL
if PROJECT_AUTHOR == "Unknown" and len(_split[0].strip()) > 0:
PROJECT_AUTHOR = _split[0].strip()
__author__ = PROJECT_AUTHOR
else:
PROJECT_EMAIL = __email__
except Exception:
pass
else:
PROJECT_EMAIL = ""
try:
PROJECT_COPYRIGHT = f"Copyright (c) {PROJECT_AUTHOR} ({PROJECT_EMAIL})\nLicense: {PROJECT_LICENSE}"
except Exception:
PROJECT_COPYRIGHT = "Copyright (c) Unknown"
try:
del _get_license_type
del _get_project_name
del _get_version
globals().pop("_mdata", None)
globals().pop("_meta_name", None)
globals().pop("_split", None)
except Exception:
pass

4
src/df_tools/__main__.py Normal file
View File

@@ -0,0 +1,4 @@
from .cli import app
if __name__ == "__main__":
app()

88
src/df_tools/cli.py Normal file
View File

@@ -0,0 +1,88 @@
# from logging import log
import re
from pathlib import Path
import typer
#from loguru import logger
from rich import print # noqa: A004
from . import PROJECT_COPYRIGHT, PROJECT_NAME, PROJECT_VERSION, __version__ # noqa: F401
from .configuration import Settings
from .mylogging import setup_logging, logger
# from pydantic import BaseModel, SecretStr
# class Config(BaseModel):
# api_key: SecretStr
# db_password: SecretStr
app = typer.Typer(name=PROJECT_NAME, help=f"A command line interface for {PROJECT_NAME}")
def version_callback(value: bool) -> None:
if value:
print(f"{PROJECT_NAME} {__version__}")
raise typer.Exit()
def do_configure_logging(is_verbose: bool, is_debug: bool, do_log_file: bool = True) -> None:
console_level = "WARNING"
log_level = "WARNING"
console_tracebacks = False
console_show_locals = False
console_show_path = False
console_show_time = False
if is_verbose:
console_level = "INFO"
log_level = "INFO"
if is_debug:
console_level = "DEBUG"
log_level = "DEBUG"
console_tracebacks = True
console_show_locals = True
console_show_path = True
console_show_time = True
logfile_name: Path = Path().joinpath(re.sub(r"[^a-zA-Z0-9]", "_", PROJECT_NAME.lower()) + ".log")
setup_logging(
level=log_level,
console_level=console_level,
log_file=logfile_name.resolve(),
console_tracebacks=console_tracebacks,
console_show_time=console_show_time,
console_locals=console_show_locals,
console_show_path=console_show_path,
loguru_tracebacks=console_tracebacks,
loguru_locals=console_show_locals,
)
@app.command()
def main(
version: bool | None = typer.Option(
None, "--version", callback=version_callback, help="Print the version and exit", is_eager=True
),
verbose: bool = typer.Option(False, "--verbose", "-v", help="Enable verbose output"),
debug: bool = typer.Option(False, "--debug", "-d", help="Enable debug output"),
) -> None:
do_configure_logging(verbose, debug)
logger.debug("Debug logging is enabled")
logger.info(f"Starting {PROJECT_NAME} version {PROJECT_VERSION}")
logger.debug("Loading configuration")
config = Settings.load()
print(f"\n[cyan]This is the default action of [bold magenta]{PROJECT_NAME}[/bold magenta][/cyan]")
print(f"\nReplace [green]this message[/green] by putting your code into {__package__}.cli:main")
print("See Typer documentation at https://typer.tiangolo.com/")
logger.debug("This is a debug message")
logger.info("This is an info message")
logger.warning("This is a warning message")
logger.error("This is an error message")
if __name__ == "__main__":
app()

View File

@@ -0,0 +1,65 @@
import tomllib
import tomli_w
from typing import Optional
from pathlib import Path
from pydantic import SecretStr, Field, BaseModel
from pydantic_settings import BaseSettings, SettingsConfigDict
class ExampleSection(BaseModel):
example_key: str = "example_value"
maybe_value: Optional[int] = None
class Settings(BaseSettings):
default_config_dir: Path = Field(default=Path.home() / ".config", # / "python-boilerplate",
description="Default path to user configuration file.", exclude=True)
config_file_name: Path = Field(default=Path("python_boilerplate.toml"),
description="Name of the configuration file.", exclude=True)
logfile: str = "python-boilerplate.log"
example_string: str = Field(default="Example Default", description="An example string value.")
example_int: Optional[int] = None
example_float: Optional[float] = None
example_bool: bool = True
example_list: list[int] = [1, 2, 3]
example_dict: dict[str, str] = {"key": "value", "foo": "bar"}
example_section: ExampleSection = ExampleSection()
model_config = SettingsConfigDict(
env_prefix="PYTHON_BOILERPLATE_",
env_nested_delimiter="__",
env_file_encoding="utf-8",
extra="ignore",
)
@classmethod
def load(cls, custom_file: Optional[Path | str] = None) -> "Settings":
# Use the provided custom file, or look for config file in standard locations
if custom_file is not None and Path(custom_file).is_file():
config_file = Path(custom_file)
elif Path(Path.cwd() / cls().config_file_name).is_file():
# Check for config in current directory
config_file = Path.cwd() / cls().config_file_name
elif Path(cls().default_config_dir / cls().config_file_name).is_file():
# Wherever the heck we specified
config_file = cls().default_config_dir / cls().config_file_name
else:
# Return default settings if no config file found
return cls()
if config_file.is_file():
with config_file.open("rb") as f:
config_data = tomllib.load(f)
return cls.model_validate(config_data)
return cls()
def save(self, custom_file: Optional[Path] = None) -> None:
# Use the provided custom file, or look for config file in standard locations
if custom_file is not None:
config_file = custom_file
else:
config_file = self.default_config_dir / self.config_file_name
config_file.parent.mkdir(parents=True, exist_ok=True)
with config_file.open("wb") as f:
tomli_w.dump(self.model_dump(mode="json",exclude_none=True), f)

356
src/df_tools/dfwindow.py Normal file
View File

@@ -0,0 +1,356 @@
import time
from pathlib import Path
import cv2
import numpy as np
from loguru import logger
from .waytools import capActiveWindow, focusWindow, moveMouse
from .waytools import sendKey as _sendKey
class DFWINDOW:
class TOOLS:
@staticmethod
def moveMouseAway() -> None:
moveMouse(-9999, -9999)
moveMouse(-9999, -9999)
moveMouse(-9999, -9999)
moveMouse(-9999, -9999)
@staticmethod
def find_content_origin(
image_in,
num_rows: int = 55,
num_cols: int = 35,
ignore_rows: int = 55,
ignore_cols: int = 20,
mean_threshold: int = 4,
) -> tuple[int, int]:
# Check the first (num_rows) rows at the top of the image,
# ignoring (ignore_cols) number of pixels at each end of teh line.
test_mean = np.mean(
cv2.cvtColor(image_in[0:num_rows, ignore_cols:-ignore_cols], cv2.COLOR_BGR2GRAY),
axis=1, # get the mean along the x-axis
)
# TODO: handle when 0 results return
# Test the mean darkness, get the first row darker than 4
content_y = np.where(test_mean < mean_threshold)[0][0]
_ignore_rows = max(ignore_rows, content_y + 1)
test_mean = np.mean(
cv2.cvtColor(image_in[_ignore_rows:-_ignore_rows, 0:num_cols], cv2.COLOR_BGR2GRAY),
axis=0, # get the mean along the y-axis
)
content_x = np.where(test_mean < mean_threshold)[0][0]
logger.debug(f"Content origin ({content_x}, {content_y})")
return (content_x, content_y)
@staticmethod
def isRightBorder(img, num_columns=20, border_threshold: int = 10) -> bool:
# grab a greyscale strip to look at
test_strip = cv2.cvtColor(img[:, -num_columns:], cv2.COLOR_BGR2GRAY)
# polarize the values around the threshold
_, thresh = cv2.threshold(test_strip, border_threshold, 255, cv2.THRESH_BINARY)
# Are all pixels in the strip "black"
return not np.any(thresh)
@staticmethod
def isLeftBorder(img, num_columns=20, border_threshold: int = 10) -> bool:
# grab a greyscale strip to look at
test_strip = cv2.cvtColor(img[:, :num_columns], cv2.COLOR_BGR2GRAY)
# polarize the values around the threshold
_, thresh = cv2.threshold(test_strip, border_threshold, 255, cv2.THRESH_BINARY)
# Are all pixels in the strip "black"
return not np.any(thresh)
@staticmethod
def isBottomBorder(img, num_rows=20, border_threshold: int = 10) -> bool:
# grab a greyscale strip to look at
test_strip = cv2.cvtColor(img[-num_rows:, :], cv2.COLOR_BGR2GRAY)
# polarize the values around the threshold
_, thresh = cv2.threshold(test_strip, border_threshold, 255, cv2.THRESH_BINARY)
# Are all pixels in the strip "black"
return not np.any(thresh)
@staticmethod
def isTopBorder(img, num_rows=20, border_threshold: int = 10) -> bool:
# grab a greyscale strip to look at
test_strip = cv2.cvtColor(img[:num_rows, :], cv2.COLOR_BGR2GRAY)
# polarize the values around the threshold
_, thresh = cv2.threshold(test_strip, border_threshold, 255, cv2.THRESH_BINARY)
# Are all pixels in the strip "black"
return not np.any(thresh)
@staticmethod
def firstNotBlackX(img):
first_x = np.where(np.mean(cv2.cvtColor(img, cv2.COLOR_BGR2GRAY), axis=0) > 15)[0][0]
return first_x
@staticmethod
def firstNotBlackY(img):
first_y = np.where(np.mean(cv2.cvtColor(img, cv2.COLOR_BGR2GRAY), axis=1) > 15)[0][0]
return first_y
bottom_to_ignore = 120
sleep_after_mouse = 0.2
sleep_after_key = 0.08
sleep_after_focus = 0.3
sleep_after_panning = 0.3
query_for_window = "dwarfort"
def __init__(self) -> None:
self._gridx = -999
self._gridy = -999
self._gridx_max = -999
self._gridy_max = -999
self._step_size_x = -999
self._step_size_y = -999
self._content_top = -999
self._content_bottom = -999
self._content_left = -999
self._content_right = -999
@property
def curGridPos(self) -> tuple[int, int]:
return (self._gridx, self._gridy)
@property
def curGridX(self) -> int:
return self._gridx
@property
def curGridY(self) -> int:
return self._gridy
@property
def maxGridX(self) -> int:
return self._gridx_max
@property
def maxGridY(self) -> int:
return self._gridy_max
@property
def contentWidth(self) -> int:
return int(self._content_right - self._content_left)
@property
def contentHeight(self) -> int:
return int(self._content_bottom - self._content_top)
def setGridPos(self, x: int, y: int, xstep: int = 1, ystep: int = 1):
# Are we NOT properly calibrated?
if self._gridx_max < 0 or self._gridy_max < 0 or self._gridx < 0 or self._gridy < 0:
logger.debug(
f"Grid system does seem to have been calibrated. x: {self._gridx}/{self._gridx_max}, y: {self._gridy}/{self._gridy_max}"
)
raise Exception("Not yet calibrated")
while (x, y) != self.curGridPos:
if self._gridx > x and self._gridx > 0:
# Move Left
self.sendKeys("a")
self._gridx -= 1
elif self._gridx < x and self._gridx < self._gridx_max:
# Move Right
self.sendKeys("d")
self._gridx += 1
if self._gridy > y and self._gridy > 0:
# Move Up
self.sendKeys("w")
self._gridy -= 1
elif self._gridy < y and self._gridy < self._gridy_max:
# Move Down
self.sendKeys("s")
self._gridy += 1
# time.sleep(self.sleep_after_mouse)
def sendKeys(
self,
thekey: str | int,
count: int = 1,
modifier: str | int | list[str | int] | None = None,
cycle_delay: float = 0.1,
sub_cycle_delay: float = 0.05,
custom_lookup: dict[str, int] | None = None,
):
self.focusWindow()
_sendKey(
thekey=thekey,
count=count,
modifier=modifier,
cycle_delay=self.sleep_after_key,
sub_cycle_delay=sub_cycle_delay,
custom_lookup=custom_lookup,
)
def capWindow(self, filename: str | Path | None = None, query: str = ""):
if query == "":
query = self.query_for_window
if focusWindow(query):
time.sleep(self.sleep_after_focus)
img = capActiveWindow(trim_transparent_border=True)
if filename is not None:
cv2.imwrite(str(Path(filename).resolve()), img)
return img
def focusWindow(self, query: str = ""):
if query == "":
query = self.query_for_window
return focusWindow(query)
def capContent(self, filename: str | Path | None = None, query: str = ""):
if query == "":
query = self.query_for_window
img = self.capWindow(filename=None, query=query)
img = img[self._content_top : self._content_bottom, self._content_left : self._content_right] # pyright: ignore[reportOptionalSubscript]
if filename is not None:
cv2.imwrite(str(Path(filename).resolve()), img)
return img
def calibrateGrid(self):
logger.info("Beggining grid calibration")
self.TOOLS.moveMouseAway()
time.sleep(self.sleep_after_mouse)
self.focusWindow()
time.sleep(self.sleep_after_focus)
self.sendKeys("w", 30)
self.sendKeys("a", 30)
img = self.capWindow()
self._content_left, self._content_top = self.TOOLS.find_content_origin(img)
self._content_right = img.shape[1] - self._content_left
self._content_bottom = img.shape[0] - self.bottom_to_ignore
img = img[self._content_top : self._content_bottom, self._content_left : self._content_right] # pyright: ignore[reportOptionalSubscript]
logger.debug(f"Content width {self.contentWidth}. Content height {self.contentHeight}.")
# Try to measure steps
logger.debug("Measuring step sizes")
mx1 = self.TOOLS.firstNotBlackX(img)
my1 = self.TOOLS.firstNotBlackY(img)
self.sendKeys("s")
self.sendKeys("d")
time.sleep(self.sleep_after_panning)
img = self.capContent()
mx2 = self.TOOLS.firstNotBlackX(img)
my2 = self.TOOLS.firstNotBlackY(img)
self._step_size_x = mx2 - mx1
self._step_size_y = my2 - my1
logger.info(f"Step sizes calculated: x={self._step_size_x} and y={self._step_size_y}")
self.sendKeys("w")
self.sendKeys("a")
time.sleep(self.sleep_after_panning)
img = self.capContent()
logger.debug("Measuring number of steps")
# Scroll down till top border is no longer visible
while self.TOOLS.isTopBorder(img):
self.sendKeys("s")
time.sleep(self.sleep_after_panning)
img = self.capContent()
# Bump up to reveal the top border again
self.sendKeys("w")
img = self.capContent()
time.sleep(self.sleep_after_panning)
if not self.TOOLS.isTopBorder(img):
raise Exception("Lost top of map while searching for it")
steps_down = 0
while not self.TOOLS.isBottomBorder(img):
self.sendKeys("s")
steps_down += 1
time.sleep(self.sleep_after_panning)
img = self.capContent()
# We now know how many steps the map is vertically
steps_vertical = steps_down
logger.debug(f"Map is about {steps_vertical} steps vertical. Current step is {steps_down}")
# go right until the left map edge disappears
while self.TOOLS.isLeftBorder(img):
self.sendKeys("d")
time.sleep(self.sleep_after_panning)
img = self.capContent()
# bump back left one
self.sendKeys("a")
time.sleep(self.sleep_after_panning)
img = self.capContent()
if not self.TOOLS.isLeftBorder(img):
raise Exception("Lost left of map while searching for it")
steps_right = 0
while not self.TOOLS.isRightBorder(img):
self.sendKeys("d")
steps_right += 1
time.sleep(self.sleep_after_panning)
img = self.capContent()
# And those are the horrizontal steps
steps_horrizontal = steps_right
logger.debug(f"Map is about {steps_horrizontal} steps horrizontal. Current step is {steps_right}")
self._gridx_max = steps_horrizontal
self._gridy_max = steps_vertical
self._gridx = steps_right
self._gridy = steps_down
# TODO: Use seek tests to calculate mapsize in pixels
# at (0,0) save left_edge_offset and top_edge_offset
# at (max,max) save right_edge_offset and bottom_edge_offset
# width = (contentWidth - l_e_o) + (gridyx_max * _step_size_x) - abs(r_e_o)
# | <==|====|====|==> |
# (max*size) is too far, so we subract the ofset/border from the right map edge
# Test going to 0,0
self.setGridPos(0, 0)
time.sleep(self.sleep_after_panning)
img = self.capContent()
if not (self.TOOLS.isLeftBorder(img) and self.TOOLS.isTopBorder(img)):
logger.debug("Calibration error. Not at requested upper left of map")
raise Exception("Calibration error. Not at requested upper left of map")
# Test going to (max,max)
self.setGridPos(self.maxGridX, self.maxGridY)
time.sleep(self.sleep_after_panning)
img = self.capContent()
if not (self.TOOLS.isRightBorder(img) and self.TOOLS.isBottomBorder(img)):
logger.debug("Calibration error. Not at requested lower right of map")
raise Exception("Calibration error. Not at requested lower right of map")
logger.info(
f"Grid calibration complete. Grid steps ({self._gridy_max + 1},{self._gridy_max + 1}), step sizes({self._step_size_x},{self._step_size_y})"
)
def getPanoramaMap(self):
self.calibrateGrid()
# Test getting pieces and stitching
stitcher = cv2.Stitcher.create(cv2.STITCHER_SCANS)
stitcher.setPanoConfidenceThresh(0.1) # Dont be confident
imgs_in_row = []
# Get a row
self.setGridPos(0, 0)
time.sleep(self.sleep_after_panning)
for x in range(0, self.maxGridX + 1, 3):
self.setGridPos(x, self.curGridY)
time.sleep(self.sleep_after_panning)
img = self.capContent()
if img.shape[2] == 4:
img = cv2.cvtColor(img, cv2.COLOR_BGRA2BGR)
imgs_in_row.append(img)
status, strip = stitcher.stitch(imgs_in_row)
logger.debug(f"{len(imgs_in_row)} images. {status=} {strip=} {status == cv2.Stitcher_OK}")
return None

566
src/df_tools/imgtools.py Normal file
View File

@@ -0,0 +1,566 @@
import json
import subprocess
import pydantic as pyd
import cv2
import numpy as np
from .dfwindow import DFWINDOW
from .mylogging import logger, setup_logging
# def sendKey(
# thekey: str, count: int = 1, cycle_delay: float = 0.1
# ): # | Key | KeyCode, input_ctx: Controller = Controller()):
# codes = {"w": 17, "a": 30, "s": 31, "d": 32}
# for _ in range(count):
# subprocess.run(f"ydotool key {codes[thekey]}:1 {codes[thekey]}:0", shell=True)
# if count > 1:
# time.sleep(cycle_delay)
# logger.debug(f"Tapped '{thekey}' {count} times")
# def kd_getWindowGeo(
# query: str = "dwarfort", exception_on_missing: bool = False
# ) -> tuple[tuple[int, int], tuple[int, int]]:
# try:
# result = subprocess.run(
# ["kdotool", "search", query, "getwindowgeometry"], check=True, capture_output=True, text=True
# )
# except subprocess.CalledProcessError as e:
# logger.error(f"Error: Could not find window '{query}'", e)
# return ((0, 0), (0, 0))
# gx = 0
# gy = 0
# px = 0
# py = 0
# got_pos = False
# got_geo = False
# for line in result.stdout.split("\n"):
# if m := re_window_geometry.match(line):
# gx = math.floor(float(m.group(1)))
# gy = math.floor(float(m.group(2)))
# got_geo = True
# if m := re_window_position.match(line):
# px = math.floor(float(m.group(1)))
# py = math.floor(float(m.group(2)))
# got_pos = True
# if (exception_on_missing) and not (got_geo and got_pos):
# raise ValueError("Incomplete window information", query, got_pos, got_geo)
# return ((px, py), (gx, gy))
# def kd_getDesktopForWindow(query: str = "dwarfort") -> str:
# try:
# result = subprocess.run(
# ["kdotool", "search", query, "get_desktop_for_window"], check=True, capture_output=True, text=True
# )
# except subprocess.CalledProcessError as e:
# logger.error("Could not detect the desktop of the window", query, e)
# return ""
# return result.stdout
# def focusWindow(query: str = "dwarfort") -> bool:
# """Uses kdotool to reliably focus the window on Wayland."""
# try:
# subprocess.run(["kdotool", "search", query, "windowactivate"], check=True)
# return True
# except subprocess.CalledProcessError:
# logger.error(f"Error: Could not find window '{query}'")
# return False
# def moveMouse(x: int, y: int):
# subprocess.run(["ydotool", "mousemove", "-x", str(x), "-y", str(y)], check=True)
# return
class WAYMONITOR_SIZE(pyd.BaseModel):
width: int
height: int
class WAYMONITOR_POINT(pyd.BaseModel):
x: int
y: int
class WAYMONITOR(pyd.BaseModel):
id: int
name: str
pos: WAYMONITOR_POINT
size: WAYMONITOR_SIZE
class WAYMONITORS:
def __init__(self) -> None:
_result = subprocess.run(["kscreen-doctor", "-j"], check=True, capture_output=True, text=True)
self._jobj = json.loads(_result.stdout)
logger.debug(f"All Outputs {len(self._jobj['outputs'])}")
self.active_outputs = [o for o in self._jobj["outputs"] if o["enabled"]]
logger.debug(f"Active Outputs {len(self.active_outputs)}")
self.monitors: list[WAYMONITOR] = []
self.monitor_ids = {}
self.monitor_names = {}
_max_x = 0
_max_y = 0
for o in self.active_outputs:
logger.info(
f"id: {o['id']} pos: {o['pos']['x']}, {o['pos']['y']} size: {o['size']['width']} x {o['size']['height']}"
)
# TODO: Copy over everything
monitor = WAYMONITOR(
id=int(o.get("id", -1)),
name=str(o.get("name", "")),
pos=WAYMONITOR_POINT(x=int(o["pos"]["x"]), y=int(o["pos"]["y"])),
size=WAYMONITOR_SIZE(width=int(o["size"]["width"]), height=int(o["size"]["height"])),
)
_max_x = max(_max_x, monitor.size.width + monitor.pos.x)
_max_y = max(_max_y, monitor.size.height + monitor.pos.y)
self.monitor_ids[o["id"]] = monitor
self.monitor_names[o["name"]] = monitor
self.monitors.append(monitor)
self.maximum_x = _max_x
self.maximum_y = _max_y
def pointInMonitor(self, x: int, y: int, monitor_id: int | str) -> bool:
if isinstance(monitor_id, int):
monitor = self.monitor_ids[monitor_id]
elif isinstance(monitor_id, str):
monitor = self.monitor_names[monitor_id]
else:
raise ValueError("id parameter to WAYMONITORS.pointInMonitor must be int or str")
# Return if x and y are in the ranges
return (
x >= monitor.pos.x
and x < monitor.pos.x + monitor.size.width - 1
and y > monitor.pos.y
and y < monitor.pos.y + monitor.size.height - 1
)
def absolute2relative(self, x: int, y: int) -> tuple[int, tuple[int, int]]:
for m in self.monitors:
if self.pointInMonitor(x, y, m.id):
return (m.id, (x - m.pos.x, y - m.pos.y))
return (-1, (-9999, -9999))
# def moveMouseAway():
# moveMouse(-9999, -9999)
# moveMouse(-9999, -9999)
# moveMouse(-9999, -9999)
# moveMouse(-9999, -9999)
# def kd_getMonitorLayoutInfo():
# result = subprocess.run(["kscreen-doctor", "-j"], check=True, capture_output=True, text=True)
# jobj = json.loads(result.stdout)
# logger.info(f"All Outputs {len(jobj['outputs'])}")
# active_outputs = [o for o in jobj["outputs"] if o["enabled"]]
# logger.info(f"Active Outputs {len(active_outputs)}")
# all_geo_pos = []
# for o in active_outputs:
# logger.info(
# f"id: {o['id']} pos: {o['pos']['x']}, {o['pos']['y']} size: {o['size']['width']} x {o['size']['height']}"
# )
# monitor = {}
# monitor["id"] = o["id"]
# monitor["name"] = o["name"]
# monitor["pos"] = o["pos"]
# monitor["size"] = o["size"]
# all_geo_pos.append(monitor)
# # for output in jobj["outputs"]:
# pass
# def capActiveWindow():
# with tempfile.TemporaryDirectory() as tdir:
# img_file = Path(tdir).joinpath("cap_image.png")
# subprocess.run(
# ["spectacle", "-b", "-n", "-a", "-o", str(img_file.resolve())],
# check=True,
# )
# if not img_file.exists():
# raise FileNotFoundError("captured image does not exist", img_file)
# img = cv2.imread(str(img_file.resolve()), cv2.IMREAD_UNCHANGED)
# if img is None:
# raise Exception("Unable to load image", img_file)
# alpha = img[:, :, 3]
# _, thresh = cv2.threshold(alpha, 230, 255, cv2.THRESH_BINARY)
# coords = cv2.findNonZero(thresh)
# x, y, w, h = cv2.boundingRect(coords)
# logger.debug(f"Bounding box: (x={x}, y={y}), size: {w}x{h}")
# cropped = img[y : y + h, x : x + w]
# return cropped
# def cv_find_black_border(img_in) -> tuple[int, int]:
# # grap a greyscale sliver at the top
# num_rows = 55
# num_cols = 35
# ignore_cols = 20
# test_strip = cv2.cvtColor(img_in[0:num_rows, ignore_cols:-ignore_cols], cv2.COLOR_BGR2GRAY)
# test_mean = np.mean(test_strip, axis=1)
# y_black_start = np.where(test_mean < 4)[0][0]
# logger.debug(f"{y_black_start=}")
# test_strip = cv2.cvtColor(img_in[y_black_start + 1 : -y_black_start, 0:num_cols], cv2.COLOR_BGR2GRAY)
# test_mean = np.mean(test_strip, axis=0)
# x_black_start = np.where(test_mean < 4)[0][0]
# logger.debug(f"{x_black_start=}")
# return (x_black_start, y_black_start)
# def cv_find_right_black(img, num_columns: int = 20) -> bool:
# test_strip = cv2.cvtColor(img[:, -num_columns:], cv2.COLOR_BGR2GRAY)
# _, thresh = cv2.threshold(test_strip, 10, 255, cv2.THRESH_BINARY)
# return not np.any(thresh)
# def cv_find_left_black(img, num_columns: int = 20) -> bool:
# test_strip = cv2.cvtColor(img[:, :num_columns], cv2.COLOR_BGR2GRAY)
# _, thresh = cv2.threshold(test_strip, 10, 255, cv2.THRESH_BINARY)
# return not np.any(thresh)
# def cv_find_bottom_black(img, num_rows: int = 20) -> bool:
# test_strip = cv2.cvtColor(img[-num_rows:, :], cv2.COLOR_BGR2GRAY)
# _, thresh = cv2.threshold(test_strip, 10, 255, cv2.THRESH_BINARY)
# return not np.any(thresh)
# def cv_find_top_black(img, num_rows: int = 20) -> bool:
# test_strip = cv2.cvtColor(img[:num_rows, :], cv2.COLOR_BGR2GRAY)
# _, thresh = cv2.threshold(test_strip, 10, 255, cv2.THRESH_BINARY)
# return not np.any(thresh)
# def cv_trim_windowframe(img_in):
# # find Y that isnt all black
# y_black_start = 0
# for y in range(img_in.shape[0]):
# if np.any(img_in[y]):
# y_blank_end = y
# break
# logger.debug(f"{y_blank_end=}")
# # find X that isnt all black
# x_blank_end = 0
# for x in range(img_in.shape[1]):
# if np.any(img_in[:, x]):
# x_blank_end = x
# break
# logger.debug(f"{x_blank_end=}")
# # Find bottom end of blank
# logger.info(f"Blank borders are {x_blank_end} wide and {y_blank_end} tall")
# # top_strip = img_in[5:55, 20:-20]
# # for y in range(5,img_in.shape[0]):
# # if not np.any(img_in[y, 20:-20]):
# # top_border = y
# def cv_image_diff(img1, img2):
# if img1.shape != img2.shape:
# return float("inf")
# grey1 = cv2.cvtColor(img1, cv2.COLOR_BGR2GRAY)
# grey2 = cv2.cvtColor(img2, cv2.COLOR_BGR2GRAY)
# # Apparently this is the Mean Squared Error (MES)
# err = np.sum((grey1.astype("float") - grey2.astype("float")) ** 2)
# err /= float(grey1.shape[0] * grey1.shape[0])
# return err
def test5():
dfw = DFWINDOW()
map_image = dfw.getPanoramaMap()
# img = dfw.capWindow()[60:-60,20:-20]
# gg = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# mm = np.mean(gg, axis=1)
# dt = img
# logger.debug(f"{img=}")
# def test4():
# bottom_to_ignore = 120
# save_dir = Path()
# image_name_base = "grid_base_"
# image_name_ext = "png"
# moveMouseAway() # Prob best to move mouse then focus
# focusWindow("dwarfort")
# logger.debug("Window focused and mouse moved away.")
# # Configure the croppin area
# # slam to upper left
# sendKey("w", 30)
# sendKey("a", 30)
# time.sleep(1)
# img = capActiveWindow()
# i_left, i_top = cv_find_black_border(img)
# i_bottom = img.shape[0] - bottom_to_ignore
# i_right = img.shape[1] - i_left
# img = img[i_top:i_bottom, i_left:i_right]
# # go down till the top of the map scrolls off
# while cv_find_top_black(img):
# sendKey("s")
# img = capActiveWindow()[i_top:i_bottom, i_left:i_right]
# sendKey("w")
# img = capActiveWindow()[i_top:i_bottom, i_left:i_right]
# if not cv_find_top_black(img):
# raise Exception("Lost top of map while searching for it")
# steps_down = 0
# while not cv_find_bottom_black(img):
# sendKey("s")
# steps_down += 1
# img = capActiveWindow()[i_top:i_bottom, i_left:i_right]
# steps_vertical = steps_down
# logger.debug(f"Map is about {steps_vertical} steps vertical. Current step is {steps_down}")
# # go right until the left scrolls off
# while cv_find_left_black(img):
# sendKey("d")
# img = capActiveWindow()[i_top:i_bottom, i_left:i_right]
# sendKey("a")
# img = capActiveWindow()[i_top:i_bottom, i_left:i_right]
# if not cv_find_left_black(img):
# raise Exception("Lost left of map while searching for it")
# steps_right = 0
# while not cv_find_right_black(img):
# sendKey("d")
# steps_right += 1
# img = capActiveWindow()[i_top:i_bottom, i_left:i_right]
# steps_horrizontal = steps_right
# logger.debug(f"Map is about {steps_horrizontal} steps horrizontal. Current step is {steps_right}")
# def test3():
# bottom_to_ignore = 120
# save_dir = Path()
# image_name_base = "grid_base_"
# image_name_ext = "png"
# taps_to_reset_x = 30
# taps_to_reset_y = 30
# moveMouseAway() # Prob best to move mouse then focus
# focusWindow("dwarfort")
# logger.debug("Window focused and mouse moved away.")
# # Configure the croppin area
# # slam to upper left
# sendKey("w", taps_to_reset_x)
# sendKey("a", taps_to_reset_y)
# time.sleep(1)
# img = capActiveWindow()
# i_left, i_top = cv_find_black_border(img)
# i_bottom = img.shape[0] - bottom_to_ignore
# i_right = img.shape[1] - i_left
# # Measure size of map
# img = img[i_top:i_bottom, i_left:i_right]
# # Ensure we can see the left edge of the map
# if not (cv_find_left_black(img) and cv_find_top_black(img)):
# raise Exception("Something is wrong, we should be looking past the upper left edge of map")
# steps_right = 0
# while not cv_find_right_black(img):
# sendKey("d", 2)
# steps_right += 2
# time.sleep(1)
# img = capActiveWindow()[i_top:i_bottom, i_left:i_right]
# logger.debug(f"Total of {steps_right} steps to right border")
# # Reset to top, and count down
# sendKey("w", 5)
# time.sleep(1)
# img = capActiveWindow()[i_top:i_bottom, i_left:i_right]
# if not (cv_find_right_black(img) and cv_find_top_black(img)):
# raise Exception("Something is wrong, we should be looking past the upper right edge of map")
# steps_down = 0
# while not cv_find_bottom_black(img):
# sendKey("s", 2)
# steps_down += 2
# time.sleep(1)
# img = capActiveWindow()[i_top:i_bottom, i_left:i_right]
# logger.debug(f"Total of {steps_down} steps to bottom border")
# # Now move back to grid 0,0
# # cur_grid = [steps_right, steps_down]
# # tgt_grid = (0,0)
# # while cur_grid != tgt_grid:
# # if cur_grid[0] > tgt_grid[0] and cur_grid[0] > 0:
# # sendKey("a")
# # cur_grid[0] = cur_grid[0] - 1
# # elif cur_grid[0] < tgt_grid[0] and cur_grid[0] < steps_
# def test2():
# bottom_to_ignore = 120
# save_dir = Path()
# image_name_base = "grid_base_"
# image_name_ext = "png"
# # find size and shape of df
# window_pos, window_geo = getWindowGeo("dwarfort", True)
# logger.debug(f"Pos: {window_pos} Geo: {window_geo}")
# moveMouseAway() # Prob best to move mouse then focus
# focusWindow("dwarfort")
# logger.debug("Window focused and mouse moved away.")
# time.sleep(1)
# taps_to_reset_x = 18
# taps_to_reset_y = 18
# logger.debug(f"Resetting map view to upper left. {taps_to_reset_y}x'w' and {taps_to_reset_x}x'a'")
# sendKey("w", taps_to_reset_x)
# sendKey("a", taps_to_reset_y)
# time.sleep(1)
# first_image = capActiveWindow()
# logger.debug(f"Took screen shot. {first_image.shape[1]} x {first_image.shape[0]}")
# # skip the next capture after this
# skip_capture = True
# cv_find_black_border(first_image)
# i_left, i_top = cv_find_black_border(first_image)
# i_bottom = first_image.shape[0] - bottom_to_ignore
# i_right = first_image.shape[1] - i_left
# logger.debug(f"Local Box: ({i_left},{i_top}) -- ({i_right},{i_bottom})")
# taps_per_grid_x = 4
# last_x = -1
# for x in range(8):
# logger.debug(f"Processing grid piece {x}, (?)")
# if not skip_capture:
# old_img = img # type: ignore # noqa: F821
# img = capActiveWindow()
# logger.debug(f"Took screen shot. {img.shape[1]} x {img.shape[0]}")
# # Compare to the old image to see if we even moved.
# mes = cv_image_diff(old_img, img)
# logger.debug(f"{mes=}")
# if mes < 5:
# break
# else:
# skip_capture = False
# img = first_image
# crop = img[i_top:i_bottom, i_left:i_right]
# found_left_margin = cv_find_left_black(crop[15:-15])
# found_right_margin = cv_find_right_black(crop[15:-15])
# found_top_margin = cv_find_top_black(crop[:, 15:-15])
# found_bottom_margin = cv_find_bottom_black(crop[:, 15:-15])
# logger.debug(f"{found_left_margin=} {found_right_margin=} {found_top_margin=} {found_bottom_margin=}")
# savedfile = save_dir.joinpath(f"{image_name_base}{x}.{image_name_ext}")
# cv2.imwrite(str(savedfile), crop)
# logger.info(f"Saved grid piece {str(savedfile)}")
# sendKey("d", taps_per_grid_x)
# logger.debug(f"Tapped {taps_per_grid_x} 'd' to move right")
# last_x = x
# time.sleep(1)
# def test1():
# time.sleep(0.1)
# window_pos, window_geo = getWindowGeo("dwarfort", True)
# logger.info(f"Pos: {window_pos} Geo: {window_geo}")
# logger.info(f"{kd_getDesktopForWindow('dwarfort')}")
# # time.sleep(1)
# # kd_focusWindow("dwarfort")
# # subprocess.run(["ydotool", "mousemove", "-a", "0", "0"], check=True)
# # subprocess.run(["ydotool", "mousemove", str(window_pos[0]), str(window_pos[1])], check=True)
# waym = WAYMONITORS()
# logger.debug(f"1: {waym.pointInMonitor(10, 10, 1)}")
# logger.debug(f"2: {waym.pointInMonitor(10, 10, 2)}")
# m_id, rpos = waym.absolute2relative(1239, 1238)
# logger.debug(f"10,10: #{m_id} {rpos[0]}, {rpos[1]}")
# if 1 == 0:
# focusWindow("dwarfort")
# time.sleep(1)
# moveMouseAway()
# # f"{window_pos[0]},{window_pos[1]} {window_geo[0]}x{window_geo[1]}",
# img = capActiveWindow()
# else:
# img = cv2.imread("./test_img.png", cv2.IMREAD_UNCHANGED)
# alpha = img[:, :, 3]
# _, thresh = cv2.threshold(alpha, 230, 255, cv2.THRESH_BINARY)
# coords = cv2.findNonZero(thresh)
# x, y, w, h = cv2.boundingRect(coords)
# logger.debug(f"Bounding box: (x={x}, y={y}), size: {w}x{h}")
# img = img[y : y + h, x : x + w]
# left_x, top_y = cv_find_black_border(img)
# right_x = left_x
# bottom_y = 120
# clean_section = img[top_y:-bottom_y, left_x:-right_x]
# logger.debug(f"{clean_section.shape[1]} x {clean_section.shape[0]}")
# # subprocess.run(["xdg-open", "./test_img.png"], check=True)
# # kd_getMonitorLayoutInfo()
# # for i in range(3):
# # # tap("w")
# # tap("d")
# # time.sleep(1)
# # kd_movemouse(-99999, -99999)
# # kd_movemouse(-99999, -99999)
# # kd_movemouse(-99999, -99999)
# # kd_movemouse(10, 800)
# # kd_movemouse(99999, 0)
# # kd_movemouse(10, 0)
# # kd_movemouse(-999, -999)
# # kd_movemouse(200, 200)
# # for i in range(2):
# # kd_movemouse(100, 100)
# # time.sleep(0.1)
# sys.exit()
# # filename = os.path.join(TILES_DIR, f"tile_{i:03d}.png")
if __name__ == "__main__":
setup_logging(level="DEBUG", enqueue=False, console_show_time=False, console_tracebacks=True)
test5()

157
src/df_tools/mylogging.py Normal file
View File

@@ -0,0 +1,157 @@
import datetime
import logging as _logging
from pathlib import Path
from loguru import logger
from rich.console import Console
from rich.logging import RichHandler
"""
Setup logging to use loguru for logging, with rich for pretty console output.
Intercepts standard logging calls.
"""
console = Console()
# "<green>{time:MM-DD HH:mm:ss.SSS}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
def setup_logging(
level: int | str = _logging.INFO,
console_level: int | str | None = None,
log_file: str | Path | None = None,
format_console: str | None = "{message}",
format_logs: str | None = "{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | {name}:{function}:{line} - {message}",
console_locals: bool = False,
enqueue: bool = True,
console_tracebacks: bool = False,
console_markup: bool = True,
console_omit_repeated_times: bool = False,
console_show_time: bool = True,
console_show_path: bool = False,
loguru_tracebacks: bool = False,
loguru_locals: bool = False,
loguru_rotation: str | int | datetime.time | datetime.timedelta | None = "10 MB",
loguru_retention: str | int | datetime.timedelta | None = "10 days",
loguru_compression: str = "gz", # "gz", "bz2", "xz", "lzma", "tar", "tar.gz", "tar.bz2", "tar.xz", "zip"
loguru_watch_files: bool = False,
loguru_delay_file_creation: bool = False,
) -> None:
"""Setup logging with loguru and rich.
Disclosure:
Most of this code is adapted from online examples, loguru documentation, and Gemini's
suggestions. I am not an expert in logging, loguru, or rich.
"""
# Intercept standard logging
class InterceptHandler(_logging.Handler):
def emit(self, record: _logging.LogRecord):
try:
level = logger.level(record.levelname).name
except ValueError:
level = record.levelno # type: ignore
frame, depth = _logging.currentframe(), 2
while frame and frame.f_code.co_filename == _logging.__file__:
frame = frame.f_back # type: ignore
depth += 1
logger.opt(depth=depth, exception=record.exc_info).log(level, record.getMessage())
logger.remove()
_logging.basicConfig(handlers=[InterceptHandler()], level=0, force=True)
# Add console handler
# Hanlder for my console output
handler_console = RichHandler(
console=console,
level=console_level if console_level is not None else level,
show_level=True,
show_path=console_show_path,
show_time=console_show_time,
rich_tracebacks=console_tracebacks,
tracebacks_show_locals=console_locals,
markup=console_markup,
omit_repeated_times=console_omit_repeated_times,
)
# handler_console.setFormatter(_logging.Formatter(format_console))
logger.add(
handler_console,
format=str(format_console),
level=console_level if console_level is not None else level,
backtrace=console_tracebacks,
diagnose=console_locals,
enqueue=enqueue,
)
# Add log file handler
if log_file:
logger.add(
log_file,
level=level,
format=str(format_logs),
rotation=loguru_rotation,
retention=loguru_retention,
compression=loguru_compression,
enqueue=enqueue,
backtrace=loguru_tracebacks,
diagnose=loguru_locals,
watch=loguru_watch_files,
delay=loguru_delay_file_creation,
)
# def setup_logging(
# level: Union[int, str] = _logging.INFO,
# log_format: str = "<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | "
# "<level>{level: <8}</level> | "
# "<cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - "
# "<level>{message}</level>",
# date_format: str = "YYYY-MM-DD HH:mm:ss.SSS",
# log_file: str | None = None,
# file_log_level: Union[int, str] = _logging.DEBUG,
# file_log_format: str = "{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | {name}:{function}:{line} - {message}",
# file_date_format: str = "YYYY-MM-DD HH:mm:ss.SSS",
# ) -> None:
# class InterceptHandler(_logging.Handler):
# def emit(self, record: _logging.LogRecord) -> None:
# try:
# level = logger.level(record.levelname).name
# except ValueError:
# level = record.levelno
# frame, depth = _logging.currentframe(), 2
# while frame.f_code.co_filename == _logging.__file__:
# frame = frame.f_back # type: ignore[union-attr]
# depth += 1
# logger.opt(depth=depth, exception=record.exc_info).log(level, record.getMessage())
# _logging.root.handlers = [InterceptHandler()]
# _logging.root.setLevel(level)
# rich_handler = RichHandler(
# rich_tracebacks=True,
# tracebacks_show_locals=True,
# markup=True,
# log_time_format=date_format,
# )
# rich_handler.setLevel(level)
# rich_handler.setFormatter(_logging.Formatter(log_format))
# _logging.getLogger().addHandler(rich_handler)
# if log_file:
# logger.add(
# log_file,
# level=file_log_level,
# format=file_log_format,
# datefmt=file_date_format,
# rotation="10 MB",
# retention="10 days",
# compression="zip",
# )

238
src/df_tools/waytools.py Normal file
View File

@@ -0,0 +1,238 @@
import math
import re
import subprocess
import tempfile
import time
from pathlib import Path
import cv2
from loguru import logger
__all__ = ["sendKey", "getWindowGeo", "focusWindow", "moveMouse", "capActiveWindow"]
_re_window_position = re.compile(r"^\s*Position:\s*([\d\.]+)\s*,\s*([\d\.]+)")
_re_window_geometry = re.compile(r"^\s*Geometry:\s*([\d\.]+)\s*x\s*([\d\.]+)")
### Begin LLM generated
# ydotool / uinput keycode mapping
KEY_TO_CODE = {
# Letters
"a": 30,
"b": 48,
"c": 46,
"d": 32,
"e": 18,
"f": 33,
"g": 34,
"h": 35,
"i": 23,
"j": 36,
"k": 37,
"l": 38,
"m": 50,
"n": 49,
"o": 24,
"p": 25,
"q": 16,
"r": 19,
"s": 31,
"t": 20,
"u": 22,
"v": 47,
"w": 17,
"x": 45,
"y": 21,
"z": 44,
# Numbers (Row)
"1": 2,
"2": 3,
"3": 4,
"4": 5,
"5": 6,
"6": 7,
"7": 8,
"8": 9,
"9": 10,
"0": 11,
# Control Keys
"esc": 1,
"backspace": 14,
"tab": 15,
"enter": 28,
"space": 57,
"capslock": 58,
"menu": 139,
"delete": 111,
"insert": 110,
# Modifiers
"l_ctrl": 29,
"r_ctrl": 97,
"l_shift": 42,
"r_shift": 54,
"l_alt": 56,
"r_alt": 100,
"super": 125, # Windows/Meta key
"meta": 125,
# Navigation
"up": 103,
"down": 108,
"left": 105,
"right": 106,
"pageup": 104,
"pagedown": 109,
"home": 102,
"end": 107,
# Punctuation
"minus": 12,
"equal": 13,
"leftbrace": 26,
"rightbrace": 27,
"semicolon": 39,
"apostrophe": 40,
"grave": 41, # `
"backslash": 43,
"comma": 51,
"dot": 52,
"slash": 53,
}
CODE_TO_KEY = {v: k for k, v in KEY_TO_CODE.items()}
### End LLM generated
def sendKey(
thekey: str | int,
count: int = 1,
modifier: str | int | list[str | int] | None = None,
cycle_delay: float = 0.1,
sub_cycle_delay: float = 0.05,
custom_lookup: dict[str, int] | None = None,
):
# Include a custom lookup if provided
if custom_lookup is not None and isinstance(custom_lookup, dict):
_KEY_TO_CODE = KEY_TO_CODE | custom_lookup
else:
_KEY_TO_CODE = KEY_TO_CODE
# Handle keycode for main key
if isinstance(thekey, str):
keycode = _KEY_TO_CODE[thekey.lower()]
elif isinstance(thekey, int):
keycode = thekey
else:
raise ValueError("sendKey's 'thekey' must be str or int")
pass
# We MUST 'release' any modifiers we 'press'
usedmods: list[int] = []
try:
if modifier is not None:
pre_modlist = modifier if isinstance(modifier, list) else [modifier]
modlist = [mkc if isinstance(mkc, int) else _KEY_TO_CODE[mkc.lower()] for mkc in pre_modlist]
for mod in modlist:
subprocess.run(["ydotool", "key", f"{mod}:1"], check=True)
usedmods.append(mod)
time.sleep(sub_cycle_delay)
for _ in range(count):
subprocess.run(["ydotool", "key", f"{keycode}:1", f"{keycode}:0"], check=True)
time.sleep(cycle_delay)
finally:
# Release everything we pressed in reverse orderr
for mod in reversed(usedmods):
try:
subprocess.run(["ydotool", "key", f"{mod}:0"], check=True)
time.sleep(sub_cycle_delay)
except Exception:
logger.critical(f"Modifier {mod} is stuck down!")
def getWindowGeo(query: str, exception_on_missing: bool = True) -> tuple[tuple[int, int], tuple[int, int]]:
try:
result = subprocess.run(
["kdotool", "search", query, "getwindowgeometry"], check=True, capture_output=True, text=True
)
except subprocess.CalledProcessError as e:
logger.error(f"Error: Could not find window '{query}'", e)
return ((-999, -999), (-999, -999))
geo_x = 0
geo_y = 0
pos_x = 0
pos_y = 0
got_pos = False
got_geo = False
for line in result.stdout.split("\n"):
if m := _re_window_geometry.match(line):
geo_x = math.floor(float(m.group(1)))
geo_y = math.floor(float(m.group(2)))
got_geo = True
elif m := _re_window_position.match(line):
pos_x = math.floor(float(m.group(1)))
pos_y = math.floor(float(m.group(2)))
got_pos = True
if not (got_geo and got_pos):
if exception_on_missing:
raise ValueError("Incomplete window information", query, got_pos, got_geo)
else:
logger.warning(f"getwindowgeometry returned partial information. {got_geo=} {got_pos=}")
return ((pos_x, pos_y), (geo_x, geo_y))
def focusWindow(query: str) -> bool:
try:
subprocess.run(["kdotool", "search", query, "windowactivate"], check=True)
return True
except subprocess.CalledProcessError:
logger.error(f"Could not find window '{query}'")
return False
def moveMouse(x: int, y: int) -> bool:
try:
subprocess.run(["ydotool", "mousemove", "-x", str(x), "-y", str(y)], check=True)
return True
except subprocess.CalledProcessError as e:
logger.error(f"Error while using ydotool to move mouse ({x},{y}). {e}")
return False
# TODO: Add getMonitorLayoutInfo
def capActiveWindow_cv2(trim_transparent_border: bool = False):
_alpha_threshold = 230
with tempfile.TemporaryDirectory() as tdir:
img_file = Path(tdir).joinpath("cap_image.png")
# TODO: Handle different capture software
capture_cmd = ["spectacle", "-b", "-n", "-a", "-o", str(img_file.resolve())]
subprocess.run(capture_cmd, check=True)
if not img_file.exists():
raise FileNotFoundError("Temporary file of captured image does not exist", img_file)
img = cv2.imread(str(img_file.resolve()), cv2.IMREAD_UNCHANGED)
if img is None:
raise Exception("Unable to load image", img_file)
if trim_transparent_border:
alpha = img[:, :, 3]
_, thresh = cv2.threshold(alpha, _alpha_threshold, 255, cv2.THRESH_BINARY)
x, y, w, h = cv2.boundingRect(cv2.findNonZero(thresh))
logger.trace(f"Bounding box: (x={x}, y={y}), size: {w}x{h}")
return img[y : y + h, x : x + w]
return img
def capActiveWindow(use_opencv: bool = True, trim_transparent_border: bool = False):
if use_opencv:
return capActiveWindow_cv2(trim_transparent_border=trim_transparent_border)
raise NotImplementedError("opencv is the only graphics backend implemented currently")