Compare commits

...

16 Commits

Author SHA1 Message Date
Benexl
41d7f1702c chore: init feature branch 2025-10-26 23:13:29 +03:00
Benexl
1fea1335c6 chore: move to feature branch 2025-10-26 23:10:05 +03:00
Benexl
8b664fae36 chore: move to feature branch 2025-10-26 23:09:53 +03:00
Benexl
19a85511b4 chore: move to feature branch 2025-10-26 23:09:42 +03:00
Benexl
205299108b fix(media-api-debug-utils): pyright errors 2025-10-26 23:05:31 +03:00
Benexl
7670bdd2f3 fix(jikan-media-api-mapper): pyright errors 2025-10-26 23:03:05 +03:00
Benexl
cd3f7f7fb8 fix(anilist-media-api-mapper): pyright errors 2025-10-26 22:58:12 +03:00
Benexl
5be03ed5b8 fix(core-concurrency-utils): pyright errors 2025-10-26 22:56:17 +03:00
Benexl
6581179336 fix(yt-dlp-downloader): pyright errors 2025-10-26 22:53:56 +03:00
Benexl
2bb674f4a0 fix(cli-image-utils): pyright errors 2025-10-26 22:49:32 +03:00
Benexl
642e77f601 fix(config-editor): pyright errors 2025-10-26 22:37:57 +03:00
Benexl
a5e99122f5 fix(registry-cmds): pyright errors 2025-10-26 21:30:10 +03:00
Benexl
39bd7bed61 chore: update deps 2025-10-26 20:18:08 +03:00
Benexl
869072633b chore: create .python-version 2025-10-26 20:17:47 +03:00
Benexl
cbd788a573 chore: bump python version for pyright 2025-10-26 20:13:49 +03:00
Benexl
11fe54b146 chore: update lock file 2025-10-26 19:17:48 +03:00
23 changed files with 177 additions and 190 deletions

1
.python-version Normal file
View File

@@ -0,0 +1 @@
3.11

View File

@@ -1,5 +1,5 @@
{
"venvPath": ".",
"venv": ".venv",
"pythonVersion": "3.11"
"pythonVersion": "3.12"
}

8
uv.lock generated
View File

@@ -3441,15 +3441,15 @@ wheels = [
[[package]]
name = "pyright"
version = "1.1.406"
version = "1.1.407"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "nodeenv" },
{ name = "typing-extensions" },
]
sdist = { url = "https://files.pythonhosted.org/packages/f7/16/6b4fbdd1fef59a0292cbb99f790b44983e390321eccbc5921b4d161da5d1/pyright-1.1.406.tar.gz", hash = "sha256:c4872bc58c9643dac09e8a2e74d472c62036910b3bd37a32813989ef7576ea2c", size = 4113151, upload-time = "2025-10-02T01:04:45.488Z" }
sdist = { url = "https://files.pythonhosted.org/packages/a6/1b/0aa08ee42948b61745ac5b5b5ccaec4669e8884b53d31c8ec20b2fcd6b6f/pyright-1.1.407.tar.gz", hash = "sha256:099674dba5c10489832d4a4b2d302636152a9a42d317986c38474c76fe562262", size = 4122872, upload-time = "2025-10-24T23:17:15.145Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/f6/a2/e309afbb459f50507103793aaef85ca4348b66814c86bc73908bdeb66d12/pyright-1.1.406-py3-none-any.whl", hash = "sha256:1d81fb43c2407bf566e97e57abb01c811973fdb21b2df8df59f870f688bdca71", size = 5980982, upload-time = "2025-10-02T01:04:43.137Z" },
{ url = "https://files.pythonhosted.org/packages/dc/93/b69052907d032b00c40cb656d21438ec00b3a471733de137a3f65a49a0a0/pyright-1.1.407-py3-none-any.whl", hash = "sha256:6dd419f54fcc13f03b52285796d65e639786373f433e243f8b94cf93a7444d21", size = 5997008, upload-time = "2025-10-24T23:17:13.159Z" },
]
[[package]]
@@ -3751,7 +3751,7 @@ wheels = [
[[package]]
name = "viu-media"
version = "3.2.7"
version = "3.2.8"
source = { editable = "." }
dependencies = [
{ name = "click" },

View File

@@ -113,6 +113,7 @@ def _create_tar_backup(
api: str,
):
"""Create a tar-based backup."""
# TODO: Add support for bz2/xz compression if needed
mode = "w:gz" if compress else "w"
with tarfile.open(output_path, mode) as tar:

View File

@@ -5,6 +5,7 @@ Registry restore command - restore registry from backup files
import json
import shutil
import tarfile
import zipfile
from datetime import datetime
from pathlib import Path
@@ -25,6 +26,11 @@ from ....service.registry.service import MediaRegistryService
is_flag=True,
help="Create backup of current registry before restoring",
)
@click.option(
"--backup-current-tar-compression-fmt",
type=click.Choice(["gz", "bz2", "xz"], case_sensitive=False),
help="The compression format to use for the current registry backup (if enabled)",
)
@click.option("--verify", is_flag=True, help="Verify backup integrity before restoring")
@click.option(
"--api",
@@ -38,6 +44,7 @@ def restore(
backup_file: Path,
force: bool,
backup_current: bool,
backup_current_compression_fmt: str,
verify: bool,
api: str,
):
@@ -61,7 +68,7 @@ def restore(
"Verification Failed",
"Backup file appears to be corrupted or invalid",
)
raise click.Abort()
return
feedback.success("Verification", "Backup file integrity verified")
# Check if current registry exists
@@ -77,7 +84,13 @@ def restore(
# Create backup of current registry if requested
if backup_current and registry_exists:
_backup_current_registry(registry_service, api, feedback)
_backup_current_registry(
registry_service,
api,
feedback,
backup_format=backup_format,
compression_fmt=backup_current_compression_fmt,
)
# Show restore summary
_show_restore_summary(backup_file, backup_format, feedback)
@@ -110,7 +123,13 @@ def restore(
def _detect_backup_format(backup_file: Path) -> str:
"""Detect backup file format."""
suffixes = "".join(backup_file.suffixes).lower()
if ".tar" in suffixes or ".gz" in suffixes or ".tgz" in suffixes:
if (
".tar" in suffixes
or ".gz" in suffixes
or ".tgz" in suffixes
or ".bz2" in suffixes
or ".xz" in suffixes
):
return "tar"
elif ".zip" in suffixes:
return "zip"
@@ -122,25 +141,38 @@ def _verify_backup(
) -> bool:
"""Verify backup file integrity."""
try:
metadata = {}
has_registry = has_index = has_metadata = False
if format_type == "tar":
with tarfile.open(backup_file, "r:*") as tar:
names = tar.getnames()
has_registry = any("registry/" in name for name in names)
has_index = any("index/" in name for name in names)
has_metadata = "backup_metadata.json" in names
for name in names:
if name == "registry/":
has_registry = True
continue
if name == "index/":
has_index = True
continue
if name == "backup_metadata.json":
has_metadata = True
continue
if has_metadata:
metadata_member = tar.getmember("backup_metadata.json")
if metadata_file := tar.extractfile(metadata_member):
metadata = json.load(metadata_file)
else: # zip
import zipfile
with zipfile.ZipFile(backup_file, "r") as zip_file:
names = zip_file.namelist()
has_registry = any("registry/" in name for name in names)
has_index = any("index/" in name for name in names)
has_metadata = "backup_metadata.json" in names
for name in names:
if name == "registry/":
has_registry = True
continue
if name == "index/":
has_index = True
continue
if name == "backup_metadata.json":
has_metadata = True
continue
if has_metadata:
with zip_file.open("backup_metadata.json") as metadata_file:
metadata = json.load(metadata_file)
@@ -163,27 +195,42 @@ def _verify_backup(
def _check_registry_exists(registry_service: MediaRegistryService) -> bool:
"""Check if a registry already exists."""
try:
stats = registry_service.get_registry_stats()
return stats.get("total_media", 0) > 0
except Exception:
return False
# TODO: Improve this check to be more robust
return registry_service.media_registry_dir.exists() and any(
registry_service.media_registry_dir.iterdir()
)
def _backup_current_registry(
registry_service: MediaRegistryService, api: str, feedback: FeedbackService
registry_service: MediaRegistryService,
api: str,
feedback: FeedbackService,
backup_format: str,
compression_fmt: str,
):
"""Create backup of current registry before restoring."""
from .backup import _create_tar_backup
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = Path(f"viu_registry_pre_restore_{api}_{timestamp}.tar.gz")
if backup_format == "tar":
from .backup import _create_tar_backup
try:
_create_tar_backup(registry_service, backup_path, True, False, feedback, api)
feedback.success("Current Registry Backed Up", f"Saved to {backup_path}")
except Exception as e:
feedback.warning("Backup Warning", f"Failed to backup current registry: {e}")
backup_path = Path(f"viu_registry_pre_restore_{api}_{timestamp}.tar.gz")
try:
_create_tar_backup(
registry_service, backup_path, True, False, feedback, api
)
feedback.success("Current Registry Backed Up", f"Saved to {backup_path}")
except Exception as e:
feedback.warning(
"Backup Warning", f"Failed to backup current registry: {e}"
)
else:
from .backup import _create_zip_backup
backup_path = Path(f"viu_registry_pre_restore_{api}_{timestamp}.zip")
_create_zip_backup(registry_service, backup_path, True, feedback, api)
def _show_restore_summary(

View File

@@ -2,6 +2,7 @@ import textwrap
from pathlib import Path
from typing import Any, Literal, get_args, get_origin
# TODO: should we maintain a separate dependency for InquirerPy or write our own simple prompt system?
from InquirerPy import inquirer
from InquirerPy.validator import NumberValidator
from pydantic import BaseModel
@@ -28,7 +29,7 @@ class InteractiveConfigEditor:
if not isinstance(section_model, BaseModel):
continue
if not inquirer.confirm(
if not inquirer.confirm( # pyright: ignore[reportPrivateImportUsage]
message=f"Configure '{section_name.title()}' settings?",
default=True,
).execute():
@@ -83,14 +84,14 @@ class InteractiveConfigEditor:
# Boolean fields
if field_type is bool:
return inquirer.confirm(
return inquirer.confirm( # pyright: ignore[reportPrivateImportUsage]
message=message, default=current_value, long_instruction=help_text
)
# Literal (Choice) fields
if hasattr(field_type, "__origin__") and get_origin(field_type) is Literal:
choices = list(get_args(field_type))
return inquirer.select(
return inquirer.select( # pyright: ignore[reportPrivateImportUsage]
message=message,
choices=choices,
default=current_value,
@@ -99,7 +100,7 @@ class InteractiveConfigEditor:
# Numeric fields
if field_type is int:
return inquirer.number(
return inquirer.number( # pyright: ignore[reportPrivateImportUsage]
message=message,
default=int(current_value),
long_instruction=help_text,
@@ -110,7 +111,7 @@ class InteractiveConfigEditor:
validate=NumberValidator(),
)
if field_type is float:
return inquirer.number(
return inquirer.number( # pyright: ignore[reportPrivateImportUsage]
message=message,
default=float(current_value),
float_allowed=True,
@@ -120,7 +121,7 @@ class InteractiveConfigEditor:
# Path fields
if field_type is Path:
# Use text prompt for paths to allow '~' expansion, as FilePathPrompt can be tricky
return inquirer.text(
return inquirer.text( # pyright: ignore[reportPrivateImportUsage]
message=message, default=str(current_value), long_instruction=help_text
)
@@ -128,13 +129,13 @@ class InteractiveConfigEditor:
if field_type is str:
# Check for 'examples' to provide choices
if hasattr(field_info, "examples") and field_info.examples:
return inquirer.fuzzy(
return inquirer.fuzzy( # pyright: ignore[reportPrivateImportUsage]
message=message,
choices=field_info.examples,
default=str(current_value),
long_instruction=help_text,
)
return inquirer.text(
return inquirer.text( # pyright: ignore[reportPrivateImportUsage]
message=message, default=str(current_value), long_instruction=help_text
)

View File

@@ -3,6 +3,8 @@ import shutil
import subprocess
from pathlib import Path
from typing import Optional
from viu_media.core.exceptions import DependencyNotFoundError
import importlib.util
import click
import httpx
@@ -43,67 +45,74 @@ def resize_image_from_url(
"""
from io import BytesIO
from PIL import Image
if importlib.util.find_spec("PIL"):
from PIL import Image # pyright: ignore[reportMissingImports]
if not return_bytes and output_path is None:
raise ValueError("output_path must be provided if return_bytes is False.")
if not return_bytes and output_path is None:
raise ValueError("output_path must be provided if return_bytes is False.")
try:
# Use the provided synchronous client
response = client.get(url)
response.raise_for_status() # Raise an exception for bad status codes
try:
# Use the provided synchronous client
response = client.get(url)
response.raise_for_status() # Raise an exception for bad status codes
image_bytes = response.content
image_stream = BytesIO(image_bytes)
img = Image.open(image_stream)
image_bytes = response.content
image_stream = BytesIO(image_bytes)
img = Image.open(image_stream)
if maintain_aspect_ratio:
img_copy = img.copy()
img_copy.thumbnail((new_width, new_height), Image.Resampling.LANCZOS)
resized_img = img_copy
else:
resized_img = img.resize((new_width, new_height), Image.Resampling.LANCZOS)
if return_bytes:
# Determine the output format. Default to JPEG if original is unknown or problematic.
# Handle RGBA to RGB conversion for JPEG output.
output_format = (
img.format if img.format in ["JPEG", "PNG", "WEBP"] else "JPEG"
)
if output_format == "JPEG":
if resized_img.mode in ("RGBA", "P"):
resized_img = resized_img.convert("RGB")
byte_arr = BytesIO()
resized_img.save(byte_arr, format=output_format)
logger.info(
f"Image from {url} resized to {resized_img.width}x{resized_img.height} and returned as bytes ({output_format} format)."
)
return byte_arr.getvalue()
else:
# Ensure the directory exists before saving
if output_path:
output_path.parent.mkdir(parents=True, exist_ok=True)
resized_img.save(output_path)
logger.info(
f"Image from {url} resized to {resized_img.width}x{resized_img.height} and saved as '{output_path}'"
if maintain_aspect_ratio:
img_copy = img.copy()
img_copy.thumbnail((new_width, new_height), Image.Resampling.LANCZOS)
resized_img = img_copy
else:
resized_img = img.resize(
(new_width, new_height), Image.Resampling.LANCZOS
)
return None
except httpx.RequestError as e:
logger.error(f"An error occurred while requesting {url}: {e}")
return None
except httpx.HTTPStatusError as e:
logger.error(
f"HTTP error occurred: {e.response.status_code} - {e.response.text}"
if return_bytes:
# Determine the output format. Default to JPEG if original is unknown or problematic.
# Handle RGBA to RGB conversion for JPEG output.
output_format = (
img.format if img.format in ["JPEG", "PNG", "WEBP"] else "JPEG"
)
if output_format == "JPEG":
if resized_img.mode in ("RGBA", "P"):
resized_img = resized_img.convert("RGB")
byte_arr = BytesIO()
resized_img.save(byte_arr, format=output_format)
logger.info(
f"Image from {url} resized to {resized_img.width}x{resized_img.height} and returned as bytes ({output_format} format)."
)
return byte_arr.getvalue()
else:
# Ensure the directory exists before saving
if output_path:
output_path.parent.mkdir(parents=True, exist_ok=True)
resized_img.save(output_path)
logger.info(
f"Image from {url} resized to {resized_img.width}x{resized_img.height} and saved as '{output_path}'"
)
return None
except httpx.RequestError as e:
logger.error(f"An error occurred while requesting {url}: {e}")
return None
except httpx.HTTPStatusError as e:
logger.error(
f"HTTP error occurred: {e.response.status_code} - {e.response.text}"
)
return None
except ValueError as e:
logger.error(f"Configuration error: {e}")
return None
except Exception as e:
logger.error(f"An unexpected error occurred: {e}")
return None
else:
raise DependencyNotFoundError(
"Pillow library is required for image processing. Please install it via 'uv pip install Pillow'."
)
return None
except ValueError as e:
logger.error(f"Configuration error: {e}")
return None
except Exception as e:
logger.error(f"An unexpected error occurred: {e}")
return None
def render(url: str, capture: bool = False, size: str = "30x30") -> Optional[str]:
@@ -123,17 +132,12 @@ def render(url: str, capture: bool = False, size: str = "30x30") -> Optional[str
If capture is False, prints directly to the terminal and returns None.
Returns None on any failure.
"""
# --- Common subprocess arguments ---
subprocess_kwargs = {
"check": False, # We will handle errors manually
"capture_output": capture,
"text": capture, # Decode stdout/stderr as text if capturing
}
# --- Try icat (Kitty terminal) first ---
if icat_executable := shutil.which("icat"):
process = subprocess.run(
[icat_executable, "--align", "left", url], **subprocess_kwargs
[icat_executable, "--align", "left", url],
capture_output=capture,
text=capture,
)
if process.returncode == 0:
return process.stdout if capture else None
@@ -148,11 +152,11 @@ def render(url: str, capture: bool = False, size: str = "30x30") -> Optional[str
response.raise_for_status()
img_bytes = response.content
# Add stdin input to the subprocess arguments
subprocess_kwargs["input"] = img_bytes
process = subprocess.run(
[chafa_executable, f"--size={size}", "-"], **subprocess_kwargs
[chafa_executable, f"--size={size}", "-"],
capture_output=capture,
text=capture,
input=img_bytes,
)
if process.returncode == 0:
return process.stdout if capture else None

View File

@@ -130,10 +130,11 @@ class YtDLPDownloader(BaseDownloader):
}
)
with yt_dlp.YoutubeDL(opts) as ydl:
# TODO: Confirm this type issues
with yt_dlp.YoutubeDL(opts) as ydl: # type: ignore
info = ydl.extract_info(params.url, download=True)
if info:
_video_path = info["requested_downloads"][0]["filepath"]
_video_path = info["requested_downloads"][0]["filepath"] # type: ignore
if _video_path.endswith(".unknown_video"):
print("Normalizing path...")
_vid_path = _video_path.replace(".unknown_video", ".mp4")

View File

@@ -219,7 +219,7 @@ class BackgroundWorker(ABC):
else:
# Wait for tasks to complete with timeout
try:
self._executor.shutdown(wait=True, timeout=timeout)
self._executor.shutdown(wait=True)
except TimeoutError:
logger.warning(
f"Worker {self.name} shutdown timed out, forcing cancellation"

View File

@@ -1,22 +0,0 @@
from httpx import get
ANISKIP_ENDPOINT = "https://api.aniskip.com/v1/skip-times"
# TODO: Finish own implementation of aniskip script
class AniSkip:
@classmethod
def get_skip_times(
cls, mal_id: int, episode_number: float | int, types=["op", "ed"]
):
url = f"{ANISKIP_ENDPOINT}/{mal_id}/{episode_number}?types=op&types=ed"
response = get(url)
print(response.text)
return response.json()
if __name__ == "__main__":
mal_id = input("Mal id: ")
episode_number = input("episode_number: ")
skip_times = AniSkip.get_skip_times(int(mal_id), float(episode_number))
print(skip_times)

View File

@@ -1,3 +0,0 @@
from .api import connect
__all__ = ["connect"]

View File

@@ -1,13 +0,0 @@
import time
from pypresence import Presence
def connect(show, episode, switch):
presence = Presence(client_id="1292070065583165512")
presence.connect()
if not switch.is_set():
presence.update(details=show, state="Watching episode " + episode)
time.sleep(10)
else:
presence.close()

View File

@@ -33,6 +33,7 @@ from ..types import (
Studio,
UserListItem,
UserMediaListStatus,
MediaType,
UserProfile,
)
from .types import (
@@ -539,7 +540,7 @@ def _to_generic_media_item_from_notification_partial(
title=_to_generic_media_title(data["title"]),
cover_image=_to_generic_media_image(data["coverImage"]),
# Provide default/empty values for fields not in notification payload
type="ANIME",
type=MediaType.ANIME,
status=MediaStatus.RELEASING, # Assume releasing for airing notifications
format=None,
description=None,

View File

@@ -6,6 +6,7 @@ from ..types import (
MediaImage,
MediaItem,
MediaSearchResult,
MediaStatus,
MediaTitle,
PageInfo,
Studio,
@@ -17,9 +18,9 @@ if TYPE_CHECKING:
# Jikan uses specific strings for status, we can map them to our generic enum.
JIKAN_STATUS_MAP = {
"Finished Airing": "FINISHED",
"Currently Airing": "RELEASING",
"Not yet aired": "NOT_YET_RELEASED",
"Finished Airing": MediaStatus.FINISHED,
"Currently Airing": MediaStatus.RELEASING,
"Not yet aired": MediaStatus.NOT_YET_RELEASED,
}
@@ -42,7 +43,11 @@ def _to_generic_title(jikan_titles: list[dict]) -> MediaTitle:
elif type_ == "Japanese":
native = title_
return MediaTitle(romaji=romaji, english=english, native=native)
return MediaTitle(
romaji=romaji,
english=english or romaji or native or "NOT AVAILABLE",
native=native,
)
def _to_generic_image(jikan_images: dict) -> MediaImage:
@@ -69,7 +74,7 @@ def _to_generic_media_item(data: dict) -> MediaItem:
id_mal=data["mal_id"],
title=_to_generic_title(data.get("titles", [])),
cover_image=_to_generic_image(data.get("images", {})),
status=JIKAN_STATUS_MAP.get(data.get("status", ""), None),
status=JIKAN_STATUS_MAP.get(data.get("status", ""), MediaStatus.UNKNOWN),
episodes=data.get("episodes"),
duration=data.get("duration"),
average_score=score,
@@ -81,7 +86,7 @@ def _to_generic_media_item(data: dict) -> MediaItem:
Studio(id=s["mal_id"], name=s["name"]) for s in data.get("studios", [])
],
# Jikan doesn't provide streaming episodes
streaming_episodes=[],
streaming_episodes={},
# Jikan doesn't provide user list status in its search results.
user_status=None,
)

View File

@@ -15,6 +15,7 @@ class MediaStatus(Enum):
NOT_YET_RELEASED = "NOT_YET_RELEASED"
CANCELLED = "CANCELLED"
HIATUS = "HIATUS"
UNKNOWN = "UNKNOWN"
class MediaType(Enum):

View File

@@ -125,47 +125,10 @@ def test_media_api(api_client: BaseApiClient):
print()
# Test 5: Get Characters
print("5. Testing Character Information...")
try:
characters = api_client.get_characters_of(
MediaCharactersParams(id=selected_anime.id)
)
if characters and characters.get("data"):
char_data = characters["data"]["Page"]["media"][0]["characters"]["nodes"]
if char_data:
print(f" Found {len(char_data)} characters:")
for char in char_data[:3]: # Show first 3
name = char["name"]["full"] or char["name"]["first"]
print(f" - {name}")
else:
print(" No character data found")
else:
print(" No characters found")
except Exception as e:
print(f" Error: {e}")
print()
# TODO: Recreate this test
# Test 6: Get Airing Schedule
print("6. Testing Airing Schedule...")
try:
schedule = api_client.get_airing_schedule_for(
MediaAiringScheduleParams(id=selected_anime.id)
)
if schedule and schedule.get("data"):
schedule_data = schedule["data"]["Page"]["media"][0]["airingSchedule"][
"nodes"
]
if schedule_data:
print(f" Found {len(schedule_data)} upcoming episodes:")
for ep in schedule_data[:3]: # Show first 3
print(f" - Episode {ep['episode']}")
else:
print(" No upcoming episodes")
else:
print(" No airing schedule found")
except Exception as e:
print(f" Error: {e}")
print()
# TODO: Recreate this test
# Test 7: User Media List (if authenticated)
if api_client.is_authenticated():