mirror of
https://github.com/Benexl/FastAnime.git
synced 2025-12-12 15:50:01 -08:00
Compare commits
16 Commits
v3.2.8
...
feature/ma
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
41d7f1702c | ||
|
|
1fea1335c6 | ||
|
|
8b664fae36 | ||
|
|
19a85511b4 | ||
|
|
205299108b | ||
|
|
7670bdd2f3 | ||
|
|
cd3f7f7fb8 | ||
|
|
5be03ed5b8 | ||
|
|
6581179336 | ||
|
|
2bb674f4a0 | ||
|
|
642e77f601 | ||
|
|
a5e99122f5 | ||
|
|
39bd7bed61 | ||
|
|
869072633b | ||
|
|
cbd788a573 | ||
|
|
11fe54b146 |
1
.python-version
Normal file
1
.python-version
Normal file
@@ -0,0 +1 @@
|
||||
3.11
|
||||
@@ -1,5 +1,5 @@
|
||||
{
|
||||
"venvPath": ".",
|
||||
"venv": ".venv",
|
||||
"pythonVersion": "3.11"
|
||||
"pythonVersion": "3.12"
|
||||
}
|
||||
|
||||
8
uv.lock
generated
8
uv.lock
generated
@@ -3441,15 +3441,15 @@ wheels = [
|
||||
|
||||
[[package]]
|
||||
name = "pyright"
|
||||
version = "1.1.406"
|
||||
version = "1.1.407"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
dependencies = [
|
||||
{ name = "nodeenv" },
|
||||
{ name = "typing-extensions" },
|
||||
]
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/f7/16/6b4fbdd1fef59a0292cbb99f790b44983e390321eccbc5921b4d161da5d1/pyright-1.1.406.tar.gz", hash = "sha256:c4872bc58c9643dac09e8a2e74d472c62036910b3bd37a32813989ef7576ea2c", size = 4113151, upload-time = "2025-10-02T01:04:45.488Z" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/a6/1b/0aa08ee42948b61745ac5b5b5ccaec4669e8884b53d31c8ec20b2fcd6b6f/pyright-1.1.407.tar.gz", hash = "sha256:099674dba5c10489832d4a4b2d302636152a9a42d317986c38474c76fe562262", size = 4122872, upload-time = "2025-10-24T23:17:15.145Z" }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/f6/a2/e309afbb459f50507103793aaef85ca4348b66814c86bc73908bdeb66d12/pyright-1.1.406-py3-none-any.whl", hash = "sha256:1d81fb43c2407bf566e97e57abb01c811973fdb21b2df8df59f870f688bdca71", size = 5980982, upload-time = "2025-10-02T01:04:43.137Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/dc/93/b69052907d032b00c40cb656d21438ec00b3a471733de137a3f65a49a0a0/pyright-1.1.407-py3-none-any.whl", hash = "sha256:6dd419f54fcc13f03b52285796d65e639786373f433e243f8b94cf93a7444d21", size = 5997008, upload-time = "2025-10-24T23:17:13.159Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -3751,7 +3751,7 @@ wheels = [
|
||||
|
||||
[[package]]
|
||||
name = "viu-media"
|
||||
version = "3.2.7"
|
||||
version = "3.2.8"
|
||||
source = { editable = "." }
|
||||
dependencies = [
|
||||
{ name = "click" },
|
||||
|
||||
@@ -113,6 +113,7 @@ def _create_tar_backup(
|
||||
api: str,
|
||||
):
|
||||
"""Create a tar-based backup."""
|
||||
# TODO: Add support for bz2/xz compression if needed
|
||||
mode = "w:gz" if compress else "w"
|
||||
|
||||
with tarfile.open(output_path, mode) as tar:
|
||||
|
||||
@@ -5,6 +5,7 @@ Registry restore command - restore registry from backup files
|
||||
import json
|
||||
import shutil
|
||||
import tarfile
|
||||
import zipfile
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
|
||||
@@ -25,6 +26,11 @@ from ....service.registry.service import MediaRegistryService
|
||||
is_flag=True,
|
||||
help="Create backup of current registry before restoring",
|
||||
)
|
||||
@click.option(
|
||||
"--backup-current-tar-compression-fmt",
|
||||
type=click.Choice(["gz", "bz2", "xz"], case_sensitive=False),
|
||||
help="The compression format to use for the current registry backup (if enabled)",
|
||||
)
|
||||
@click.option("--verify", is_flag=True, help="Verify backup integrity before restoring")
|
||||
@click.option(
|
||||
"--api",
|
||||
@@ -38,6 +44,7 @@ def restore(
|
||||
backup_file: Path,
|
||||
force: bool,
|
||||
backup_current: bool,
|
||||
backup_current_compression_fmt: str,
|
||||
verify: bool,
|
||||
api: str,
|
||||
):
|
||||
@@ -61,7 +68,7 @@ def restore(
|
||||
"Verification Failed",
|
||||
"Backup file appears to be corrupted or invalid",
|
||||
)
|
||||
raise click.Abort()
|
||||
return
|
||||
feedback.success("Verification", "Backup file integrity verified")
|
||||
|
||||
# Check if current registry exists
|
||||
@@ -77,7 +84,13 @@ def restore(
|
||||
|
||||
# Create backup of current registry if requested
|
||||
if backup_current and registry_exists:
|
||||
_backup_current_registry(registry_service, api, feedback)
|
||||
_backup_current_registry(
|
||||
registry_service,
|
||||
api,
|
||||
feedback,
|
||||
backup_format=backup_format,
|
||||
compression_fmt=backup_current_compression_fmt,
|
||||
)
|
||||
|
||||
# Show restore summary
|
||||
_show_restore_summary(backup_file, backup_format, feedback)
|
||||
@@ -110,7 +123,13 @@ def restore(
|
||||
def _detect_backup_format(backup_file: Path) -> str:
|
||||
"""Detect backup file format."""
|
||||
suffixes = "".join(backup_file.suffixes).lower()
|
||||
if ".tar" in suffixes or ".gz" in suffixes or ".tgz" in suffixes:
|
||||
if (
|
||||
".tar" in suffixes
|
||||
or ".gz" in suffixes
|
||||
or ".tgz" in suffixes
|
||||
or ".bz2" in suffixes
|
||||
or ".xz" in suffixes
|
||||
):
|
||||
return "tar"
|
||||
elif ".zip" in suffixes:
|
||||
return "zip"
|
||||
@@ -122,25 +141,38 @@ def _verify_backup(
|
||||
) -> bool:
|
||||
"""Verify backup file integrity."""
|
||||
try:
|
||||
metadata = {}
|
||||
has_registry = has_index = has_metadata = False
|
||||
if format_type == "tar":
|
||||
with tarfile.open(backup_file, "r:*") as tar:
|
||||
names = tar.getnames()
|
||||
has_registry = any("registry/" in name for name in names)
|
||||
has_index = any("index/" in name for name in names)
|
||||
has_metadata = "backup_metadata.json" in names
|
||||
for name in names:
|
||||
if name == "registry/":
|
||||
has_registry = True
|
||||
continue
|
||||
if name == "index/":
|
||||
has_index = True
|
||||
continue
|
||||
if name == "backup_metadata.json":
|
||||
has_metadata = True
|
||||
continue
|
||||
if has_metadata:
|
||||
metadata_member = tar.getmember("backup_metadata.json")
|
||||
if metadata_file := tar.extractfile(metadata_member):
|
||||
metadata = json.load(metadata_file)
|
||||
else: # zip
|
||||
import zipfile
|
||||
|
||||
with zipfile.ZipFile(backup_file, "r") as zip_file:
|
||||
names = zip_file.namelist()
|
||||
has_registry = any("registry/" in name for name in names)
|
||||
has_index = any("index/" in name for name in names)
|
||||
has_metadata = "backup_metadata.json" in names
|
||||
for name in names:
|
||||
if name == "registry/":
|
||||
has_registry = True
|
||||
continue
|
||||
if name == "index/":
|
||||
has_index = True
|
||||
continue
|
||||
if name == "backup_metadata.json":
|
||||
has_metadata = True
|
||||
continue
|
||||
if has_metadata:
|
||||
with zip_file.open("backup_metadata.json") as metadata_file:
|
||||
metadata = json.load(metadata_file)
|
||||
@@ -163,27 +195,42 @@ def _verify_backup(
|
||||
|
||||
def _check_registry_exists(registry_service: MediaRegistryService) -> bool:
|
||||
"""Check if a registry already exists."""
|
||||
try:
|
||||
stats = registry_service.get_registry_stats()
|
||||
return stats.get("total_media", 0) > 0
|
||||
except Exception:
|
||||
return False
|
||||
# TODO: Improve this check to be more robust
|
||||
return registry_service.media_registry_dir.exists() and any(
|
||||
registry_service.media_registry_dir.iterdir()
|
||||
)
|
||||
|
||||
|
||||
def _backup_current_registry(
|
||||
registry_service: MediaRegistryService, api: str, feedback: FeedbackService
|
||||
registry_service: MediaRegistryService,
|
||||
api: str,
|
||||
feedback: FeedbackService,
|
||||
backup_format: str,
|
||||
compression_fmt: str,
|
||||
):
|
||||
"""Create backup of current registry before restoring."""
|
||||
from .backup import _create_tar_backup
|
||||
|
||||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
backup_path = Path(f"viu_registry_pre_restore_{api}_{timestamp}.tar.gz")
|
||||
if backup_format == "tar":
|
||||
from .backup import _create_tar_backup
|
||||
|
||||
try:
|
||||
_create_tar_backup(registry_service, backup_path, True, False, feedback, api)
|
||||
feedback.success("Current Registry Backed Up", f"Saved to {backup_path}")
|
||||
except Exception as e:
|
||||
feedback.warning("Backup Warning", f"Failed to backup current registry: {e}")
|
||||
backup_path = Path(f"viu_registry_pre_restore_{api}_{timestamp}.tar.gz")
|
||||
|
||||
try:
|
||||
_create_tar_backup(
|
||||
registry_service, backup_path, True, False, feedback, api
|
||||
)
|
||||
feedback.success("Current Registry Backed Up", f"Saved to {backup_path}")
|
||||
except Exception as e:
|
||||
feedback.warning(
|
||||
"Backup Warning", f"Failed to backup current registry: {e}"
|
||||
)
|
||||
else:
|
||||
from .backup import _create_zip_backup
|
||||
|
||||
backup_path = Path(f"viu_registry_pre_restore_{api}_{timestamp}.zip")
|
||||
|
||||
_create_zip_backup(registry_service, backup_path, True, feedback, api)
|
||||
|
||||
|
||||
def _show_restore_summary(
|
||||
|
||||
@@ -2,6 +2,7 @@ import textwrap
|
||||
from pathlib import Path
|
||||
from typing import Any, Literal, get_args, get_origin
|
||||
|
||||
# TODO: should we maintain a separate dependency for InquirerPy or write our own simple prompt system?
|
||||
from InquirerPy import inquirer
|
||||
from InquirerPy.validator import NumberValidator
|
||||
from pydantic import BaseModel
|
||||
@@ -28,7 +29,7 @@ class InteractiveConfigEditor:
|
||||
if not isinstance(section_model, BaseModel):
|
||||
continue
|
||||
|
||||
if not inquirer.confirm(
|
||||
if not inquirer.confirm( # pyright: ignore[reportPrivateImportUsage]
|
||||
message=f"Configure '{section_name.title()}' settings?",
|
||||
default=True,
|
||||
).execute():
|
||||
@@ -83,14 +84,14 @@ class InteractiveConfigEditor:
|
||||
|
||||
# Boolean fields
|
||||
if field_type is bool:
|
||||
return inquirer.confirm(
|
||||
return inquirer.confirm( # pyright: ignore[reportPrivateImportUsage]
|
||||
message=message, default=current_value, long_instruction=help_text
|
||||
)
|
||||
|
||||
# Literal (Choice) fields
|
||||
if hasattr(field_type, "__origin__") and get_origin(field_type) is Literal:
|
||||
choices = list(get_args(field_type))
|
||||
return inquirer.select(
|
||||
return inquirer.select( # pyright: ignore[reportPrivateImportUsage]
|
||||
message=message,
|
||||
choices=choices,
|
||||
default=current_value,
|
||||
@@ -99,7 +100,7 @@ class InteractiveConfigEditor:
|
||||
|
||||
# Numeric fields
|
||||
if field_type is int:
|
||||
return inquirer.number(
|
||||
return inquirer.number( # pyright: ignore[reportPrivateImportUsage]
|
||||
message=message,
|
||||
default=int(current_value),
|
||||
long_instruction=help_text,
|
||||
@@ -110,7 +111,7 @@ class InteractiveConfigEditor:
|
||||
validate=NumberValidator(),
|
||||
)
|
||||
if field_type is float:
|
||||
return inquirer.number(
|
||||
return inquirer.number( # pyright: ignore[reportPrivateImportUsage]
|
||||
message=message,
|
||||
default=float(current_value),
|
||||
float_allowed=True,
|
||||
@@ -120,7 +121,7 @@ class InteractiveConfigEditor:
|
||||
# Path fields
|
||||
if field_type is Path:
|
||||
# Use text prompt for paths to allow '~' expansion, as FilePathPrompt can be tricky
|
||||
return inquirer.text(
|
||||
return inquirer.text( # pyright: ignore[reportPrivateImportUsage]
|
||||
message=message, default=str(current_value), long_instruction=help_text
|
||||
)
|
||||
|
||||
@@ -128,13 +129,13 @@ class InteractiveConfigEditor:
|
||||
if field_type is str:
|
||||
# Check for 'examples' to provide choices
|
||||
if hasattr(field_info, "examples") and field_info.examples:
|
||||
return inquirer.fuzzy(
|
||||
return inquirer.fuzzy( # pyright: ignore[reportPrivateImportUsage]
|
||||
message=message,
|
||||
choices=field_info.examples,
|
||||
default=str(current_value),
|
||||
long_instruction=help_text,
|
||||
)
|
||||
return inquirer.text(
|
||||
return inquirer.text( # pyright: ignore[reportPrivateImportUsage]
|
||||
message=message, default=str(current_value), long_instruction=help_text
|
||||
)
|
||||
|
||||
|
||||
@@ -3,6 +3,8 @@ import shutil
|
||||
import subprocess
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
from viu_media.core.exceptions import DependencyNotFoundError
|
||||
import importlib.util
|
||||
|
||||
import click
|
||||
import httpx
|
||||
@@ -43,67 +45,74 @@ def resize_image_from_url(
|
||||
"""
|
||||
from io import BytesIO
|
||||
|
||||
from PIL import Image
|
||||
if importlib.util.find_spec("PIL"):
|
||||
from PIL import Image # pyright: ignore[reportMissingImports]
|
||||
|
||||
if not return_bytes and output_path is None:
|
||||
raise ValueError("output_path must be provided if return_bytes is False.")
|
||||
if not return_bytes and output_path is None:
|
||||
raise ValueError("output_path must be provided if return_bytes is False.")
|
||||
|
||||
try:
|
||||
# Use the provided synchronous client
|
||||
response = client.get(url)
|
||||
response.raise_for_status() # Raise an exception for bad status codes
|
||||
try:
|
||||
# Use the provided synchronous client
|
||||
response = client.get(url)
|
||||
response.raise_for_status() # Raise an exception for bad status codes
|
||||
|
||||
image_bytes = response.content
|
||||
image_stream = BytesIO(image_bytes)
|
||||
img = Image.open(image_stream)
|
||||
image_bytes = response.content
|
||||
image_stream = BytesIO(image_bytes)
|
||||
img = Image.open(image_stream)
|
||||
|
||||
if maintain_aspect_ratio:
|
||||
img_copy = img.copy()
|
||||
img_copy.thumbnail((new_width, new_height), Image.Resampling.LANCZOS)
|
||||
resized_img = img_copy
|
||||
else:
|
||||
resized_img = img.resize((new_width, new_height), Image.Resampling.LANCZOS)
|
||||
|
||||
if return_bytes:
|
||||
# Determine the output format. Default to JPEG if original is unknown or problematic.
|
||||
# Handle RGBA to RGB conversion for JPEG output.
|
||||
output_format = (
|
||||
img.format if img.format in ["JPEG", "PNG", "WEBP"] else "JPEG"
|
||||
)
|
||||
if output_format == "JPEG":
|
||||
if resized_img.mode in ("RGBA", "P"):
|
||||
resized_img = resized_img.convert("RGB")
|
||||
|
||||
byte_arr = BytesIO()
|
||||
resized_img.save(byte_arr, format=output_format)
|
||||
logger.info(
|
||||
f"Image from {url} resized to {resized_img.width}x{resized_img.height} and returned as bytes ({output_format} format)."
|
||||
)
|
||||
return byte_arr.getvalue()
|
||||
else:
|
||||
# Ensure the directory exists before saving
|
||||
if output_path:
|
||||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
resized_img.save(output_path)
|
||||
logger.info(
|
||||
f"Image from {url} resized to {resized_img.width}x{resized_img.height} and saved as '{output_path}'"
|
||||
if maintain_aspect_ratio:
|
||||
img_copy = img.copy()
|
||||
img_copy.thumbnail((new_width, new_height), Image.Resampling.LANCZOS)
|
||||
resized_img = img_copy
|
||||
else:
|
||||
resized_img = img.resize(
|
||||
(new_width, new_height), Image.Resampling.LANCZOS
|
||||
)
|
||||
return None
|
||||
|
||||
except httpx.RequestError as e:
|
||||
logger.error(f"An error occurred while requesting {url}: {e}")
|
||||
return None
|
||||
except httpx.HTTPStatusError as e:
|
||||
logger.error(
|
||||
f"HTTP error occurred: {e.response.status_code} - {e.response.text}"
|
||||
if return_bytes:
|
||||
# Determine the output format. Default to JPEG if original is unknown or problematic.
|
||||
# Handle RGBA to RGB conversion for JPEG output.
|
||||
output_format = (
|
||||
img.format if img.format in ["JPEG", "PNG", "WEBP"] else "JPEG"
|
||||
)
|
||||
if output_format == "JPEG":
|
||||
if resized_img.mode in ("RGBA", "P"):
|
||||
resized_img = resized_img.convert("RGB")
|
||||
|
||||
byte_arr = BytesIO()
|
||||
resized_img.save(byte_arr, format=output_format)
|
||||
logger.info(
|
||||
f"Image from {url} resized to {resized_img.width}x{resized_img.height} and returned as bytes ({output_format} format)."
|
||||
)
|
||||
return byte_arr.getvalue()
|
||||
else:
|
||||
# Ensure the directory exists before saving
|
||||
if output_path:
|
||||
output_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
resized_img.save(output_path)
|
||||
logger.info(
|
||||
f"Image from {url} resized to {resized_img.width}x{resized_img.height} and saved as '{output_path}'"
|
||||
)
|
||||
return None
|
||||
|
||||
except httpx.RequestError as e:
|
||||
logger.error(f"An error occurred while requesting {url}: {e}")
|
||||
return None
|
||||
except httpx.HTTPStatusError as e:
|
||||
logger.error(
|
||||
f"HTTP error occurred: {e.response.status_code} - {e.response.text}"
|
||||
)
|
||||
return None
|
||||
except ValueError as e:
|
||||
logger.error(f"Configuration error: {e}")
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"An unexpected error occurred: {e}")
|
||||
return None
|
||||
else:
|
||||
raise DependencyNotFoundError(
|
||||
"Pillow library is required for image processing. Please install it via 'uv pip install Pillow'."
|
||||
)
|
||||
return None
|
||||
except ValueError as e:
|
||||
logger.error(f"Configuration error: {e}")
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"An unexpected error occurred: {e}")
|
||||
return None
|
||||
|
||||
|
||||
def render(url: str, capture: bool = False, size: str = "30x30") -> Optional[str]:
|
||||
@@ -123,17 +132,12 @@ def render(url: str, capture: bool = False, size: str = "30x30") -> Optional[str
|
||||
If capture is False, prints directly to the terminal and returns None.
|
||||
Returns None on any failure.
|
||||
"""
|
||||
# --- Common subprocess arguments ---
|
||||
subprocess_kwargs = {
|
||||
"check": False, # We will handle errors manually
|
||||
"capture_output": capture,
|
||||
"text": capture, # Decode stdout/stderr as text if capturing
|
||||
}
|
||||
|
||||
# --- Try icat (Kitty terminal) first ---
|
||||
if icat_executable := shutil.which("icat"):
|
||||
process = subprocess.run(
|
||||
[icat_executable, "--align", "left", url], **subprocess_kwargs
|
||||
[icat_executable, "--align", "left", url],
|
||||
capture_output=capture,
|
||||
text=capture,
|
||||
)
|
||||
if process.returncode == 0:
|
||||
return process.stdout if capture else None
|
||||
@@ -148,11 +152,11 @@ def render(url: str, capture: bool = False, size: str = "30x30") -> Optional[str
|
||||
response.raise_for_status()
|
||||
img_bytes = response.content
|
||||
|
||||
# Add stdin input to the subprocess arguments
|
||||
subprocess_kwargs["input"] = img_bytes
|
||||
|
||||
process = subprocess.run(
|
||||
[chafa_executable, f"--size={size}", "-"], **subprocess_kwargs
|
||||
[chafa_executable, f"--size={size}", "-"],
|
||||
capture_output=capture,
|
||||
text=capture,
|
||||
input=img_bytes,
|
||||
)
|
||||
if process.returncode == 0:
|
||||
return process.stdout if capture else None
|
||||
|
||||
@@ -130,10 +130,11 @@ class YtDLPDownloader(BaseDownloader):
|
||||
}
|
||||
)
|
||||
|
||||
with yt_dlp.YoutubeDL(opts) as ydl:
|
||||
# TODO: Confirm this type issues
|
||||
with yt_dlp.YoutubeDL(opts) as ydl: # type: ignore
|
||||
info = ydl.extract_info(params.url, download=True)
|
||||
if info:
|
||||
_video_path = info["requested_downloads"][0]["filepath"]
|
||||
_video_path = info["requested_downloads"][0]["filepath"] # type: ignore
|
||||
if _video_path.endswith(".unknown_video"):
|
||||
print("Normalizing path...")
|
||||
_vid_path = _video_path.replace(".unknown_video", ".mp4")
|
||||
|
||||
@@ -219,7 +219,7 @@ class BackgroundWorker(ABC):
|
||||
else:
|
||||
# Wait for tasks to complete with timeout
|
||||
try:
|
||||
self._executor.shutdown(wait=True, timeout=timeout)
|
||||
self._executor.shutdown(wait=True)
|
||||
except TimeoutError:
|
||||
logger.warning(
|
||||
f"Worker {self.name} shutdown timed out, forcing cancellation"
|
||||
|
||||
@@ -1,22 +0,0 @@
|
||||
from httpx import get
|
||||
|
||||
ANISKIP_ENDPOINT = "https://api.aniskip.com/v1/skip-times"
|
||||
|
||||
|
||||
# TODO: Finish own implementation of aniskip script
|
||||
class AniSkip:
|
||||
@classmethod
|
||||
def get_skip_times(
|
||||
cls, mal_id: int, episode_number: float | int, types=["op", "ed"]
|
||||
):
|
||||
url = f"{ANISKIP_ENDPOINT}/{mal_id}/{episode_number}?types=op&types=ed"
|
||||
response = get(url)
|
||||
print(response.text)
|
||||
return response.json()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
mal_id = input("Mal id: ")
|
||||
episode_number = input("episode_number: ")
|
||||
skip_times = AniSkip.get_skip_times(int(mal_id), float(episode_number))
|
||||
print(skip_times)
|
||||
@@ -1,3 +0,0 @@
|
||||
from .api import connect
|
||||
|
||||
__all__ = ["connect"]
|
||||
@@ -1,13 +0,0 @@
|
||||
import time
|
||||
|
||||
from pypresence import Presence
|
||||
|
||||
|
||||
def connect(show, episode, switch):
|
||||
presence = Presence(client_id="1292070065583165512")
|
||||
presence.connect()
|
||||
if not switch.is_set():
|
||||
presence.update(details=show, state="Watching episode " + episode)
|
||||
time.sleep(10)
|
||||
else:
|
||||
presence.close()
|
||||
@@ -33,6 +33,7 @@ from ..types import (
|
||||
Studio,
|
||||
UserListItem,
|
||||
UserMediaListStatus,
|
||||
MediaType,
|
||||
UserProfile,
|
||||
)
|
||||
from .types import (
|
||||
@@ -539,7 +540,7 @@ def _to_generic_media_item_from_notification_partial(
|
||||
title=_to_generic_media_title(data["title"]),
|
||||
cover_image=_to_generic_media_image(data["coverImage"]),
|
||||
# Provide default/empty values for fields not in notification payload
|
||||
type="ANIME",
|
||||
type=MediaType.ANIME,
|
||||
status=MediaStatus.RELEASING, # Assume releasing for airing notifications
|
||||
format=None,
|
||||
description=None,
|
||||
|
||||
@@ -6,6 +6,7 @@ from ..types import (
|
||||
MediaImage,
|
||||
MediaItem,
|
||||
MediaSearchResult,
|
||||
MediaStatus,
|
||||
MediaTitle,
|
||||
PageInfo,
|
||||
Studio,
|
||||
@@ -17,9 +18,9 @@ if TYPE_CHECKING:
|
||||
|
||||
# Jikan uses specific strings for status, we can map them to our generic enum.
|
||||
JIKAN_STATUS_MAP = {
|
||||
"Finished Airing": "FINISHED",
|
||||
"Currently Airing": "RELEASING",
|
||||
"Not yet aired": "NOT_YET_RELEASED",
|
||||
"Finished Airing": MediaStatus.FINISHED,
|
||||
"Currently Airing": MediaStatus.RELEASING,
|
||||
"Not yet aired": MediaStatus.NOT_YET_RELEASED,
|
||||
}
|
||||
|
||||
|
||||
@@ -42,7 +43,11 @@ def _to_generic_title(jikan_titles: list[dict]) -> MediaTitle:
|
||||
elif type_ == "Japanese":
|
||||
native = title_
|
||||
|
||||
return MediaTitle(romaji=romaji, english=english, native=native)
|
||||
return MediaTitle(
|
||||
romaji=romaji,
|
||||
english=english or romaji or native or "NOT AVAILABLE",
|
||||
native=native,
|
||||
)
|
||||
|
||||
|
||||
def _to_generic_image(jikan_images: dict) -> MediaImage:
|
||||
@@ -69,7 +74,7 @@ def _to_generic_media_item(data: dict) -> MediaItem:
|
||||
id_mal=data["mal_id"],
|
||||
title=_to_generic_title(data.get("titles", [])),
|
||||
cover_image=_to_generic_image(data.get("images", {})),
|
||||
status=JIKAN_STATUS_MAP.get(data.get("status", ""), None),
|
||||
status=JIKAN_STATUS_MAP.get(data.get("status", ""), MediaStatus.UNKNOWN),
|
||||
episodes=data.get("episodes"),
|
||||
duration=data.get("duration"),
|
||||
average_score=score,
|
||||
@@ -81,7 +86,7 @@ def _to_generic_media_item(data: dict) -> MediaItem:
|
||||
Studio(id=s["mal_id"], name=s["name"]) for s in data.get("studios", [])
|
||||
],
|
||||
# Jikan doesn't provide streaming episodes
|
||||
streaming_episodes=[],
|
||||
streaming_episodes={},
|
||||
# Jikan doesn't provide user list status in its search results.
|
||||
user_status=None,
|
||||
)
|
||||
|
||||
@@ -15,6 +15,7 @@ class MediaStatus(Enum):
|
||||
NOT_YET_RELEASED = "NOT_YET_RELEASED"
|
||||
CANCELLED = "CANCELLED"
|
||||
HIATUS = "HIATUS"
|
||||
UNKNOWN = "UNKNOWN"
|
||||
|
||||
|
||||
class MediaType(Enum):
|
||||
|
||||
@@ -125,47 +125,10 @@ def test_media_api(api_client: BaseApiClient):
|
||||
print()
|
||||
|
||||
# Test 5: Get Characters
|
||||
print("5. Testing Character Information...")
|
||||
try:
|
||||
characters = api_client.get_characters_of(
|
||||
MediaCharactersParams(id=selected_anime.id)
|
||||
)
|
||||
if characters and characters.get("data"):
|
||||
char_data = characters["data"]["Page"]["media"][0]["characters"]["nodes"]
|
||||
if char_data:
|
||||
print(f" Found {len(char_data)} characters:")
|
||||
for char in char_data[:3]: # Show first 3
|
||||
name = char["name"]["full"] or char["name"]["first"]
|
||||
print(f" - {name}")
|
||||
else:
|
||||
print(" No character data found")
|
||||
else:
|
||||
print(" No characters found")
|
||||
except Exception as e:
|
||||
print(f" Error: {e}")
|
||||
print()
|
||||
# TODO: Recreate this test
|
||||
|
||||
# Test 6: Get Airing Schedule
|
||||
print("6. Testing Airing Schedule...")
|
||||
try:
|
||||
schedule = api_client.get_airing_schedule_for(
|
||||
MediaAiringScheduleParams(id=selected_anime.id)
|
||||
)
|
||||
if schedule and schedule.get("data"):
|
||||
schedule_data = schedule["data"]["Page"]["media"][0]["airingSchedule"][
|
||||
"nodes"
|
||||
]
|
||||
if schedule_data:
|
||||
print(f" Found {len(schedule_data)} upcoming episodes:")
|
||||
for ep in schedule_data[:3]: # Show first 3
|
||||
print(f" - Episode {ep['episode']}")
|
||||
else:
|
||||
print(" No upcoming episodes")
|
||||
else:
|
||||
print(" No airing schedule found")
|
||||
except Exception as e:
|
||||
print(f" Error: {e}")
|
||||
print()
|
||||
# TODO: Recreate this test
|
||||
|
||||
# Test 7: User Media List (if authenticated)
|
||||
if api_client.is_authenticated():
|
||||
|
||||
Reference in New Issue
Block a user