hotfix(cli): replace raw subprocess tracebacks with styled error messages and fix restart race condition #96
@@ -1,4 +1,8 @@
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
|
||||
import click
|
||||
import typer
|
||||
|
||||
args = ["qs", "-c", "zshell"]
|
||||
@@ -8,35 +12,68 @@ app = typer.Typer()
|
||||
|
||||
@app.command()
|
||||
def kill():
|
||||
subprocess.run(args + ["kill"], check=True)
|
||||
result = subprocess.run(args + ["kill"], capture_output=True)
|
||||
if result.returncode != 0:
|
||||
raise click.ClickException("No running instance to kill.")
|
||||
sys.stderr.write(result.stderr.decode())
|
||||
|
||||
|
||||
def start_instance(no_daemon: bool = False) -> None:
|
||||
result = subprocess.run(args + ["-n"] + ([] if no_daemon else ["-d"]), capture_output=True)
|
||||
stdout = result.stdout.decode().strip()
|
||||
|
zach marked this conversation as resolved
Outdated
|
||||
if stdout:
|
||||
if "already running" in stdout.lower():
|
||||
raise click.ClickException(stdout)
|
||||
if result.returncode != 0:
|
||||
stderr = result.stderr.decode().strip()
|
||||
raise click.ClickException(stderr)
|
||||
|
||||
|
||||
@app.command()
|
||||
def start(no_daemon: bool = False):
|
||||
subprocess.run(args + ["-n"] + ([] if no_daemon else ["-d"]), check=True)
|
||||
start_instance(no_daemon)
|
||||
|
||||
|
AramJonghu marked this conversation as resolved
Outdated
zach
commented
This seems ugly. Personally think we should just set a sleep timer of ~5 seconds, then run code. This seems ugly. Personally think we should just set a sleep timer of ~5 seconds, then run code.
AramJonghu
commented
Is that not an messier implementation? A user would want a restart to be quick (quicker than running the kill and start command separately). 2.5 or higher stock sleep timer would make little sense. time.monotonic() could be used to hide the ugliness, but same logic/idea. Is that not an messier implementation? A user would want a restart to be quick (quicker than running the kill and start command separately). 2.5 or higher stock sleep timer would make little sense.
time.monotonic() could be used to hide the ugliness, but same logic/idea.
zach
commented
Running a loop would keep a CPU thread busy until it's able to launch the shell, while a sleep timer wouldn't. I don't really see much benefit in looping upwards of 50 times just to make sure we can run the start command exactly when it's available. Running a loop would keep a CPU thread busy until it's able to launch the shell, while a sleep timer wouldn't. I don't really see much benefit in looping upwards of 50 times just to make sure we can run the start command exactly when it's available.
AramJonghu
commented
Is that not okay for a rare command as restart? We could change the loop to check every 250ms instead, so it is 10 checks instead of 50. Is that not okay for a rare command as restart?
We could change the loop to check every 250ms instead, so it is 10 checks instead of 50.
zach
commented
That's a good middle ground I think, agreed. That's a good middle ground I think, agreed.
AramJonghu
commented
Changed it. Changed it.
|
||||
|
||||
@app.command()
|
||||
def restart(no_daemon: bool = False):
|
||||
subprocess.run(args + ["kill"], check=False)
|
||||
subprocess.run(args + ["-n"] + ([] if no_daemon else ["-d"]), check=True)
|
||||
subprocess.run(args + ["kill"], capture_output=True)
|
||||
deadline = time.monotonic() + 2.5
|
||||
|
zach marked this conversation as resolved
Outdated
zach
commented
Here we can just call Here we can just call `start()`, right? Not sure if `Typer` allows calling command functions.
AramJonghu
commented
It was to have separate tests. Simpler in my eyes (at least at time of writing). I can simplify it by having a It was to have separate tests. Simpler in my eyes (at least at time of writing). I can simplify it by having a `start_instance()` that start uses outright, and is reused by restart. This could satisfy testing and reuse of code, and perhaps future code changes.
|
||||
while time.monotonic() < deadline:
|
||||
result = subprocess.run(args + ["kill"], capture_output=True)
|
||||
if result.returncode == 255:
|
||||
break
|
||||
time.sleep(0.25)
|
||||
start_instance(no_daemon=no_daemon)
|
||||
|
||||
|
||||
@app.command()
|
||||
def show():
|
||||
subprocess.run(args + ["ipc"] + ["show"], check=True)
|
||||
result = subprocess.run(args + ["ipc"] + ["show"], capture_output=True)
|
||||
if result.returncode != 0:
|
||||
raise click.ClickException(result.stderr.decode().strip())
|
||||
sys.stderr.write(result.stderr.decode())
|
||||
|
||||
|
||||
@app.command()
|
||||
def log():
|
||||
subprocess.run(args + ["log"], check=True)
|
||||
result = subprocess.run(args + ["log"], capture_output=True)
|
||||
if result.returncode != 0:
|
||||
raise click.ClickException(result.stderr.decode().strip())
|
||||
sys.stdout.write(result.stdout.decode())
|
||||
sys.stderr.write(result.stderr.decode())
|
||||
|
||||
|
||||
@app.command()
|
||||
def lock():
|
||||
subprocess.run(args + ["ipc"] + ["call"] + ["lock"] + ["lock"], check=True)
|
||||
result = subprocess.run(args + ["ipc"] + ["call"] + ["lock"] + ["lock"], capture_output=True)
|
||||
if result.returncode != 0:
|
||||
raise click.ClickException(result.stderr.decode().strip())
|
||||
sys.stderr.write(result.stderr.decode())
|
||||
|
||||
|
||||
@app.command()
|
||||
def call(target: str, method: str, method_args: list[str] = typer.Argument(None)):
|
||||
subprocess.run(args + ["ipc"] + ["call"] + [target] + [method] + (method_args or []), check=True)
|
||||
result = subprocess.run(args + ["ipc"] + ["call"] + [target] + [method] + (method_args or []), capture_output=True)
|
||||
if result.returncode != 0:
|
||||
raise click.ClickException(result.stderr.decode().strip())
|
||||
sys.stderr.write(result.stderr.decode())
|
||||
|
||||
@@ -1,13 +1,15 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from subprocess import CompletedProcess
|
||||
from unittest.mock import patch, call
|
||||
|
||||
from typer.testing import CliRunner
|
||||
from zshell.subcommands.shell import app
|
||||
|
||||
runner = CliRunner()
|
||||
|
||||
def invoke(*args: str) -> None:
|
||||
from typer.testing import CliRunner
|
||||
|
||||
runner = CliRunner()
|
||||
def invoke(*args: str):
|
||||
result = runner.invoke(app, args)
|
||||
if result.exit_code != 0:
|
||||
raise RuntimeError(result.output)
|
||||
@@ -16,72 +18,113 @@ def invoke(*args: str) -> None:
|
||||
|
||||
class TestKill:
|
||||
@patch("zshell.subcommands.shell.subprocess.run")
|
||||
def test_kill_runs_qs_kill(self, mock_run):
|
||||
def test_kill_runs_qs_kill_success(self, mock_run):
|
||||
mock_run.return_value = CompletedProcess([], 0, b"", b"Killed abc\n")
|
||||
invoke("kill")
|
||||
mock_run.assert_called_once_with(["qs", "-c", "zshell", "kill"], check=True)
|
||||
mock_run.assert_called_once_with(["qs", "-c", "zshell", "kill"], capture_output=True)
|
||||
|
||||
@patch("zshell.subcommands.shell.subprocess.run")
|
||||
def test_kill_no_instance_errors(self, mock_run):
|
||||
mock_run.return_value = CompletedProcess([], 255, b"", b"No running instances\n")
|
||||
result = runner.invoke(app, ["kill"])
|
||||
assert result.exit_code != 0
|
||||
assert "No running instance to kill" in result.output
|
||||
|
||||
|
||||
class TestStart:
|
||||
@patch("zshell.subcommands.shell.subprocess.run")
|
||||
def test_start_default_daemon(self, mock_run):
|
||||
mock_run.return_value = CompletedProcess([], 0, b"", b"Launching config\n")
|
||||
invoke("start")
|
||||
mock_run.assert_called_once_with(["qs", "-c", "zshell", "-n", "-d"], check=True)
|
||||
mock_run.assert_called_once_with(["qs", "-c", "zshell", "-n", "-d"], capture_output=True)
|
||||
|
||||
@patch("zshell.subcommands.shell.subprocess.run")
|
||||
def test_start_no_daemon(self, mock_run):
|
||||
mock_run.return_value = CompletedProcess([], 0, b"", b"Launching config\n")
|
||||
invoke("start", "--no-daemon")
|
||||
mock_run.assert_called_once_with(["qs", "-c", "zshell", "-n"], check=True)
|
||||
mock_run.assert_called_once_with(["qs", "-c", "zshell", "-n"], capture_output=True)
|
||||
|
||||
@patch("zshell.subcommands.shell.subprocess.run")
|
||||
def test_start_already_running_errors(self, mock_run):
|
||||
mock_run.return_value = CompletedProcess([], 0, b"An instance of this configuration is already running.\n", b"")
|
||||
result = runner.invoke(app, ["start"])
|
||||
assert result.exit_code != 0
|
||||
assert "already running" in result.output
|
||||
|
||||
@patch("zshell.subcommands.shell.subprocess.run")
|
||||
def test_start_other_failure_errors(self, mock_run):
|
||||
mock_run.return_value = CompletedProcess([], 1, b"", b"Config error\n")
|
||||
result = runner.invoke(app, ["start"])
|
||||
assert result.exit_code != 0
|
||||
assert "Config error" in result.output
|
||||
|
||||
|
||||
class TestShow:
|
||||
@patch("zshell.subcommands.shell.subprocess.run")
|
||||
def test_show_runs_ipc_show(self, mock_run):
|
||||
mock_run.return_value = CompletedProcess([], 0, b"", b"target visibilities\n")
|
||||
invoke("show")
|
||||
mock_run.assert_called_once_with(["qs", "-c", "zshell", "ipc", "show"], check=True)
|
||||
mock_run.assert_called_once_with(["qs", "-c", "zshell", "ipc", "show"], capture_output=True)
|
||||
|
||||
|
||||
class TestLog:
|
||||
@patch("zshell.subcommands.shell.subprocess.run")
|
||||
def test_log_runs_qs_log(self, mock_run):
|
||||
mock_run.return_value = CompletedProcess([], 0, b"log output\n", b"")
|
||||
invoke("log")
|
||||
mock_run.assert_called_once_with(["qs", "-c", "zshell", "log"], check=True)
|
||||
mock_run.assert_called_once_with(["qs", "-c", "zshell", "log"], capture_output=True)
|
||||
|
||||
|
||||
class TestLock:
|
||||
@patch("zshell.subcommands.shell.subprocess.run")
|
||||
def test_lock_runs_ipc_call_lock(self, mock_run):
|
||||
mock_run.return_value = CompletedProcess([], 0, b"", b"")
|
||||
invoke("lock")
|
||||
mock_run.assert_called_once_with(["qs", "-c", "zshell", "ipc", "call", "lock", "lock"], check=True)
|
||||
mock_run.assert_called_once_with(["qs", "-c", "zshell", "ipc", "call", "lock", "lock"], capture_output=True)
|
||||
|
||||
|
||||
class TestCall:
|
||||
@patch("zshell.subcommands.shell.subprocess.run")
|
||||
def test_call_no_args(self, mock_run):
|
||||
mock_run.return_value = CompletedProcess([], 0, b"", b"")
|
||||
invoke("call", "target", "method")
|
||||
mock_run.assert_called_once_with(["qs", "-c", "zshell", "ipc", "call", "target", "method"], check=True)
|
||||
mock_run.assert_called_once_with(["qs", "-c", "zshell", "ipc", "call", "target", "method"], capture_output=True)
|
||||
|
||||
@patch("zshell.subcommands.shell.subprocess.run")
|
||||
def test_call_with_args(self, mock_run):
|
||||
mock_run.return_value = CompletedProcess([], 0, b"", b"")
|
||||
invoke("call", "target", "method", "arg1", "arg2")
|
||||
mock_run.assert_called_once_with(
|
||||
["qs", "-c", "zshell", "ipc", "call", "target", "method", "arg1", "arg2"],
|
||||
check=True,
|
||||
capture_output=True,
|
||||
)
|
||||
|
||||
|
||||
class TestRestart:
|
||||
@patch("zshell.subcommands.shell.start_instance")
|
||||
@patch("zshell.subcommands.shell.subprocess.run")
|
||||
def test_restart_kills_then_starts_daemon(self, mock_run):
|
||||
def test_restart_kills_then_starts(self, mock_run, mock_start):
|
||||
mock_run.side_effect = [
|
||||
CompletedProcess([], 0, b"", b"Killed abc\n"), # first kill (captured)
|
||||
CompletedProcess([], 255, b"", b""), # poll → no instance
|
||||
]
|
||||
invoke("restart")
|
||||
assert mock_run.call_args_list == [
|
||||
call(["qs", "-c", "zshell", "kill"], check=False),
|
||||
call(["qs", "-c", "zshell", "-n", "-d"], check=True),
|
||||
call(["qs", "-c", "zshell", "kill"], capture_output=True),
|
||||
call(["qs", "-c", "zshell", "kill"], capture_output=True),
|
||||
]
|
||||
mock_start.assert_called_once_with(no_daemon=False)
|
||||
|
||||
@patch("zshell.subcommands.shell.start_instance")
|
||||
@patch("zshell.subcommands.shell.subprocess.run")
|
||||
def test_restart_no_daemon(self, mock_run):
|
||||
def test_restart_no_daemon(self, mock_run, mock_start):
|
||||
mock_run.side_effect = [
|
||||
CompletedProcess([], 0, b"", b"Killed abc\n"),
|
||||
CompletedProcess([], 255, b"", b""),
|
||||
]
|
||||
invoke("restart", "--no-daemon")
|
||||
assert mock_run.call_args_list == [
|
||||
call(["qs", "-c", "zshell", "kill"], check=False),
|
||||
call(["qs", "-c", "zshell", "-n"], check=True),
|
||||
call(["qs", "-c", "zshell", "kill"], capture_output=True),
|
||||
call(["qs", "-c", "zshell", "kill"], capture_output=True),
|
||||
]
|
||||
mock_start.assert_called_once_with(no_daemon=True)
|
||||
|
||||
The start command should not need to run
qs -c zshell ipc call, IPC calls are used to trigger states and functions within the QML code. The-nflag already checks if there's a running instance of the shell.Was not aware of this. Remaining functions use ipc, which is why I did not bat an eye. Resolving now.
Change occurred since I wanted an error block such as in the screenshot. But with that info, ipc is indeed not needed in the code.