Merge branch 'master' into fix/nix-flake

This commit is contained in:
Benexl
2025-07-27 23:29:24 +03:00
committed by GitHub
25 changed files with 907 additions and 1105 deletions

86
.gitignore vendored
View File

@@ -1,25 +1,17 @@
# mine
*.mp4
*.mp3
*.ass
vids
data/
.project/
fastanime.ini
crashdump.txt
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*.py[codz]
*$py.class
anixstream.ini
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
bin/
downloads/
eggs/
.eggs/
lib/
@@ -38,7 +30,7 @@ MANIFEST
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
# *.spec
*.spec
# Installer logs
pip-log.txt
@@ -54,7 +46,7 @@ htmlcov/
nosetests.xml
coverage.xml
*.cover
*.py,cover
*.py.cover
.hypothesis/
.pytest_cache/
cover/
@@ -102,23 +94,36 @@ ipython_config.py
# install all needed dependencies.
#Pipfile.lock
# UV
# Similar to Pipfile.lock, it is generally recommended to include uv.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
#uv.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
#poetry.toml
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
# pdm recommends including project-wide configuration in pdm.toml, but excluding .pdm-python.
# https://pdm-project.org/en/latest/usage/project/#working-with-version-control
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/latest/usage/project/#working-with-version-control
.pdm.toml
#pdm.toml
.pdm-python
.pdm-build/
# pixi
# Similar to Pipfile.lock, it is generally recommended to include pixi.lock in version control.
#pixi.lock
# Pixi creates a virtual environment in the .pixi directory, just like venv module creates one
# in the .venv directory. It is recommended not to include this directory in version control.
.pixi
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
@@ -131,6 +136,7 @@ celerybeat.pid
# Environments
.env
.envrc
.venv
env/
venv/
@@ -167,13 +173,39 @@ cython_debug/
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
.idea/
app/anixstream.ini
app/settings.json
app/user_data.json
app/View/SearchScreen/.search_screen.py.un~
app/View/SearchScreen/search_screen.py~
app/user_data.json
.buildozer
result
#.idea/
# Abstra
# Abstra is an AI-powered process automation framework.
# Ignore directories containing user credentials, local state, and settings.
# Learn more at https://abstra.io/docs
.abstra/
# Visual Studio Code
# Visual Studio Code specific template is maintained in a separate VisualStudioCode.gitignore
# that can be found at https://github.com/github/gitignore/blob/main/Global/VisualStudioCode.gitignore
# and can be added to the global gitignore or merged into this file. However, if you prefer,
# you could uncomment the following to ignore the entire vscode folder
# .vscode/
# Ruff stuff:
.ruff_cache/
# PyPI configuration file
.pypirc
# Cursor
# Cursor is an AI-powered code editor. `.cursorignore` specifies files/directories to
# exclude from AI features like autocomplete and code analysis. Recommended for sensitive data
# refer to https://docs.cursor.com/context/ignore-files
.cursorignore
.cursorindexingignore
# Marimo
marimo/_static/
marimo/_lsp/
__marimo__/
# custom
repomix-output.xml
.project/

View File

@@ -21,7 +21,6 @@ if TYPE_CHECKING:
class Options(TypedDict):
no_config: bool | None
trace: bool | None
log_to_file: bool | None
dev: bool | None
log: bool | None
rich_traceback: bool | None
@@ -53,7 +52,6 @@ commands = {
)
@click.option("--dev", is_flag=True, help="Controls Whether the app is in dev mode")
@click.option("--log", is_flag=True, help="Controls Whether to log")
@click.option("--log-to-file", is_flag=True, help="Controls Whether to log to a file")
@click.option(
"--rich-traceback",
is_flag=True,

View File

@@ -1,6 +1,7 @@
from typing import TYPE_CHECKING
import click
from fastanime.cli.service.player.service import PlayerService
from ...core.config import AppConfig
from ...core.exceptions import FastAnimeError
@@ -130,10 +131,9 @@ def stream_anime(
from rich.progress import Progress
from ...libs.player.params import PlayerParams
from ...libs.player.player import create_player
from ...libs.provider.anime.params import EpisodeStreamsParams
player = create_player(config)
player_service = PlayerService(config, provider)
with Progress() as progress:
progress.add_task("Fetching Episode Streams...", total=None)
@@ -163,7 +163,7 @@ def stream_anime(
progress.add_task("Fetching servers", total=None)
servers = {server.name: server for server in streams}
servers_names = list(servers.keys())
if config.stream.server in servers_names:
if config.stream.server.value in servers_names:
server = servers[config.stream.server.value]
else:
server_name = selector.choose("Select Server", servers_names)
@@ -177,35 +177,14 @@ def stream_anime(
)
print(f"[green bold]Now Streaming:[/] {anime.title} Episode: {episode}")
# Check if IPC player should be used
if config.mpv.use_ipc:
# Get available episodes for current translation type
available_episodes = getattr(anime.episodes, config.stream.translation_type, [])
# Use IPC player with episode navigation capabilities
player.play(
PlayerParams(
url=stream_link,
title=f"{anime.title}; Episode {episode}",
subtitles=[sub.url for sub in server.subtitles],
headers=server.headers,
# IPC-specific parameters for episode navigation
anime_provider=provider,
current_anime=anime,
available_episodes=available_episodes,
current_episode=episode,
current_anime_id=anime.id,
current_anime_title=anime.title,
current_translation_type=config.stream.translation_type,
)
)
else:
# Use regular player
player.play(
PlayerParams(
url=stream_link,
title=f"{anime.title}; Episode {episode}",
subtitles=[sub.url for sub in server.subtitles],
headers=server.headers,
)
)
player_service.play(
PlayerParams(
url=stream_link,
title=f"{anime.title}; Episode {episode}",
query=anime_title,
episode=episode,
subtitles=[sub.url for sub in server.subtitles],
headers=server.headers,
),
anime,
)

View File

@@ -35,7 +35,7 @@ def episodes(ctx: Context, state: State) -> State | InternalDirective:
chosen_episode, start_time = ctx.watch_history.get_episode(media_item)
if not chosen_episode or ctx.switch.show_episodes_menu:
choices = [*sorted(available_episodes, key=float), "Back"]
choices = [*available_episodes, "Back"]
preview_command = None
if ctx.config.general.preview != "none":

View File

@@ -58,7 +58,7 @@ def provider_search(ctx: Context, state: State) -> State | InternalDirective:
media_title.lower(),
),
)
feedback.info("Auto-selecting best match: {best_match_title}")
feedback.info(f"Auto-selecting best match: {best_match_title}")
selected_provider_anime = provider_results_map[best_match_title]
else:
choices = list(provider_results_map.keys())

View File

@@ -52,8 +52,8 @@ def servers(ctx: Context, state: State) -> State | InternalDirective:
server_map: Dict[str, Server] = {s.name: s for s in all_servers}
selected_server: Server | None = None
preferred_server = config.stream.server.value.lower()
if preferred_server == "top":
preferred_server = config.stream.server.value
if preferred_server == "TOP":
selected_server = all_servers[0]
feedback.info(f"Auto-selecting top server: {selected_server.name}")
elif preferred_server in server_map:
@@ -76,54 +76,30 @@ def servers(ctx: Context, state: State) -> State | InternalDirective:
final_title = (
media_item.streaming_episodes[episode_number].title
if media_item.streaming_episodes.get(episode_number)
else f"{media_item.title.english} - Ep {episode_number}"
else f"{media_item.title.english}; Episode {episode_number}"
)
feedback.info(f"[bold green]Launching player for:[/] {final_title}")
# TODO: Refine implementation mpv ipc player
# Check if IPC player should be used and if we have the required data
if (
config.mpv.use_ipc
and state.provider.anime
and provider_anime
and episode_number
):
# Get available episodes for current translation type
available_episodes = getattr(
provider_anime.episodes, config.stream.translation_type, []
)
# Create player params with IPC dependencies for episode navigation
player_result = ctx.player.play(
PlayerParams(
url=stream_link_obj.link,
title=final_title,
subtitles=[sub.url for sub in selected_server.subtitles],
headers=selected_server.headers,
start_time=state.provider.start_time,
# IPC-specific parameters for episode navigation
anime_provider=provider,
current_anime=provider_anime,
available_episodes=available_episodes,
current_episode=episode_number,
current_anime_id=provider_anime.id,
current_anime_title=provider_anime.title,
current_translation_type=config.stream.translation_type,
)
)
else:
# Use regular player without IPC features
player_result = ctx.player.play(
PlayerParams(
url=stream_link_obj.link,
title=final_title,
subtitles=[sub.url for sub in selected_server.subtitles],
headers=selected_server.headers,
start_time=state.provider.start_time,
)
)
if not state.media_api.media_item or not state.provider.anime:
return InternalDirective.BACKX3
player_result = ctx.player.play(
PlayerParams(
url=stream_link_obj.link,
title=final_title,
query=(
state.media_api.media_item.title.romaji
or state.media_api.media_item.title.english
),
episode=episode_number,
subtitles=[sub.url for sub in selected_server.subtitles],
headers=selected_server.headers,
start_time=state.provider.start_time,
),
state.provider.anime,
state.media_api.media_item,
)
if media_item and episode_number:
ctx.watch_history.track(media_item, episode_number, player_result)
ctx.watch_history.track(media_item, player_result)
return State(
menu_name=MenuName.PLAYER_CONTROLS,

View File

@@ -12,11 +12,11 @@ from .state import InternalDirective, MenuName, State
if TYPE_CHECKING:
from ...libs.media_api.base import BaseApiClient
from ...libs.player.base import BasePlayer
from ...libs.provider.anime.base import BaseAnimeProvider
from ...libs.selectors.base import BaseSelector
from ..service.auth import AuthService
from ..service.feedback import FeedbackService
from ..service.player import PlayerService
from ..service.registry import MediaRegistryService
from ..service.session import SessionsService
from ..service.watch_history import WatchHistoryService
@@ -72,7 +72,6 @@ class Context:
switch: Switch = field(default_factory=Switch)
_provider: Optional["BaseAnimeProvider"] = None
_selector: Optional["BaseSelector"] = None
_player: Optional["BasePlayer"] = None
_media_api: Optional["BaseApiClient"] = None
_feedback: Optional["FeedbackService"] = None
@@ -80,6 +79,7 @@ class Context:
_watch_history: Optional["WatchHistoryService"] = None
_session: Optional["SessionsService"] = None
_auth: Optional["AuthService"] = None
_player: Optional["PlayerService"] = None
@property
def provider(self) -> "BaseAnimeProvider":
@@ -118,11 +118,11 @@ class Context:
return self._media_api
@property
def player(self) -> "BasePlayer":
def player(self) -> "PlayerService":
if not self._player:
from ...libs.player.player import create_player
from ..service.player import PlayerService
self._player = create_player(self.config)
self._player = PlayerService(self.config, self.provider)
return self._player
@property

View File

@@ -0,0 +1,3 @@
from .service import PlayerService
__all__ = ["PlayerService"]

View File

@@ -0,0 +1,33 @@
from abc import ABC, abstractmethod
from typing import Optional
from .....core.config import StreamConfig
from .....libs.media_api.types import MediaItem
from .....libs.player.base import BasePlayer
from .....libs.player.params import PlayerParams
from .....libs.player.types import PlayerResult
from .....libs.provider.anime.base import BaseAnimeProvider
from .....libs.provider.anime.types import Anime
class BaseIPCPlayer(ABC):
"""
Abstract Base Class defining the contract for all media players with ipc control.
"""
def __init__(self, stream_config: StreamConfig):
self.stream_config = stream_config
@abstractmethod
def play(
self,
player: BasePlayer,
player_params: PlayerParams,
provider: BaseAnimeProvider,
anime: Anime,
media_item: Optional[MediaItem] = None,
) -> PlayerResult:
"""
Plays the given media URL.
"""
pass

View File

@@ -0,0 +1,639 @@
"""
IPC-based MPV Player implementation for FastAnime.
This provides advanced features like episode navigation, quality switching, and auto-next.
"""
import json
import logging
import socket
import subprocess
import tempfile
import threading
import time
from dataclasses import dataclass, field
from pathlib import Path
from queue import Empty, Queue
from typing import Any, Callable, Dict, List, Literal, Optional
from .....core.config.model import StreamConfig
from .....core.exceptions import FastAnimeError
from .....core.utils import formatter
from .....libs.media_api.types import MediaItem
from .....libs.player.base import BasePlayer
from .....libs.player.params import PlayerParams
from .....libs.player.types import PlayerResult
from .....libs.provider.anime.base import BaseAnimeProvider
from .....libs.provider.anime.params import EpisodeStreamsParams
from .....libs.provider.anime.types import Anime, ProviderServer, Server
from .base import BaseIPCPlayer
logger = logging.getLogger(__name__)
class MPVIPCError(FastAnimeError):
"""Exception raised for MPV IPC communication errors."""
pass
class MPVIPCClient:
"""Client for communicating with MPV via IPC socket with a dedicated reader thread."""
def __init__(self, socket_path: str):
self.socket_path = socket_path
self.socket: Optional[socket.socket] = None
self._request_id_counter = 0
self._lock = threading.Lock()
self._reader_thread: Optional[threading.Thread] = None
self._stop_event = threading.Event()
self._message_buffer = b""
self._event_queue: Queue = Queue()
self._response_dict: Dict[int, Any] = {}
self._response_events: Dict[int, threading.Event] = {}
def connect(self, timeout: float = 5.0) -> None:
"""Connect to MPV IPC socket and start the reader thread."""
start_time = time.time()
while time.time() - start_time < timeout:
try:
self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.socket.connect(self.socket_path)
logger.info(f"Connected to MPV IPC socket at {self.socket_path}")
self._start_reader_thread()
return
except (ConnectionRefusedError, FileNotFoundError, OSError):
time.sleep(0.2)
raise MPVIPCError(f"Failed to connect to MPV IPC socket at {self.socket_path}")
def disconnect(self) -> None:
"""Disconnect from MPV IPC socket and stop the reader thread."""
self._stop_event.set()
if self._reader_thread and self._reader_thread.is_alive():
self._reader_thread.join(timeout=2.0)
if self.socket:
try:
self.socket.close()
except Exception:
pass
self.socket = None
def _start_reader_thread(self):
"""Starts the background thread to read messages from the socket."""
self._stop_event.clear()
self._reader_thread = threading.Thread(target=self._read_loop, daemon=True)
self._reader_thread.start()
def _read_loop(self):
"""Continuously reads data from the socket and processes messages."""
while not self._stop_event.is_set():
try:
if not self.socket:
break
# A blocking recv is efficient as the thread will sleep until data is available.
data = self.socket.recv(4096)
if not data:
logger.info("MPV IPC socket closed.")
# Put a special event to signal the main loop that MPV has shut down.
self._event_queue.put({"event": "shutdown"})
break
self._message_buffer += data
self._process_buffer()
except (socket.timeout, BlockingIOError):
continue
except Exception as e:
if not self._stop_event.is_set():
logger.error(f"Error in IPC read loop: {e}")
break
def _process_buffer(self):
"""Processes the internal buffer to extract full JSON messages."""
while b"\n" in self._message_buffer:
message_data, self._message_buffer = self._message_buffer.split(b"\n", 1)
if not message_data:
continue
try:
message = json.loads(message_data.decode("utf-8"))
# Responses have a 'request_id' and 'error' field, events do not.
if "request_id" in message and "error" in message:
req_id = message["request_id"]
with self._lock:
self._response_dict[req_id] = message
if req_id in self._response_events:
self._response_events[req_id].set()
else: # It's an event
self._event_queue.put(message)
except (json.JSONDecodeError, UnicodeDecodeError) as e:
logger.warning(
f"Failed to decode MPV message: {message_data[:100]}... Error: {e}"
)
def get_event(self, block: bool = True, timeout: Optional[float] = None) -> Any:
"""Retrieves an event from the event queue."""
try:
return self._event_queue.get(block=block, timeout=timeout)
except Empty:
return None
def send_command(self, command: List[Any], timeout: float = 5.0) -> Dict[str, Any]:
"""Send a command and wait for a specific response."""
if not self.socket:
raise MPVIPCError("Not connected to MPV")
with self._lock:
self._request_id_counter += 1
request_id = self._request_id_counter
request = {"command": command, "request_id": request_id}
response_event = threading.Event()
self._response_events[request_id] = response_event
try:
message = json.dumps(request) + "\n"
self.socket.sendall(message.encode("utf-8"))
if response_event.wait(timeout=timeout):
with self._lock:
return self._response_dict.pop(request_id, {})
else:
raise MPVIPCError(f"Timeout waiting for response to command: {command}")
finally:
with self._lock:
self._response_events.pop(request_id, None)
@dataclass
class PlayerState:
"""Represents the dynamic state of the media player."""
stream_config: StreamConfig
query: str
episode: str
servers: Dict[ProviderServer, Server] = field(default_factory=dict)
server_name: Optional[ProviderServer] = None
media_item: Optional[MediaItem] = None
stop_time_secs: float = 0
total_time_secs: float = 0
@property
def episode_title(self) -> str:
if self.media_item:
if (
self.media_item.streaming_episodes
and self.episode in self.media_item.streaming_episodes
):
return (
self.media_item.streaming_episodes[self.episode].title
or f"Episode {self.episode}"
)
return f"{self.media_item.title.english or self.media_item.title.romaji} - Episode {self.episode}"
if server := self.server:
return server.episode_title or f"Episode {self.episode}"
return f"Episode {self.episode}"
@property
def server(self) -> Optional[Server]:
if not self.servers:
logger.warning("Attempt to access server when servers are unavailable.")
return None
server_name = self.stream_config.server
if server_name not in self.servers:
if self.server_name and self.server_name in self.servers:
server_name = self.server_name
else:
server_name = list(self.servers.keys())[0]
self.server_name = server_name
return self.servers.get(server_name)
@property
def stream_url(self) -> Optional[str]:
if server := self.server:
# Simple quality selection for now
return server.links[0].link
return None
@property
def stream_subtitles(self) -> List[str]:
return [sub.url for sub in self.server.subtitles] if self.server else []
@property
def stream_headers(self) -> Dict[str, str]:
return self.server.headers if self.server else {}
@property
def stop_time(self) -> Optional[str]:
return (
formatter.format_time(self.stop_time_secs)
if self.stop_time_secs > 0
else None
)
@property
def total_time(self) -> Optional[str]:
return (
formatter.format_time(self.total_time_secs)
if self.total_time_secs > 0
else None
)
def reset(self):
self.stop_time_secs = 0
self.total_time_secs = 0
class MpvIPCPlayer(BaseIPCPlayer):
"""MPV Player implementation using IPC for advanced features."""
stream_config: StreamConfig
mpv_process: subprocess.Popen
ipc_client: MPVIPCClient
player_state: PlayerState
player_fetching: bool = False
player_first_run: bool = True
event_handlers: Dict[str, List[Callable]] = {}
property_observers: Dict[str, List[Callable]] = {}
key_bindings: Dict[str, Callable] = {}
message_handlers: Dict[str, Callable] = {}
provider: BaseAnimeProvider
anime: Anime
media_item: Optional[MediaItem]
def __init__(self, stream_config: StreamConfig):
super().__init__(stream_config)
self.socket_path: Optional[str] = None
self._fetch_thread: Optional[threading.Thread] = None
self._fetch_result_queue: Queue = Queue()
def play(
self, player, player_params, provider, anime, media_item=None
) -> PlayerResult:
self.provider = provider
self.anime = anime
self.media_item = media_item
self.player_state = PlayerState(
self.stream_config,
player_params.query,
player_params.episode,
media_item=media_item,
)
return self._play_with_ipc(player, player_params)
def _play_with_ipc(self, player: BasePlayer, params: PlayerParams) -> PlayerResult:
"""Play media using MPV IPC."""
try:
self._start_mpv_process(player, params)
self._connect_ipc()
self._setup_event_handling()
self._setup_key_bindings()
self._setup_message_handlers()
self._wait_for_playback()
return PlayerResult(
episode=self.player_state.episode,
stop_time=self.player_state.stop_time,
total_time=self.player_state.total_time,
)
except MPVIPCError as e:
logger.warning(
f"IPC connection failed: {e}. Falling back to non-IPC playback."
)
if (
input("Failed to play with IPC. Continue without it? (Y/n): ").lower()
!= "n"
):
return player.play(params)
else:
return PlayerResult(
episode=params.episode, stop_time=None, total_time=None
)
finally:
self._cleanup()
def _start_mpv_process(self, player: BasePlayer, params: PlayerParams) -> None:
"""Start MPV process with IPC enabled."""
temp_dir = Path(tempfile.gettempdir())
self.socket_path = str(temp_dir / f"mpv_ipc_{time.time()}.sock")
self.mpv_process = player.play_with_ipc(params, self.socket_path)
time.sleep(1.0)
def _connect_ipc(self):
if not self.socket_path:
raise MPVIPCError("Socket path not set")
self.ipc_client = MPVIPCClient(self.socket_path)
self.ipc_client.connect()
def _setup_event_handling(self):
if not self.ipc_client:
return
self.ipc_client.send_command(["request_log_messages", "info"])
self.ipc_client.send_command(["observe_property", 1, "time-pos"])
self.ipc_client.send_command(["observe_property", 2, "duration"])
self.ipc_client.send_command(["observe_property", 3, "percent-pos"])
self.ipc_client.send_command(["observe_property", 4, "filename"])
def _bind_key(self, key, command, description):
if not self.ipc_client:
return
try:
response = self.ipc_client.send_command(["keybind", key, command])
if response.get("error") != "success":
logger.warning(f"Failed to bind key {key}: {response.get('error')}")
self._show_text(f"Error binding '{description}' key", duration=3000)
except Exception as e:
logger.error(f"Exception binding key {key}: {e}")
def _setup_key_bindings(self):
key_bindings = {
"shift+n": ("script-message fastanime-next-episode", "Next Episode"),
"shift+p": (
"script-message fastanime-previous-episode",
"Previous Episode",
),
"shift+a": (
"script-message fastanime-toggle-auto-next",
"Toggle Auto-Next",
),
"shift+t": (
"script-message fastanime-toggle-translation",
"Toggle Translation",
),
"shift+r": ("script-message fastanime-reload-episode", "Reload Episode"),
}
for key, (command, description) in key_bindings.items():
self._bind_key(key, command, description)
self._show_text(
"FastAnime IPC: Shift+N=Next, Shift+P=Prev, Shift+R=Reload", 3000
)
def _setup_message_handlers(self):
self.message_handlers.update(
{
"fastanime-next-episode": self._next_episode,
"fastanime-previous-episode": self._previous_episode,
"fastanime-reload-episode": self._reload_episode,
"fastanime-toggle-auto-next": self._toggle_auto_next,
"fastanime-toggle-translation": self._toggle_translation_type,
"select-episode": self._handle_select_episode,
"select-server": self._handle_select_server,
"select-quality": self._handle_select_quality,
}
)
def _wait_for_playback(self):
"""A non-blocking loop that checks for MPV process exit and processes events."""
if not self.ipc_client:
return
should_stop = False
try:
while not should_stop:
if self.mpv_process and self.mpv_process.poll() is not None:
logger.info("MPV process has exited.")
break
while True:
message = self.ipc_client.get_event(block=False)
if message is None:
break
if message.get("event") == "shutdown":
should_stop = True
break
self._handle_mpv_message(message)
try:
fetch_result = self._fetch_result_queue.get(block=False)
self._handle_fetch_result(fetch_result)
except Empty:
pass
if should_stop:
break
time.sleep(0.05)
except KeyboardInterrupt:
logger.info("Playback interrupted by user")
def _handle_mpv_message(self, message: Dict[str, Any]):
event = message.get("event")
if event == "property-change":
self._handle_property_change(message)
elif event == "client-message":
self._handle_client_message(message)
elif event == "file-loaded":
self._configure_player()
elif event:
logger.debug(f"MPV event: {event}")
def _handle_property_change(self, message: Dict[str, Any]):
name = message.get("name")
data = message.get("data")
if name == "time-pos" and isinstance(data, (int, float)):
self.player_state.stop_time_secs = data
elif name == "duration" and isinstance(data, (int, float)):
self.player_state.total_time_secs = data
elif name == "percent-pos" and isinstance(data, (int, float)):
if (
self.stream_config.auto_next
and data >= self.stream_config.episode_complete_at
and not self.player_fetching
):
self._auto_next_episode()
def _handle_client_message(self, message: Dict[str, Any]):
args = message.get("args", [])
if args:
handler_name = args[0]
handler_args = args[1:]
handler = self.message_handlers.get(handler_name)
if handler:
try:
handler(*handler_args)
except Exception as e:
logger.error(f"Error in message handler for '{handler_name}': {e}")
def _cleanup(self):
if self.ipc_client:
self.ipc_client.disconnect()
if self.mpv_process:
try:
self.mpv_process.terminate()
self.mpv_process.wait(timeout=3)
except subprocess.TimeoutExpired:
self.mpv_process.kill()
if self.socket_path and Path(self.socket_path).exists():
Path(self.socket_path).unlink(missing_ok=True)
def _get_episode(
self,
episode_type: Literal["next", "previous", "reload", "custom"],
ep_no: Optional[str] = None,
):
if self.player_fetching:
self._show_text("Player is busy. Please wait.")
return
self.player_fetching = True
self._show_text(f"Fetching {episode_type} episode...")
self._fetch_thread = threading.Thread(
target=self._fetch_episode_task, args=(episode_type, ep_no), daemon=True
)
self._fetch_thread.start()
def _fetch_episode_task(
self,
episode_type: Literal["next", "previous", "reload", "custom"],
ep_no: Optional[str] = None,
):
"""This function runs in a background thread to fetch episode streams."""
try:
available_episodes = getattr(
self.anime.episodes, self.stream_config.translation_type
)
if not available_episodes:
raise ValueError(
f"No {self.stream_config.translation_type} episodes available."
)
current_index = available_episodes.index(self.player_state.episode)
if episode_type == "next":
if current_index >= len(available_episodes) - 1:
raise ValueError("Already at the last episode.")
target_episode = available_episodes[current_index + 1]
elif episode_type == "previous":
if current_index <= 0:
raise ValueError("Already at first episode")
target_episode = available_episodes[current_index - 1]
elif episode_type == "reload":
target_episode = self.player_state.episode
elif episode_type == "custom":
if not ep_no or ep_no not in available_episodes:
raise ValueError(
f"Invalid episode. Available: {', '.join(available_episodes)}"
)
target_episode = ep_no
else:
return
stream_params = EpisodeStreamsParams(
anime_id=self.anime.id,
query=self.player_state.query,
episode=target_episode,
translation_type=self.stream_config.translation_type,
)
# This is the blocking network call, now safely in a thread
episode_streams = list(self.provider.episode_streams(stream_params) or [])
if not episode_streams:
raise ValueError(f"No streams found for episode {target_episode}")
result = {
"type": "success",
"target_episode": target_episode,
"servers": {ProviderServer(s.name): s for s in episode_streams},
}
self._fetch_result_queue.put(result)
except Exception as e:
logger.error(f"Episode fetch task failed: {e}")
self._fetch_result_queue.put({"type": "error", "message": str(e)})
def _handle_fetch_result(self, result: Dict[str, Any]):
"""Handles the result from the background fetch thread in the main thread."""
self.player_fetching = False
if result["type"] == "success":
self.player_state.episode = result["target_episode"]
self.player_state.servers = result["servers"]
self.player_state.reset()
self._show_text(f"Fetched {self.player_state.episode_title}")
self._load_current_stream()
else:
self._show_text(f"Error: {result['message']}")
def _next_episode(self):
self._get_episode("next")
def _previous_episode(self):
self._get_episode("previous")
def _reload_episode(self):
self._get_episode("reload")
def _auto_next_episode(self):
if self.stream_config.auto_next:
self._next_episode()
def _load_current_stream(self):
if self.ipc_client and self.player_state and self.player_state.stream_url:
self.ipc_client.send_command(["loadfile", self.player_state.stream_url])
def _show_text(self, text: str, duration: int = 2000):
if self.ipc_client:
self.ipc_client.send_command(["show-text", text, str(duration)])
def _configure_player(self):
if not self.ipc_client or self.player_first_run:
self.player_first_run = False
return
self.ipc_client.send_command(["seek", 0, "absolute"])
self.ipc_client.send_command(
["set_property", "title", self.player_state.episode_title]
)
self._add_episode_subtitles()
def _add_episode_subtitles(self):
if not self.ipc_client or not self.player_state.stream_subtitles:
return
time.sleep(0.5)
for i, sub_url in enumerate(self.player_state.stream_subtitles):
flag = "select" if i == 0 else "auto"
self.ipc_client.send_command(["sub-add", sub_url, flag])
def _toggle_auto_next(self):
self.stream_config.auto_next = not self.stream_config.auto_next
self._show_text(
f"Auto-next {'enabled' if self.stream_config.auto_next else 'disabled'}"
)
def _toggle_translation_type(self):
new_type = "sub" if self.stream_config.translation_type == "dub" else "dub"
self._show_text(f"Switching to {new_type}...")
self.stream_config.translation_type = new_type
self._reload_episode()
def _handle_select_episode(self, episode: Optional[str] = None):
if episode:
self._get_episode("custom", episode)
def _handle_select_server(self, server: Optional[str] = None):
if not server or not self.player_state:
return
try:
provider_server = ProviderServer(server)
if provider_server in self.player_state.servers:
self.player_state.server_name = provider_server
self._reload_episode()
else:
self._show_text(f"Server '{server}' not available.")
except ValueError:
available_servers = ", ".join(
[s.value for s in self.player_state.servers.keys()]
)
self._show_text(
f"Invalid server name: {server}. Available: {available_servers}"
)
def _handle_select_quality(self, quality: Optional[str] = None):
self._show_text("Quality switching is not yet implemented.")

View File

@@ -0,0 +1,42 @@
from typing import Optional
from ....core.config import AppConfig
from ....core.exceptions import FastAnimeError
from ....libs.media_api.types import MediaItem
from ....libs.player.base import BasePlayer
from ....libs.player.params import PlayerParams
from ....libs.player.player import create_player
from ....libs.player.types import PlayerResult
from ....libs.provider.anime.base import BaseAnimeProvider
from ....libs.provider.anime.types import Anime
class PlayerService:
app_config: AppConfig
provider: BaseAnimeProvider
player: BasePlayer
def __init__(self, app_config: AppConfig, provider: BaseAnimeProvider):
self.app_config = app_config
self.provider = provider
self.player = create_player(app_config)
def play(
self, params: PlayerParams, anime: Anime, media_item: Optional[MediaItem] = None
) -> PlayerResult:
if self.app_config.stream.use_ipc:
return self._play_with_ipc(params, anime, media_item)
else:
return self.player.play(params)
def _play_with_ipc(
self, params: PlayerParams, anime: Anime, media_item: Optional[MediaItem] = None
) -> PlayerResult:
if self.app_config.stream.player == "mpv":
from .ipc.mpv import MpvIPCPlayer
return MpvIPCPlayer(self.app_config.stream).play(
self.player, params, self.provider, anime, media_item
)
else:
raise FastAnimeError("Not implemented")

View File

@@ -22,7 +22,10 @@ class WatchHistoryService:
self.media_registry = media_registry
self.media_api = media_api
def track(self, media_item: MediaItem, episode: str, player_result: PlayerResult):
def track(self, media_item: MediaItem, player_result: PlayerResult):
logger.info(
f"Updating watch history for {media_item.title.english} ({media_item.id}) with Episode={player_result.episode}; Stop Time={player_result.stop_time}; Total Duration={player_result.total_time}"
)
status = None
self.media_registry.update_media_index_entry(
media_id=media_item.id,
@@ -30,7 +33,7 @@ class WatchHistoryService:
media_item=media_item,
last_watch_position=player_result.stop_time,
total_duration=player_result.total_time,
progress=episode,
progress=player_result.episode,
status=status,
)
@@ -38,7 +41,7 @@ class WatchHistoryService:
self.media_api.update_list_entry(
UpdateUserMediaListEntryParams(
media_id=media_item.id,
progress=episode,
progress=player_result.episode,
status=status,
)
)

View File

@@ -1,15 +1,18 @@
from ..constants import APP_DATA_DIR, DEFAULTS_DIR, USER_VIDEOS_DIR
from ..constants import APP_DATA_DIR, DEFAULTS_DIR, PLATFORM, USER_VIDEOS_DIR
from ..utils import detect
# GeneralConfig
GENERAL_PYGMENT_STYLE = "github-dark"
GENERAL_API_CLIENT = "anilist"
GENERAL_PREFERRED_TRACKER = "local"
GENERAL_PROVIDER = "allanime"
GENERAL_SELECTOR = "default"
GENERAL_SELECTOR = lambda: "fzf" if detect.has_fzf() else "default"
GENERAL_AUTO_SELECT_ANIME_RESULT = True
GENERAL_ICONS = False
GENERAL_PREVIEW = "none"
GENERAL_IMAGE_RENDERER = "chafa"
GENERAL_ICONS = True
GENERAL_PREVIEW = lambda: "full" if detect.is_running_kitty_terminal() else "none"
GENERAL_IMAGE_RENDERER = (
lambda: "icat" if detect.is_running_kitty_terminal() else "chafa"
)
GENERAL_MANGA_VIEWER = "feh"
GENERAL_CHECK_FOR_UPDATES = True
GENERAL_CACHE_REQUESTS = True
@@ -32,6 +35,9 @@ STREAM_YTDLP_FORMAT = "best[height<=1080]/bestvideo[height<=1080]+bestaudio/best
STREAM_FORCE_FORWARD_TRACKING = True
STREAM_DEFAULT_MEDIA_LIST_TRACKING = "prompt"
STREAM_SUB_LANG = "eng"
STREAM_USE_IPC = (
lambda: True if PLATFORM != "win32" and not detect.is_running_in_termux() else False
)
# ServiceConfig
SERVICE_ENABLED = False
@@ -58,9 +64,6 @@ ROFI_THEME_PREVIEW = _ROFI_THEMES_DIR / "preview.rasi"
# MpvConfig
MPV_ARGS = ""
MPV_PRE_ARGS = ""
MPV_DISABLE_POPEN = True
MPV_USE_PYTHON_MPV = False
MPV_USE_IPC = False
# VlcConfig
VLC_ARGS = ""

View File

@@ -56,6 +56,7 @@ STREAM_DEFAULT_MEDIA_LIST_TRACKING = (
"Default behavior for tracking progress on AniList."
)
STREAM_SUB_LANG = "Preferred language code for subtitles (e.g., 'en', 'es')."
STREAM_USE_IPC = "Use IPC communication with the player for advanced features like episode navigation."
# ServiceConfig
SERVICE_ENABLED = "Whether the background service should be enabled by default."
@@ -87,13 +88,6 @@ ROFI_THEME_INPUT = "Path to the Rofi theme file for user input prompts."
# MpvConfig
MPV_ARGS = "Comma-separated arguments to pass to the MPV player."
MPV_PRE_ARGS = "Comma-separated arguments to prepend before the MPV command."
MPV_DISABLE_POPEN = (
"Disable using subprocess.Popen for MPV, which can be unstable on some systems."
)
MPV_USE_PYTHON_MPV = "Use the python-mpv library for enhanced player control."
MPV_USE_IPC = (
"Use IPC communication with MPV for advanced features like episode navigation."
)
# VlcConfig
VLC_ARGS = "Comma-separated arguments to pass to the Vlc player."

View File

@@ -1,4 +1,3 @@
import os
from pathlib import Path
from typing import Literal
@@ -30,7 +29,7 @@ class GeneralConfig(BaseModel):
description=desc.GENERAL_PROVIDER,
)
selector: Literal["default", "fzf", "rofi"] = Field(
default=defaults.GENERAL_SELECTOR,
default_factory=defaults.GENERAL_SELECTOR,
description=desc.GENERAL_SELECTOR,
)
auto_select_anime_result: bool = Field(
@@ -39,13 +38,11 @@ class GeneralConfig(BaseModel):
)
icons: bool = Field(default=defaults.GENERAL_ICONS, description=desc.GENERAL_ICONS)
preview: Literal["full", "text", "image", "none"] = Field(
default=defaults.GENERAL_PREVIEW,
default_factory=defaults.GENERAL_PREVIEW,
description=desc.GENERAL_PREVIEW,
)
image_renderer: Literal["icat", "chafa", "imgcat"] = Field(
default="icat"
if os.environ.get("KITTY_WINDOW_ID")
else defaults.GENERAL_IMAGE_RENDERER,
default_factory=defaults.GENERAL_IMAGE_RENDERER,
description=desc.GENERAL_IMAGE_RENDERER,
)
manga_viewer: Literal["feh", "icat"] = Field(
@@ -137,6 +134,11 @@ class StreamConfig(BaseModel):
description=desc.STREAM_SUB_LANG,
)
use_ipc: bool = Field(
default_factory=defaults.STREAM_USE_IPC,
description=desc.STREAM_USE_IPC,
)
class ServiceConfig(BaseModel):
"""Configuration for the background download service."""
@@ -265,18 +267,6 @@ class MpvConfig(OtherConfig):
default=defaults.MPV_PRE_ARGS,
description=desc.MPV_PRE_ARGS,
)
disable_popen: bool = Field(
default=defaults.MPV_DISABLE_POPEN,
description=desc.MPV_DISABLE_POPEN,
)
use_python_mpv: bool = Field(
default=defaults.MPV_USE_PYTHON_MPV,
description=desc.MPV_USE_PYTHON_MPV,
)
use_ipc: bool = Field(
default=defaults.MPV_USE_IPC,
description=desc.MPV_USE_IPC,
)
class VlcConfig(OtherConfig):

View File

@@ -4,7 +4,6 @@ from importlib import metadata, resources
from pathlib import Path
PLATFORM = sys.platform
PROJECT_NAME = "FASTANIME"
PROJECT_NAME_LOWER = "fastanime"
APP_NAME = os.environ.get(f"{PROJECT_NAME}_APP_NAME", PROJECT_NAME.lower())

View File

@@ -1,4 +1,5 @@
import os
import shutil
import sys
@@ -16,3 +17,11 @@ def is_running_in_termux():
return True
return False
def is_running_kitty_terminal() -> bool:
return True if os.environ.get("KITTY_WINDOW_ID") else False
def has_fzf() -> bool:
return True if shutil.which("fzf") else False

View File

@@ -1,6 +1,6 @@
import re
from datetime import datetime
from typing import List, Optional, Dict, Union
from typing import Dict, List, Optional, Union
from ...libs.media_api.types import AiringSchedule
@@ -66,6 +66,14 @@ def format_date(dt: Optional[datetime], format_str: str = "%A, %d %B %Y") -> str
return dt.strftime(format_str)
def format_time(duration_in_secs: float) -> str:
"""Format duration in seconds to HH:MM:SS format."""
h = int(duration_in_secs // 3600)
m = int((duration_in_secs % 3600) // 60)
s = int(duration_in_secs % 60)
return f"{h:02d}:{m:02d}:{s:02d}"
def _htmlentity_transform(entity_with_semicolon):
import contextlib
import html.entities

View File

@@ -1,5 +1,7 @@
import subprocess
from abc import ABC, abstractmethod
from ...core.config import StreamConfig
from .params import PlayerParams
from .types import PlayerResult
@@ -9,9 +11,16 @@ class BasePlayer(ABC):
Abstract Base Class defining the contract for all media players.
"""
def __init__(self, config: StreamConfig):
self.stream_config = config
@abstractmethod
def play(self, params: PlayerParams) -> PlayerResult:
"""
Plays the given media URL.
"""
pass
@abstractmethod
def play_with_ipc(self, params: PlayerParams, socket_path: str) -> subprocess.Popen:
"""Stream using IPC player for enhanced features."""

View File

@@ -1,910 +0,0 @@
"""
IPC-based MPV Player implementation for FastAnime.
This provides advanced features like episode navigation, quality switching, and auto-next.
Usage:
To enable IPC player, set `use_ipc = true` in the MPV config section.
Key bindings:
- Shift+N: Next episode
- Shift+P: Previous episode
- Shift+R: Reload current episode
- Shift+T: Toggle translation type (sub/dub)
- Shift+A: Toggle auto-next (placeholder)
Script messages (can be sent via MPV console with 'script-message'):
- select-episode <episode_number>: Jump to specific episode
- select-server <server_name>: Switch server for current episode
- select-quality <quality>: Switch quality (360, 480, 720, 1080)
Requirements:
- MPV executable in PATH
- Unix domain socket support (Linux/macOS)
"""
import json
import logging
import random
import socket
import subprocess
import tempfile
import time
from pathlib import Path
from typing import Any, Callable, Dict, List, Literal, Optional, Union
from ....core.config import MpvConfig
from ....core.exceptions import FastAnimeError
from ....core.utils import detect
from ....libs.provider.anime.base import BaseAnimeProvider
from ....libs.provider.anime.params import EpisodeStreamsParams
from ....libs.provider.anime.types import Server
from ..base import BasePlayer
from ..params import PlayerParams
from ..types import PlayerResult
logger = logging.getLogger(__name__)
def format_time(duration_in_secs: float) -> str:
"""Format duration in seconds to HH:MM:SS format."""
h = int(duration_in_secs // 3600)
m = int((duration_in_secs % 3600) // 60)
s = int(duration_in_secs % 60)
return f"{h:02d}:{m:02d}:{s:02d}"
class MPVIPCError(FastAnimeError):
"""Exception raised for MPV IPC communication errors."""
pass
class MPVIPCClient:
"""Client for communicating with MPV via IPC socket."""
def __init__(self, socket_path: str):
self.socket_path = socket_path
self.socket: Optional[socket.socket] = None
self._request_id = 0
def connect(self, timeout: float = 5.0) -> None:
"""Connect to MPV IPC socket."""
start_time = time.time()
last_exception = None
while time.time() - start_time < timeout:
try:
self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.socket.settimeout(2.0) # Set socket timeout
self.socket.connect(self.socket_path)
logger.info(f"Connected to MPV IPC socket at {self.socket_path}")
return
except (ConnectionRefusedError, FileNotFoundError, OSError) as e:
last_exception = e
if self.socket:
try:
self.socket.close()
except:
pass
self.socket = None
time.sleep(0.2) # Wait a bit longer between attempts
continue
error_msg = f"Failed to connect to MPV IPC socket at {self.socket_path}"
if last_exception:
error_msg += f": {last_exception}"
raise MPVIPCError(error_msg)
def disconnect(self) -> None:
"""Disconnect from MPV IPC socket."""
if self.socket:
try:
self.socket.close()
except:
pass
self.socket = None
def send_command(
self, command: List[Union[str, int, float, bool, None]]
) -> Dict[str, Any]:
"""Send a command to MPV and return the response."""
if not self.socket:
raise MPVIPCError("Not connected to MPV")
self._request_id += 1
request = {"command": command, "request_id": self._request_id}
message = json.dumps(request) + "\n"
try:
self.socket.send(message.encode())
# Read response - MPV sends one JSON object per line
response_data = b""
while True:
chunk = self.socket.recv(1024)
if not chunk:
break
response_data += chunk
if b"\n" in response_data:
break
response_text = response_data.decode().strip()
if response_text:
# Handle multiple JSON objects on separate lines
lines = response_text.split("\n")
for line in lines:
line = line.strip()
if line:
try:
response = json.loads(line)
# Return the response that matches our request ID
if response.get("request_id") == self._request_id:
return response
except json.JSONDecodeError:
continue
# If no matching response found, return the first valid JSON
for line in lines:
line = line.strip()
if line:
try:
return json.loads(line)
except json.JSONDecodeError:
continue
return {}
except Exception as e:
raise MPVIPCError(f"Failed to send command: {e}")
def get_property(self, property_name: str) -> Union[str, bool, int, float, None]:
"""Get a property value from MPV."""
response = self.send_command(["get_property", property_name])
if response.get("error") == "success":
return response.get("data")
return None
def set_property(
self, property_name: str, value: Union[str, bool, int, float, None]
) -> bool:
"""Set a property value in MPV."""
response = self.send_command(["set_property", property_name, value])
return response.get("error") == "success"
def observe_property(self, property_name: str, enable: bool = True) -> bool:
"""Observe a property for changes."""
command = "observe_property" if enable else "unobserve_property"
response = self.send_command([command, self._request_id, property_name])
return response.get("error") == "success"
class MpvIPCPlayer(BasePlayer):
"""MPV Player implementation using IPC for advanced features."""
def __init__(self, config: MpvConfig):
self.config = config
self.ipc_client: Optional[MPVIPCClient] = None
self.mpv_process: Optional[subprocess.Popen] = None
self.socket_path: Optional[str] = None
# Player state
self.last_stop_time: str = "0"
self.last_total_time: str = "0"
self.last_stop_time_secs: float = 0
self.last_total_time_secs: float = 0
self.current_media_title: str = ""
self.player_fetching: bool = False
# Runtime state - injected from outside
self.anime_provider: Optional["BaseAnimeProvider"] = None
self.current_anime: Optional[Any] = None
self.available_episodes: List[str] = []
self.current_episode: Optional[str] = None
self.current_anime_id: Optional[str] = None
self.current_anime_title: Optional[str] = None
self.current_translation_type: str = "sub"
self.current_server: Optional["Server"] = None
self.subtitles: List[Dict[str, str]] = []
# Event handlers
self.event_handlers: Dict[str, List[Callable]] = {}
self.property_observers: Dict[str, List[Callable]] = {}
self.key_bindings: Dict[str, Callable] = {}
self.message_handlers: Dict[str, Callable] = {}
def play(self, params: PlayerParams) -> PlayerResult:
"""Play media using MPV with IPC."""
if detect.is_running_in_termux():
raise FastAnimeError("IPC player not supported on termux")
return self._play_with_ipc(params)
def _play_with_ipc(self, params: PlayerParams) -> PlayerResult:
"""Play media using MPV IPC."""
# Set up runtime dependencies from params if provided
if params.anime_provider and params.current_anime:
self.anime_provider = params.anime_provider
self.current_anime = params.current_anime
self.available_episodes = params.available_episodes or []
self.current_episode = params.current_episode or ""
self.current_anime_id = params.current_anime_id or ""
self.current_anime_title = params.current_anime_title or ""
self.current_translation_type = params.current_translation_type or "sub"
try:
self._setup_ipc_socket()
self._start_mpv_process(params)
self._connect_ipc()
self._setup_event_handling()
self._setup_key_bindings()
self._setup_message_handlers()
self._configure_player(params)
# Wait for playback to complete
self._wait_for_playback()
return PlayerResult(
stop_time=self.last_stop_time, total_time=self.last_total_time
)
finally:
self._cleanup()
def _setup_ipc_socket(self) -> None:
"""Create a temporary IPC socket path."""
temp_dir = Path(tempfile.gettempdir())
self.socket_path = str(temp_dir / f"mpv_ipc_{time.time()}.sock")
def _start_mpv_process(self, params: PlayerParams) -> None:
"""Start MPV process with IPC enabled."""
mpv_args = [
"mpv",
f"--input-ipc-server={self.socket_path}",
"--idle=yes",
"--force-window=yes",
params.url,
]
# Add custom MPV arguments
mpv_args.extend(self._create_mpv_cli_options(params))
# Add pre-args if configured
pre_args = self.config.pre_args.split(",") if self.config.pre_args else []
logger.info(f"Starting MPV with IPC socket: {self.socket_path}")
self.mpv_process = subprocess.Popen(
pre_args + mpv_args, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
# Give MPV a moment to start and create the socket
time.sleep(1.0)
def _connect_ipc(self) -> None:
"""Connect to MPV IPC socket."""
if not self.socket_path:
raise MPVIPCError("Socket path not set")
self.ipc_client = MPVIPCClient(self.socket_path)
self.ipc_client.connect()
def _setup_event_handling(self) -> None:
"""Setup event handlers for MPV events."""
if not self.ipc_client:
return
# Request events we care about
try:
self.ipc_client.send_command(["request_log_messages", "info"])
except Exception as e:
logger.warning(f"Failed to request log messages: {e}")
# Observe properties we care about
try:
self.ipc_client.observe_property("time-pos")
self.ipc_client.observe_property("time-remaining")
self.ipc_client.observe_property("duration")
self.ipc_client.observe_property("filename")
except Exception as e:
logger.warning(f"Failed to observe properties: {e}")
def _setup_key_bindings(self) -> None:
"""Setup custom key bindings."""
if not self.ipc_client:
return
# Define key bindings using individual keybind commands
key_bindings = {
"shift+n": "script-message fastanime-next-episode",
"shift+p": "script-message fastanime-previous-episode",
"shift+a": "script-message fastanime-toggle-auto-next",
"shift+t": "script-message fastanime-toggle-translation",
"shift+r": "script-message fastanime-reload-episode",
}
# Register key bindings with MPV using keybind command
for key, command in key_bindings.items():
try:
response = self.ipc_client.send_command(["keybind", key, command])
logger.info(f"Key binding result for {key}: {response}")
except Exception as e:
logger.warning(f"Failed to bind key {key}: {e}")
# Also show a message to indicate keys are ready
try:
self.ipc_client.send_command(
[
"show-text",
"FastAnime IPC: Shift+N=Next, Shift+P=Prev, Shift+R=Reload, Shift+T=Toggle",
"3000",
]
)
except Exception as e:
logger.warning(f"Failed to show key binding message: {e}")
def _setup_message_handlers(self) -> None:
"""Setup script message handlers."""
self.message_handlers.update(
{
"select-episode": self._handle_select_episode,
"select-server": self._handle_select_server,
"select-quality": self._handle_select_quality,
"fastanime-next-episode": lambda: self._next_episode(),
"fastanime-previous-episode": lambda: self._previous_episode(),
"fastanime-toggle-auto-next": lambda: self._toggle_auto_next(),
"fastanime-toggle-translation": lambda: self._toggle_translation_type(),
"fastanime-reload-episode": lambda: self._reload_episode(),
}
)
def _configure_player(self, params: PlayerParams) -> None:
"""Configure MPV player with parameters."""
if not self.ipc_client:
return
# Set title
if params.title:
try:
self.ipc_client.set_property("title", params.title)
self.current_media_title = params.title
except MPVIPCError as e:
logger.warning(f"Failed to set title: {e}")
# Set start time
if params.start_time:
try:
self.ipc_client.set_property("start", params.start_time)
except MPVIPCError as e:
logger.warning(f"Failed to set start time: {e}")
# Add subtitles
if params.subtitles:
for i, subtitle_path in enumerate(params.subtitles):
flag = "select" if i == 0 else "auto"
try:
self.ipc_client.send_command(["sub-add", subtitle_path, flag])
except MPVIPCError as e:
logger.warning(f"Failed to add subtitle {subtitle_path}: {e}")
# Add any episode-specific subtitles
try:
self._add_episode_subtitles()
except Exception as e:
logger.warning(f"Failed to add episode subtitles: {e}")
# Set HTTP headers (only if not empty)
if params.headers:
header_str = ",".join([f"{k}:{v}" for k, v in params.headers.items()])
if header_str.strip(): # Only set if we have actual headers
try:
self.ipc_client.set_property("http-header-fields", header_str)
except MPVIPCError as e:
logger.warning(f"Failed to set HTTP headers: {e}")
def _wait_for_playback(self) -> None:
"""Wait for playback to complete while handling events."""
if not self.ipc_client:
return
try:
while True:
# Check if MPV process is still running
if self.mpv_process and self.mpv_process.poll() is not None:
break
# Handle property changes and events
self._handle_events()
time.sleep(0.1)
except KeyboardInterrupt:
logger.info("Playback interrupted by user")
def _handle_events(self) -> None:
"""Handle MPV events and property changes."""
if not self.ipc_client or not self.ipc_client.socket:
return
try:
# Check for incoming messages (non-blocking)
self.ipc_client.socket.settimeout(0.01)
try:
data = self.ipc_client.socket.recv(4096) # Increased buffer size
if data:
message_text = data.decode().strip()
if message_text:
# Handle multiple JSON objects on separate lines
lines = message_text.split("\n")
for line in lines:
line = line.strip()
if line:
try:
message = json.loads(line)
self._handle_mpv_message(message)
except json.JSONDecodeError as e:
logger.debug(
f"Failed to parse JSON: {line[:100]} - {e}"
)
continue
except socket.timeout:
pass
except Exception as e:
logger.debug(f"Socket read error: {e}")
pass
finally:
self.ipc_client.socket.settimeout(None)
# Periodically update time properties (less frequently to avoid spam)
if random.randint(1, 50) == 1: # Only update occasionally
# Get current time position (with error handling)
try:
time_pos = self.ipc_client.get_property("time-pos")
if isinstance(time_pos, float):
self.last_stop_time = format_time(time_pos)
self.last_stop_time_secs = time_pos
except (MPVIPCError, Exception):
pass
# Get duration (with error handling)
try:
duration = self.ipc_client.get_property("duration")
if isinstance(duration, float):
self.last_total_time = format_time(duration)
self.last_total_time_secs = duration
except (MPVIPCError, Exception):
pass
# Get time remaining for auto-next (with error handling)
try:
time_remaining = self.ipc_client.get_property("time-remaining")
if (
isinstance(time_remaining, float)
and time_remaining < 1
and not self.player_fetching
):
self._auto_next_episode()
except (MPVIPCError, Exception):
pass
except MPVIPCError:
# IPC communication failed, probably because MPV closed
pass
def _handle_mpv_message(self, message: Dict[str, Any]) -> None:
"""Handle incoming messages from MPV."""
logger.debug(f"Received MPV message: {message}")
if message.get("event") == "client-message":
# Handle script messages
args = message.get("args", [])
if args and len(args) > 0:
message_name = args[0]
message_args = args[1:] if len(args) > 1 else []
logger.info(
f"Handling script message: {message_name} with args: {message_args}"
)
handler = self.message_handlers.get(message_name)
if handler:
try:
if message_args:
handler(*message_args)
else:
handler()
except Exception as e:
logger.error(f"Error handling message {message_name}: {e}")
else:
logger.warning(f"No handler found for message: {message_name}")
elif message.get("event") == "file-loaded":
# File loaded event - add subtitles
logger.info("File loaded, adding episode subtitles")
self._add_episode_subtitles()
elif message.get("event") == "property-change":
# Handle property changes
property_name = message.get("name")
if property_name == "time-remaining":
value = message.get("data")
if value is not None and value < 1 and not self.player_fetching:
self._auto_next_episode()
elif message.get("event"):
# Log other events for debugging
logger.debug(f"MPV event: {message.get('event')}")
# Handle responses to our commands
elif message.get("request_id"):
logger.debug(f"Command response: {message}")
def _cleanup(self) -> None:
"""Clean up resources."""
if self.ipc_client:
self.ipc_client.disconnect()
if self.mpv_process:
try:
self.mpv_process.terminate()
self.mpv_process.wait(timeout=5)
except subprocess.TimeoutExpired:
self.mpv_process.kill()
# Remove socket file
if self.socket_path and Path(self.socket_path).exists():
try:
Path(self.socket_path).unlink()
except:
pass
def _create_mpv_cli_options(self, params: PlayerParams) -> List[str]:
"""Create MPV CLI options from parameters."""
mpv_args = []
if params.headers:
header_str = ",".join([f"{k}:{v}" for k, v in params.headers.items()])
mpv_args.append(f"--http-header-fields={header_str}")
if params.subtitles:
for sub in params.subtitles:
mpv_args.append(f"--sub-file={sub}")
if params.start_time:
mpv_args.append(f"--start={params.start_time}")
if params.title:
mpv_args.append(f"--title={params.title}")
if self.config.args:
mpv_args.extend(self.config.args.split(","))
return mpv_args
# Episode navigation methods (similar to original implementation)
def _get_episode(
self,
episode_type: Literal["next", "previous", "reload", "custom"],
ep_no: Optional[str] = None,
server: str = "top",
) -> Optional[str]:
"""Get episode stream URL for navigation."""
if (
not self.anime_provider
or not self.current_anime
or not self.current_episode
):
if self.ipc_client:
self.ipc_client.send_command(
["show-text", "Episode navigation not available"]
)
return None
# Show status message
if self.ipc_client:
self.ipc_client.send_command(
["show-text", f"Fetching {episode_type} episode..."]
)
# Reset timing info for new episode
self.last_stop_time = "0"
self.last_total_time = "0"
self.last_stop_time_secs = 0
self.last_total_time_secs = 0
# Determine target episode
try:
current_index = self.available_episodes.index(self.current_episode)
if episode_type == "next":
if current_index >= len(self.available_episodes) - 1:
if self.ipc_client:
self.ipc_client.send_command(
["show-text", "Already at last episode"]
)
return None
target_episode = self.available_episodes[current_index + 1]
elif episode_type == "previous":
if current_index <= 0:
if self.ipc_client:
self.ipc_client.send_command(
["show-text", "Already at first episode"]
)
return None
target_episode = self.available_episodes[current_index - 1]
elif episode_type == "reload":
target_episode = self.current_episode
elif episode_type == "custom":
if not ep_no or ep_no not in self.available_episodes:
if self.ipc_client:
self.ipc_client.send_command(
[
"show-text",
f"Invalid episode. Available: {', '.join(self.available_episodes)}",
]
)
return None
target_episode = ep_no
except ValueError:
if self.ipc_client:
self.ipc_client.send_command(
["show-text", "Current episode not found in available episodes"]
)
return None
# Get streams for the target episode
try:
# Validate required fields
if not self.current_anime_id:
if self.ipc_client:
self.ipc_client.send_command(["show-text", "Missing anime ID"])
return None
# Cast translation type to proper literal
translation_type: Literal["sub", "dub"] = (
"sub" if self.current_translation_type == "sub" else "dub"
)
stream_params = EpisodeStreamsParams(
anime_id=self.current_anime_id,
query=self.current_anime_title or "",
episode=target_episode,
translation_type=translation_type,
)
episode_streams = self.anime_provider.episode_streams(stream_params)
if not episode_streams:
if self.ipc_client:
self.ipc_client.send_command(
["show-text", "No streams found for episode"]
)
return None
# Select server (top or specific)
if server == "top":
selected_server = next(episode_streams, None)
else:
# Find specific server
selected_server = None
for stream_server in episode_streams:
if stream_server.name.lower() == server.lower():
selected_server = stream_server
break
if not selected_server:
if self.ipc_client:
self.ipc_client.send_command(
["show-text", f"Server '{server}' not found"]
)
return None
if not selected_server:
if self.ipc_client:
self.ipc_client.send_command(["show-text", "No server available"])
return None
# Get stream link - prefer highest quality
if not selected_server.links:
if self.ipc_client:
self.ipc_client.send_command(
["show-text", "No stream links available"]
)
return None
# Sort by quality and get the best one
sorted_links = sorted(
selected_server.links, key=lambda x: int(x.quality), reverse=True
)
stream_link = sorted_links[0].link
# Update current state
self.current_episode = target_episode
self.current_server = selected_server
self.current_media_title = (
selected_server.episode_title or f"Episode {target_episode}"
)
self.subtitles = [
{"url": sub.url, "language": sub.language or "unknown"}
for sub in selected_server.subtitles
]
return stream_link
except Exception as e:
logger.error(f"Error fetching episode {target_episode}: {e}")
if self.ipc_client:
self.ipc_client.send_command(
["show-text", f"Error fetching episode: {str(e)}"]
)
return None
def _next_episode(self) -> None:
"""Navigate to next episode."""
url = self._get_episode("next")
if url and self.ipc_client:
self.ipc_client.send_command(["loadfile", url])
self.ipc_client.set_property("title", self.current_media_title)
# Add subtitles after a short delay to ensure file is loaded
time.sleep(0.5)
self._add_episode_subtitles()
def _previous_episode(self) -> None:
"""Navigate to previous episode."""
url = self._get_episode("previous")
if url and self.ipc_client:
self.ipc_client.send_command(["loadfile", url])
self.ipc_client.set_property("title", self.current_media_title)
# Add subtitles after a short delay to ensure file is loaded
time.sleep(0.5)
self._add_episode_subtitles()
def _reload_episode(self) -> None:
"""Reload current episode."""
url = self._get_episode("reload")
if url and self.ipc_client:
self.ipc_client.send_command(["loadfile", url])
self.ipc_client.set_property("title", self.current_media_title)
# Add subtitles after a short delay to ensure file is loaded
time.sleep(0.5)
self._add_episode_subtitles()
def _toggle_auto_next(self) -> None:
"""Toggle auto-next feature."""
# This would be controlled by config, but for now just show message
if self.ipc_client:
self.ipc_client.send_command(
["show-text", "Auto-next feature toggle not implemented"]
)
def _toggle_translation_type(self) -> None:
"""Toggle between sub and dub."""
if not self.anime_provider:
return
new_type = "sub" if self.current_translation_type == "dub" else "dub"
if self.ipc_client:
self.ipc_client.send_command(["show-text", f"Switching to {new_type}..."])
# Try to reload current episode with new translation type
old_type = self.current_translation_type
self.current_translation_type = new_type
url = self._get_episode("reload")
if url and self.ipc_client:
self.ipc_client.send_command(["loadfile", url])
self.ipc_client.set_property("title", self.current_media_title)
self.ipc_client.send_command(["show-text", f"Switched to {new_type}"])
# Add subtitles after a short delay to ensure file is loaded
time.sleep(0.5)
self._add_episode_subtitles()
else:
# Revert if failed
self.current_translation_type = old_type
if self.ipc_client:
self.ipc_client.send_command(
["show-text", f"Failed to switch to {new_type}"]
)
def _auto_next_episode(self) -> None:
"""Automatically play next episode."""
if not self.player_fetching:
logger.info("Auto fetching next episode")
self.player_fetching = True
url = self._get_episode("next")
if url and self.ipc_client:
self.ipc_client.send_command(["loadfile", url])
self.ipc_client.set_property("title", self.current_media_title)
# Add subtitles after a short delay to ensure file is loaded
time.sleep(0.5)
self._add_episode_subtitles()
# Message handlers
def _handle_select_episode(self, episode: Optional[str] = None) -> None:
"""Handle episode selection message."""
if not episode:
if self.ipc_client:
self.ipc_client.send_command(["show-text", "No episode was selected"])
return
url = self._get_episode("custom", episode)
if url and self.ipc_client:
self.ipc_client.send_command(["loadfile", url])
self.ipc_client.set_property("title", self.current_media_title)
# Add subtitles after a short delay to ensure file is loaded
time.sleep(0.5)
self._add_episode_subtitles()
def _handle_select_server(self, server: Optional[str] = None) -> None:
"""Handle server selection message."""
if not server:
if self.ipc_client:
self.ipc_client.send_command(["show-text", "No server was selected"])
return
url = self._get_episode("reload", server=server)
if url and self.ipc_client:
self.ipc_client.send_command(["loadfile", url])
self.ipc_client.set_property("title", self.current_media_title)
# Add subtitles after a short delay to ensure file is loaded
time.sleep(0.5)
self._add_episode_subtitles()
def _handle_select_quality(self, quality: Optional[str] = None) -> None:
"""Handle quality selection message."""
if not quality or not self.current_server:
if self.ipc_client:
self.ipc_client.send_command(["show-text", "No quality was selected"])
return
# Find link with matching quality
matching_link = None
for link in self.current_server.links:
if link.quality == quality:
matching_link = link
break
if matching_link:
if self.ipc_client:
self.ipc_client.send_command(
["show-text", f"Switching to {quality}p quality..."]
)
self.ipc_client.send_command(["loadfile", matching_link.link])
else:
available_qualities = [link.quality for link in self.current_server.links]
if self.ipc_client:
self.ipc_client.send_command(
[
"show-text",
f"Quality {quality}p not available. Available: {', '.join(available_qualities)}",
]
)
def show_text(self, text: str, duration: int = 2000) -> None:
"""Show text on MPV OSD."""
if self.ipc_client:
self.ipc_client.send_command(["show-text", text, str(duration)])
def _add_episode_subtitles(self) -> None:
"""Add episode-specific subtitles after loading new episode."""
if not self.ipc_client or not self.subtitles:
return
for i, subtitle in enumerate(self.subtitles):
flag = "select" if i == 0 else "auto"
try:
self.ipc_client.send_command(
[
"sub-add",
subtitle["url"],
flag,
None,
subtitle.get("language", "unknown"),
]
)
except Exception as e:
logger.warning(f"Failed to add subtitle: {e}")
# Factory function for creating IPC player
def create_ipc_player(config: MpvConfig) -> MpvIPCPlayer:
"""Create an IPC-based MPV player instance."""
return MpvIPCPlayer(config)

View File

@@ -63,7 +63,7 @@ class MpvPlayer(BasePlayer):
subprocess.run(args)
return PlayerResult()
return PlayerResult(params.episode)
def _play_on_desktop(self, params) -> PlayerResult:
if not self.executable:
@@ -73,10 +73,6 @@ class MpvPlayer(BasePlayer):
return self._stream_on_desktop_with_webtorrent_cli(params)
elif params.syncplay:
return self._stream_on_desktop_with_syncplay(params)
elif self.config.use_ipc:
return self._stream_on_desktop_with_ipc(params)
elif self.config.use_python_mpv:
return self._stream_on_desktop_with_python_mpv(params)
else:
return self._stream_on_desktop_with_subprocess(params)
@@ -104,17 +100,31 @@ class MpvPlayer(BasePlayer):
stop_time = match.group(1)
total_time = match.group(2)
break
return PlayerResult(total_time=total_time, stop_time=stop_time)
return PlayerResult(
episode=params.episode, total_time=total_time, stop_time=stop_time
)
def _stream_on_desktop_with_python_mpv(self, params: PlayerParams) -> PlayerResult:
return PlayerResult()
def _stream_on_desktop_with_ipc(self, params: PlayerParams) -> PlayerResult:
def play_with_ipc(self, params: PlayerParams, socket_path: str) -> subprocess.Popen:
"""Stream using IPC player for enhanced features."""
from .ipc import MpvIPCPlayer
mpv_args = [
self.executable,
f"--input-ipc-server={socket_path}",
"--idle=yes",
"--force-window=yes",
params.url,
]
ipc_player = MpvIPCPlayer(self.config)
return ipc_player.play(params)
# Add custom MPV arguments
mpv_args.extend(self._create_mpv_cli_options(params))
# Add pre-args if configured
pre_args = self.config.pre_args.split(",") if self.config.pre_args else []
logger.info(f"Starting MPV with IPC socket: {socket_path}")
process = subprocess.Popen(pre_args + mpv_args)
return process
def _stream_on_desktop_with_webtorrent_cli(
self, params: PlayerParams
@@ -131,7 +141,7 @@ class MpvPlayer(BasePlayer):
args.extend(mpv_args)
subprocess.run(args)
return PlayerResult()
return PlayerResult(params.episode)
# TODO: Get people with real friends to do this lol
def _stream_on_desktop_with_syncplay(self, params: PlayerParams) -> PlayerResult:
@@ -146,7 +156,7 @@ class MpvPlayer(BasePlayer):
args.extend(mpv_args)
subprocess.run(args)
return PlayerResult()
return PlayerResult(params.episode)
def _create_mpv_cli_options(self, params: PlayerParams) -> list[str]:
mpv_args = []
@@ -175,5 +185,5 @@ if __name__ == "__main__":
print(APP_ASCII_ART)
url = input("Enter the url you would like to stream: ")
mpv = MpvPlayer(MpvConfig())
player_result = mpv.play(PlayerParams(url=url, title=""))
player_result = mpv.play(PlayerParams(episode="", query="", url=url, title=""))
print(player_result)

View File

@@ -1,25 +1,17 @@
from dataclasses import dataclass
from typing import TYPE_CHECKING, List, Literal, Optional
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from ..provider.anime.base import BaseAnimeProvider
from ..provider.anime.types import Anime
pass
@dataclass(frozen=True)
class PlayerParams:
url: str
title: str
query: str
episode: str
syncplay: bool = False
subtitles: list[str] | None = None
headers: dict[str, str] | None = None
start_time: str | None = None
# IPC player specific parameters for episode navigation
anime_provider: Optional["BaseAnimeProvider"] = None
current_anime: Optional["Anime"] = None
available_episodes: Optional[List[str]] = None
current_episode: Optional[str] = None
current_anime_id: Optional[str] = None
current_anime_title: Optional[str] = None
current_translation_type: Optional[Literal["sub", "dub"]] = None

View File

@@ -3,14 +3,6 @@ from dataclasses import dataclass
@dataclass(frozen=True)
class PlayerResult:
"""
Represents the result of a completed playback session.
Attributes:
stop_time: The timestamp where playback stopped (e.g., "00:15:30").
total_time: The total duration of the media (e.g., "00:23:45").
"""
episode: str | None = None
episode: str
stop_time: str | None = None
total_time: str | None = None

View File

@@ -50,9 +50,9 @@ def map_to_anime_result(response: Response) -> Anime:
id=anime["_id"],
title=anime["name"],
episodes=AnimeEpisodes(
sub=anime["availableEpisodesDetail"]["sub"],
dub=anime["availableEpisodesDetail"]["dub"],
raw=anime["availableEpisodesDetail"]["raw"],
sub=sorted(anime["availableEpisodesDetail"]["sub"], key=float),
dub=sorted(anime["availableEpisodesDetail"]["dub"], key=float),
raw=sorted(anime["availableEpisodesDetail"]["raw"], key=float),
),
type=anime.get("__typename"),
)

View File

@@ -59,6 +59,7 @@ def map_to_anime_result(
) -> Anime:
episodes_info = []
episodes = []
anime["data"] = sorted(anime["data"], key=lambda k: float(k["episode"]))
for ep_info in anime["data"]:
episodes.append(str(ep_info["episode"]))
episodes_info.append(