fix(mpv-ipc-player): deadlock from subprocess.PIPE being filled up causing mpv to wait resulting in the player freezing

This commit is contained in:
Benexl
2025-07-27 23:21:51 +03:00
parent 9edeeb5ca4
commit 316832e771
2 changed files with 109 additions and 70 deletions

View File

@@ -91,9 +91,12 @@ class MPVIPCClient:
try:
if not self.socket:
break
# A blocking recv is efficient as the thread will sleep until data is available.
data = self.socket.recv(4096)
if not data:
logger.info("MPV IPC socket closed.")
# Put a special event to signal the main loop that MPV has shut down.
self._event_queue.put({"event": "shutdown"})
break
self._message_buffer += data
@@ -114,13 +117,14 @@ class MPVIPCClient:
try:
message = json.loads(message_data.decode("utf-8"))
if "request_id" in message:
# Responses have a 'request_id' and 'error' field, events do not.
if "request_id" in message and "error" in message:
req_id = message["request_id"]
with self._lock:
self._response_dict[req_id] = message
if req_id in self._response_events:
self._response_events[req_id].set()
else:
else: # It's an event
self._event_queue.put(message)
except (json.JSONDecodeError, UnicodeDecodeError) as e:
logger.warning(
@@ -247,24 +251,25 @@ class MpvIPCPlayer(BaseIPCPlayer):
"""MPV Player implementation using IPC for advanced features."""
stream_config: StreamConfig
mpv_process: subprocess.Popen
ipc_client: MPVIPCClient
player_state: PlayerState
player_fetching: bool = False
player_first_run: bool = True
event_handlers: Dict[str, List[Callable]] = {}
property_observers: Dict[str, List[Callable]] = {}
key_bindings: Dict[str, Callable] = {}
message_handlers: Dict[str, Callable] = {}
provider: BaseAnimeProvider
anime: Anime
media_item: Optional[MediaItem]
def __init__(self, stream_config: StreamConfig):
super().__init__(stream_config)
self.socket_path: Optional[str] = None
self._fetch_thread: Optional[threading.Thread] = None
self._fetch_result_queue: Queue = Queue()
def play(
self, player, player_params, provider, anime, media_item=None
) -> PlayerResult:
@@ -289,31 +294,34 @@ class MpvIPCPlayer(BaseIPCPlayer):
self._setup_key_bindings()
self._setup_message_handlers()
self._wait_for_playback()
return PlayerResult(
episode=self.player_state.episode,
stop_time=self.player_state.stop_time,
total_time=self.player_state.total_time,
)
except MPVIPCError as e:
logger.warning(
f"IPC connection failed: {e}. Falling back to non-IPC playback."
)
self._cleanup()
choice = input(
"FAILED TO PLAY WITH IPC WOULD YOU LIKE TO RESUME PLAYBACK WITHOUT IT(Y/n): "
)
if choice != "n":
if (
input("Failed to play with IPC. Continue without it? (Y/n): ").lower()
!= "n"
):
return player.play(params)
else:
return PlayerResult(
episode=params.episode, stop_time=None, total_time=None
)
finally:
self._cleanup()
return PlayerResult(
episode=self.player_state.episode,
stop_time=self.player_state.stop_time,
total_time=self.player_state.total_time,
)
def _start_mpv_process(self, player: BasePlayer, params: PlayerParams) -> None:
"""Start MPV process with IPC enabled."""
temp_dir = Path(tempfile.gettempdir())
self.socket_path = str(temp_dir / f"mpv_ipc_{time.time()}.sock")
self.mpv_process = player.play_with_ipc(params, self.socket_path)
time.sleep(1.0) # Give MPV time to start and create the socket
time.sleep(1.0)
def _connect_ipc(self):
if not self.socket_path:
@@ -380,13 +388,38 @@ class MpvIPCPlayer(BaseIPCPlayer):
)
def _wait_for_playback(self):
"""A non-blocking loop that checks for MPV process exit and processes events."""
if not self.ipc_client:
return
should_stop = False
try:
while self.mpv_process and self.mpv_process.poll() is None:
message = self.ipc_client.get_event(block=True, timeout=0.1)
if message:
while not should_stop:
if self.mpv_process and self.mpv_process.poll() is not None:
logger.info("MPV process has exited.")
break
while True:
message = self.ipc_client.get_event(block=False)
if message is None:
break
if message.get("event") == "shutdown":
should_stop = True
break
self._handle_mpv_message(message)
try:
fetch_result = self._fetch_result_queue.get(block=False)
self._handle_fetch_result(fetch_result)
except Empty:
pass
if should_stop:
break
time.sleep(0.05)
except KeyboardInterrupt:
logger.info("Playback interrupted by user")
@@ -410,7 +443,8 @@ class MpvIPCPlayer(BaseIPCPlayer):
self.player_state.total_time_secs = data
elif name == "percent-pos" and isinstance(data, (int, float)):
if (
data >= self.stream_config.episode_complete_at
self.stream_config.auto_next
and data >= self.stream_config.episode_complete_at
and not self.player_fetching
):
self._auto_next_episode()
@@ -451,39 +485,46 @@ class MpvIPCPlayer(BaseIPCPlayer):
self.player_fetching = True
self._show_text(f"Fetching {episode_type} episode...")
self._fetch_thread = threading.Thread(
target=self._fetch_episode_task, args=(episode_type, ep_no), daemon=True
)
self._fetch_thread.start()
def _fetch_episode_task(
self,
episode_type: Literal["next", "previous", "reload", "custom"],
ep_no: Optional[str] = None,
):
"""This function runs in a background thread to fetch episode streams."""
try:
available_episodes = getattr(
self.anime.episodes, self.stream_config.translation_type
)
if not available_episodes:
self._show_text(
raise ValueError(
f"No {self.stream_config.translation_type} episodes available."
)
return
current_index = available_episodes.index(self.player_state.episode)
if episode_type == "next":
if current_index >= len(available_episodes) - 1:
self._show_text("Already at the last episode.")
return
raise ValueError("Already at the last episode.")
target_episode = available_episodes[current_index + 1]
elif episode_type == "previous":
if current_index <= 0:
self._show_text("Already at first episode")
return
raise ValueError("Already at first episode")
target_episode = available_episodes[current_index - 1]
elif episode_type == "reload":
target_episode = self.player_state.episode
elif episode_type == "custom":
if not ep_no or ep_no not in available_episodes:
self._show_text(
raise ValueError(
f"Invalid episode. Available: {', '.join(available_episodes)}"
)
return
target_episode = ep_no
else:
return # Should not happen
return
stream_params = EpisodeStreamsParams(
anime_id=self.anime.id,
@@ -491,38 +532,42 @@ class MpvIPCPlayer(BaseIPCPlayer):
episode=target_episode,
translation_type=self.stream_config.translation_type,
)
episode_streams = self.provider.episode_streams(stream_params)
# This is the blocking network call, now safely in a thread
episode_streams = list(self.provider.episode_streams(stream_params) or [])
if not episode_streams:
self._show_text(f"No streams found for episode {target_episode}")
return
raise ValueError(f"No streams found for episode {target_episode}")
self.player_state.servers = {
ProviderServer(server.name): server for server in episode_streams
result = {
"type": "success",
"target_episode": target_episode,
"servers": {ProviderServer(s.name): s for s in episode_streams},
}
self.player_state.episode = target_episode
self.player_state.reset()
self._show_text(
f"Fetched {self.player_state.episode_title or 'Episode ' + target_episode}"
)
self._fetch_result_queue.put(result)
except ValueError:
self._show_text("Current episode not found in list.")
except Exception as e:
self._show_text(f"Error: {e}")
finally:
self.player_fetching = False
logger.error(f"Episode fetch task failed: {e}")
self._fetch_result_queue.put({"type": "error", "message": str(e)})
def _handle_fetch_result(self, result: Dict[str, Any]):
"""Handles the result from the background fetch thread in the main thread."""
self.player_fetching = False
if result["type"] == "success":
self.player_state.episode = result["target_episode"]
self.player_state.servers = result["servers"]
self.player_state.reset()
self._show_text(f"Fetched {self.player_state.episode_title}")
self._load_current_stream()
else:
self._show_text(f"Error: {result['message']}")
def _next_episode(self):
self._get_episode("next")
self._load_current_stream()
def _previous_episode(self):
self._get_episode("previous")
self._load_current_stream()
def _reload_episode(self):
self._get_episode("reload")
self._load_current_stream()
def _auto_next_episode(self):
if self.stream_config.auto_next:
@@ -551,7 +596,7 @@ class MpvIPCPlayer(BaseIPCPlayer):
if not self.ipc_client or not self.player_state.stream_subtitles:
return
time.sleep(0.5) # Allow file to load
time.sleep(0.5)
for i, sub_url in enumerate(self.player_state.stream_subtitles):
flag = "select" if i == 0 else "auto"
self.ipc_client.send_command(["sub-add", sub_url, flag])
@@ -565,21 +610,12 @@ class MpvIPCPlayer(BaseIPCPlayer):
def _toggle_translation_type(self):
new_type = "sub" if self.stream_config.translation_type == "dub" else "dub"
self._show_text(f"Switching to {new_type}...")
old_type = self.stream_config.translation_type
self.stream_config.translation_type = new_type
self._get_episode("reload")
if self.player_state.stream_url:
self._load_current_stream()
self._show_text(f"Switched to {new_type}")
else:
self.stream_config.translation_type = old_type
self._show_text(f"Failed to switch. Reverting to {old_type}")
self._reload_episode()
def _handle_select_episode(self, episode: Optional[str] = None):
if episode:
self._get_episode("custom", episode)
self._load_current_stream()
def _handle_select_server(self, server: Optional[str] = None):
if not server or not self.player_state:
@@ -592,8 +628,11 @@ class MpvIPCPlayer(BaseIPCPlayer):
else:
self._show_text(f"Server '{server}' not available.")
except ValueError:
available_servers = ", ".join(
[s.value for s in self.player_state.servers.keys()]
)
self._show_text(
f"Invalid server name: {server}. Servers are {', '.join([server.value for server in self.player_state.servers])}"
f"Invalid server name: {server}. Available: {available_servers}"
)
def _handle_select_quality(self, quality: Optional[str] = None):

View File

@@ -63,7 +63,7 @@ class MpvPlayer(BasePlayer):
subprocess.run(args)
return PlayerResult()
return PlayerResult(params.episode)
def _play_on_desktop(self, params) -> PlayerResult:
if not self.executable:
@@ -100,7 +100,9 @@ class MpvPlayer(BasePlayer):
stop_time = match.group(1)
total_time = match.group(2)
break
return PlayerResult(total_time=total_time, stop_time=stop_time)
return PlayerResult(
episode=params.episode, total_time=total_time, stop_time=stop_time
)
def play_with_ipc(self, params: PlayerParams, socket_path: str) -> subprocess.Popen:
"""Stream using IPC player for enhanced features."""
@@ -120,9 +122,7 @@ class MpvPlayer(BasePlayer):
logger.info(f"Starting MPV with IPC socket: {socket_path}")
process = subprocess.Popen(
pre_args + mpv_args, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
process = subprocess.Popen(pre_args + mpv_args)
return process
@@ -141,7 +141,7 @@ class MpvPlayer(BasePlayer):
args.extend(mpv_args)
subprocess.run(args)
return PlayerResult()
return PlayerResult(params.episode)
# TODO: Get people with real friends to do this lol
def _stream_on_desktop_with_syncplay(self, params: PlayerParams) -> PlayerResult:
@@ -156,7 +156,7 @@ class MpvPlayer(BasePlayer):
args.extend(mpv_args)
subprocess.run(args)
return PlayerResult()
return PlayerResult(params.episode)
def _create_mpv_cli_options(self, params: PlayerParams) -> list[str]:
mpv_args = []
@@ -185,5 +185,5 @@ if __name__ == "__main__":
print(APP_ASCII_ART)
url = input("Enter the url you would like to stream: ")
mpv = MpvPlayer(MpvConfig())
player_result = mpv.play(PlayerParams(url=url, title=""))
player_result = mpv.play(PlayerParams(episode="", query="", url=url, title=""))
print(player_result)