mirror of
https://github.com/Benexl/FastAnime.git
synced 2025-12-12 15:50:01 -08:00
158 lines
6.0 KiB
Python
158 lines
6.0 KiB
Python
import logging
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
import httpx
|
|
from viu_media.cli.service.registry import MediaRegistryService
|
|
from viu_media.cli.service.registry.models import DownloadStatus
|
|
from viu_media.core.config.model import AppConfig
|
|
from viu_media.core.constants import APP_CACHE_DIR
|
|
from viu_media.libs.media_api.base import BaseApiClient
|
|
from viu_media.libs.media_api.types import MediaItem, Notification
|
|
|
|
try:
|
|
from plyer import notification as plyer_notification
|
|
|
|
PLYER_AVAILABLE = True
|
|
except ImportError: # pragma: no cover - optional dependency
|
|
plyer_notification = None # type: ignore[assignment]
|
|
PLYER_AVAILABLE = False
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
NOTIFICATION_ICONS_CACHE_DIR = APP_CACHE_DIR / "notification_icons"
|
|
|
|
|
|
class NotificationService:
|
|
def __init__(
|
|
self,
|
|
app_config: AppConfig,
|
|
media_api: BaseApiClient,
|
|
registry_service: MediaRegistryService,
|
|
):
|
|
self.media_api = media_api
|
|
self.app_config = app_config
|
|
self.registry = registry_service
|
|
|
|
def _mark_seen(self, notification_id: int, media_id: int, episode: str | None):
|
|
if self.registry and episode:
|
|
try:
|
|
self.registry.update_media_index_entry(
|
|
media_id, last_notified_episode=str(episode)
|
|
)
|
|
except Exception:
|
|
logger.debug("Failed to update last_notified_episode in registry")
|
|
|
|
def check_and_display_notifications(self):
|
|
if not PLYER_AVAILABLE:
|
|
logger.warning("plyer not installed. Cannot display desktop notifications.")
|
|
return
|
|
|
|
if not self.media_api.is_authenticated():
|
|
logger.info("Not authenticated, skipping notification check.")
|
|
return
|
|
|
|
logger.info("Checking for new notifications...")
|
|
notifications = self.media_api.get_notifications()
|
|
|
|
if not notifications:
|
|
logger.info("No new notifications found.")
|
|
return
|
|
|
|
# Filter out notifications already seen in this session or older than registry marker
|
|
filtered: list[Notification] = []
|
|
for n in notifications:
|
|
if self._is_seen_in_registry(n.media.id, n.episode):
|
|
continue
|
|
filtered.append(n)
|
|
|
|
if not filtered:
|
|
logger.info("No unseen notifications found.")
|
|
return
|
|
|
|
for notif in filtered:
|
|
if self.app_config.worker.auto_download_new_episode:
|
|
if not self.registry.get_media_record(notif.media.id):
|
|
self.registry.get_or_create_record(notif.media)
|
|
self.registry.update_episode_download_status(
|
|
media_id=notif.media.id,
|
|
episode_number=str(notif.episode),
|
|
status=DownloadStatus.QUEUED,
|
|
)
|
|
title = notif.media.title.english or notif.media.title.romaji
|
|
message = f"Episode {notif.episode} of {title} has aired!"
|
|
|
|
# Try to include an image (cover large/extra_large) if available
|
|
app_icon: Optional[str] = None
|
|
try:
|
|
icon_path = self._get_or_fetch_icon(notif.media)
|
|
app_icon = str(icon_path) if icon_path else None
|
|
except Exception:
|
|
app_icon = None
|
|
|
|
try:
|
|
# Guard: only call if available
|
|
if not PLYER_AVAILABLE or plyer_notification is None:
|
|
raise RuntimeError("Notification backend unavailable")
|
|
# Assert for type checkers and runtime safety
|
|
assert plyer_notification is not None
|
|
plyer_notification.notify( # type: ignore
|
|
title="Viu: New Episode",
|
|
message=message,
|
|
app_name="Viu",
|
|
app_icon=app_icon, # plyer supports file paths or URLs depending on platform
|
|
timeout=self.app_config.general.desktop_notification_duration,
|
|
)
|
|
logger.info(f"Displayed notification: {message}")
|
|
self._mark_seen(
|
|
notif.id,
|
|
notif.media.id,
|
|
str(notif.episode) if notif.episode is not None else None,
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Failed to display notification: {e}")
|
|
|
|
def _is_seen_in_registry(self, media_id: int, episode: Optional[int]) -> bool:
|
|
if episode is None:
|
|
return False
|
|
try:
|
|
entry = self.registry.get_media_index_entry(media_id)
|
|
if not entry or not entry.last_notified_episode:
|
|
return False
|
|
# Compare numerically
|
|
try:
|
|
last_ep = float(entry.last_notified_episode)
|
|
return float(episode) <= last_ep
|
|
except Exception:
|
|
return False
|
|
except Exception:
|
|
return False
|
|
|
|
def _get_or_fetch_icon(self, media_item: MediaItem) -> Optional[Path]:
|
|
"""Fetch and cache a small cover image for system notifications."""
|
|
try:
|
|
cover = media_item.cover_image
|
|
url = None
|
|
if cover:
|
|
url = cover.extra_large or cover.large or cover.medium
|
|
if not url:
|
|
return None
|
|
|
|
cache_dir = NOTIFICATION_ICONS_CACHE_DIR
|
|
cache_dir.mkdir(parents=True, exist_ok=True)
|
|
icon_path = cache_dir / f"{media_item.id}.png"
|
|
if icon_path.exists() and icon_path.stat().st_size > 0:
|
|
return icon_path
|
|
|
|
# Directly download the image bytes without resizing
|
|
with httpx.Client(follow_redirects=True, timeout=20) as client:
|
|
resp = client.get(url)
|
|
resp.raise_for_status()
|
|
data = resp.content
|
|
if data:
|
|
icon_path.write_bytes(data)
|
|
return icon_path
|
|
except Exception as e:
|
|
logger.debug(f"Could not fetch icon for media {media_item.id}: {e}")
|
|
return None
|