Merge branch 'main' into fix/transparency-luminance-scaling
Lint & Format (JS/TS) / lint-format (pull_request) Successful in 12s
Python / lint-format (pull_request) Successful in 23s
Python / test (pull_request) Successful in 51s
Lint & Format (Rust) / lint-format (pull_request) Successful in 1m47s

This commit is contained in:
2026-05-24 18:53:39 +02:00
18 changed files with 931 additions and 71 deletions
+82 -41
View File
@@ -1,41 +1,82 @@
// pragma Singleton pragma Singleton
//
// import Quickshell import Quickshell
// import QtQuick import Quickshell.Io
// import QtQuick
// Singleton {
// id: root Singleton {
// id: root
// function start(extraArgs = []): void {
// needsStart = true; readonly property alias elapsed: props.elapsed
// startArgs = extraArgs; property bool needsPause
// checkProc.running = true; property bool needsStart
// } property bool needsStop
// readonly property alias paused: props.paused
// PersistentProperties { readonly property alias running: props.running
// id: props property list<string> startArgs
//
// property real elapsed: 0 function start(extraArgs = []): void {
// property bool paused: false needsStart = true;
// property bool running: false startArgs = extraArgs;
// checkProc.running = true;
// reloadableId: "recorder" }
// }
// function stop(): void {
// Process { needsStop = true;
// id: checkProc checkProc.running = true;
// }
// command: ["pidof", "gpu-screen-recorder"]
// running: true function togglePause(): void {
// needsPause = true;
// onExited: code => { checkProc.running = true;
// props.running = code === 0; }
//
// if (code === 0) { PersistentProperties {
// if (root.needsStop) { id: props
// Quickshell.execDetached(["zshell-cli"]);
// } property real elapsed: 0
// } property bool paused: false
// } property bool running: false
// }
// } reloadableId: "recorder"
}
Process {
id: checkProc
command: ["pidof", "gpu-screen-recorder"]
running: true
onExited: code => {
props.running = code === 0;
if (code === 0) {
if (root.needsStop) {
Quickshell.execDetached(["zshell-cli", "record", "record"]);
props.running = false;
props.paused = false;
} else if (root.needsPause) {
Quickshell.execDetached(["zshell-cli", "record", "record", "-p"]);
props.paused = !props.paused;
}
} else if (root.needsStart) {
Quickshell.execDetached(["zshell-cli", "record", "record", ...root.startArgs]);
props.running = true;
props.paused = false;
props.elapsed = 0;
}
root.needsStart = false;
root.needsStop = false;
root.needsPause = false;
}
}
Connections {
function onSecondsChanged(): void {
props.elapsed++;
}
target: Time // qmllint disable incompatible-type
}
}
@@ -0,0 +1,290 @@
pragma ComponentBehavior: Bound
import Quickshell
import QtQuick
import QtQuick.Layouts
import qs.Components
import qs.Config
import qs.Helpers
CustomRect {
id: root
required property var props
required property PersistentProperties visibilities
Layout.fillWidth: true
color: DynamicColors.tPalette.m3surfaceContainer
implicitHeight: layout.implicitHeight + layout.anchors.margins * 2
radius: Appearance.rounding.smallest
ColumnLayout {
id: layout
anchors.fill: parent
anchors.margins: Appearance.padding.large
spacing: Appearance.spacing.normal
RowLayout {
spacing: Appearance.spacing.normal
z: 1
CustomRect {
color: Recorder.running ? DynamicColors.palette.m3secondary : DynamicColors.palette.m3secondaryContainer
implicitHeight: {
const h = icon.implicitHeight + Appearance.padding.smaller * 2;
return h - (h % 2);
}
implicitWidth: implicitHeight
radius: Appearance.rounding.full
MaterialIcon {
id: icon
anchors.centerIn: parent
anchors.horizontalCenterOffset: -0.5
anchors.verticalCenterOffset: 1.5
color: Recorder.running ? DynamicColors.palette.m3onSecondary : DynamicColors.palette.m3onSecondaryContainer
font.pointSize: Appearance.font.size.large
text: "screen_record"
}
}
ColumnLayout {
Layout.fillWidth: true
spacing: 0
CustomText {
Layout.fillWidth: true
elide: Text.ElideRight
font.pointSize: Appearance.font.size.normal
text: qsTr("Screen Recorder")
}
CustomText {
Layout.fillWidth: true
color: DynamicColors.palette.m3onSurfaceVariant
elide: Text.ElideRight
font.pointSize: Appearance.font.size.small
text: Recorder.paused ? qsTr("Recording paused") : Recorder.running ? qsTr("Recording running") : qsTr("Recording off")
}
}
CustomSplitButton {
active: menuItems.find(m => root.props.recordingMode === m.icon + m.text) ?? menuItems[0]
disabled: Recorder.running
menuItems: [
MenuItem {
activeText: qsTr("Fullscreen")
icon: "fullscreen"
text: qsTr("Record fullscreen")
onClicked: Recorder.start()
},
MenuItem {
activeText: qsTr("Region")
icon: "screenshot_region"
text: qsTr("Record region")
onClicked: Recorder.start(["-r"])
},
MenuItem {
activeText: qsTr("Fullscreen")
icon: "select_to_speak"
text: qsTr("Record fullscreen with sound")
onClicked: Recorder.start(["-s"])
},
MenuItem {
activeText: qsTr("Region")
icon: "volume_up"
text: qsTr("Record region with sound")
onClicked: Recorder.start(["-s", "-r"])
}
]
menu.onItemSelected: item => root.props.recordingMode = item.icon + item.text
}
}
Loader {
id: listOrControls
property bool running: Recorder.running
Layout.fillWidth: true
Layout.preferredHeight: implicitHeight
asynchronous: true
sourceComponent: running ? recordingControls : recordingList
Behavior on Layout.preferredHeight {
id: locHeightAnim
enabled: false
Anim {
}
}
Behavior on running {
SequentialAnimation {
ParallelAnimation {
Anim {
duration: Appearance.anim.durations.small
easing: Appearance.anim.curves.standardAccel
property: "scale"
target: listOrControls
to: 0.7
}
Anim {
duration: Appearance.anim.durations.small
easing: Appearance.anim.curves.standardAccel
property: "opacity"
target: listOrControls
to: 0
}
}
PropertyAction {
property: "enabled"
target: locHeightAnim
value: true
}
PropertyAction {
}
PropertyAction {
property: "enabled"
target: locHeightAnim
value: false
}
ParallelAnimation {
Anim {
duration: Appearance.anim.durations.small
easing: Appearance.anim.curves.standardDecel
property: "scale"
target: listOrControls
to: 1
}
Anim {
duration: Appearance.anim.durations.small
easing: Appearance.anim.curves.standardDecel
property: "opacity"
target: listOrControls
to: 1
}
}
}
}
}
}
Component {
id: recordingList
RecordingList {
props: root.props
visibilities: root.visibilities
}
}
Component {
id: recordingControls
RowLayout {
spacing: Appearance.spacing.normal
CustomRect {
color: Recorder.paused ? DynamicColors.palette.m3tertiary : DynamicColors.palette.m3error
implicitHeight: recText.implicitHeight + Appearance.padding.smaller * 2
implicitWidth: recText.implicitWidth + Appearance.padding.normal * 2
radius: Appearance.rounding.full
Behavior on implicitWidth {
Anim {
}
}
SequentialAnimation on opacity {
alwaysRunToEnd: true
loops: Animation.Infinite
running: !Recorder.paused
Anim {
duration: Appearance.anim.durations.large
easing: Appearance.anim.curves.emphasizedAccel
from: 1
to: 0
}
Anim {
duration: Appearance.anim.durations.extraLarge
easing: Appearance.anim.curves.emphasizedDecel
from: 0
to: 1
}
}
CustomText {
id: recText
anchors.centerIn: parent
animate: true
color: Recorder.paused ? DynamicColors.palette.m3onTertiary : DynamicColors.palette.m3onError
font.family: Appearance.font.family.mono
text: Recorder.paused ? "PAUSED" : "REC"
}
}
CustomText {
font.pointSize: Appearance.font.size.normal
text: {
const elapsed = Recorder.elapsed;
const hours = Math.floor(elapsed / 3600);
const mins = Math.floor((elapsed % 3600) / 60);
const secs = Math.floor(elapsed % 60).toString().padStart(2, "0");
let time;
if (hours > 0)
time = `${hours}:${mins.toString().padStart(2, "0")}:${secs}`;
else
time = `${mins}:${secs}`;
return qsTr("Recording for %1").arg(time);
}
}
Item {
Layout.fillWidth: true
}
IconButton {
checked: Recorder.paused
font.pointSize: Appearance.font.size.large
icon: Recorder.paused ? "play_arrow" : "pause"
label.animate: true
toggle: true
type: IconButton.Tonal
onClicked: {
Recorder.togglePause();
internalChecked = Recorder.paused;
}
}
IconButton {
font.pointSize: Appearance.font.size.large
icon: "stop"
inactiveColour: DynamicColors.palette.m3error
inactiveOnColour: DynamicColors.palette.m3onError
onClicked: Recorder.stop()
}
}
}
}
@@ -0,0 +1,226 @@
pragma ComponentBehavior: Bound
import QtQuick
import QtQuick.Layouts
import Quickshell
import Quickshell.Widgets
import ZShell.Models
import qs.Components
import qs.Helpers
import qs.Paths
import qs.Config
ColumnLayout {
id: root
required property var props
required property PersistentProperties visibilities
spacing: 0
WrapperMouseArea {
Layout.fillWidth: true
cursorShape: Qt.PointingHandCursor
onClicked: root.props.recordingListExpanded = !root.props.recordingListExpanded
RowLayout {
spacing: Appearance.spacing.smaller
MaterialIcon {
Layout.alignment: Qt.AlignVCenter
font.pointSize: Appearance.font.size.large
text: "list"
}
CustomText {
Layout.alignment: Qt.AlignVCenter
Layout.fillWidth: true
font.pointSize: Appearance.font.size.normal
text: qsTr("Recordings")
}
IconButton {
icon: root.props.recordingListExpanded ? "unfold_less" : "unfold_more"
label.animate: true
type: IconButton.Text
onClicked: root.props.recordingListExpanded = !root.props.recordingListExpanded
}
}
}
CustomListView {
id: list
Layout.fillWidth: true
Layout.rightMargin: -Appearance.spacing.small
clip: true
implicitHeight: (Appearance.font.size.larger + Appearance.padding.small) * (root.props.recordingListExpanded ? 10 : 3)
CustomScrollBar.vertical: CustomScrollBar {
flickable: list
}
add: Transition {
Anim {
from: 0
property: "opacity"
to: 1
}
Anim {
from: 0.5
property: "scale"
to: 1
}
}
delegate: RowLayout {
id: recording
property string baseName
required property FileSystemEntry modelData
anchors.left: list.contentItem.left
anchors.right: list.contentItem.right
anchors.rightMargin: Appearance.spacing.small
spacing: Appearance.spacing.small / 2
Component.onCompleted: baseName = modelData.baseName
CustomText {
Layout.fillWidth: true
Layout.rightMargin: Appearance.spacing.small / 2
color: DynamicColors.palette.m3onSurfaceVariant
elide: Text.ElideRight
text: {
const time = recording.baseName;
const matches = time.match(/^recording_(\d{4})(\d{2})(\d{2})_(\d{2})-(\d{2})-(\d{2})/);
if (!matches)
return time;
const date = new Date(...matches.slice(1));
date.setMonth(date.getMonth() - 1);
return qsTr("Recording at %1").arg(Qt.formatDateTime(date, Qt.locale()));
}
}
IconButton {
icon: "play_arrow"
type: IconButton.Text
onClicked: {
root.visibilities.sidebar = false;
Quickshell.execDetached(["app2unit", "--", ...Config.general.apps.playback, recording.modelData.path]);
}
}
IconButton {
icon: "folder"
type: IconButton.Text
onClicked: {
root.visibilities.sidebar = false;
Quickshell.execDetached(["app2unit", "--", ...Config.general.apps.explorer, recording.modelData.path]);
}
}
}
displaced: Transition {
Anim {
properties: "opacity,scale"
to: 1
}
Anim {
property: "y"
}
}
Behavior on implicitHeight {
Anim {
}
}
model: FileSystemModel {
nameFilters: ["recording_*.mp4"]
path: Paths.recsdir
sortReverse: true
}
remove: Transition {
Anim {
property: "opacity"
to: 0
}
Anim {
property: "scale"
to: 0.5
}
}
Loader {
active: opacity > 0
anchors.centerIn: parent
asynchronous: true
opacity: list.count === 0 ? 1 : 0
Behavior on opacity {
Anim {
}
}
sourceComponent: ColumnLayout {
spacing: Appearance.spacing.small
MaterialIcon {
Layout.alignment: Qt.AlignHCenter
Layout.preferredHeight: root.props.recordingListExpanded ? implicitHeight : 0
color: DynamicColors.palette.m3outline
font.pointSize: Appearance.font.size.extraLarge
opacity: root.props.recordingListExpanded ? 1 : 0
scale: root.props.recordingListExpanded ? 1 : 0
text: "scan_delete"
Behavior on Layout.preferredHeight {
Anim {
}
}
Behavior on opacity {
Anim {
}
}
Behavior on scale {
Anim {
}
}
}
RowLayout {
spacing: Appearance.spacing.smaller
MaterialIcon {
Layout.alignment: Qt.AlignHCenter
Layout.preferredWidth: !root.props.recordingListExpanded ? implicitWidth : 0
color: DynamicColors.palette.m3outline
opacity: !root.props.recordingListExpanded ? 1 : 0
scale: !root.props.recordingListExpanded ? 1 : 0
text: "scan_delete"
Behavior on Layout.preferredWidth {
Anim {
}
}
Behavior on opacity {
Anim {
}
}
Behavior on scale {
Anim {
}
}
}
CustomText {
color: DynamicColors.palette.m3outline
text: qsTr("No recordings found")
}
}
}
}
}
}
@@ -1,13 +1,14 @@
import qs.Modules.Notifications.Sidebar.Utils.Cards import Quickshell
import qs.Config
import QtQuick import QtQuick
import QtQuick.Layouts import QtQuick.Layouts
import qs.Modules.Notifications.Sidebar.Utils.Cards
import qs.Config
Item { Item {
id: root id: root
required property Item popouts required property Item popouts
required property var props required property PersistentProperties props
required property var visibilities required property var visibilities
implicitHeight: layout.implicitHeight implicitHeight: layout.implicitHeight
@@ -22,6 +23,12 @@ Item {
IdleInhibit { IdleInhibit {
} }
Record {
props: root.props
visibilities: root.visibilities
z: 1
}
Toggles { Toggles {
popouts: root.popouts popouts: root.popouts
visibilities: root.visibilities visibilities: root.visibilities
+1
View File
@@ -9,6 +9,7 @@ version = "0.1.0"
dependencies = [ dependencies = [
"typer", "typer",
"pillow", "pillow",
"jinja2",
"materialyoucolor" "materialyoucolor"
] ]
+2 -1
View File
@@ -1,6 +1,6 @@
from __future__ import annotations from __future__ import annotations
import typer import typer
from zshell.subcommands import shell, scheme, screenshot, wallpaper from zshell.subcommands import shell, scheme, screenshot, wallpaper, record
app = typer.Typer() app = typer.Typer()
@@ -8,6 +8,7 @@ app.add_typer(shell.app, name="shell")
app.add_typer(scheme.app, name="scheme") app.add_typer(scheme.app, name="scheme")
app.add_typer(screenshot.app, name="screenshot") app.add_typer(screenshot.app, name="screenshot")
app.add_typer(wallpaper.app, name="wallpaper") app.add_typer(wallpaper.app, name="wallpaper")
app.add_typer(record.app, name="record")
# app.add_typer(preset.app, name="preset") # app.add_typer(preset.app, name="preset")
+214
View File
@@ -0,0 +1,214 @@
import os
import json
import subprocess
import time
from pathlib import Path
from typing import Optional
import typer
app = typer.Typer()
RECORDER = "gpu-screen-recorder"
HOME = str(os.getenv("HOME", str(Path.home())))
CONFIG = Path(HOME) / ".config/zshell/config.json"
STATE_DIR = Path(HOME) / ".local/state/zshell/record"
TEMP_RECORDING = STATE_DIR / "recording.mp4"
REPLAY_RECORDING = STATE_DIR / "replay.mp4"
NOTIF_ID_FILE = STATE_DIR / "notifid.txt"
RECORDINGS_DIR = os.getenv("ZSHELL_RECORDINGS_DIR",
str(Path(HOME) / "Videos/Recordings"))
def _read_extra_args() -> list[str]:
try:
if CONFIG.is_file():
data = json.loads(CONFIG.read_text())
return data.get("record", {}).get("extraArgs", [])
except Exception:
pass
return []
def _is_recording() -> bool:
return subprocess.run(["pidof", RECORDER], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL).returncode == 0
def _notify(summary: str, body: str = "", actions: list = None, timeout: int = 5000) -> Optional[int]:
args = ["notify-send", summary, body, "-t", str(timeout), "-p"]
if actions:
for action in actions:
args.extend(["-A", action])
try:
proc = subprocess.run(args, capture_output=True, text=True)
return int(proc.stdout.strip()) if proc.stdout.strip().isdigit() else None
except Exception:
return None
def _close_notification(notif_id: int):
subprocess.run(["notify-send", "--close", str(notif_id)],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
def _get_monitors() -> list[dict]:
try:
res = subprocess.run(["hyprctl", "monitors", "-j"],
capture_output=True, text=True)
return json.loads(res.stdout)
except Exception:
return []
def _focused_monitor_name() -> Optional[str]:
for m in _get_monitors():
if m.get("focused"):
return m["name"]
return None
def _monitors_intersecting_region(x: int, y: int, w: int, h: int) -> list[dict]:
region = (x, y, x + w, y + h)
intersecting = []
for m in _get_monitors():
mx, my, mw, mh = m["x"], m["y"], m["width"], m["height"]
if not (region[2] <= mx or region[0] >= mx + mw or region[3] <= my or region[1] >= my + mh):
intersecting.append(m)
return intersecting
def _highest_refresh(monitors: list[dict]) -> float:
return max((m["refreshRate"] for m in monitors), default=60.0)
def _slurp_region() -> Optional[str]:
try:
return subprocess.check_output(["slurp", "-f", "%wx%h+%x+%y"], text=True).strip()
except subprocess.CalledProcessError:
return None
def _parse_geometry(geometry: str) -> Optional[tuple[int, int, int, int]]:
import re
match = re.match(r"(\d+)x(\d+)\+(\d+)\+(\d+)", geometry)
if match:
return int(match.group(3)), int(match.group(4)), int(match.group(1)), int(match.group(2))
return None
def start_recording(region: Optional[str], sound: bool):
STATE_DIR.mkdir(parents=True, exist_ok=True)
cmd = [RECORDER]
extra_args = _read_extra_args()
if region:
if region.lower() == "slurp" or not region:
geometry = _slurp_region()
if not geometry:
typer.echo("Region selection cancelled.")
raise typer.Abort()
else:
geometry = region
parsed = _parse_geometry(geometry)
if not parsed:
typer.echo("Invalid geometry format.")
raise typer.Abort()
x, y, w, h = parsed
monitors = _monitors_intersecting_region(x, y, w, h)
framerate = _highest_refresh(monitors)
cmd.extend(["-w", "region", "-region", geometry, "-f", str(int(framerate))])
else:
monitor_name = _focused_monitor_name()
if not monitor_name:
typer.echo("No focused monitor found.")
raise typer.Abort()
monitors = _get_monitors()
mon = next((m for m in monitors if m["name"] == monitor_name), None)
rate = int(mon["refreshRate"]) if mon else 60
cmd.extend(["-w", monitor_name, "-f", str(rate)])
if sound:
cmd.extend(["-a", "default_output"])
cmd.extend(extra_args)
cmd.extend(["-o", str(TEMP_RECORDING)])
subprocess.Popen(cmd, start_new_session=True,
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
notif_id = _notify("Recording started", f"Saving to {TEMP_RECORDING}")
if notif_id is not None:
NOTIF_ID_FILE.write_text(str(notif_id))
time.sleep(1)
if not _is_recording():
_notify("Recording failed",
"Check gpu-screen-recorder output.", timeout=5000)
raise typer.Exit(code=1)
def stop_recording(clipboard: bool):
subprocess.run(["pkill", "-f", RECORDER],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
for _ in range(50):
if not _is_recording():
break
time.sleep(0.1)
dest_dir = Path(RECORDINGS_DIR)
dest_dir.mkdir(parents=True, exist_ok=True)
timestamp = time.strftime("%Y-%m-%d_%H-%M-%S")
final_path = dest_dir / f"recording_{timestamp}.mp4"
if TEMP_RECORDING.exists():
TEMP_RECORDING.rename(final_path)
if NOTIF_ID_FILE.is_file():
try:
_close_notification(int(NOTIF_ID_FILE.read_text().strip()))
except Exception:
pass
NOTIF_ID_FILE.unlink()
if clipboard:
subprocess.run(["wl-copy", "--type", "text/uri-list", f"file://{final_path}"],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
_notify("Recording stopped", f"Saved to {final_path}", timeout=5000)
def toggle_pause():
subprocess.run(["pkill", "-USR2", "-f", RECORDER],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
typer.echo("Toggled pause.")
@app.command()
def record(
region: Optional[str] = typer.Option(
None, "--region", "-r",
help="Record a region. Use 'slurp' (or omit value) to select interactively, or give 'WxH+X+Y'.",
),
sound: bool = typer.Option(
False, "--sound", "-s", help="Record audio from default output."),
pause: bool = typer.Option(
False, "--pause", "-p", help="Toggle pause/resume."),
clipboard: bool = typer.Option(
False, "--clipboard", "-c", help="Copy the final recording path to clipboard."),
):
"""Start or stop a screen recording with gpu-screen-recorder."""
if pause:
toggle_pause()
raise typer.Exit()
if _is_recording():
stop_recording(clipboard)
else:
start_recording(region, sound)
+45 -8
View File
@@ -1,4 +1,8 @@
import subprocess import subprocess
import sys
import time
import click
import typer import typer
args = ["qs", "-c", "zshell"] args = ["qs", "-c", "zshell"]
@@ -8,35 +12,68 @@ app = typer.Typer()
@app.command() @app.command()
def kill(): def kill():
subprocess.run(args + ["kill"], check=True) result = subprocess.run(args + ["kill"], capture_output=True)
if result.returncode != 0:
raise click.ClickException("No running instance to kill.")
sys.stderr.write(result.stderr.decode())
def start_instance(no_daemon: bool = False) -> None:
result = subprocess.run(args + ["-n"] + ([] if no_daemon else ["-d"]), capture_output=True)
stdout = result.stdout.decode().strip()
if stdout:
if "already running" in stdout.lower():
raise click.ClickException(stdout)
if result.returncode != 0:
stderr = result.stderr.decode().strip()
raise click.ClickException(stderr)
@app.command() @app.command()
def start(no_daemon: bool = False): def start(no_daemon: bool = False):
subprocess.run(args + ["-n"] + ([] if no_daemon else ["-d"]), check=True) start_instance(no_daemon)
@app.command() @app.command()
def restart(no_daemon: bool = False): def restart(no_daemon: bool = False):
subprocess.run(args + ["kill"], check=False) subprocess.run(args + ["kill"], capture_output=True)
subprocess.run(args + ["-n"] + ([] if no_daemon else ["-d"]), check=True) deadline = time.monotonic() + 2.5
while time.monotonic() < deadline:
result = subprocess.run(args + ["kill"], capture_output=True)
if result.returncode == 255:
break
time.sleep(0.25)
start_instance(no_daemon=no_daemon)
@app.command() @app.command()
def show(): def show():
subprocess.run(args + ["ipc"] + ["show"], check=True) result = subprocess.run(args + ["ipc"] + ["show"], capture_output=True)
if result.returncode != 0:
raise click.ClickException(result.stderr.decode().strip())
sys.stderr.write(result.stderr.decode())
@app.command() @app.command()
def log(): def log():
subprocess.run(args + ["log"], check=True) result = subprocess.run(args + ["log"], capture_output=True)
if result.returncode != 0:
raise click.ClickException(result.stderr.decode().strip())
sys.stdout.write(result.stdout.decode())
sys.stderr.write(result.stderr.decode())
@app.command() @app.command()
def lock(): def lock():
subprocess.run(args + ["ipc"] + ["call"] + ["lock"] + ["lock"], check=True) result = subprocess.run(args + ["ipc"] + ["call"] + ["lock"] + ["lock"], capture_output=True)
if result.returncode != 0:
raise click.ClickException(result.stderr.decode().strip())
sys.stderr.write(result.stderr.decode())
@app.command() @app.command()
def call(target: str, method: str, method_args: list[str] = typer.Argument(None)): def call(target: str, method: str, method_args: list[str] = typer.Argument(None)):
subprocess.run(args + ["ipc"] + ["call"] + [target] + [method] + (method_args or []), check=True) result = subprocess.run(args + ["ipc"] + ["call"] + [target] + [method] + (method_args or []), capture_output=True)
if result.returncode != 0:
raise click.ClickException(result.stderr.decode().strip())
sys.stderr.write(result.stderr.decode())
+61 -18
View File
@@ -1,13 +1,15 @@
from __future__ import annotations from __future__ import annotations
from subprocess import CompletedProcess
from unittest.mock import patch, call from unittest.mock import patch, call
from typer.testing import CliRunner
from zshell.subcommands.shell import app from zshell.subcommands.shell import app
runner = CliRunner()
def invoke(*args: str) -> None:
from typer.testing import CliRunner
runner = CliRunner() def invoke(*args: str):
result = runner.invoke(app, args) result = runner.invoke(app, args)
if result.exit_code != 0: if result.exit_code != 0:
raise RuntimeError(result.output) raise RuntimeError(result.output)
@@ -16,72 +18,113 @@ def invoke(*args: str) -> None:
class TestKill: class TestKill:
@patch("zshell.subcommands.shell.subprocess.run") @patch("zshell.subcommands.shell.subprocess.run")
def test_kill_runs_qs_kill(self, mock_run): def test_kill_runs_qs_kill_success(self, mock_run):
mock_run.return_value = CompletedProcess([], 0, b"", b"Killed abc\n")
invoke("kill") invoke("kill")
mock_run.assert_called_once_with(["qs", "-c", "zshell", "kill"], check=True) mock_run.assert_called_once_with(["qs", "-c", "zshell", "kill"], capture_output=True)
@patch("zshell.subcommands.shell.subprocess.run")
def test_kill_no_instance_errors(self, mock_run):
mock_run.return_value = CompletedProcess([], 255, b"", b"No running instances\n")
result = runner.invoke(app, ["kill"])
assert result.exit_code != 0
assert "No running instance to kill" in result.output
class TestStart: class TestStart:
@patch("zshell.subcommands.shell.subprocess.run") @patch("zshell.subcommands.shell.subprocess.run")
def test_start_default_daemon(self, mock_run): def test_start_default_daemon(self, mock_run):
mock_run.return_value = CompletedProcess([], 0, b"", b"Launching config\n")
invoke("start") invoke("start")
mock_run.assert_called_once_with(["qs", "-c", "zshell", "-n", "-d"], check=True) mock_run.assert_called_once_with(["qs", "-c", "zshell", "-n", "-d"], capture_output=True)
@patch("zshell.subcommands.shell.subprocess.run") @patch("zshell.subcommands.shell.subprocess.run")
def test_start_no_daemon(self, mock_run): def test_start_no_daemon(self, mock_run):
mock_run.return_value = CompletedProcess([], 0, b"", b"Launching config\n")
invoke("start", "--no-daemon") invoke("start", "--no-daemon")
mock_run.assert_called_once_with(["qs", "-c", "zshell", "-n"], check=True) mock_run.assert_called_once_with(["qs", "-c", "zshell", "-n"], capture_output=True)
@patch("zshell.subcommands.shell.subprocess.run")
def test_start_already_running_errors(self, mock_run):
mock_run.return_value = CompletedProcess([], 0, b"An instance of this configuration is already running.\n", b"")
result = runner.invoke(app, ["start"])
assert result.exit_code != 0
assert "already running" in result.output
@patch("zshell.subcommands.shell.subprocess.run")
def test_start_other_failure_errors(self, mock_run):
mock_run.return_value = CompletedProcess([], 1, b"", b"Config error\n")
result = runner.invoke(app, ["start"])
assert result.exit_code != 0
assert "Config error" in result.output
class TestShow: class TestShow:
@patch("zshell.subcommands.shell.subprocess.run") @patch("zshell.subcommands.shell.subprocess.run")
def test_show_runs_ipc_show(self, mock_run): def test_show_runs_ipc_show(self, mock_run):
mock_run.return_value = CompletedProcess([], 0, b"", b"target visibilities\n")
invoke("show") invoke("show")
mock_run.assert_called_once_with(["qs", "-c", "zshell", "ipc", "show"], check=True) mock_run.assert_called_once_with(["qs", "-c", "zshell", "ipc", "show"], capture_output=True)
class TestLog: class TestLog:
@patch("zshell.subcommands.shell.subprocess.run") @patch("zshell.subcommands.shell.subprocess.run")
def test_log_runs_qs_log(self, mock_run): def test_log_runs_qs_log(self, mock_run):
mock_run.return_value = CompletedProcess([], 0, b"log output\n", b"")
invoke("log") invoke("log")
mock_run.assert_called_once_with(["qs", "-c", "zshell", "log"], check=True) mock_run.assert_called_once_with(["qs", "-c", "zshell", "log"], capture_output=True)
class TestLock: class TestLock:
@patch("zshell.subcommands.shell.subprocess.run") @patch("zshell.subcommands.shell.subprocess.run")
def test_lock_runs_ipc_call_lock(self, mock_run): def test_lock_runs_ipc_call_lock(self, mock_run):
mock_run.return_value = CompletedProcess([], 0, b"", b"")
invoke("lock") invoke("lock")
mock_run.assert_called_once_with(["qs", "-c", "zshell", "ipc", "call", "lock", "lock"], check=True) mock_run.assert_called_once_with(["qs", "-c", "zshell", "ipc", "call", "lock", "lock"], capture_output=True)
class TestCall: class TestCall:
@patch("zshell.subcommands.shell.subprocess.run") @patch("zshell.subcommands.shell.subprocess.run")
def test_call_no_args(self, mock_run): def test_call_no_args(self, mock_run):
mock_run.return_value = CompletedProcess([], 0, b"", b"")
invoke("call", "target", "method") invoke("call", "target", "method")
mock_run.assert_called_once_with(["qs", "-c", "zshell", "ipc", "call", "target", "method"], check=True) mock_run.assert_called_once_with(["qs", "-c", "zshell", "ipc", "call", "target", "method"], capture_output=True)
@patch("zshell.subcommands.shell.subprocess.run") @patch("zshell.subcommands.shell.subprocess.run")
def test_call_with_args(self, mock_run): def test_call_with_args(self, mock_run):
mock_run.return_value = CompletedProcess([], 0, b"", b"")
invoke("call", "target", "method", "arg1", "arg2") invoke("call", "target", "method", "arg1", "arg2")
mock_run.assert_called_once_with( mock_run.assert_called_once_with(
["qs", "-c", "zshell", "ipc", "call", "target", "method", "arg1", "arg2"], ["qs", "-c", "zshell", "ipc", "call", "target", "method", "arg1", "arg2"],
check=True, capture_output=True,
) )
class TestRestart: class TestRestart:
@patch("zshell.subcommands.shell.start_instance")
@patch("zshell.subcommands.shell.subprocess.run") @patch("zshell.subcommands.shell.subprocess.run")
def test_restart_kills_then_starts_daemon(self, mock_run): def test_restart_kills_then_starts(self, mock_run, mock_start):
mock_run.side_effect = [
CompletedProcess([], 0, b"", b"Killed abc\n"), # first kill (captured)
CompletedProcess([], 255, b"", b""), # poll → no instance
]
invoke("restart") invoke("restart")
assert mock_run.call_args_list == [ assert mock_run.call_args_list == [
call(["qs", "-c", "zshell", "kill"], check=False), call(["qs", "-c", "zshell", "kill"], capture_output=True),
call(["qs", "-c", "zshell", "-n", "-d"], check=True), call(["qs", "-c", "zshell", "kill"], capture_output=True),
] ]
mock_start.assert_called_once_with(no_daemon=False)
@patch("zshell.subcommands.shell.start_instance")
@patch("zshell.subcommands.shell.subprocess.run") @patch("zshell.subcommands.shell.subprocess.run")
def test_restart_no_daemon(self, mock_run): def test_restart_no_daemon(self, mock_run, mock_start):
mock_run.side_effect = [
CompletedProcess([], 0, b"", b"Killed abc\n"),
CompletedProcess([], 255, b"", b""),
]
invoke("restart", "--no-daemon") invoke("restart", "--no-daemon")
assert mock_run.call_args_list == [ assert mock_run.call_args_list == [
call(["qs", "-c", "zshell", "kill"], check=False), call(["qs", "-c", "zshell", "kill"], capture_output=True),
call(["qs", "-c", "zshell", "-n"], check=True), call(["qs", "-c", "zshell", "kill"], capture_output=True),
] ]
mock_start.assert_called_once_with(no_daemon=True)