feat(worker-service): draft

This commit is contained in:
Benexl
2025-07-29 02:02:32 +03:00
parent ee52b945ea
commit 25812b6562
9 changed files with 284 additions and 0 deletions

View File

@@ -0,0 +1,16 @@
# values in {NAME} syntax are provided by python using .replace()
#
[Unit]
Description=FastAnime Background Worker
After=network-online.target
[Service]
Type=simple
# Ensure you have the full path to your fastanime executable
# Use `which fastanime` to find it
ExecStart={EXECUTABLE} worker --log
Restart=always
RestartSec=30
[Install]
WantedBy=default.target

View File

@@ -36,6 +36,8 @@ commands = {
"download": "download.download",
"update": "update.update",
"registry": "registry.registry",
"worker": "worker.worker",
"queue": "queue.queue",
}

View File

@@ -0,0 +1,51 @@
import click
from fastanime.core.config import AppConfig
from fastanime.libs.media_api.params import MediaSearchParams
from fastanime.core.exceptions import FastAnimeError
@click.command(help="Queue episodes for the background worker to download.")
@click.option("--title", "-t", required=True, multiple=True, help="Anime title to queue.")
@click.option("--episode-range", "-r", required=True, help="Range of episodes (e.g., '1-10').")
@click.pass_obj
def queue(config: AppConfig, title: tuple, episode_range: str):
"""
Searches for an anime and adds the specified episodes to the download queue.
The background worker must be running for the downloads to start.
"""
from fastanime.cli.service.download.service import DownloadService
from fastanime.cli.service.feedback import FeedbackService
from fastanime.cli.utils.parser import parse_episode_range
from fastanime.libs.media_api.api import create_api_client
from fastanime.libs.provider.anime.provider import create_provider
from fastanime.cli.service.registry import MediaRegistryService
feedback = FeedbackService(config.general.icons)
media_api = create_api_client(config.general.media_api, config)
provider = create_provider(config.general.provider)
registry = MediaRegistryService(config.general.media_api, config.media_registry)
download_service = DownloadService(config, registry, media_api, provider)
for anime_title in title:
try:
feedback.info(f"Searching for '{anime_title}'...")
search_result = media_api.search_media(MediaSearchParams(query=anime_title, per_page=1))
if not search_result or not search_result.media:
feedback.warning(f"Could not find '{anime_title}' on AniList.")
continue
media_item = search_result.media[0]
available_episodes = [str(i + 1) for i in range(media_item.episodes or 0)]
episodes_to_queue = list(parse_episode_range(episode_range, available_episodes))
queued_count = 0
for ep in episodes_to_queue:
if download_service.add_to_queue(media_item, ep):
queued_count += 1
feedback.success(f"Successfully queued {queued_count} episodes for '{media_item.title.english}'.")
except FastAnimeError as e:
feedback.error(f"Failed to queue '{anime_title}'", str(e))
except Exception as e:
feedback.error("An unexpected error occurred", str(e))

View File

@@ -0,0 +1,39 @@
import click
from fastanime.core.config import AppConfig
@click.command(help="Run the background worker for notifications and downloads.")
@click.pass_obj
def worker(config: AppConfig):
"""
Starts the long-running background worker process.
This process will periodically check for AniList notifications and
process any queued downloads. It's recommended to run this in the
background (e.g., 'fastanime worker &') or as a system service.
"""
from fastanime.cli.service.download.service import DownloadService
from fastanime.cli.service.feedback import FeedbackService
from fastanime.cli.service.notification.service import NotificationService
from fastanime.cli.service.registry.service import MediaRegistryService
from fastanime.cli.service.worker.service import BackgroundWorkerService
from fastanime.libs.media_api.api import create_api_client
from fastanime.libs.provider.anime.provider import create_provider
feedback = FeedbackService(config.general.icons)
if not config.worker.enabled:
feedback.warning("Worker is disabled in the configuration. Exiting.")
return
# Instantiate services
media_api = create_api_client(config.general.media_api, config)
provider = create_provider(config.general.provider)
registry = MediaRegistryService(config.general.media_api, config.media_registry)
notification_service = NotificationService(media_api)
download_service = DownloadService(config, registry, media_api, provider)
worker_service = BackgroundWorkerService(
config.worker, notification_service, download_service
)
feedback.info("Starting background worker...", "Press Ctrl+C to stop.")
worker_service.run()

View File

@@ -0,0 +1,80 @@
import json
import logging
from pathlib import Path
from typing import Set
from fastanime.core.constants import APP_CACHE_DIR
from fastanime.libs.media_api.base import BaseApiClient
from fastanime.libs.media_api.types import Notification
try:
import plyer
PLYER_AVAILABLE = True
except ImportError:
PLYER_AVAILABLE = False
logger = logging.getLogger(__name__)
SEEN_NOTIFICATIONS_CACHE = APP_CACHE_DIR / "seen_notifications.json"
class NotificationService:
def __init__(self, media_api: BaseApiClient):
self.media_api = media_api
self._seen_ids: Set[int] = self._load_seen_ids()
def _load_seen_ids(self) -> Set[int]:
if not SEEN_NOTIFICATIONS_CACHE.exists():
return set()
try:
with open(SEEN_NOTIFICATIONS_CACHE, "r") as f:
return set(json.load(f))
except (json.JSONDecodeError, IOError):
return set()
def _save_seen_ids(self):
try:
with open(SEEN_NOTIFICATIONS_CACHE, "w") as f:
json.dump(list(self._seen_ids), f)
except IOError:
logger.error("Failed to save seen notifications cache.")
def check_and_display_notifications(self):
if not PLYER_AVAILABLE:
logger.warning("plyer not installed. Cannot display desktop notifications.")
return
if not self.media_api.is_authenticated():
logger.info("Not authenticated, skipping notification check.")
return
logger.info("Checking for new notifications...")
notifications = self.media_api.get_notifications()
if not notifications:
logger.info("No new notifications found.")
return
new_notifications = [n for n in notifications if n.id not in self._seen_ids]
if not new_notifications:
logger.info("No unseen notifications found.")
return
for notif in new_notifications:
title = notif.media.title.english or notif.media.title.romaji
message = f"Episode {notif.episode} of {title} has aired!"
try:
plyer.notification.notify(
title="FastAnime: New Episode",
message=message,
app_name="FastAnime",
timeout=20,
)
logger.info(f"Displayed notification: {message}")
self._seen_ids.add(notif.id)
except Exception as e:
logger.error(f"Failed to display notification: {e}")
self._save_seen_ids()

View File

@@ -0,0 +1,61 @@
import logging
import time
from fastanime.cli.service.download.service import DownloadService
from fastanime.cli.service.notification.service import NotificationService
from fastanime.core.config.model import WorkerConfig
logger = logging.getLogger(__name__)
class BackgroundWorkerService:
def __init__(
self,
config: WorkerConfig,
notification_service: NotificationService,
download_service: DownloadService,
):
self.config = config
self.notification_service = notification_service
self.download_service = download_service
self.running = True
def run(self):
logger.info("Background worker started.")
last_notification_check = 0
last_download_check = 0
notification_interval_sec = self.config.notification_check_interval * 60
download_interval_sec = self.config.download_check_interval * 60
self.download_service.start()
try:
while self.running:
current_time = time.time()
# Check for notifications
if current_time - last_notification_check > notification_interval_sec:
try:
self.notification_service.check_and_display_notifications()
except Exception as e:
logger.error(f"Error during notification check: {e}")
last_notification_check = current_time
# Process download queue
if current_time - last_download_check > download_interval_sec:
try:
self.download_service.resume_unfinished_downloads()
except Exception as e:
logger.error(f"Error during download queue processing: {e}")
last_download_check = current_time
# Sleep for a short interval to prevent high CPU usage
time.sleep(30) # Sleep for 30 seconds before next check cycle
except KeyboardInterrupt:
logger.info("Background worker stopped by user.")
self.stop()
def stop(self):
self.running = False
logger.info("Background worker shutting down.")

View File

@@ -39,6 +39,11 @@ STREAM_USE_IPC = (
lambda: True if PLATFORM != "win32" and not detect.is_running_in_termux() else False
)
# WorkerConfig
WORKER_ENABLED = True
WORKER_NOTIFICATION_CHECK_INTERVAL = 15 # minutes
WORKER_DOWNLOAD_CHECK_INTERVAL = 5 # minutes
# FzfConfig
FZF_OPTS = DEFAULTS_DIR / "fzf-opts"
FZF_HEADER_COLOR = "95,135,175"

View File

@@ -58,6 +58,13 @@ STREAM_DEFAULT_MEDIA_LIST_TRACKING = (
STREAM_SUB_LANG = "Preferred language code for subtitles (e.g., 'en', 'es')."
STREAM_USE_IPC = "Use IPC communication with the player for advanced features like episode navigation."
# WorkerConfig
APP_WORKER = "Configuration for the background worker service."
WORKER_ENABLED = "Enable the background worker for notifications and queued downloads."
WORKER_NOTIFICATION_CHECK_INTERVAL = (
"How often to check for new AniList notifications (in minutes)."
)
WORKER_DOWNLOAD_CHECK_INTERVAL = "How often to process the download queue (in minutes)."
# FzfConfig
FZF_HEADER_COLOR = "RGB color for the main TUI header."

View File

@@ -144,6 +144,25 @@ class OtherConfig(BaseModel):
pass
class WorkerConfig(OtherConfig):
"""Configuration for the background worker service."""
enabled: bool = Field(
default=True,
description="Enable the background worker for notifications and queued downloads.",
)
notification_check_interval: int = Field(
default=15, # in minutes
ge=1,
description="How often to check for new AniList notifications (in minutes).",
)
download_check_interval: int = Field(
default=5, # in minutes
ge=1,
description="How often to process the download queue (in minutes).",
)
class SessionsConfig(OtherConfig):
dir: Path = Field(
default_factory=lambda: defaults.SESSIONS_DIR,
@@ -382,3 +401,7 @@ class AppConfig(BaseModel):
sessions: SessionsConfig = Field(
default_factory=SessionsConfig, description=desc.APP_SESSIONS
)
worker: WorkerConfig = Field(
default_factory=WorkerConfig,
description="Configuration for the background worker service.",
)