feat: scaffhold worker command with gpt 5, it is actually pretty good lol

This commit is contained in:
Benexl
2025-08-08 18:57:08 +03:00
parent 55a7c7facf
commit ae62adf233
15 changed files with 774 additions and 99 deletions

View File

@@ -1,62 +1,218 @@
import click
from fastanime.core.config import AppConfig
from fastanime.core.exceptions import FastAnimeError
from fastanime.libs.media_api.params import MediaSearchParams
from fastanime.libs.media_api.types import (
MediaFormat,
MediaGenre,
MediaItem,
MediaSeason,
MediaSort,
MediaStatus,
MediaTag,
MediaType,
MediaYear,
)
@click.command(help="Queue episodes for the background worker to download.")
# Search/Filter options (mirrors 'fastanime anilist download')
@click.option("--title", "-t")
@click.option("--page", "-p", type=click.IntRange(min=1), default=1)
@click.option("--per-page", type=click.IntRange(min=1, max=50))
@click.option("--season", type=click.Choice([s.value for s in MediaSeason]))
@click.option(
"--title", "-t", required=True, multiple=True, help="Anime title to queue."
"--status", "-S", multiple=True, type=click.Choice([s.value for s in MediaStatus])
)
@click.option(
"--episode-range", "-r", required=True, help="Range of episodes (e.g., '1-10')."
"--status-not", multiple=True, type=click.Choice([s.value for s in MediaStatus])
)
@click.option("--sort", "-s", type=click.Choice([s.value for s in MediaSort]))
@click.option(
"--genres", "-g", multiple=True, type=click.Choice([g.value for g in MediaGenre])
)
@click.option(
"--genres-not", multiple=True, type=click.Choice([g.value for g in MediaGenre])
)
@click.option("--tags", "-T", multiple=True, type=click.Choice([t.value for t in MediaTag]))
@click.option("--tags-not", multiple=True, type=click.Choice([t.value for t in MediaTag]))
@click.option(
"--media-format",
"-f",
multiple=True,
type=click.Choice([f.value for f in MediaFormat]),
)
@click.option("--media-type", type=click.Choice([t.value for t in MediaType]))
@click.option("--year", "-y", type=click.Choice([y.value for y in MediaYear]))
@click.option("--popularity-greater", type=click.IntRange(min=0))
@click.option("--popularity-lesser", type=click.IntRange(min=0))
@click.option("--score-greater", type=click.IntRange(min=0, max=100))
@click.option("--score-lesser", type=click.IntRange(min=0, max=100))
@click.option("--start-date-greater", type=int)
@click.option("--start-date-lesser", type=int)
@click.option("--end-date-greater", type=int)
@click.option("--end-date-lesser", type=int)
@click.option("--on-list/--not-on-list", "-L/-no-L", type=bool, default=None)
# Queue-specific options
@click.option(
"--episode-range",
"-r",
required=True,
help="Range of episodes to queue (e.g., '1-10', '5', '8:12').",
)
@click.option(
"--yes",
"-Y",
is_flag=True,
help="Automatically queue from all found anime without prompting for selection.",
)
@click.pass_obj
def queue(config: AppConfig, title: tuple, episode_range: str):
def queue(config: AppConfig, **options):
"""
Searches for an anime and adds the specified episodes to the download queue.
The background worker must be running for the downloads to start.
Search AniList with filters, select one or more anime (or use --yes),
and queue the specified episode range for background download.
The background worker should be running to process the queue.
"""
from fastanime.cli.service.download.service import DownloadService
from fastanime.cli.service.feedback import FeedbackService
from fastanime.cli.service.registry import MediaRegistryService
from fastanime.cli.utils.parser import parse_episode_range
from fastanime.libs.media_api.params import MediaSearchParams
from fastanime.libs.media_api.api import create_api_client
from fastanime.libs.provider.anime.provider import create_provider
from fastanime.libs.selectors import create_selector
from rich.progress import Progress
feedback = FeedbackService(config)
selector = create_selector(config)
media_api = create_api_client(config.general.media_api, config)
provider = create_provider(config.general.provider)
registry = MediaRegistryService(config.general.media_api, config.media_registry)
download_service = DownloadService(config, registry, media_api, provider)
for anime_title in title:
try:
feedback.info(f"Searching for '{anime_title}'...")
search_result = media_api.search_media(
MediaSearchParams(query=anime_title, per_page=1)
)
try:
# Build search params mirroring anilist download
sort_val = options.get("sort")
status_val = options.get("status")
status_not_val = options.get("status_not")
genres_val = options.get("genres")
genres_not_val = options.get("genres_not")
tags_val = options.get("tags")
tags_not_val = options.get("tags_not")
media_format_val = options.get("media_format")
media_type_val = options.get("media_type")
season_val = options.get("season")
year_val = options.get("year")
if not search_result or not search_result.media:
feedback.warning(f"Could not find '{anime_title}' on AniList.")
search_params = MediaSearchParams(
query=options.get("title"),
page=options.get("page", 1),
per_page=options.get("per_page"),
sort=MediaSort(sort_val) if sort_val else None,
status_in=[MediaStatus(s) for s in status_val] if status_val else None,
status_not_in=[MediaStatus(s) for s in status_not_val]
if status_not_val
else None,
genre_in=[MediaGenre(g) for g in genres_val] if genres_val else None,
genre_not_in=[MediaGenre(g) for g in genres_not_val]
if genres_not_val
else None,
tag_in=[MediaTag(t) for t in tags_val] if tags_val else None,
tag_not_in=[MediaTag(t) for t in tags_not_val] if tags_not_val else None,
format_in=[MediaFormat(f) for f in media_format_val]
if media_format_val
else None,
type=MediaType(media_type_val) if media_type_val else None,
season=MediaSeason(season_val) if season_val else None,
seasonYear=int(year_val) if year_val else None,
popularity_greater=options.get("popularity_greater"),
popularity_lesser=options.get("popularity_lesser"),
averageScore_greater=options.get("score_greater"),
averageScore_lesser=options.get("score_lesser"),
startDate_greater=options.get("start_date_greater"),
startDate_lesser=options.get("start_date_lesser"),
endDate_greater=options.get("end_date_greater"),
endDate_lesser=options.get("end_date_lesser"),
on_list=options.get("on_list"),
)
with Progress() as progress:
progress.add_task("Searching AniList...", total=None)
search_result = media_api.search_media(search_params)
if not search_result or not search_result.media:
raise FastAnimeError("No anime found matching your search criteria.")
if options.get("yes"):
anime_to_queue = search_result.media
else:
choice_map: dict[str, MediaItem] = {
(item.title.english or item.title.romaji or f"ID: {item.id}"): item
for item in search_result.media
}
preview_command = None
if config.general.preview != "none":
from ..utils.preview import create_preview_context # type: ignore
with create_preview_context() as preview_ctx:
preview_command = preview_ctx.get_anime_preview(
list(choice_map.values()),
list(choice_map.keys()),
config,
)
selected_titles = selector.choose_multiple(
"Select anime to queue",
list(choice_map.keys()),
preview=preview_command,
)
else:
selected_titles = selector.choose_multiple(
"Select anime to queue", list(choice_map.keys())
)
if not selected_titles:
feedback.warning("No anime selected. Nothing queued.")
return
anime_to_queue = [choice_map[title] for title in selected_titles]
episode_range_str = options.get("episode_range")
total_queued = 0
for media_item in anime_to_queue:
available_episodes = [str(i + 1) for i in range(media_item.episodes or 0)]
if not available_episodes:
feedback.warning(
f"No episode information for '{media_item.title.english}', skipping."
)
continue
media_item = search_result.media[0]
available_episodes = [str(i + 1) for i in range(media_item.episodes or 0)]
episodes_to_queue = list(
parse_episode_range(episode_range, available_episodes)
)
try:
episodes_to_queue = list(
parse_episode_range(episode_range_str, available_episodes)
)
if not episodes_to_queue:
feedback.warning(
f"Episode range '{episode_range_str}' resulted in no episodes for '{media_item.title.english}'."
)
continue
queued_count = 0
for ep in episodes_to_queue:
if download_service.add_to_queue(media_item, ep):
queued_count += 1
queued_count = 0
for ep in episodes_to_queue:
if download_service.add_to_queue(media_item, ep):
queued_count += 1
feedback.success(
f"Successfully queued {queued_count} episodes for '{media_item.title.english}'."
)
total_queued += queued_count
feedback.success(
f"Queued {queued_count} episodes for '{media_item.title.english}'."
)
except (ValueError, IndexError) as e:
feedback.error(
f"Invalid episode range for '{media_item.title.english}': {e}"
)
except FastAnimeError as e:
feedback.error(f"Failed to queue '{anime_title}'", str(e))
except Exception as e:
feedback.error("An unexpected error occurred", str(e))
feedback.success(
f"Done. Total of {total_queued} episode(s) queued across all selections."
)
except FastAnimeError as e:
feedback.error("Queue command failed", str(e))
except Exception as e:
feedback.error("An unexpected error occurred", str(e))

View File

@@ -0,0 +1,3 @@
from .cmd import queue
__all__ = ["queue"]

View File

@@ -0,0 +1,26 @@
import click
from ...utils.lazyloader import LazyGroup
commands = {
"add": "add.add",
"list": "list.list_cmd",
"resume": "resume.resume",
"clear": "clear.clear_cmd",
}
@click.group(
cls=LazyGroup,
name="queue",
root="fastanime.cli.commands.queue.commands",
invoke_without_command=False,
help="Manage the download queue (add, list, resume, clear).",
short_help="Manage the download queue.",
lazy_subcommands=commands,
)
@click.pass_context
def queue(ctx: click.Context):
"""Queue management root command."""
# No-op root; subcommands are lazy-loaded
pass

View File

@@ -0,0 +1,213 @@
import click
from fastanime.core.config import AppConfig
from fastanime.core.exceptions import FastAnimeError
from fastanime.libs.media_api.types import (
MediaFormat,
MediaGenre,
MediaItem,
MediaSeason,
MediaSort,
MediaStatus,
MediaTag,
MediaType,
MediaYear,
)
@click.command(name="add", help="Add episodes to the background download queue.")
# Search/Filter options (mirrors 'fastanime anilist download')
@click.option("--title", "-t")
@click.option("--page", "-p", type=click.IntRange(min=1), default=1)
@click.option("--per-page", type=click.IntRange(min=1, max=50))
@click.option("--season", type=click.Choice([s.value for s in MediaSeason]))
@click.option(
"--status", "-S", multiple=True, type=click.Choice([s.value for s in MediaStatus])
)
@click.option(
"--status-not", multiple=True, type=click.Choice([s.value for s in MediaStatus])
)
@click.option("--sort", "-s", type=click.Choice([s.value for s in MediaSort]))
@click.option(
"--genres", "-g", multiple=True, type=click.Choice([g.value for g in MediaGenre])
)
@click.option(
"--genres-not", multiple=True, type=click.Choice([g.value for g in MediaGenre])
)
@click.option("--tags", "-T", multiple=True, type=click.Choice([t.value for t in MediaTag]))
@click.option("--tags-not", multiple=True, type=click.Choice([t.value for t in MediaTag]))
@click.option(
"--media-format",
"-f",
multiple=True,
type=click.Choice([f.value for f in MediaFormat]),
)
@click.option("--media-type", type=click.Choice([t.value for t in MediaType]))
@click.option("--year", "-y", type=click.Choice([y.value for y in MediaYear]))
@click.option("--popularity-greater", type=click.IntRange(min=0))
@click.option("--popularity-lesser", type=click.IntRange(min=0))
@click.option("--score-greater", type=click.IntRange(min=0, max=100))
@click.option("--score-lesser", type=click.IntRange(min=0, max=100))
@click.option("--start-date-greater", type=int)
@click.option("--start-date-lesser", type=int)
@click.option("--end-date-greater", type=int)
@click.option("--end-date-lesser", type=int)
@click.option("--on-list/--not-on-list", "-L/-no-L", type=bool, default=None)
# Queue-specific options
@click.option(
"--episode-range",
"-r",
required=True,
help="Range of episodes to queue (e.g., '1-10', '5', '8:12').",
)
@click.option(
"--yes",
"-Y",
is_flag=True,
help="Queue for all found anime without prompting for selection.",
)
@click.pass_obj
def add(config: AppConfig, **options):
from fastanime.cli.service.download.service import DownloadService
from fastanime.cli.service.feedback import FeedbackService
from fastanime.cli.service.registry import MediaRegistryService
from fastanime.cli.utils.parser import parse_episode_range
from fastanime.libs.media_api.params import MediaSearchParams
from fastanime.libs.media_api.api import create_api_client
from fastanime.libs.provider.anime.provider import create_provider
from fastanime.libs.selectors import create_selector
from rich.progress import Progress
feedback = FeedbackService(config)
selector = create_selector(config)
media_api = create_api_client(config.general.media_api, config)
provider = create_provider(config.general.provider)
registry = MediaRegistryService(config.general.media_api, config.media_registry)
download_service = DownloadService(config, registry, media_api, provider)
try:
# Build search params mirroring anilist download
sort_val = options.get("sort")
status_val = options.get("status")
status_not_val = options.get("status_not")
genres_val = options.get("genres")
genres_not_val = options.get("genres_not")
tags_val = options.get("tags")
tags_not_val = options.get("tags_not")
media_format_val = options.get("media_format")
media_type_val = options.get("media_type")
season_val = options.get("season")
year_val = options.get("year")
search_params = MediaSearchParams(
query=options.get("title"),
page=options.get("page", 1),
per_page=options.get("per_page"),
sort=MediaSort(sort_val) if sort_val else None,
status_in=[MediaStatus(s) for s in status_val] if status_val else None,
status_not_in=[MediaStatus(s) for s in status_not_val]
if status_not_val
else None,
genre_in=[MediaGenre(g) for g in genres_val] if genres_val else None,
genre_not_in=[MediaGenre(g) for g in genres_not_val]
if genres_not_val
else None,
tag_in=[MediaTag(t) for t in tags_val] if tags_val else None,
tag_not_in=[MediaTag(t) for t in tags_not_val] if tags_not_val else None,
format_in=[MediaFormat(f) for f in media_format_val]
if media_format_val
else None,
type=MediaType(media_type_val) if media_type_val else None,
season=MediaSeason(season_val) if season_val else None,
seasonYear=int(year_val) if year_val else None,
popularity_greater=options.get("popularity_greater"),
popularity_lesser=options.get("popularity_lesser"),
averageScore_greater=options.get("score_greater"),
averageScore_lesser=options.get("score_lesser"),
startDate_greater=options.get("start_date_greater"),
startDate_lesser=options.get("start_date_lesser"),
endDate_greater=options.get("end_date_greater"),
endDate_lesser=options.get("end_date_lesser"),
on_list=options.get("on_list"),
)
with Progress() as progress:
progress.add_task("Searching AniList...", total=None)
search_result = media_api.search_media(search_params)
if not search_result or not search_result.media:
raise FastAnimeError("No anime found matching your search criteria.")
if options.get("yes"):
anime_to_queue = search_result.media
else:
choice_map: dict[str, MediaItem] = {
(item.title.english or item.title.romaji or f"ID: {item.id}"): item
for item in search_result.media
}
preview_command = None
if config.general.preview != "none":
from ...utils.preview import create_preview_context # type: ignore
with create_preview_context() as preview_ctx:
preview_command = preview_ctx.get_anime_preview(
list(choice_map.values()),
list(choice_map.keys()),
config,
)
selected_titles = selector.choose_multiple(
"Select anime to queue",
list(choice_map.keys()),
preview=preview_command,
)
else:
selected_titles = selector.choose_multiple(
"Select anime to queue", list(choice_map.keys())
)
if not selected_titles:
feedback.warning("No anime selected. Nothing queued.")
return
anime_to_queue = [choice_map[title] for title in selected_titles]
episode_range_str = options.get("episode_range")
total_queued = 0
for media_item in anime_to_queue:
available_episodes = [str(i + 1) for i in range(media_item.episodes or 0)]
if not available_episodes:
feedback.warning(
f"No episode information for '{media_item.title.english}', skipping."
)
continue
try:
episodes_to_queue = list(
parse_episode_range(episode_range_str, available_episodes)
)
if not episodes_to_queue:
feedback.warning(
f"Episode range '{episode_range_str}' resulted in no episodes for '{media_item.title.english}'."
)
continue
queued_count = 0
for ep in episodes_to_queue:
if download_service.add_to_queue(media_item, ep):
queued_count += 1
total_queued += queued_count
feedback.success(
f"Queued {queued_count} episodes for '{media_item.title.english}'."
)
except (ValueError, IndexError) as e:
feedback.error(
f"Invalid episode range for '{media_item.title.english}': {e}"
)
feedback.success(
f"Done. Total of {total_queued} episode(s) queued across all selections."
)
except FastAnimeError as e:
feedback.error("Queue add failed", str(e))
except Exception as e:
feedback.error("An unexpected error occurred", str(e))

View File

@@ -0,0 +1,30 @@
import click
from fastanime.core.config import AppConfig
@click.command(name="clear", help="Clear queued items from the registry (QUEUED -> NOT_DOWNLOADED).")
@click.option("--force", is_flag=True, help="Do not prompt for confirmation.")
@click.pass_obj
def clear_cmd(config: AppConfig, force: bool):
from fastanime.cli.service.feedback import FeedbackService
from fastanime.cli.service.registry import MediaRegistryService
from fastanime.cli.service.registry.models import DownloadStatus
feedback = FeedbackService(config)
registry = MediaRegistryService(config.general.media_api, config.media_registry)
if not force and not click.confirm("This will clear all queued items. Continue?"):
feedback.info("Aborted.")
return
cleared = 0
queued = registry.get_episodes_by_download_status(DownloadStatus.QUEUED)
for media_id, ep in queued:
ok = registry.update_episode_download_status(
media_id=media_id,
episode_number=ep,
status=DownloadStatus.NOT_DOWNLOADED,
)
if ok:
cleared += 1
feedback.success(f"Cleared {cleared} queued episode(s).")

View File

@@ -0,0 +1,45 @@
import click
from fastanime.core.config import AppConfig
@click.command(name="list", help="List items in the download queue and their statuses.")
@click.option("--status", type=click.Choice(["queued", "downloading", "completed", "failed", "paused"]))
@click.pass_obj
def list_cmd(config: AppConfig, status: str | None):
from fastanime.cli.service.registry import MediaRegistryService
from fastanime.cli.service.registry.models import DownloadStatus
from fastanime.cli.service.feedback import FeedbackService
feedback = FeedbackService(config)
registry = MediaRegistryService(config.general.media_api, config.media_registry)
status_map = {
"queued": DownloadStatus.QUEUED,
"downloading": DownloadStatus.DOWNLOADING,
"completed": DownloadStatus.COMPLETED,
"failed": DownloadStatus.FAILED,
"paused": DownloadStatus.PAUSED,
}
if status:
target = status_map[status]
episodes = registry.get_episodes_by_download_status(target)
feedback.info(f"{len(episodes)} episode(s) with status {status}.")
for media_id, ep in episodes:
feedback.info(f"- media:{media_id} episode:{ep}")
else:
from rich.table import Table
from rich.console import Console
stats = registry.get_download_statistics()
table = Table(title="Queue Status")
table.add_column("Metric")
table.add_column("Value")
table.add_row("Queued", str(stats.get("queued", 0)))
table.add_row("Downloading", str(stats.get("downloading", 0)))
table.add_row("Completed", str(stats.get("downloaded", 0)))
table.add_row("Failed", str(stats.get("failed", 0)))
table.add_row("Paused", str(stats.get("paused", 0)))
console = Console()
console.print(table)

View File

@@ -0,0 +1,22 @@
import click
from fastanime.core.config import AppConfig
@click.command(name="resume", help="Submit any queued or in-progress downloads to the worker.")
@click.pass_obj
def resume(config: AppConfig):
from fastanime.cli.service.download.service import DownloadService
from fastanime.cli.service.feedback import FeedbackService
from fastanime.cli.service.registry import MediaRegistryService
from fastanime.libs.media_api.api import create_api_client
from fastanime.libs.provider.anime.provider import create_provider
feedback = FeedbackService(config)
media_api = create_api_client(config.general.media_api, config)
provider = create_provider(config.general.provider)
registry = MediaRegistryService(config.general.media_api, config.media_registry)
download_service = DownloadService(config, registry, media_api, provider)
download_service.start()
download_service.resume_unfinished_downloads()
feedback.success("Submitted queued downloads to background worker.")

View File

@@ -16,6 +16,7 @@ def worker(config: AppConfig):
from fastanime.cli.service.notification.service import NotificationService
from fastanime.cli.service.registry.service import MediaRegistryService
from fastanime.cli.service.worker.service import BackgroundWorkerService
from fastanime.cli.service.auth import AuthService
from fastanime.libs.media_api.api import create_api_client
from fastanime.libs.provider.anime.provider import create_provider
@@ -26,6 +27,13 @@ def worker(config: AppConfig):
# Instantiate services
media_api = create_api_client(config.general.media_api, config)
# Authenticate if credentials exist (enables notifications)
auth = AuthService(config.general.media_api)
if profile := auth.get_auth():
try:
media_api.authenticate(profile.token)
except Exception:
pass
provider = create_provider(config.general.provider)
registry = MediaRegistryService(config.general.media_api, config.media_registry)

View File

@@ -36,6 +36,8 @@ class DownloadService:
self.media_api = media_api_service
self.provider = provider_service
self.downloader = create_downloader(config.downloads)
# Track in-flight downloads to avoid duplicate queueing
self._inflight: set[tuple[int, str]] = set()
# Worker is kept for potential future background commands
self._worker = ManagedBackgroundWorker(
@@ -56,18 +58,25 @@ class DownloadService:
self._worker.shutdown(wait=False)
def add_to_queue(self, media_item: MediaItem, episode_number: str) -> bool:
"""Adds a download job to the ASYNCHRONOUS queue."""
"""Mark an episode as queued in the registry (no immediate download)."""
logger.info(
f"Queueing background download for '{media_item.title.english}' Episode {episode_number}"
f"Queueing episode '{episode_number}' for '{media_item.title.english}' (registry only)"
)
self.registry.get_or_create_record(media_item)
updated = self.registry.update_episode_download_status(
return self.registry.update_episode_download_status(
media_id=media_item.id,
episode_number=episode_number,
status=DownloadStatus.QUEUED,
)
if not updated:
def _submit_download(self, media_item: MediaItem, episode_number: str) -> bool:
"""Submit a download task to the worker if not already in-flight."""
key = (media_item.id, str(episode_number))
if key in self._inflight:
return False
if not self._worker.is_running():
self._worker.start()
self._inflight.add(key)
self._worker.submit_function(
self._execute_download_job, media_item, episode_number
)
@@ -108,9 +117,11 @@ class DownloadService:
f"Found {len(unfinished_jobs)} unfinished downloads. Re-queueing..."
)
for media_id, episode_number in unfinished_jobs:
if (media_id, str(episode_number)) in self._inflight:
continue
record = self.registry.get_media_record(media_id)
if record and record.media_item:
self.add_to_queue(record.media_item, episode_number)
self._submit_download(record.media_item, episode_number)
else:
logger.error(
f"Could not find metadata for media ID {media_id}. Cannot resume. Please run 'fastanime registry sync'."
@@ -244,3 +255,9 @@ class DownloadService:
status=DownloadStatus.FAILED,
error_message=str(e),
)
finally:
# Remove from in-flight tracking regardless of outcome
try:
self._inflight.discard((media_item.id, str(episode_number)))
except Exception:
pass

View File

@@ -1,41 +1,58 @@
import json
import logging
from typing import Set
from pathlib import Path
from typing import Optional, Set
import httpx
from fastanime.core.constants import APP_CACHE_DIR
from fastanime.libs.media_api.base import BaseApiClient
from fastanime.libs.media_api.types import MediaItem
# Note: Previously used image resizing; now we download icons directly without resizing.
try:
import plyer
from plyer import notification as plyer_notification
PLYER_AVAILABLE = True
except ImportError:
except ImportError: # pragma: no cover - optional dependency
plyer_notification = None # type: ignore[assignment]
PLYER_AVAILABLE = False
logger = logging.getLogger(__name__)
SEEN_NOTIFICATIONS_CACHE = APP_CACHE_DIR / "seen_notifications.json"
class NotificationService:
def __init__(self, media_api: BaseApiClient):
def __init__(self, media_api: BaseApiClient, registry_service=None):
self.media_api = media_api
self.registry = registry_service # optional; used for seen tracking
self._seen_ids: Set[int] = self._load_seen_ids()
def _load_seen_ids(self) -> Set[int]:
if not SEEN_NOTIFICATIONS_CACHE.exists():
return set()
# Prefer MediaRegistry storage via index.last_notified_episode markers
try:
with open(SEEN_NOTIFICATIONS_CACHE, "r") as f:
return set(json.load(f))
except (json.JSONDecodeError, IOError):
if not self.registry:
return set()
seen: Set[int] = set()
for record in self.registry.get_all_media_records():
index_entry = self.registry.get_media_index_entry(
record.media_item.id
)
# last_notified_episode stored per media; we cant reconstruct notif IDs,
# so keep an in-memory set per session (fresh on start). Return empty.
# Future: persist a mapping media_id->max_created_at for durability.
return seen
except Exception:
return set()
def _save_seen_ids(self):
try:
with open(SEEN_NOTIFICATIONS_CACHE, "w") as f:
json.dump(list(self._seen_ids), f)
except IOError:
logger.error("Failed to save seen notifications cache.")
def _mark_seen(self, notification_id: int, media_id: int, episode: str | None):
self._seen_ids.add(notification_id)
# Also update registrys last_notified_episode for the media
if self.registry and episode:
try:
self.registry.update_media_index_entry(
media_id, last_notified_episode=str(episode)
)
except Exception:
logger.debug("Failed to update last_notified_episode in registry")
def check_and_display_notifications(self):
if not PLYER_AVAILABLE:
@@ -53,26 +70,93 @@ class NotificationService:
logger.info("No new notifications found.")
return
new_notifications = [n for n in notifications if n.id not in self._seen_ids]
# Filter out notifications already seen in this session or older than registry marker
filtered = []
for n in notifications:
if n.id in self._seen_ids:
continue
if self._is_seen_in_registry(n.media.id, n.episode):
continue
filtered.append(n)
if not new_notifications:
if not filtered:
logger.info("No unseen notifications found.")
return
for notif in new_notifications:
for notif in filtered:
title = notif.media.title.english or notif.media.title.romaji
message = f"Episode {notif.episode} of {title} has aired!"
# Try to include an image (cover large/extra_large) if available
app_icon: Optional[str] = None
try:
plyer.notification.notify(
icon_path = self._get_or_fetch_icon(notif.media)
app_icon = str(icon_path) if icon_path else None
except Exception:
app_icon = None
try:
# Guard: only call if available
if not PLYER_AVAILABLE or plyer_notification is None:
raise RuntimeError("Notification backend unavailable")
# Assert for type checkers and runtime safety
assert plyer_notification is not None
plyer_notification.notify(
title="FastAnime: New Episode",
message=message,
app_name="FastAnime",
app_icon=app_icon, # plyer supports file paths or URLs depending on platform
timeout=20,
)
logger.info(f"Displayed notification: {message}")
self._seen_ids.add(notif.id)
self._mark_seen(
notif.id,
notif.media.id,
str(notif.episode) if notif.episode is not None else None,
)
except Exception as e:
logger.error(f"Failed to display notification: {e}")
self._save_seen_ids()
def _is_seen_in_registry(self, media_id: int, episode: Optional[int]) -> bool:
if not self.registry or episode is None:
return False
try:
entry = self.registry.get_media_index_entry(media_id)
if not entry or not entry.last_notified_episode:
return False
# Compare numerically
try:
last_ep = float(entry.last_notified_episode)
return float(episode) <= last_ep
except Exception:
return str(episode) <= entry.last_notified_episode
except Exception:
return False
def _get_or_fetch_icon(self, media_item: MediaItem) -> Optional[Path]:
"""Fetch and cache a small cover image for system notifications."""
try:
cover = media_item.cover_image
url = None
if cover:
url = cover.extra_large or cover.large or cover.medium
if not url:
return None
cache_dir = APP_CACHE_DIR / "notification_icons"
cache_dir.mkdir(parents=True, exist_ok=True)
icon_path = cache_dir / f"{media_item.id}.png"
if icon_path.exists() and icon_path.stat().st_size > 0:
return icon_path
# Directly download the image bytes without resizing
with httpx.Client(follow_redirects=True, timeout=20) as client:
resp = client.get(url)
resp.raise_for_status()
data = resp.content
if data:
icon_path.write_bytes(data)
return icon_path
except Exception as e:
logger.debug(f"Could not fetch icon for media {media_item.id}: {e}")
return None

View File

@@ -550,13 +550,12 @@ class MediaRegistryService:
break
if not episode_record:
if not file_path:
logger.error(f"File path required for new episode {episode_number}")
return False
# Allow creation without file_path for queued/in-progress states.
# Only require file_path once the episode is marked COMPLETED.
episode_record = MediaEpisode(
episode_number=episode_number,
file_path=file_path,
download_status=status,
file_path=file_path,
)
record.media_episodes.append(episode_record)
@@ -564,6 +563,12 @@ class MediaRegistryService:
episode_record.download_status = status
if file_path:
episode_record.file_path = file_path
elif status.name == "COMPLETED" and not episode_record.file_path:
logger.warning(
"Completed status set without file_path for media %s episode %s",
media_id,
episode_number,
)
if file_size is not None:
episode_record.file_size = file_size
if quality:

View File

@@ -1,5 +1,8 @@
import logging
import signal
import threading
import time
from typing import Optional
from fastanime.cli.service.download.service import DownloadService
from fastanime.cli.service.notification.service import NotificationService
@@ -18,44 +21,96 @@ class BackgroundWorkerService:
self.config = config
self.notification_service = notification_service
self.download_service = download_service
self.running = True
self._stop_event = threading.Event()
self._signals_installed = False
def run(self):
logger.info("Background worker started.")
last_notification_check = 0
last_download_check = 0
def _install_signal_handlers(self):
"""Install SIGINT/SIGTERM handlers to allow graceful shutdown when run in foreground."""
if self._signals_installed:
return
notification_interval_sec = self.config.notification_check_interval * 60
download_interval_sec = self.config.download_check_interval * 60
self.download_service.start()
try:
while self.running:
current_time = time.time()
# Check for notifications
if current_time - last_notification_check > notification_interval_sec:
try:
self.notification_service.check_and_display_notifications()
except Exception as e:
logger.error(f"Error during notification check: {e}")
last_notification_check = current_time
# Process download queue
if current_time - last_download_check > download_interval_sec:
try:
self.download_service.resume_unfinished_downloads()
except Exception as e:
logger.error(f"Error during download queue processing: {e}")
last_download_check = current_time
# Sleep for a short interval to prevent high CPU usage
time.sleep(30) # Sleep for 30 seconds before next check cycle
except KeyboardInterrupt:
logger.info("Background worker stopped by user.")
def _handler(signum, frame): # noqa: ARG001 (signature fixed by signal)
logger.info("Received signal %s, shutting down background worker...", signum)
self.stop()
try:
signal.signal(signal.SIGINT, _handler)
signal.signal(signal.SIGTERM, _handler)
self._signals_installed = True
except Exception:
# Signal handling may fail in non-main threads or certain environments
logger.debug("Signal handlers not installed (non-main thread or unsupported environment).")
def run(self):
"""Run the background loop until stopped.
Responsibilities:
- Periodically check AniList notifications (if authenticated & plyer available)
- Periodically resume/process unfinished downloads
- Keep CPU usage low using an event-based wait
- Gracefully terminate on KeyboardInterrupt/SIGTERM
"""
logger.info("Background worker starting...")
# Convert configured minutes to seconds
notification_interval_sec = max(60, self.config.notification_check_interval * 60)
download_interval_sec = max(60, self.config.download_check_interval * 60)
# Start download worker and attempt resuming pending jobs once at startup
self.download_service.start()
# Schedule the very first execution immediately
next_notification_ts: Optional[float] = 0.0
next_download_ts: Optional[float] = 0.0
# Install signal handlers if possible
self._install_signal_handlers()
try:
while not self._stop_event.is_set():
now = time.time()
# Check for notifications
if next_notification_ts is not None and now >= next_notification_ts:
try:
self.notification_service.check_and_display_notifications()
except Exception:
logger.exception("Error during notification check")
finally:
next_notification_ts = now + notification_interval_sec
# Process download queue
if next_download_ts is not None and now >= next_download_ts:
try:
self.download_service.resume_unfinished_downloads()
except Exception:
logger.exception("Error during download queue processing")
finally:
next_download_ts = now + download_interval_sec
# Determine how long to wait until the next scheduled task
next_events = [t for t in (next_notification_ts, next_download_ts) if t is not None]
if next_events:
time_until_next = max(0.0, min(next_events) - time.time())
else:
time_until_next = 30.0
# Cap wait to react reasonably fast to stop requests
wait_time = min(time_until_next, 30.0)
self._stop_event.wait(timeout=wait_time)
except KeyboardInterrupt:
logger.info("Background worker interrupted by user. Stopping...")
self.stop()
finally:
# Ensure we always stop the download worker
try:
self.download_service.stop()
except Exception:
logger.exception("Failed to stop download service cleanly")
logger.info("Background worker stopped.")
def stop(self):
self.running = False
logger.info("Background worker shutting down.")
if not self._stop_event.is_set():
logger.info("Background worker shutting down...")
self._stop_event.set()

View File

@@ -23,7 +23,10 @@ standard = [
"yt-dlp>=2025.7.21",
"pycryptodomex>=3.23.0",
]
notifications = ["plyer>=2.1.0"]
notifications = [
"dbus-python>=1.4.0",
"plyer>=2.1.0",
]
mpv = [
"mpv>=1.0.7",
]

8
uv.lock generated
View File

@@ -74,6 +74,12 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/d1/d6/3965ed04c63042e047cb6a3e6ed1a63a35087b6a609aa3a15ed8ac56c221/colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6", size = 25335, upload-time = "2022-10-25T02:36:20.889Z" },
]
[[package]]
name = "dbus-python"
version = "1.4.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/ff/24/63118050c7dd7be04b1ccd60eab53fef00abe844442e1b6dec92dae505d6/dbus-python-1.4.0.tar.gz", hash = "sha256:991666e498f60dbf3e49b8b7678f5559b8a65034fdf61aae62cdecdb7d89c770", size = 232490, upload-time = "2025-03-13T19:57:54.212Z" }
[[package]]
name = "distlib"
version = "0.4.0"
@@ -122,6 +128,7 @@ mpv = [
{ name = "mpv" },
]
notifications = [
{ name = "dbus-python" },
{ name = "plyer" },
]
standard = [
@@ -150,6 +157,7 @@ dev = [
[package.metadata]
requires-dist = [
{ name = "click", specifier = ">=8.1.7" },
{ name = "dbus-python", marker = "extra == 'notifications'", specifier = ">=1.4.0" },
{ name = "httpx", specifier = ">=0.28.1" },
{ name = "inquirerpy", specifier = ">=0.3.4" },
{ name = "libtorrent", marker = "extra == 'torrent'", specifier = ">=2.0.11" },