feat: make the session more performant by lazyloading the context

This commit is contained in:
Benexl
2025-07-26 17:05:52 +03:00
parent 159136cfb1
commit 494104ee19
12 changed files with 173 additions and 113 deletions

View File

@@ -7,7 +7,6 @@ commands = {
# "trending": "trending.trending",
# "recent": "recent.recent",
"search": "search.search",
"download": "download.download",
# "downloads": "downloads.downloads",
"auth": "auth.auth",
"stats": "stats.stats",

View File

@@ -19,7 +19,7 @@ MenuAction = Callable[[], State | InternalDirective]
def downloads(ctx: Context, state: State) -> State | InternalDirective:
"""Downloads menu showing locally stored media from registry."""
icons = ctx.config.general.icons
feedback = ctx.service.feedback
feedback = ctx.feedback
feedback.clear_console()
options: Dict[str, MenuAction] = {
@@ -90,13 +90,13 @@ def _create_local_media_list_action(
"""Create action for searching local media with sorting and optional status filter."""
def action():
feedback = ctx.service.feedback
feedback = ctx.feedback
search_params = MediaSearchParams(sort=sort, status=status)
loading_message = "Searching local media registry"
result = None
with feedback.progress(loading_message):
result = ctx.service.media_registry.search_for_media(search_params)
result = ctx.media_registry.search_for_media(search_params)
if result and result.media:
return State(
@@ -120,12 +120,12 @@ def _create_local_random_media_list(ctx: Context, state: State) -> MenuAction:
"""Create action for getting random local media."""
def action():
feedback = ctx.service.feedback
feedback = ctx.feedback
loading_message = "Getting random local media"
with feedback.progress(loading_message):
# Get all records and pick random ones
all_records = list(ctx.service.media_registry.get_all_media_records())
all_records = list(ctx.media_registry.get_all_media_records())
if not all_records:
feedback.info("No media found in local registry")
@@ -136,7 +136,7 @@ def _create_local_random_media_list(ctx: Context, state: State) -> MenuAction:
random_ids = [record.media_item.id for record in random_records]
search_params = MediaSearchParams(id_in=random_ids)
result = ctx.service.media_registry.search_for_media(search_params)
result = ctx.media_registry.search_for_media(search_params)
if result and result.media:
return State(
@@ -160,7 +160,7 @@ def _create_local_search_media_list(ctx: Context, state: State) -> MenuAction:
"""Create action for searching local media by query."""
def action():
feedback = ctx.service.feedback
feedback = ctx.feedback
query = ctx.selector.ask("Search Local Anime")
if not query:
@@ -171,7 +171,7 @@ def _create_local_search_media_list(ctx: Context, state: State) -> MenuAction:
loading_message = "Searching local media registry"
result = None
with feedback.progress(loading_message):
result = ctx.service.media_registry.search_for_media(search_params)
result = ctx.media_registry.search_for_media(search_params)
if result and result.media:
return State(
@@ -197,12 +197,12 @@ def _create_local_status_action(
"""Create action for getting local media by user status."""
def action():
feedback = ctx.service.feedback
feedback = ctx.feedback
loading_message = f"Getting {status.value} media from local registry"
result = None
with feedback.progress(loading_message):
result = ctx.service.media_registry.get_media_by_status(status)
result = ctx.media_registry.get_media_by_status(status)
if result and result.media:
return State(
@@ -225,7 +225,7 @@ def _create_local_recent_media_action(ctx: Context, state: State) -> MenuAction:
"""Create action for getting recently watched local media."""
def action():
result = ctx.service.media_registry.get_recently_watched()
result = ctx.media_registry.get_recently_watched()
if result and result.media:
return State(
menu_name=MenuName.RESULTS,
@@ -237,7 +237,7 @@ def _create_local_recent_media_action(ctx: Context, state: State) -> MenuAction:
),
)
else:
ctx.service.feedback.info(
ctx.feedback.info(
"No recently watched media found in local registry"
)
return InternalDirective.RELOAD

View File

@@ -22,7 +22,7 @@ SEARCH_TEMPLATE_SCRIPT = (FZF_SCRIPTS_DIR / "search.template.sh").read_text(
@session.menu
def dynamic_search(ctx: Context, state: State) -> State | InternalDirective:
"""Dynamic search menu that provides real-time search results."""
feedback = ctx.service.feedback
feedback = ctx.feedback
feedback.clear_console()
# Ensure cache directory exists

View File

@@ -9,7 +9,7 @@ def episodes(ctx: Context, state: State) -> State | InternalDirective:
the logic for continuing from watch history or manual selection.
"""
config = ctx.config
feedback = ctx.service.feedback
feedback = ctx.feedback
feedback.clear_console()
provider_anime = state.provider.anime

View File

@@ -18,7 +18,7 @@ MenuAction = Callable[[], State | InternalDirective]
@session.menu
def main(ctx: Context, state: State) -> State | InternalDirective:
icons = ctx.config.general.icons
feedback = ctx.service.feedback
feedback = ctx.feedback
feedback.clear_console()
options: Dict[str, MenuAction] = {
@@ -84,7 +84,7 @@ def _create_media_list_action(
ctx: Context, state: State, sort: MediaSort, status: MediaStatus | None = None
) -> MenuAction:
def action():
feedback = ctx.service.feedback
feedback = ctx.feedback
search_params = MediaSearchParams(sort=sort, status=status)
loading_message = "Fetching media list"
@@ -111,7 +111,7 @@ def _create_media_list_action(
def _create_random_media_list(ctx: Context, state: State) -> MenuAction:
def action():
feedback = ctx.service.feedback
feedback = ctx.feedback
search_params = MediaSearchParams(id_in=random.sample(range(1, 15000), k=50))
loading_message = "Fetching media list"
@@ -138,7 +138,7 @@ def _create_random_media_list(ctx: Context, state: State) -> MenuAction:
def _create_search_media_list(ctx: Context, state: State) -> MenuAction:
def action():
feedback = ctx.service.feedback
feedback = ctx.feedback
query = ctx.selector.ask("Search for Anime")
if not query:
@@ -174,7 +174,7 @@ def _create_user_list_action(
"""A factory to create menu actions for fetching user lists, handling authentication."""
def action():
feedback = ctx.service.feedback
feedback = ctx.feedback
if not ctx.media_api.is_authenticated():
feedback.error("You haven't logged in")
return InternalDirective.MAIN
@@ -205,7 +205,7 @@ def _create_user_list_action(
def _create_recent_media_action(ctx: Context, state: State) -> MenuAction:
def action():
result = ctx.service.media_registry.get_recently_watched()
result = ctx.media_registry.get_recently_watched()
if result:
return State(
menu_name=MenuName.RESULTS,

View File

@@ -19,7 +19,7 @@ MenuAction = Callable[[], State | InternalDirective]
@session.menu
def media_actions(ctx: Context, state: State) -> State | InternalDirective:
feedback = ctx.service.feedback
feedback = ctx.feedback
icons = ctx.config.general.icons
@@ -64,7 +64,7 @@ def _stream(ctx: Context, state: State) -> MenuAction:
def _watch_trailer(ctx: Context, state: State) -> MenuAction:
def action():
feedback = ctx.service.feedback
feedback = ctx.feedback
media_item = state.media_api.media_item
if not media_item:
@@ -87,7 +87,7 @@ def _watch_trailer(ctx: Context, state: State) -> MenuAction:
def _manage_user_media_list(ctx: Context, state: State) -> MenuAction:
def action():
feedback = ctx.service.feedback
feedback = ctx.feedback
media_item = state.media_api.media_item
if not media_item:
@@ -104,7 +104,7 @@ def _manage_user_media_list(ctx: Context, state: State) -> MenuAction:
)
if status:
# local
ctx.service.media_registry.update_media_index_entry(
ctx.media_registry.update_media_index_entry(
media_id=media_item.id,
media_item=media_item,
status=UserMediaListStatus(status),
@@ -122,7 +122,7 @@ def _manage_user_media_list(ctx: Context, state: State) -> MenuAction:
def _score_anime(ctx: Context, state: State) -> MenuAction:
def action():
feedback = ctx.service.feedback
feedback = ctx.feedback
media_item = state.media_api.media_item
if not media_item:
@@ -137,7 +137,7 @@ def _score_anime(ctx: Context, state: State) -> MenuAction:
if not 0.0 <= score <= 10.0:
raise ValueError("Score out of range.")
# local
ctx.service.media_registry.update_media_index_entry(
ctx.media_registry.update_media_index_entry(
media_id=media_item.id, media_item=media_item, score=score
)
# remote
@@ -401,7 +401,7 @@ def _view_info(ctx: Context, state: State) -> MenuAction:
def _view_recommendations(ctx: Context, state: State) -> MenuAction:
def action():
feedback = ctx.service.feedback
feedback = ctx.feedback
media_item = state.media_api.media_item
if not media_item:
@@ -450,7 +450,7 @@ def _view_recommendations(ctx: Context, state: State) -> MenuAction:
def _view_relations(ctx: Context, state: State) -> MenuAction:
def action():
feedback = ctx.service.feedback
feedback = ctx.feedback
media_item = state.media_api.media_item
if not media_item:
@@ -499,7 +499,7 @@ def _view_relations(ctx: Context, state: State) -> MenuAction:
def _view_characters(ctx: Context, state: State) -> MenuAction:
def action():
feedback = ctx.service.feedback
feedback = ctx.feedback
media_item = state.media_api.media_item
if not media_item:
@@ -586,7 +586,7 @@ def _view_characters(ctx: Context, state: State) -> MenuAction:
def _view_airing_schedule(ctx: Context, state: State) -> MenuAction:
def action():
feedback = ctx.service.feedback
feedback = ctx.feedback
media_item = state.media_api.media_item
if not media_item:

View File

@@ -8,7 +8,7 @@ MenuAction = Callable[[], Union[State, InternalDirective]]
@session.menu
def player_controls(ctx: Context, state: State) -> Union[State, InternalDirective]:
feedback = ctx.service.feedback
feedback = ctx.feedback
feedback.clear_console()
config = ctx.config
@@ -80,7 +80,7 @@ def player_controls(ctx: Context, state: State) -> Union[State, InternalDirectiv
def _next_episode(ctx: Context, state: State) -> MenuAction:
def action():
feedback = ctx.service.feedback
feedback = ctx.feedback
feedback.clear_console()
config = ctx.config
@@ -131,7 +131,7 @@ def _replay(ctx: Context, state: State) -> MenuAction:
def _change_server(ctx: Context, state: State) -> MenuAction:
def action():
feedback = ctx.service.feedback
feedback = ctx.feedback
feedback.clear_console()
selector = ctx.selector

View File

@@ -10,7 +10,7 @@ from ...state import InternalDirective, MenuName, ProviderState, State
@session.menu
def provider_search(ctx: Context, state: State) -> State | InternalDirective:
feedback = ctx.service.feedback
feedback = ctx.feedback
media_item = state.media_api.media_item
if not media_item:
feedback.error("No AniList anime to search for", "Please select an anime first")

View File

@@ -9,7 +9,7 @@ from ...state import InternalDirective, MediaApiState, MenuName, State
@session.menu
def results(ctx: Context, state: State) -> State | InternalDirective:
feedback = ctx.service.feedback
feedback = ctx.feedback
feedback.clear_console()
search_result = state.media_api.search_result
@@ -127,7 +127,7 @@ def _format_title(ctx: Context, media_item: MediaItem) -> str:
def _handle_pagination(
ctx: Context, state: State, page_delta: int
) -> State | InternalDirective:
feedback = ctx.service.feedback
feedback = ctx.feedback
search_params = state.media_api.search_params

View File

@@ -9,7 +9,7 @@ from ...state import InternalDirective, MenuName, State
@session.menu
def servers(ctx: Context, state: State) -> State | InternalDirective:
feedback = ctx.service.feedback
feedback = ctx.feedback
config = ctx.config
provider = ctx.provider
@@ -89,7 +89,7 @@ def servers(ctx: Context, state: State) -> State | InternalDirective:
)
)
if media_item and episode_number:
ctx.service.watch_history.track(media_item, episode_number, player_result)
ctx.watch_history.track(media_item, episode_number, player_result)
return State(
menu_name=MenuName.PLAYER_CONTROLS,

View File

@@ -2,47 +2,132 @@ import importlib.util
import logging
import os
from dataclasses import dataclass
from typing import Callable, List, Optional, Union
from typing import TYPE_CHECKING, Callable, List, Optional, Union
import click
from ...core.config import AppConfig
from ...core.constants import APP_DIR, USER_CONFIG_PATH
from ...libs.media_api.base import BaseApiClient
from ...libs.player.base import BasePlayer
from ...libs.provider.anime.base import BaseAnimeProvider
from ...libs.selectors.base import BaseSelector
from ..service.auth import AuthService
from ..service.feedback import FeedbackService
from ..service.registry import MediaRegistryService
from ..service.session import SessionsService
from ..service.watch_history import WatchHistoryService
from .state import InternalDirective, MenuName, State
if TYPE_CHECKING:
from ...libs.media_api.base import BaseApiClient
from ...libs.player.base import BasePlayer
from ...libs.provider.anime.base import BaseAnimeProvider
from ...libs.selectors.base import BaseSelector
from ..service.auth import AuthService
from ..service.feedback import FeedbackService
from ..service.registry import MediaRegistryService
from ..service.session import SessionsService
from ..service.watch_history import WatchHistoryService
logger = logging.getLogger(__name__)
# A type alias for the signature all menu functions must follow.
MENUS_DIR = APP_DIR / "cli" / "interactive" / "menu"
@dataclass(frozen=True)
class Services:
feedback: FeedbackService
media_registry: MediaRegistryService
watch_history: WatchHistoryService
session: SessionsService
auth: AuthService
@dataclass(frozen=True)
@dataclass
class Context:
config: AppConfig
provider: BaseAnimeProvider
selector: BaseSelector
player: BasePlayer
media_api: BaseApiClient
service: Services
config: "AppConfig"
_provider: Optional["BaseAnimeProvider"] = None
_selector: Optional["BaseSelector"] = None
_player: Optional["BasePlayer"] = None
_media_api: Optional["BaseApiClient"] = None
_feedback: Optional["FeedbackService"] = None
_media_registry: Optional["MediaRegistryService"] = None
_watch_history: Optional["WatchHistoryService"] = None
_session: Optional["SessionsService"] = None
_auth: Optional["AuthService"] = None
@property
def provider(self) -> "BaseAnimeProvider":
if not self._provider:
from ...libs.provider.anime.provider import create_provider
self._provider = create_provider(self.config.general.provider)
return self._provider
@property
def selector(self) -> "BaseSelector":
if not self._selector:
from ...libs.selectors.selector import create_selector
self._selector = create_selector(self.config)
return self._selector
@property
def media_api(self) -> "BaseApiClient":
if not self._media_api:
from ...libs.media_api.api import create_api_client
self._media_api = create_api_client(
self.config.general.media_api, self.config
)
if auth_profile := self.auth.get_auth():
p = self._media_api.authenticate(auth_profile.token)
if p:
logger.debug(f"Authenticated as {p.name}")
else:
logger.warning(f"Failed to authenticate with {auth_profile.token}")
else:
logger.debug("Not authenticated")
return self._media_api
@property
def player(self) -> "BasePlayer":
if not self._player:
from ...libs.player.player import create_player
self._player = create_player(self.config)
return self._player
@property
def feedback(self) -> "FeedbackService":
if not self._feedback:
from ..service.feedback.service import FeedbackService
self._feedback = FeedbackService()
return self._feedback
@property
def media_registry(self) -> "MediaRegistryService":
if not self._media_registry:
from ..service.registry.service import MediaRegistryService
self._media_registry = MediaRegistryService(
self.config.general.media_api, self.config.media_registry
)
return self._media_registry
@property
def watch_history(self) -> "WatchHistoryService":
if not self._watch_history:
from ..service.watch_history.service import WatchHistoryService
self._watch_history = WatchHistoryService(
self.config, self.media_registry, self._media_api
)
return self._watch_history
@property
def session(self) -> "SessionsService":
if not self._session:
from ..service.session.service import SessionsService
self._session = SessionsService(self.config.sessions)
return self._session
@property
def auth(self) -> "AuthService":
if not self._auth:
from ..service.auth.service import AuthService
self._auth = AuthService(self.config.general.media_api)
return self._auth
MenuFunction = Callable[[Context, State], Union[State, InternalDirective]]
@@ -60,43 +145,7 @@ class Session:
_menus: dict[MenuName, Menu] = {}
def _load_context(self, config: AppConfig):
"""Initializes all shared service based on the provided configuration."""
from ...libs.media_api.api import create_api_client
from ...libs.player import create_player
from ...libs.provider.anime.provider import create_provider
from ...libs.selectors import create_selector
media_registry = MediaRegistryService(
media_api=config.general.media_api, config=config.media_registry
)
auth = AuthService(config.general.media_api)
services = Services(
feedback=FeedbackService(config.general.icons),
media_registry=media_registry,
watch_history=WatchHistoryService(config, media_registry),
session=SessionsService(config.sessions),
auth=auth,
)
media_api = create_api_client(config.general.media_api, config)
if auth_profile := auth.get_auth():
p = media_api.authenticate(auth_profile.token)
if p:
logger.debug(f"Authenticated as {p.name}")
else:
logger.warning(f"Failed to authenticate with {auth_profile.token}")
else:
logger.debug("Not authenticated")
self._context = Context(
config=config,
provider=create_provider(config.general.provider),
selector=create_selector(config),
player=create_player(config),
media_api=media_api,
service=services,
)
self._context = Context(config)
logger.info("Application context reloaded.")
def _edit_config(self):
@@ -116,10 +165,7 @@ class Session:
):
self._load_context(config)
if resume:
if (
history
:= self._context.service.session.get_most_recent_session_history()
):
if history := self._context.session.get_default_session_history():
self._history = history
else:
logger.warning("Failed to continue from history. No sessions found")
@@ -132,12 +178,12 @@ class Session:
try:
self._run_main_loop()
except Exception:
self._context.service.session.create_crash_backup(self._history)
self._context.session.create_crash_backup(self._history)
raise
finally:
# Clean up preview workers when session ends
self._cleanup_preview_workers()
self._context.service.session.save_session(self._history)
self._context.session.save_session(self._history)
def _cleanup_preview_workers(self):
"""Clean up preview workers when session ends."""

View File

@@ -16,17 +16,32 @@ class SessionsService:
self.dir = config.dir
self._ensure_sessions_directory()
def save_session(self, history: List[State], name: Optional[str] = None):
session = Session(history=history)
def save_session(
self, history: List[State], name: Optional[str] = None, default=True
):
if default:
name = "default"
session = Session(history=history, name=name)
else:
session = Session(history=history)
self._save_session(session)
def create_crash_backup(self, history: List[State]):
self._save_session(Session(history=history, is_from_crash=True))
def create_crash_backup(self, history: List[State], default=True):
if default:
self._save_session(
Session(history=history, name="crash", is_from_crash=True)
)
else:
self._save_session(Session(history=history, is_from_crash=True))
def get_session_history(self, session_name: str) -> Optional[List[State]]:
if session := self._load_session(session_name):
return session.history
def get_default_session_history(self) -> Optional[List[State]]:
if history := self.get_session_history("default"):
return history
def get_most_recent_session_history(self) -> Optional[List[State]]:
session_name: Optional[str] = None
latest_timestamp: Optional[datetime] = None