Compare commits

...

42 Commits

Author SHA1 Message Date
Benexl
26f6ccc91e feat(aniskip): scaffhold with gemini 2025-08-12 12:59:51 +03:00
Benexl
114fba27fd fix(hianime-provider): use a more specific css selector 2025-08-12 12:31:03 +03:00
Benexl
8b123cdae2 fix(hianime-provider): first split by ? 2025-08-12 12:30:31 +03:00
Benexl
b91129c4a7 feat(hianime-provider): update domain to hianime.to 2025-08-12 12:29:46 +03:00
Benexl
de0c656bc6 feat(hianime-provider): scaffhold with gemini using aniwatch repo by ghoshRitesh12 2025-08-12 12:21:35 +03:00
Benexl
de774a58d2 feat(anilist-downloads-command): create it 2025-08-12 11:40:20 +03:00
Benexl
ee25cbba10 fix(normalizer): convert media api title to lower 2025-08-12 10:57:15 +03:00
Benexl
278a771f64 Merge pull request #119 from Benexl/minor-fixes
fix: add back missing yt-dlp's "external_downloader" logic
2025-08-12 10:37:50 +03:00
Type-Delta
0d8c287e2f Merge branch 'master' of https://github.com/Benexl/FastAnime into minor-fixes 2025-08-12 14:35:52 +07:00
Type-Delta
74308dfdc5 fix: add back missing yt-dlp's "external_downloader" logic 2025-08-12 14:35:23 +07:00
Benexl
7ca1b8572e fix(rofi-previews): ensure the cover image is used if episode image unavailable 2025-08-12 02:26:19 +03:00
Benexl
54aed9e5a0 feat(config-command): use title case for the desktop entry title 2025-08-12 02:25:25 +03:00
Benexl
4511d14e8b feat(previews): implement rofi image preview logic 2025-08-12 02:11:27 +03:00
Benexl
bff684e8cb refactor(config-command): rename desktop entry option to generate-desktop-entry 2025-08-12 02:11:04 +03:00
Benexl
cfc83450c8 feat(config): include possible values for pygment style 2025-08-12 01:07:06 +03:00
Benexl
04a6a425b7 feat(config): show possible types and possible values in config comments 2025-08-12 00:55:51 +03:00
Benexl
088d232bfd feat(normalizer): add user normalizer json 2025-08-12 00:39:43 +03:00
Benexl
03fd8c0bf8 feat(selectors): complete contracts 2025-08-11 23:57:34 +03:00
Benexl
17f1744025 feat(watch-history): intelligently switch to repeating if was initially completed 2025-08-11 23:38:24 +03:00
Benexl
9a5f3d46be feat(download-service): notify on completed episode download if plyer available 2025-08-11 23:37:50 +03:00
Benexl
66eb854da5 feat: the worker command lol 2025-08-11 22:42:13 +03:00
Benexl
ae62adf233 feat: scaffhold worker command with gpt 5, it is actually pretty good lol 2025-08-11 22:42:13 +03:00
Benexl
55a7c7facf Merge pull request #117 from Benexl/minor-fixes
fix: anilist auth failed to open link on Windows
2025-08-08 11:17:49 +03:00
Type-Delta
2340c34d02 fix: anilist auth failed to open link on Windows
Switches to using the standard web browser module for opening the authentication URL, providing clearer feedback to the user about browser launch success or failure.
2025-08-08 12:28:23 +07:00
Benexl
40b29ba6e5 Merge pull request #115 from Benexl/minor-fixes
fix: anilist auth example
2025-08-07 13:14:48 +03:00
Type-Delta
5dc768f7e8 fix: anilist auth example 2025-08-07 15:19:38 +07:00
Benexl
b343bfb645 docs: enhance docstrings across player modules for clarity and completeness 2025-08-05 12:11:02 +03:00
Benexl
37773265ce Merge pull request #111 from khachbe/bugfix_formatter
Use int() cast instead of is_integer() for renumbered_val
2025-08-03 15:19:03 +03:00
khachbe
70ef1bf633 Merge branch 'master' into bugfix_formatter 2025-08-03 13:58:54 +02:00
khachbe
bee97acd35 Update formatter.py 2025-08-03 13:58:12 +02:00
Benexl
fb61fd17f1 fix(preview-script): quote $value to prevent interpretation of special characters 2025-08-02 14:01:30 +03:00
Benexl
98fff7d00f chore: bump lockfile 2025-08-02 14:00:47 +03:00
Benexl
3cc9ae50b6 Merge pull request #112 from khachbe/bugfix_typedict
Add list to AllAnimeEpisodeStreams
2025-08-02 13:46:37 +03:00
khachbe
26f7de172a add list to AllAnimeEpisodeStreams 2025-08-01 11:46:05 +02:00
khachbe
673b6280e4 Use int() cast instead of is_integer() for renumbered_val 2025-08-01 11:20:57 +02:00
Benexl
7943dcc3db Merge pull request #110 from theobori/fix/ruff-check-errors
Fixed the `ruff check` command line errors
2025-07-30 22:38:25 +03:00
Théo Bori
49ee1f9bbd Fixed the ruff check command line errors 2025-07-30 18:38:15 +02:00
Benexl
fd80149e74 Merge pull request #109 from iMithrellas/preview-scaling
Add optional --scale-up flag for icat image previews
2025-07-30 00:49:09 +03:00
iMithrellas
7c11616bea fix(fzf-preview): Make --scale-up flag opt in instead of opt out 2025-07-29 23:20:56 +02:00
iMithrellas
b9130018ca Merge branch 'master' into preview-scaling 2025-07-29 21:59:06 +02:00
iMithrellas
70ade13017 feat(fzf-preview): make --scale-up flag a config option 2025-07-29 21:52:48 +02:00
iMithrellas
071c0daf4f feat(fzf-preview): add --scale-up flag so that previews fill the available space 2025-07-29 21:52:48 +02:00
79 changed files with 2753 additions and 379 deletions

1
.gitignore vendored
View File

@@ -210,3 +210,4 @@ repomix-output.xml
.project/
result
.direnv
fastanime/libs/provider/anime/hianime/extractors/js/node_modules

View File

@@ -44,18 +44,19 @@ fzf_preview() {
if [ "$IMAGE_RENDERER" = "icat" ] && [ -z "$GHOSTTY_BIN_DIR" ]; then
if command -v kitten >/dev/null 2>&1; then
kitten icat --clear --transfer-mode=memory --unicode-placeholder --stdin=no --place="$dim@0x0" "$file" | sed "\$d" | sed "$(printf "\$s/\$/\033[m/")"
kitten icat --clear --transfer-mode=memory --unicode-placeholder{SCALE_UP} --stdin=no --place="$dim@0x0" "$file" | sed "\$d" | sed "$(printf "\$s/\$/\033[m/")"
elif command -v icat >/dev/null 2>&1; then
icat --clear --transfer-mode=memory --unicode-placeholder --stdin=no --place="$dim@0x0" "$file" | sed "\$d" | sed "$(printf "\$s/\$/\033[m/")"
icat --clear --transfer-mode=memory --unicode-placeholder{SCALE_UP} --stdin=no --place="$dim@0x0" "$file" | sed "\$d" | sed "$(printf "\$s/\$/\033[m/")"
else
kitty icat --clear --transfer-mode=memory --unicode-placeholder --stdin=no --place="$dim@0x0" "$file" | sed "\$d" | sed "$(printf "\$s/\$/\033[m/")"
kitty icat --clear --transfer-mode=memory --unicode-placeholder{SCALE_UP} --stdin=no --place="$dim@0x0" "$file" | sed "\$d" | sed "$(printf "\$s/\$/\033[m/")"
fi
elif [ -n "$GHOSTTY_BIN_DIR" ]; then
dim=$((FZF_PREVIEW_COLUMNS - 1))x${FZF_PREVIEW_LINES}
if command -v kitten >/dev/null 2>&1; then
kitten icat --clear --transfer-mode=memory --unicode-placeholder --stdin=no --place="$dim@0x0" "$file" | sed "\$d" | sed "$(printf "\$s/\$/\033[m/")"
kitten icat --clear --transfer-mode=memory --unicode-placeholder{SCALE_UP} --stdin=no --place="$dim@0x0" "$file" | sed "\$d" | sed "$(printf "\$s/\$/\033[m/")"
elif command -v icat >/dev/null 2>&1; then
icat --clear --transfer-mode=memory --unicode-placeholder --stdin=no --place="$dim@0x0" "$file" | sed "\$d" | sed "$(printf "\$s/\$/\033[m/")"
icat --clear --transfer-mode=memory --unicode-placeholder{SCALE_UP} --stdin=no --place="$dim@0x0" "$file" | sed "\$d" | sed "$(printf "\$s/\$/\033[m/")"
else
chafa -s "$dim" "$file"
fi
@@ -91,7 +92,7 @@ print_kv() {
# If the text is too long to fit, just add a single space for separation.
if [ "$padding_len" -lt 1 ]; then
padding_len=1
value=$(echo $value| fold -s -w "$((WIDTH - key_len - 3))")
value=$(echo "$value"| fold -s -w "$((WIDTH - key_len - 3))")
printf "{C_KEY}%s:{RESET}%*s%s\\n" "$key" "$padding_len" "" " $value"
else
printf "{C_KEY}%s:{RESET}%*s%s\\n" "$key" "$padding_len" "" " $value"

View File

@@ -1 +1,3 @@
from .cmd import anilist
__all__ = ["anilist"]

View File

@@ -8,6 +8,7 @@ commands = {
# "recent": "recent.recent",
"search": "search.search",
"download": "download.download",
"downloads": "downloads.downloads",
"auth": "auth.auth",
"stats": "stats.stats",
"notifications": "notifications.notifications",

View File

@@ -1,4 +1,5 @@
import click
import webbrowser
from .....core.config.model import AppConfig
@@ -41,9 +42,14 @@ def auth(config: AppConfig, status: bool, logout: bool):
return
api_client = create_api_client("anilist", config)
# TODO: stop the printing of opening browser session to stderr
click.launch(ANILIST_AUTH)
feedback.info("Your browser has been opened to obtain an AniList token.")
open_success = webbrowser.open(ANILIST_AUTH, new=2)
if open_success:
feedback.info("Your browser has been opened to obtain an AniList token.")
feedback.info(f"or you can visit the site manually [magenta][link={ANILIST_AUTH}]here[/link][/magenta].")
else:
feedback.warning(
f"Failed to open the browser. Please visit the site manually [magenta][link={ANILIST_AUTH}]here[/link][/magenta]."
)
feedback.info(
"After authorizing, copy the token from the address bar and paste it below."
)

View File

@@ -0,0 +1,211 @@
import json
from typing import TYPE_CHECKING
import click
from .....core.config import AppConfig
from .....libs.media_api.params import MediaSearchParams
from .....libs.media_api.types import (
MediaFormat,
MediaGenre,
MediaSort,
UserMediaListStatus,
)
from ....service.feedback import FeedbackService
from ....service.registry.service import MediaRegistryService
@click.command(help="Search through the local media registry")
@click.argument("query", required=False)
@click.option(
"--status",
type=click.Choice(
[s.value for s in UserMediaListStatus],
case_sensitive=False,
),
help="Filter by watch status",
)
@click.option(
"--genre", multiple=True, help="Filter by genre (can be used multiple times)"
)
@click.option(
"--format",
type=click.Choice(
[
f.value
for f in MediaFormat
if f not in [MediaFormat.MANGA, MediaFormat.NOVEL, MediaFormat.ONE_SHOT]
],
case_sensitive=False,
),
help="Filter by format",
)
@click.option("--year", type=int, help="Filter by release year")
@click.option("--min-score", type=float, help="Minimum average score (0.0 - 10.0)")
@click.option("--max-score", type=float, help="Maximum average score (0.0 - 10.0)")
@click.option(
"--sort",
type=click.Choice(
["title", "score", "popularity", "year", "episodes", "updated"],
case_sensitive=False,
),
default="title",
help="Sort results by field",
)
@click.option("--limit", type=int, default=20, help="Maximum number of results to show")
@click.option(
"--json", "output_json", is_flag=True, help="Output results in JSON format"
)
@click.option(
"--api",
default="anilist",
type=click.Choice(["anilist"], case_sensitive=False),
help="Media API registry to search",
)
@click.pass_obj
def downloads(
config: AppConfig,
query: str | None,
status: str | None,
genre: tuple[str, ...],
format: str | None,
year: int | None,
min_score: float | None,
max_score: float | None,
sort: str,
limit: int,
output_json: bool,
api: str,
):
"""
Search through your local media registry.
You can search by title and filter by various criteria like status,
genre, format, year, and score range.
"""
feedback = FeedbackService(config)
if not has_user_input(click.get_current_context()):
from ....interactive.session import session
from ....interactive.state import MediaApiState, MenuName, State
# Create initial state with search results
initial_state = [State(menu_name=MenuName.DOWNLOADS)]
session.load_menus_from_folder("media")
session.run(config, history=initial_state)
registry_service = MediaRegistryService(api, config.media_registry)
search_params = _build_search_params(
query, status, genre, format, year, min_score, max_score, sort, limit
)
with feedback.progress("Searching local registry..."):
result = registry_service.search_for_media(search_params)
if not result or not result.media:
feedback.info("No Results", "No media found matching your criteria")
return
if output_json:
print(json.dumps(result.model_dump(mode="json"), indent=2))
return
from ....interactive.session import session
from ....interactive.state import MediaApiState, MenuName, State
feedback.info(
f"Found {len(result.media)} anime matching your search. Launching interactive mode..."
)
# Create initial state with search results
initial_state = [
State(menu_name=MenuName.DOWNLOADS),
State(
menu_name=MenuName.RESULTS,
media_api=MediaApiState(
search_result={
media_item.id: media_item for media_item in result.media
},
search_params=search_params,
page_info=result.page_info,
),
),
]
session.load_menus_from_folder("media")
session.run(config, history=initial_state)
def _build_search_params(
query: str | None,
status: str | None,
genre: tuple[str, ...],
format_str: str | None,
year: int | None,
min_score: float | None,
max_score: float | None,
sort: str,
limit: int,
) -> MediaSearchParams:
"""Build MediaSearchParams from command options for local filtering."""
sort_map = {
"title": MediaSort.TITLE_ROMAJI,
"score": MediaSort.SCORE_DESC,
"popularity": MediaSort.POPULARITY_DESC,
"year": MediaSort.START_DATE_DESC,
"episodes": MediaSort.EPISODES_DESC,
"updated": MediaSort.UPDATED_AT_DESC,
}
# Safely convert strings to enums
format_enum = next(
(f for f in MediaFormat if f.value.lower() == (format_str or "").lower()), None
)
genre_enums = [
g for g_str in genre for g in MediaGenre if g.value.lower() == g_str.lower()
]
# Note: Local search handles status separately as it's part of the index, not MediaItem
return MediaSearchParams(
query=query,
per_page=limit,
sort=[sort_map.get(sort.lower(), MediaSort.TITLE_ROMAJI)],
averageScore_greater=int(min_score * 10) if min_score is not None else None,
averageScore_lesser=int(max_score * 10) if max_score is not None else None,
genre_in=genre_enums or None,
format_in=[format_enum] if format_enum else None,
seasonYear=year,
)
def has_user_input(ctx: click.Context) -> bool:
"""
Checks if any command-line options or arguments were provided by the user
by comparing the given values to their default values.
This handles all parameter types including flags, multiple options,
and arguments with no default.
"""
import sys
if len(sys.argv) > 3:
return True
else:
return False
for param in ctx.command.params:
# Get the value for the parameter from the context.
# This will be the user-provided value or the default.
value = ctx.params.get(param.name)
# We need to explicitly check if a value was provided by the user.
# The simplest way to do this is to compare it to its default.
if value != param.default:
# If the value is different from the default, the user
# must have provided it.
return True
# If the loop completes without finding any non-default values,
# then no user input was given.
return False

View File

@@ -19,15 +19,11 @@ def stats(config: "AppConfig"):
from .....libs.media_api.api import create_api_client
from ....service.auth import AuthService
from ....service.feedback import FeedbackService
from ....service.registry import MediaRegistryService
console = Console()
feedback = FeedbackService(config)
auth = AuthService(config.general.media_api)
registry_service = MediaRegistryService(
config.general.media_api, config.media_registry
)
media_api_client = create_api_client(config.general.media_api, config)

View File

@@ -95,7 +95,7 @@ search = """
main = """
\b
\b\bExamples:
# ---- search ----
# ---- search ----
\b
# Basic search by title
fastanime anilist search -t "Attack on Titan"
@@ -147,16 +147,16 @@ main = """
\b
# ---- login ----
\b
# To sign in just run
fastanime anilist login
# To sign in just run
fastanime anilist auth
\b
# To view your login status
fastanime anilist login --status
# To check your login status
fastanime anilist auth --status
\b
# To erase login data
fastanime anilist login --erase
# To log out and erase credentials
fastanime anilist auth --logout
\b
# ---- notifier ----
# ---- notifier ----
\b
# basic form
fastanime anilist notifier

View File

@@ -20,10 +20,13 @@ from ...core.config import AppConfig
fastanime config --path
\b
# print desktop entry info
fastanime config --desktop-entry
fastanime config --generate-desktop-entry
\b
# update your config without opening an editor
fastanime --icons --fzf --preview config --update
fastanime --icons --selector fzf --preview full config --update
\b
# interactively define your config
fastanime config --interactive
\b
# view the current contents of your config
fastanime config --view
@@ -40,9 +43,9 @@ from ...core.config import AppConfig
is_flag=True,
)
@click.option(
"--desktop-entry",
"--generate-desktop-entry",
"-d",
help="Configure the desktop entry of fastanime",
help="Generate the desktop entry of fastanime",
is_flag=True,
)
@click.option(
@@ -59,7 +62,13 @@ from ...core.config import AppConfig
)
@click.pass_obj
def config(
user_config: AppConfig, path, view, view_json, desktop_entry, update, interactive
user_config: AppConfig,
path,
view,
view_json,
generate_desktop_entry,
update,
interactive,
):
from ...core.constants import USER_CONFIG
from ..config.editor import InteractiveConfigEditor
@@ -85,7 +94,7 @@ def config(
import json
print(json.dumps(user_config.model_dump(mode="json")))
elif desktop_entry:
elif generate_desktop_entry:
_generate_desktop_entry()
elif interactive:
editor = InteractiveConfigEditor(current_config=user_config)
@@ -123,9 +132,9 @@ def _generate_desktop_entry():
EXECUTABLE = shutil.which("fastanime")
if EXECUTABLE:
cmds = f"{EXECUTABLE} --rofi anilist"
cmds = f"{EXECUTABLE} --selector rofi anilist"
else:
cmds = f"{sys.executable} -m fastanime --rofi anilist"
cmds = f"{sys.executable} -m fastanime --selector rofi anilist"
# TODO: Get funs of the other platforms to complete this lol
if PLATFORM == "win32":
@@ -140,7 +149,7 @@ def _generate_desktop_entry():
desktop_entry = dedent(
f"""
[Desktop Entry]
Name={PROJECT_NAME}
Name={PROJECT_NAME.title()}
Type=Application
version={__version__}
Path={Path().home()}

View File

@@ -204,7 +204,6 @@ def download_anime(
anime_title: str,
episode: str,
):
from ...core.downloader import DownloadParams, create_downloader
from ...libs.provider.anime.params import EpisodeStreamsParams

View File

@@ -1,62 +1,218 @@
import click
from fastanime.core.config import AppConfig
from fastanime.core.exceptions import FastAnimeError
from fastanime.libs.media_api.params import MediaSearchParams
from fastanime.libs.media_api.types import (
MediaFormat,
MediaGenre,
MediaItem,
MediaSeason,
MediaSort,
MediaStatus,
MediaTag,
MediaType,
MediaYear,
)
@click.command(help="Queue episodes for the background worker to download.")
# Search/Filter options (mirrors 'fastanime anilist download')
@click.option("--title", "-t")
@click.option("--page", "-p", type=click.IntRange(min=1), default=1)
@click.option("--per-page", type=click.IntRange(min=1, max=50))
@click.option("--season", type=click.Choice([s.value for s in MediaSeason]))
@click.option(
"--title", "-t", required=True, multiple=True, help="Anime title to queue."
"--status", "-S", multiple=True, type=click.Choice([s.value for s in MediaStatus])
)
@click.option(
"--episode-range", "-r", required=True, help="Range of episodes (e.g., '1-10')."
"--status-not", multiple=True, type=click.Choice([s.value for s in MediaStatus])
)
@click.option("--sort", "-s", type=click.Choice([s.value for s in MediaSort]))
@click.option(
"--genres", "-g", multiple=True, type=click.Choice([g.value for g in MediaGenre])
)
@click.option(
"--genres-not", multiple=True, type=click.Choice([g.value for g in MediaGenre])
)
@click.option("--tags", "-T", multiple=True, type=click.Choice([t.value for t in MediaTag]))
@click.option("--tags-not", multiple=True, type=click.Choice([t.value for t in MediaTag]))
@click.option(
"--media-format",
"-f",
multiple=True,
type=click.Choice([f.value for f in MediaFormat]),
)
@click.option("--media-type", type=click.Choice([t.value for t in MediaType]))
@click.option("--year", "-y", type=click.Choice([y.value for y in MediaYear]))
@click.option("--popularity-greater", type=click.IntRange(min=0))
@click.option("--popularity-lesser", type=click.IntRange(min=0))
@click.option("--score-greater", type=click.IntRange(min=0, max=100))
@click.option("--score-lesser", type=click.IntRange(min=0, max=100))
@click.option("--start-date-greater", type=int)
@click.option("--start-date-lesser", type=int)
@click.option("--end-date-greater", type=int)
@click.option("--end-date-lesser", type=int)
@click.option("--on-list/--not-on-list", "-L/-no-L", type=bool, default=None)
# Queue-specific options
@click.option(
"--episode-range",
"-r",
required=True,
help="Range of episodes to queue (e.g., '1-10', '5', '8:12').",
)
@click.option(
"--yes",
"-Y",
is_flag=True,
help="Automatically queue from all found anime without prompting for selection.",
)
@click.pass_obj
def queue(config: AppConfig, title: tuple, episode_range: str):
def queue(config: AppConfig, **options):
"""
Searches for an anime and adds the specified episodes to the download queue.
The background worker must be running for the downloads to start.
Search AniList with filters, select one or more anime (or use --yes),
and queue the specified episode range for background download.
The background worker should be running to process the queue.
"""
from fastanime.cli.service.download.service import DownloadService
from fastanime.cli.service.feedback import FeedbackService
from fastanime.cli.service.registry import MediaRegistryService
from fastanime.cli.utils.parser import parse_episode_range
from fastanime.libs.media_api.params import MediaSearchParams
from fastanime.libs.media_api.api import create_api_client
from fastanime.libs.provider.anime.provider import create_provider
from fastanime.libs.selectors import create_selector
from rich.progress import Progress
feedback = FeedbackService(config)
selector = create_selector(config)
media_api = create_api_client(config.general.media_api, config)
provider = create_provider(config.general.provider)
registry = MediaRegistryService(config.general.media_api, config.media_registry)
download_service = DownloadService(config, registry, media_api, provider)
for anime_title in title:
try:
feedback.info(f"Searching for '{anime_title}'...")
search_result = media_api.search_media(
MediaSearchParams(query=anime_title, per_page=1)
)
try:
# Build search params mirroring anilist download
sort_val = options.get("sort")
status_val = options.get("status")
status_not_val = options.get("status_not")
genres_val = options.get("genres")
genres_not_val = options.get("genres_not")
tags_val = options.get("tags")
tags_not_val = options.get("tags_not")
media_format_val = options.get("media_format")
media_type_val = options.get("media_type")
season_val = options.get("season")
year_val = options.get("year")
if not search_result or not search_result.media:
feedback.warning(f"Could not find '{anime_title}' on AniList.")
search_params = MediaSearchParams(
query=options.get("title"),
page=options.get("page", 1),
per_page=options.get("per_page"),
sort=MediaSort(sort_val) if sort_val else None,
status_in=[MediaStatus(s) for s in status_val] if status_val else None,
status_not_in=[MediaStatus(s) for s in status_not_val]
if status_not_val
else None,
genre_in=[MediaGenre(g) for g in genres_val] if genres_val else None,
genre_not_in=[MediaGenre(g) for g in genres_not_val]
if genres_not_val
else None,
tag_in=[MediaTag(t) for t in tags_val] if tags_val else None,
tag_not_in=[MediaTag(t) for t in tags_not_val] if tags_not_val else None,
format_in=[MediaFormat(f) for f in media_format_val]
if media_format_val
else None,
type=MediaType(media_type_val) if media_type_val else None,
season=MediaSeason(season_val) if season_val else None,
seasonYear=int(year_val) if year_val else None,
popularity_greater=options.get("popularity_greater"),
popularity_lesser=options.get("popularity_lesser"),
averageScore_greater=options.get("score_greater"),
averageScore_lesser=options.get("score_lesser"),
startDate_greater=options.get("start_date_greater"),
startDate_lesser=options.get("start_date_lesser"),
endDate_greater=options.get("end_date_greater"),
endDate_lesser=options.get("end_date_lesser"),
on_list=options.get("on_list"),
)
with Progress() as progress:
progress.add_task("Searching AniList...", total=None)
search_result = media_api.search_media(search_params)
if not search_result or not search_result.media:
raise FastAnimeError("No anime found matching your search criteria.")
if options.get("yes"):
anime_to_queue = search_result.media
else:
choice_map: dict[str, MediaItem] = {
(item.title.english or item.title.romaji or f"ID: {item.id}"): item
for item in search_result.media
}
preview_command = None
if config.general.preview != "none":
from ..utils.preview import create_preview_context # type: ignore
with create_preview_context() as preview_ctx:
preview_command = preview_ctx.get_anime_preview(
list(choice_map.values()),
list(choice_map.keys()),
config,
)
selected_titles = selector.choose_multiple(
"Select anime to queue",
list(choice_map.keys()),
preview=preview_command,
)
else:
selected_titles = selector.choose_multiple(
"Select anime to queue", list(choice_map.keys())
)
if not selected_titles:
feedback.warning("No anime selected. Nothing queued.")
return
anime_to_queue = [choice_map[title] for title in selected_titles]
episode_range_str = options.get("episode_range")
total_queued = 0
for media_item in anime_to_queue:
available_episodes = [str(i + 1) for i in range(media_item.episodes or 0)]
if not available_episodes:
feedback.warning(
f"No episode information for '{media_item.title.english}', skipping."
)
continue
media_item = search_result.media[0]
available_episodes = [str(i + 1) for i in range(media_item.episodes or 0)]
episodes_to_queue = list(
parse_episode_range(episode_range, available_episodes)
)
try:
episodes_to_queue = list(
parse_episode_range(episode_range_str, available_episodes)
)
if not episodes_to_queue:
feedback.warning(
f"Episode range '{episode_range_str}' resulted in no episodes for '{media_item.title.english}'."
)
continue
queued_count = 0
for ep in episodes_to_queue:
if download_service.add_to_queue(media_item, ep):
queued_count += 1
queued_count = 0
for ep in episodes_to_queue:
if download_service.add_to_queue(media_item, ep):
queued_count += 1
feedback.success(
f"Successfully queued {queued_count} episodes for '{media_item.title.english}'."
)
total_queued += queued_count
feedback.success(
f"Queued {queued_count} episodes for '{media_item.title.english}'."
)
except (ValueError, IndexError) as e:
feedback.error(
f"Invalid episode range for '{media_item.title.english}': {e}"
)
except FastAnimeError as e:
feedback.error(f"Failed to queue '{anime_title}'", str(e))
except Exception as e:
feedback.error("An unexpected error occurred", str(e))
feedback.success(
f"Done. Total of {total_queued} episode(s) queued across all selections."
)
except FastAnimeError as e:
feedback.error("Queue command failed", str(e))
except Exception as e:
feedback.error("An unexpected error occurred", str(e))

View File

@@ -0,0 +1,3 @@
from .cmd import queue
__all__ = ["queue"]

View File

@@ -0,0 +1,26 @@
import click
from ...utils.lazyloader import LazyGroup
commands = {
"add": "add.add",
"list": "list.list_cmd",
"resume": "resume.resume",
"clear": "clear.clear_cmd",
}
@click.group(
cls=LazyGroup,
name="queue",
root="fastanime.cli.commands.queue.commands",
invoke_without_command=False,
help="Manage the download queue (add, list, resume, clear).",
short_help="Manage the download queue.",
lazy_subcommands=commands,
)
@click.pass_context
def queue(ctx: click.Context):
"""Queue management root command."""
# No-op root; subcommands are lazy-loaded
pass

View File

@@ -0,0 +1,217 @@
import click
from fastanime.core.config import AppConfig
from fastanime.core.exceptions import FastAnimeError
from fastanime.libs.media_api.types import (
MediaFormat,
MediaGenre,
MediaItem,
MediaSeason,
MediaSort,
MediaStatus,
MediaTag,
MediaType,
MediaYear,
)
@click.command(name="add", help="Add episodes to the background download queue.")
@click.option("--title", "-t")
@click.option("--page", "-p", type=click.IntRange(min=1), default=1)
@click.option("--per-page", type=click.IntRange(min=1, max=50))
@click.option("--season", type=click.Choice([s.value for s in MediaSeason]))
@click.option(
"--status", "-S", multiple=True, type=click.Choice([s.value for s in MediaStatus])
)
@click.option(
"--status-not", multiple=True, type=click.Choice([s.value for s in MediaStatus])
)
@click.option("--sort", "-s", type=click.Choice([s.value for s in MediaSort]))
@click.option(
"--genres", "-g", multiple=True, type=click.Choice([g.value for g in MediaGenre])
)
@click.option(
"--genres-not", multiple=True, type=click.Choice([g.value for g in MediaGenre])
)
@click.option(
"--tags", "-T", multiple=True, type=click.Choice([t.value for t in MediaTag])
)
@click.option(
"--tags-not", multiple=True, type=click.Choice([t.value for t in MediaTag])
)
@click.option(
"--media-format",
"-f",
multiple=True,
type=click.Choice([f.value for f in MediaFormat]),
)
@click.option("--media-type", type=click.Choice([t.value for t in MediaType]))
@click.option("--year", "-y", type=click.Choice([y.value for y in MediaYear]))
@click.option("--popularity-greater", type=click.IntRange(min=0))
@click.option("--popularity-lesser", type=click.IntRange(min=0))
@click.option("--score-greater", type=click.IntRange(min=0, max=100))
@click.option("--score-lesser", type=click.IntRange(min=0, max=100))
@click.option("--start-date-greater", type=int)
@click.option("--start-date-lesser", type=int)
@click.option("--end-date-greater", type=int)
@click.option("--end-date-lesser", type=int)
@click.option("--on-list/--not-on-list", "-L/-no-L", type=bool, default=None)
# Queue-specific options
@click.option(
"--episode-range",
"-r",
required=True,
help="Range of episodes to queue (e.g., '1-10', '5', '8:12').",
)
@click.option(
"--yes",
"-Y",
is_flag=True,
help="Queue for all found anime without prompting for selection.",
)
@click.pass_obj
def add(config: AppConfig, **options):
from fastanime.cli.service.download import DownloadService
from fastanime.cli.service.feedback import FeedbackService
from fastanime.cli.service.registry import MediaRegistryService
from fastanime.cli.utils.parser import parse_episode_range
from fastanime.libs.media_api.api import create_api_client
from fastanime.libs.media_api.params import MediaSearchParams
from fastanime.libs.provider.anime.provider import create_provider
from fastanime.libs.selectors import create_selector
from rich.progress import Progress
feedback = FeedbackService(config)
selector = create_selector(config)
media_api = create_api_client(config.general.media_api, config)
provider = create_provider(config.general.provider)
registry = MediaRegistryService(config.general.media_api, config.media_registry)
download_service = DownloadService(config, registry, media_api, provider)
try:
# Build search params mirroring anilist download
sort_val = options.get("sort")
status_val = options.get("status")
status_not_val = options.get("status_not")
genres_val = options.get("genres")
genres_not_val = options.get("genres_not")
tags_val = options.get("tags")
tags_not_val = options.get("tags_not")
media_format_val = options.get("media_format")
media_type_val = options.get("media_type")
season_val = options.get("season")
year_val = options.get("year")
search_params = MediaSearchParams(
query=options.get("title"),
page=options.get("page", 1),
per_page=options.get("per_page"),
sort=MediaSort(sort_val) if sort_val else None,
status_in=[MediaStatus(s) for s in status_val] if status_val else None,
status_not_in=[MediaStatus(s) for s in status_not_val]
if status_not_val
else None,
genre_in=[MediaGenre(g) for g in genres_val] if genres_val else None,
genre_not_in=[MediaGenre(g) for g in genres_not_val]
if genres_not_val
else None,
tag_in=[MediaTag(t) for t in tags_val] if tags_val else None,
tag_not_in=[MediaTag(t) for t in tags_not_val] if tags_not_val else None,
format_in=[MediaFormat(f) for f in media_format_val]
if media_format_val
else None,
type=MediaType(media_type_val) if media_type_val else None,
season=MediaSeason(season_val) if season_val else None,
seasonYear=int(year_val) if year_val else None,
popularity_greater=options.get("popularity_greater"),
popularity_lesser=options.get("popularity_lesser"),
averageScore_greater=options.get("score_greater"),
averageScore_lesser=options.get("score_lesser"),
startDate_greater=options.get("start_date_greater"),
startDate_lesser=options.get("start_date_lesser"),
endDate_greater=options.get("end_date_greater"),
endDate_lesser=options.get("end_date_lesser"),
on_list=options.get("on_list"),
)
with Progress() as progress:
progress.add_task("Searching AniList...", total=None)
search_result = media_api.search_media(search_params)
if not search_result or not search_result.media:
raise FastAnimeError("No anime found matching your search criteria.")
if options.get("yes"):
anime_to_queue = search_result.media
else:
choice_map: dict[str, MediaItem] = {
(item.title.english or item.title.romaji or f"ID: {item.id}"): item
for item in search_result.media
}
preview_command = None
if config.general.preview != "none":
from fastanime.cli.utils.preview import create_preview_context
with create_preview_context() as preview_ctx:
preview_command = preview_ctx.get_anime_preview(
list(choice_map.values()),
list(choice_map.keys()),
config,
)
selected_titles = selector.choose_multiple(
"Select anime to queue",
list(choice_map.keys()),
preview=preview_command,
)
else:
selected_titles = selector.choose_multiple(
"Select anime to queue", list(choice_map.keys())
)
if not selected_titles:
feedback.warning("No anime selected. Nothing queued.")
return
anime_to_queue = [choice_map[title] for title in selected_titles]
episode_range_str = options.get("episode_range")
total_queued = 0
for media_item in anime_to_queue:
# TODO: do a provider search here to determine episodes available maybe, or allow pasing of an episode list probably just change the format for parsing episodes
available_episodes = [str(i + 1) for i in range(media_item.episodes or 0)]
if not available_episodes:
feedback.warning(
f"No episode information for '{media_item.title.english}', skipping."
)
continue
try:
episodes_to_queue = list(
parse_episode_range(episode_range_str, available_episodes)
)
if not episodes_to_queue:
feedback.warning(
f"Episode range '{episode_range_str}' resulted in no episodes for '{media_item.title.english}'."
)
continue
queued_count = 0
for ep in episodes_to_queue:
if download_service.add_to_queue(media_item, ep):
queued_count += 1
total_queued += queued_count
feedback.success(
f"Queued {queued_count} episodes for '{media_item.title.english}'."
)
except (ValueError, IndexError) as e:
feedback.error(
f"Invalid episode range for '{media_item.title.english}': {e}"
)
feedback.success(
f"Done. Total of {total_queued} episode(s) queued across all selections."
)
except FastAnimeError as e:
feedback.error("Queue add failed", str(e))
except Exception as e:
feedback.error("An unexpected error occurred", str(e))

View File

@@ -0,0 +1,30 @@
import click
from fastanime.core.config import AppConfig
@click.command(name="clear", help="Clear queued items from the registry (QUEUED -> NOT_DOWNLOADED).")
@click.option("--force", is_flag=True, help="Do not prompt for confirmation.")
@click.pass_obj
def clear_cmd(config: AppConfig, force: bool):
from fastanime.cli.service.feedback import FeedbackService
from fastanime.cli.service.registry import MediaRegistryService
from fastanime.cli.service.registry.models import DownloadStatus
feedback = FeedbackService(config)
registry = MediaRegistryService(config.general.media_api, config.media_registry)
if not force and not click.confirm("This will clear all queued items. Continue?"):
feedback.info("Aborted.")
return
cleared = 0
queued = registry.get_episodes_by_download_status(DownloadStatus.QUEUED)
for media_id, ep in queued:
ok = registry.update_episode_download_status(
media_id=media_id,
episode_number=ep,
status=DownloadStatus.NOT_DOWNLOADED,
)
if ok:
cleared += 1
feedback.success(f"Cleared {cleared} queued episode(s).")

View File

@@ -0,0 +1,60 @@
import click
from fastanime.core.config import AppConfig
@click.command(name="list", help="List items in the download queue and their statuses.")
@click.option(
"--status",
type=click.Choice(["queued", "downloading", "completed", "failed", "paused"]),
)
@click.option("--detailed", is_flag=True)
@click.pass_obj
def list_cmd(config: AppConfig, status: str | None, detailed: bool | None):
from fastanime.cli.service.feedback import FeedbackService
from fastanime.cli.service.registry import MediaRegistryService
from fastanime.cli.service.registry.models import DownloadStatus
feedback = FeedbackService(config)
registry = MediaRegistryService(config.general.media_api, config.media_registry)
status_map = {
"queued": DownloadStatus.QUEUED,
"downloading": DownloadStatus.DOWNLOADING,
"completed": DownloadStatus.COMPLETED,
"failed": DownloadStatus.FAILED,
"paused": DownloadStatus.PAUSED,
}
# TODO: improve this by modifying the download_status function or create new function
if detailed and status:
target = status_map[status]
episodes = registry.get_episodes_by_download_status(target)
feedback.info(f"{len(episodes)} episode(s) with status {status}.")
for media_id, ep in episodes:
record = registry.get_media_record(media_id)
if record:
feedback.info(f"{record.media_item.title.english} episode {ep}")
return
if status:
target = status_map[status]
episodes = registry.get_episodes_by_download_status(target)
feedback.info(f"{len(episodes)} episode(s) with status {status}.")
for media_id, ep in episodes:
feedback.info(f"- media:{media_id} episode:{ep}")
else:
from rich.console import Console
from rich.table import Table
stats = registry.get_download_statistics()
table = Table(title="Queue Status")
table.add_column("Metric")
table.add_column("Value")
table.add_row("Queued", str(stats.get("queued", 0)))
table.add_row("Downloading", str(stats.get("downloading", 0)))
table.add_row("Completed", str(stats.get("downloaded", 0)))
table.add_row("Failed", str(stats.get("failed", 0)))
table.add_row("Paused", str(stats.get("paused", 0)))
console = Console()
console.print(table)

View File

@@ -0,0 +1,22 @@
import click
from fastanime.core.config import AppConfig
@click.command(name="resume", help="Submit any queued or in-progress downloads to the worker.")
@click.pass_obj
def resume(config: AppConfig):
from fastanime.cli.service.download.service import DownloadService
from fastanime.cli.service.feedback import FeedbackService
from fastanime.cli.service.registry import MediaRegistryService
from fastanime.libs.media_api.api import create_api_client
from fastanime.libs.provider.anime.provider import create_provider
feedback = FeedbackService(config)
media_api = create_api_client(config.general.media_api, config)
provider = create_provider(config.general.provider)
registry = MediaRegistryService(config.general.media_api, config.media_registry)
download_service = DownloadService(config, registry, media_api, provider)
download_service.start()
download_service.resume_unfinished_downloads()
feedback.success("Submitted queued downloads to background worker.")

View File

@@ -197,7 +197,6 @@ def _find_old_format_entries(registry_service: MediaRegistryService) -> list:
old_format = []
index = registry_service._load_index()
current_version = index.version
if index.version != REGISTRY_VERSION:
old_format.append(
{

View File

@@ -208,7 +208,9 @@ def _create_breakdown_table(
total = sum(data.values())
# Determine sorting method
sort_key = lambda item: item[0] if sort_by_key else item[1]
def sort_key(item):
return item[0] if sort_by_key else item[1]
sorted_data = sorted(data.items(), key=sort_key, reverse=reverse_sort)
# Apply limit if specified

View File

@@ -11,6 +11,7 @@ def worker(config: AppConfig):
process any queued downloads. It's recommended to run this in the
background (e.g., 'fastanime worker &') or as a system service.
"""
from fastanime.cli.service.auth import AuthService
from fastanime.cli.service.download.service import DownloadService
from fastanime.cli.service.feedback import FeedbackService
from fastanime.cli.service.notification.service import NotificationService
@@ -26,10 +27,17 @@ def worker(config: AppConfig):
# Instantiate services
media_api = create_api_client(config.general.media_api, config)
# Authenticate if credentials exist (enables notifications)
auth = AuthService(config.general.media_api)
if profile := auth.get_auth():
try:
media_api.authenticate(profile.token)
except Exception:
pass
provider = create_provider(config.general.provider)
registry = MediaRegistryService(config.general.media_api, config.media_registry)
notification_service = NotificationService(media_api)
notification_service = NotificationService(config, media_api, registry)
download_service = DownloadService(config, registry, media_api, provider)
worker_service = BackgroundWorkerService(
config.worker, notification_service, download_service

View File

@@ -1,6 +1,9 @@
import textwrap
from enum import Enum
from pathlib import Path
from typing import Any, Literal, get_args, get_origin
from pydantic.fields import FieldInfo
from ...core.config import AppConfig
from ...core.constants import APP_ASCII_ART, DISCORD_INVITE, PROJECT_NAME, REPO_HOME
@@ -11,7 +14,7 @@ config_asci = "\n".join(
)
CONFIG_HEADER = f"""
# ==============================================================================
#
#
{config_asci}
#
# ==============================================================================
@@ -37,26 +40,16 @@ CONFIG_FOOTER = f"""
def generate_config_ini_from_app_model(app_model: AppConfig) -> str:
"""Generate a configuration file content from a Pydantic model."""
model_schema = AppConfig.model_json_schema(mode="serialization")
app_model_dict = app_model.model_dump()
config_ini_content = [CONFIG_HEADER]
for section_name, section_dict in app_model_dict.items():
section_ref = model_schema["properties"][section_name].get("$ref")
if not section_ref:
continue
section_class_name = section_ref.split("/")[-1]
section_schema = model_schema["$defs"][section_class_name]
section_comment = section_schema.get("description", "")
for section_name, section_model in app_model:
section_comment = section_model.model_config.get("title", "")
config_ini_content.append(f"\n#\n# {section_comment}\n#")
config_ini_content.append(f"[{section_name}]")
for field_name, field_value in section_dict.items():
field_properties = section_schema.get("properties", {}).get(field_name, {})
description = field_properties.get("description", "")
for field_name, field_info in section_model.model_fields.items():
description = field_info.description or ""
if description:
wrapped_comment = textwrap.fill(
description,
@@ -66,6 +59,17 @@ def generate_config_ini_from_app_model(app_model: AppConfig) -> str:
)
config_ini_content.append(f"\n{wrapped_comment}")
field_type_comment = _get_field_type_comment(field_info)
if field_type_comment:
wrapped_comment = textwrap.fill(
field_type_comment,
width=78,
initial_indent="# ",
subsequent_indent="# ",
)
config_ini_content.append(wrapped_comment)
field_value = getattr(section_model, field_name)
if isinstance(field_value, bool):
value_str = str(field_value).lower()
elif isinstance(field_value, Path):
@@ -81,3 +85,70 @@ def generate_config_ini_from_app_model(app_model: AppConfig) -> str:
config_ini_content.extend(["\n", CONFIG_FOOTER])
return "\n".join(config_ini_content)
def _get_field_type_comment(field_info: FieldInfo) -> str:
"""Generate a comment with type information for a field."""
field_type = field_info.annotation
# Handle Literal and Enum types
possible_values = []
if field_type is not None:
if isinstance(field_type, type) and issubclass(field_type, Enum):
possible_values = [member.value for member in field_type]
elif hasattr(field_type, "__origin__") and get_origin(field_type) is Literal:
args = get_args(field_type)
if args:
possible_values = list(args)
if possible_values:
return f"Possible values: [ {', '.join(map(str, possible_values))} ]"
# Handle basic types and numeric ranges
type_name = _get_type_name(field_type)
range_info = _get_range_info(field_info)
if range_info:
return f"Type: {type_name} ({range_info})"
elif type_name:
return f"Type: {type_name}"
return ""
def _get_type_name(field_type: Any) -> str:
"""Get a user-friendly name for a field's type."""
if field_type is str:
return "string"
if field_type is int:
return "integer"
if field_type is float:
return "float"
if field_type is bool:
return "boolean"
if field_type is Path:
return "path"
return ""
def _get_range_info(field_info: FieldInfo) -> str:
"""Get a string describing the numeric range of a field."""
constraints = {}
if hasattr(field_info, "metadata") and field_info.metadata:
for constraint in field_info.metadata:
constraint_type = type(constraint).__name__
if constraint_type == "Ge" and hasattr(constraint, "ge"):
constraints["min"] = constraint.ge
elif constraint_type == "Le" and hasattr(constraint, "le"):
constraints["max"] = constraint.le
elif constraint_type == "Gt" and hasattr(constraint, "gt"):
constraints["min"] = constraint.gt + 1
elif constraint_type == "Lt" and hasattr(constraint, "lt"):
constraints["max"] = constraint.lt - 1
if constraints:
min_val = constraints.get("min", "N/A")
max_val = constraints.get("max", "N/A")
return f"Range: {min_val}-{max_val}"
return ""

View File

@@ -94,6 +94,9 @@ def media_actions(ctx: Context, state: State) -> State | InternalDirective:
f"{'🔘 ' if icons else ''}Toggle Translation Type (Current: {ctx.config.stream.translation_type.upper()})": _toggle_config_state(
ctx, state, "TRANSLATION_TYPE"
),
f"{'🔘 ' if icons else ''}Toggle Auto Skip (Current: {ctx.config.stream.auto_skip})": _toggle_config_state(
ctx, state, "AUTO_SKIP"
),
f"{'🔙 ' if icons else ''}Back to Results": lambda: InternalDirective.BACK,
f"{'' if icons else ''}Exit": lambda: InternalDirective.EXIT,
}
@@ -317,7 +320,11 @@ def _toggle_config_state(
ctx: Context,
state: State,
config_state: Literal[
"AUTO_ANIME", "AUTO_EPISODE", "CONTINUE_FROM_HISTORY", "TRANSLATION_TYPE"
"AUTO_ANIME",
"AUTO_EPISODE",
"CONTINUE_FROM_HISTORY",
"TRANSLATION_TYPE",
"AUTO_SKIP",
],
) -> MenuAction:
def action():
@@ -336,6 +343,8 @@ def _toggle_config_state(
ctx.config.stream.translation_type = (
"sub" if ctx.config.stream.translation_type == "dub" else "dub"
)
case "AUTO_SKIP":
ctx.config.stream.auto_skip = not ctx.config.stream.auto_skip
return InternalDirective.RELOAD
return action

View File

@@ -197,8 +197,6 @@ def _next_episode(ctx: Context, state: State) -> MenuAction:
feedback = ctx.feedback
config = ctx.config
media_item = state.media_api.media_item
current_episode_num = state.provider.episode
@@ -248,8 +246,6 @@ def _previous_episode(ctx: Context, state: State) -> MenuAction:
feedback = ctx.feedback
config = ctx.config
media_item = state.media_api.media_item
current_episode_num = state.provider.episode

View File

@@ -7,7 +7,7 @@ from ...state import InternalDirective, MenuName, ProviderState, State
@session.menu
def provider_search(ctx: Context, state: State) -> State | InternalDirective:
from .....core.utils.fuzzy import fuzz
from .....core.utils.normalizer import normalize_title
from .....core.utils.normalizer import normalize_title, update_user_normalizer_json
feedback = ctx.feedback
media_item = state.media_api.media_item
@@ -71,6 +71,12 @@ def provider_search(ctx: Context, state: State) -> State | InternalDirective:
if not chosen_title or chosen_title == "Back":
return InternalDirective.BACK
if selector.confirm(
f"Would you like to update your local normalizer json with: {chosen_title} for {media_title}"
):
update_user_normalizer_json(
chosen_title, media_title, config.general.provider.value
)
selected_provider_anime = provider_results_map[chosen_title]
with feedback.progress(

View File

@@ -0,0 +1,3 @@
from .service import DownloadService
__all__ = ["DownloadService"]

View File

@@ -1,7 +1,9 @@
import logging
from pathlib import Path
from typing import TYPE_CHECKING, List
from ....core.config.model import AppConfig
from ....core.constants import APP_CACHE_DIR
from ....core.downloader import DownloadParams, create_downloader
from ....core.utils.concurrency import ManagedBackgroundWorker, thread_manager
from ....core.utils.fuzzy import fuzz
@@ -21,6 +23,7 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
NOTIFICATION_ICONS_CACHE_DIR = APP_CACHE_DIR / "notification_icons"
class DownloadService:
@@ -31,13 +34,14 @@ class DownloadService:
media_api_service: "BaseApiClient",
provider_service: "BaseAnimeProvider",
):
self.config = config
self.app_config = config
self.registry = registry_service
self.media_api = media_api_service
self.provider = provider_service
self.downloader = create_downloader(config.downloads)
# Track in-flight downloads to avoid duplicate queueing
self._inflight: set[tuple[int, str]] = set()
# Worker is kept for potential future background commands
self._worker = ManagedBackgroundWorker(
max_workers=config.downloads.max_concurrent_downloads,
name="DownloadWorker",
@@ -56,18 +60,25 @@ class DownloadService:
self._worker.shutdown(wait=False)
def add_to_queue(self, media_item: MediaItem, episode_number: str) -> bool:
"""Adds a download job to the ASYNCHRONOUS queue."""
"""Mark an episode as queued in the registry (no immediate download)."""
logger.info(
f"Queueing background download for '{media_item.title.english}' Episode {episode_number}"
f"Queueing episode '{episode_number}' for '{media_item.title.english}' (registry only)"
)
self.registry.get_or_create_record(media_item)
updated = self.registry.update_episode_download_status(
return self.registry.update_episode_download_status(
media_id=media_item.id,
episode_number=episode_number,
status=DownloadStatus.QUEUED,
)
if not updated:
def _submit_download(self, media_item: MediaItem, episode_number: str) -> bool:
"""Submit a download task to the worker if not already in-flight."""
key = (media_item.id, str(episode_number))
if key in self._inflight:
return False
if not self._worker.is_running():
self._worker.start()
self._inflight.add(key)
self._worker.submit_function(
self._execute_download_job, media_item, episode_number
)
@@ -92,6 +103,7 @@ class DownloadService:
def resume_unfinished_downloads(self):
"""Finds and re-queues any downloads that were left in an unfinished state."""
logger.info("Checking for unfinished downloads to resume...")
# TODO: make the checking of unfinished downloads more efficient probably by modifying the registry to be aware of what actually changed and load that instead
queued_jobs = self.registry.get_episodes_by_download_status(
DownloadStatus.QUEUED
)
@@ -108,9 +120,54 @@ class DownloadService:
f"Found {len(unfinished_jobs)} unfinished downloads. Re-queueing..."
)
for media_id, episode_number in unfinished_jobs:
if (media_id, str(episode_number)) in self._inflight:
continue
record = self.registry.get_media_record(media_id)
if record and record.media_item:
self.add_to_queue(record.media_item, episode_number)
self._submit_download(record.media_item, episode_number)
else:
logger.error(
f"Could not find metadata for media ID {media_id}. Cannot resume. Please run 'fastanime registry sync'."
)
def retry_failed_downloads(self):
"""Finds and re-queues any downloads that were left in an unfinished state."""
logger.info("Checking for unfinished downloads to resume...")
# TODO: may need to improve this
queued_jobs = self.registry.get_episodes_by_download_status(
DownloadStatus.FAILED
)
unfinished_jobs = queued_jobs
if not unfinished_jobs:
logger.info("No unfinished downloads found.")
return
logger.info(
f"Found {len(unfinished_jobs)} unfinished downloads. Re-queueing..."
)
for media_id, episode_number in unfinished_jobs:
if (media_id, str(episode_number)) in self._inflight:
continue
record = self.registry.get_media_record(media_id)
if record and record.media_item:
for episode in record.media_episodes:
if episode_number != episode.episode_number:
continue
if (
episode.download_attempts
<= self.app_config.downloads.max_retry_attempts
):
logger.info(
f"Retrying {episode_number} of {record.media_item.title.english}"
)
self._submit_download(record.media_item, episode_number)
else:
logger.info(
f"Max attempts reached for {episode_number} of {record.media_item.title.english}"
)
else:
logger.error(
f"Could not find metadata for media ID {media_id}. Cannot resume. Please run 'fastanime registry sync'."
@@ -130,12 +187,17 @@ class DownloadService:
# 1. Search the provider to get the provider-specific ID
provider_search_results = self.provider.search(
SearchParams(query=media_title)
SearchParams(
query=normalize_title(
media_title, self.app_config.general.provider.value, True
),
translation_type=self.app_config.stream.translation_type,
)
)
if not provider_search_results or not provider_search_results.results:
raise ValueError(
f"Could not find '{media_title}' on provider '{self.config.general.provider.value}'"
f"Could not find '{media_title}' on provider '{self.app_config.general.provider.value}'"
)
# 2. Find the best match using fuzzy logic (like auto-select)
@@ -146,7 +208,7 @@ class DownloadService:
provider_results_map.keys(),
key=lambda p_title: fuzz.ratio(
normalize_title(
p_title, self.config.general.provider.value
p_title, self.app_config.general.provider.value
).lower(),
media_title.lower(),
),
@@ -168,7 +230,7 @@ class DownloadService:
anime_id=provider_anime.id,
query=media_title,
episode=episode_number,
translation_type=self.config.stream.translation_type,
translation_type=self.app_config.stream.translation_type,
)
)
if not streams_iterator:
@@ -178,11 +240,11 @@ class DownloadService:
if not server or not server.links:
raise ValueError(f"No stream links found for Episode {episode_number}")
if server.name != self.config.downloads.server.value:
if server.name != self.app_config.downloads.server.value:
while True:
try:
_server = next(streams_iterator)
if _server.name == self.config.downloads.server.value:
if _server.name == self.app_config.downloads.server.value:
server = _server
break
except StopIteration:
@@ -202,9 +264,9 @@ class DownloadService:
silent=False,
headers=server.headers,
subtitles=[sub.url for sub in server.subtitles],
merge=self.config.downloads.merge_subtitles,
clean=self.config.downloads.cleanup_after_merge,
no_check_certificate=self.config.downloads.no_check_certificate,
merge=self.app_config.downloads.merge_subtitles,
clean=self.app_config.downloads.cleanup_after_merge,
no_check_certificate=self.app_config.downloads.no_check_certificate,
)
result = self.downloader.download(download_params)
@@ -223,19 +285,49 @@ class DownloadService:
file_path=result.merged_path or result.video_path,
file_size=file_size,
quality=stream_link.quality,
provider_name=self.config.general.provider.value,
provider_name=self.app_config.general.provider.value,
server_name=server.name,
subtitle_paths=result.subtitle_paths,
)
logger.info(
f"Successfully downloaded Episode {episode_number} of '{media_title}'"
)
message = f"Successfully downloaded Episode {episode_number} of '{media_title}'"
try:
from plyer import notification
icon_path = self._get_or_fetch_icon(media_item)
app_icon = str(icon_path) if icon_path else None
notification.notify( # type: ignore
title="FastAnime: New Episode",
message=message,
app_name="FastAnime",
app_icon=app_icon,
timeout=20,
)
except:
pass
logger.info(message)
else:
raise ValueError(result.error_message or "Unknown download error")
except Exception as e:
message = f"Download failed for '{media_item.title.english}' Ep {episode_number}: {e}"
try:
from plyer import notification
icon_path = self._get_or_fetch_icon(media_item)
app_icon = str(icon_path) if icon_path else None
notification.notify( # type: ignore
title="FastAnime: New Episode",
message=message,
app_name="FastAnime",
app_icon=app_icon,
timeout=20,
)
except:
pass
logger.error(
f"Download failed for '{media_item.title.english}' Ep {episode_number}: {e}",
message,
exc_info=True,
)
self.registry.update_episode_download_status(
@@ -244,3 +336,39 @@ class DownloadService:
status=DownloadStatus.FAILED,
error_message=str(e),
)
finally:
# Remove from in-flight tracking regardless of outcome
try:
self._inflight.discard((media_item.id, str(episode_number)))
except Exception:
pass
def _get_or_fetch_icon(self, media_item: MediaItem) -> Path | None:
"""Fetch and cache a small cover image for system notifications."""
import httpx
try:
cover = media_item.cover_image
url = None
if cover:
url = cover.extra_large or cover.large or cover.medium
if not url:
return None
cache_dir = NOTIFICATION_ICONS_CACHE_DIR
cache_dir.mkdir(parents=True, exist_ok=True)
icon_path = cache_dir / f"{media_item.id}.png"
if icon_path.exists() and icon_path.stat().st_size > 0:
return icon_path
# Directly download the image bytes without resizing
with httpx.Client(follow_redirects=True, timeout=20) as client:
resp = client.get(url)
resp.raise_for_status()
data = resp.content
if data:
icon_path.write_bytes(data)
return icon_path
except Exception as e:
logger.debug(f"Could not fetch icon for media {media_item.id}: {e}")
return None

View File

@@ -1,41 +1,47 @@
import json
import logging
from typing import Set
from pathlib import Path
from typing import Optional
import httpx
from fastanime.cli.service.registry import MediaRegistryService
from fastanime.cli.service.registry.models import DownloadStatus
from fastanime.core.config.model import AppConfig
from fastanime.core.constants import APP_CACHE_DIR
from fastanime.libs.media_api.base import BaseApiClient
from fastanime.libs.media_api.types import MediaItem, Notification
try:
import plyer
from plyer import notification as plyer_notification
PLYER_AVAILABLE = True
except ImportError:
except ImportError: # pragma: no cover - optional dependency
plyer_notification = None # type: ignore[assignment]
PLYER_AVAILABLE = False
logger = logging.getLogger(__name__)
SEEN_NOTIFICATIONS_CACHE = APP_CACHE_DIR / "seen_notifications.json"
NOTIFICATION_ICONS_CACHE_DIR = APP_CACHE_DIR / "notification_icons"
class NotificationService:
def __init__(self, media_api: BaseApiClient):
def __init__(
self,
app_config: AppConfig,
media_api: BaseApiClient,
registry_service: MediaRegistryService,
):
self.media_api = media_api
self._seen_ids: Set[int] = self._load_seen_ids()
self.app_config = app_config
self.registry = registry_service
def _load_seen_ids(self) -> Set[int]:
if not SEEN_NOTIFICATIONS_CACHE.exists():
return set()
try:
with open(SEEN_NOTIFICATIONS_CACHE, "r") as f:
return set(json.load(f))
except (json.JSONDecodeError, IOError):
return set()
def _save_seen_ids(self):
try:
with open(SEEN_NOTIFICATIONS_CACHE, "w") as f:
json.dump(list(self._seen_ids), f)
except IOError:
logger.error("Failed to save seen notifications cache.")
def _mark_seen(self, notification_id: int, media_id: int, episode: str | None):
if self.registry and episode:
try:
self.registry.update_media_index_entry(
media_id, last_notified_episode=str(episode)
)
except Exception:
logger.debug("Failed to update last_notified_episode in registry")
def check_and_display_notifications(self):
if not PLYER_AVAILABLE:
@@ -53,26 +59,99 @@ class NotificationService:
logger.info("No new notifications found.")
return
new_notifications = [n for n in notifications if n.id not in self._seen_ids]
# Filter out notifications already seen in this session or older than registry marker
filtered: list[Notification] = []
for n in notifications:
if self._is_seen_in_registry(n.media.id, n.episode):
continue
filtered.append(n)
if not new_notifications:
if not filtered:
logger.info("No unseen notifications found.")
return
for notif in new_notifications:
for notif in filtered:
if self.app_config.worker.auto_download_new_episode:
if not self.registry.get_media_record(notif.media.id):
self.registry.get_or_create_record(notif.media)
self.registry.update_episode_download_status(
media_id=notif.media.id,
episode_number=str(notif.episode),
status=DownloadStatus.QUEUED,
)
title = notif.media.title.english or notif.media.title.romaji
message = f"Episode {notif.episode} of {title} has aired!"
# Try to include an image (cover large/extra_large) if available
app_icon: Optional[str] = None
try:
plyer.notification.notify(
icon_path = self._get_or_fetch_icon(notif.media)
app_icon = str(icon_path) if icon_path else None
except Exception:
app_icon = None
try:
# Guard: only call if available
if not PLYER_AVAILABLE or plyer_notification is None:
raise RuntimeError("Notification backend unavailable")
# Assert for type checkers and runtime safety
assert plyer_notification is not None
plyer_notification.notify( # type: ignore
title="FastAnime: New Episode",
message=message,
app_name="FastAnime",
app_icon=app_icon, # plyer supports file paths or URLs depending on platform
timeout=20,
)
logger.info(f"Displayed notification: {message}")
self._seen_ids.add(notif.id)
self._mark_seen(
notif.id,
notif.media.id,
str(notif.episode) if notif.episode is not None else None,
)
except Exception as e:
logger.error(f"Failed to display notification: {e}")
self._save_seen_ids()
def _is_seen_in_registry(self, media_id: int, episode: Optional[int]) -> bool:
if episode is None:
return False
try:
entry = self.registry.get_media_index_entry(media_id)
if not entry or not entry.last_notified_episode:
return False
# Compare numerically
try:
last_ep = float(entry.last_notified_episode)
return float(episode) <= last_ep
except Exception:
return False
except Exception:
return False
def _get_or_fetch_icon(self, media_item: MediaItem) -> Optional[Path]:
"""Fetch and cache a small cover image for system notifications."""
try:
cover = media_item.cover_image
url = None
if cover:
url = cover.extra_large or cover.large or cover.medium
if not url:
return None
cache_dir = NOTIFICATION_ICONS_CACHE_DIR
cache_dir.mkdir(parents=True, exist_ok=True)
icon_path = cache_dir / f"{media_item.id}.png"
if icon_path.exists() and icon_path.stat().st_size > 0:
return icon_path
# Directly download the image bytes without resizing
with httpx.Client(follow_redirects=True, timeout=20) as client:
resp = client.get(url)
resp.raise_for_status()
data = resp.content
if data:
icon_path.write_bytes(data)
return icon_path
except Exception as e:
logger.debug(f"Could not fetch icon for media {media_item.id}: {e}")
return None

View File

@@ -18,6 +18,7 @@ from typing import Any, Callable, Dict, List, Literal, Optional
from .....core.config.model import StreamConfig
from .....core.exceptions import FastAnimeError
from .....core.utils import formatter
from .....libs.aniskip.api import AniSkip, SkipTimeResult
from .....libs.media_api.types import MediaItem
from .....libs.player.base import BasePlayer
from .....libs.player.params import PlayerParams
@@ -252,6 +253,8 @@ class PlayerState:
class MpvIPCPlayer(BaseIPCPlayer):
"""MPV Player implementation using IPC for advanced features."""
_skip_times: Optional[SkipTimeResult] = None
_skipped_ids: set[str] = set() # To prevent re-skipping the same segment
stream_config: StreamConfig
mpv_process: subprocess.Popen
ipc_client: MPVIPCClient
@@ -283,6 +286,9 @@ class MpvIPCPlayer(BaseIPCPlayer):
registry: Optional[MediaRegistryService] = None,
media_item: Optional[MediaItem] = None,
) -> PlayerResult:
self._skip_times = None # Reset on each new play call
self._skipped_ids = set()
self.provider = provider
self.anime = anime
self.media_item = media_item
@@ -441,6 +447,7 @@ class MpvIPCPlayer(BaseIPCPlayer):
elif event == "client-message":
self._handle_client_message(message)
elif event == "file-loaded":
self._fetch_and_load_skip_times()
time.sleep(0.1)
self._configure_player()
elif event:
@@ -451,6 +458,8 @@ class MpvIPCPlayer(BaseIPCPlayer):
data = message.get("data")
if name == "time-pos" and isinstance(data, (int, float)):
self.player_state.stop_time_secs = data
self._check_for_skip(data)
elif name == "duration" and isinstance(data, (int, float)):
self.player_state.total_time_secs = data
elif name == "percent-pos" and isinstance(data, (int, float)):
@@ -698,3 +707,55 @@ class MpvIPCPlayer(BaseIPCPlayer):
def _handle_select_quality(self, quality: Optional[str] = None):
self._show_text("Quality switching is not yet implemented.")
def _check_for_skip(self, current_time: float):
"""Checks if the current playback time falls within a skip interval."""
if (
not self.stream_config.auto_skip
or not self._skip_times
or not self._skip_times.found
):
return
for skip in self._skip_times.results:
if skip.skip_id in self._skipped_ids:
continue
start_time, end_time = skip.interval
# Trigger skip slightly after the start time
if start_time <= current_time < end_time:
logger.info(
f"Skipping {skip.skip_type.upper()} from {start_time} to {end_time}"
)
self._show_text(f"Skipping {skip.skip_type.upper()}...", duration=1500)
self.ipc_client.send_command(["set_property", "time-pos", end_time])
self._skipped_ids.add(skip.skip_id)
break
def _fetch_and_load_skip_times(self):
"""Fetches skip times for the current episode in a background thread."""
if (
not self.stream_config.auto_skip
or not self.media_item
or not self.media_item.id_mal
):
return
try:
episode_num = int(float(self.player_state.episode))
mal_id = self.media_item.id_mal
def task():
self._skip_times = AniSkip.get_skip_times(mal_id, episode_num)
if self._skip_times and self._skip_times.found:
logger.info(
f"Found {len(self._skip_times.results)} skip intervals for Ep {episode_num}"
)
self._show_text("Skip times loaded.", duration=2000)
# Run in a thread to not block playback
threading.Thread(target=task, daemon=True).start()
except (ValueError, TypeError):
logger.warning(
f"Could not parse episode number for Aniskip: {self.player_state.episode}"
)

View File

@@ -40,8 +40,6 @@ class MediaEpisode(BaseModel):
download_attempts: int = 0 # Number of download attempts
last_error: Optional[str] = None # Last error message if failed
model_config = {"arbitrary_types_allowed": True}
class MediaRecord(BaseModel):
media_item: MediaItem

View File

@@ -85,6 +85,13 @@ class MediaRegistryService:
logger.debug("saved registry index")
def get_seen_notifications(self) -> dict[int, str]:
seen = {}
for id, index_entry in self._load_index().media_index.items():
if episode := index_entry.last_notified_episode:
seen[index_entry.media_id] = episode
return seen
def get_media_index_entry(self, media_id: int) -> Optional[MediaRegistryIndexEntry]:
index = self._load_index()
return index.media_index.get(f"{self._media_api}_{media_id}")
@@ -102,7 +109,7 @@ class MediaRegistryService:
record = MediaRecord.model_validate(data)
logger.debug(f"Loaded media record for {media_id}")
# logger.debug(f"Loaded media record for {media_id}")
return record
def get_or_create_index_entry(self, media_id: int) -> MediaRegistryIndexEntry:
@@ -184,6 +191,8 @@ class MediaRegistryService:
else:
if not index_entry.status:
index_entry.status = UserMediaListStatus.WATCHING
elif index_entry.status == UserMediaListStatus.COMPLETED:
index_entry.status = UserMediaListStatus.REPEATING
if last_watch_position:
index_entry.last_watch_position = last_watch_position
@@ -550,13 +559,12 @@ class MediaRegistryService:
break
if not episode_record:
if not file_path:
logger.error(f"File path required for new episode {episode_number}")
return False
# Allow creation without file_path for queued/in-progress states.
# Only require file_path once the episode is marked COMPLETED.
episode_record = MediaEpisode(
episode_number=episode_number,
file_path=file_path,
download_status=status,
file_path=file_path,
)
record.media_episodes.append(episode_record)
@@ -564,6 +572,12 @@ class MediaRegistryService:
episode_record.download_status = status
if file_path:
episode_record.file_path = file_path
elif status.name == "COMPLETED" and not episode_record.file_path:
logger.warning(
"Completed status set without file_path for media %s episode %s",
media_id,
episode_number,
)
if file_size is not None:
episode_record.file_size = file_size
if quality:

View File

@@ -28,6 +28,11 @@ class WatchHistoryService:
)
status = None
if (
media_item.user_status
and media_item.user_status.status == UserMediaListStatus.COMPLETED
):
status = UserMediaListStatus.REPEATING
self.media_registry.update_media_index_entry(
media_id=media_item.id,
watched=True,
@@ -92,7 +97,7 @@ class WatchHistoryService:
start_time = None
try:
current_local_episode = str(int(current_local_episode) + 1)
except:
except Exception:
# incase its a float
pass
else:

View File

@@ -1,5 +1,8 @@
import logging
import signal
import threading
import time
from typing import Optional
from fastanime.cli.service.download.service import DownloadService
from fastanime.cli.service.notification.service import NotificationService
@@ -18,44 +21,124 @@ class BackgroundWorkerService:
self.config = config
self.notification_service = notification_service
self.download_service = download_service
self.running = True
self._stop_event = threading.Event()
self._signals_installed = False
def run(self):
logger.info("Background worker started.")
last_notification_check = 0
last_download_check = 0
def _install_signal_handlers(self):
"""Install SIGINT/SIGTERM handlers to allow graceful shutdown when run in foreground."""
if self._signals_installed:
return
notification_interval_sec = self.config.notification_check_interval * 60
download_interval_sec = self.config.download_check_interval * 60
self.download_service.start()
try:
while self.running:
current_time = time.time()
# Check for notifications
if current_time - last_notification_check > notification_interval_sec:
try:
self.notification_service.check_and_display_notifications()
except Exception as e:
logger.error(f"Error during notification check: {e}")
last_notification_check = current_time
# Process download queue
if current_time - last_download_check > download_interval_sec:
try:
self.download_service.resume_unfinished_downloads()
except Exception as e:
logger.error(f"Error during download queue processing: {e}")
last_download_check = current_time
# Sleep for a short interval to prevent high CPU usage
time.sleep(30) # Sleep for 30 seconds before next check cycle
except KeyboardInterrupt:
logger.info("Background worker stopped by user.")
def _handler(signum, frame): # noqa: ARG001 (signature fixed by signal)
logger.info(
"Received signal %s, shutting down background worker...", signum
)
self.stop()
try:
signal.signal(signal.SIGINT, _handler)
signal.signal(signal.SIGTERM, _handler)
self._signals_installed = True
except Exception:
# Signal handling may fail in non-main threads or certain environments
logger.debug(
"Signal handlers not installed (non-main thread or unsupported environment)."
)
def run(self):
"""Run the background loop until stopped.
Responsibilities:
- Periodically check AniList notifications (if authenticated & plyer available)
- Periodically resume/process unfinished downloads
- Keep CPU usage low using an event-based wait
- Gracefully terminate on KeyboardInterrupt/SIGTERM
"""
logger.info("Background worker starting...")
# Convert configured minutes to seconds
notification_interval_sec = max(
60, self.config.notification_check_interval * 60
)
download_interval_sec = max(60, self.config.download_check_interval * 60)
download_retry_interval_sec = max(
60, self.config.download_check_failed_interval * 60
)
# Start download worker and attempt resuming pending jobs once at startup
self.download_service.start()
# Schedule the very first execution immediately
next_notification_ts: Optional[float] = 0.0
next_download_ts: Optional[float] = 0.0
next_retry_download_ts: Optional[float] = 0.0
# Install signal handlers if possible
self._install_signal_handlers()
try:
while not self._stop_event.is_set():
now = time.time()
# Check for notifications
if next_notification_ts is not None and now >= next_notification_ts:
try:
logger.info("Checking for notifications...")
self.notification_service.check_and_display_notifications()
except Exception:
logger.exception("Error during notification check")
finally:
next_notification_ts = now + notification_interval_sec
# Process download queue
if next_download_ts is not None and now >= next_download_ts:
try:
self.download_service.resume_unfinished_downloads()
except Exception:
logger.exception("Error during download queue processing")
finally:
next_download_ts = now + download_interval_sec
if next_retry_download_ts is not None and now >= next_retry_download_ts:
try:
self.download_service.retry_failed_downloads()
except Exception:
logger.exception(
"Error during failed download queue processing"
)
finally:
next_retry_download_ts = now + download_retry_interval_sec
# Determine how long to wait until the next scheduled task
next_events = [
t
for t in (
next_notification_ts,
next_download_ts,
next_retry_download_ts,
)
if t is not None
]
if next_events:
time_until_next = max(0.0, min(next_events) - time.time())
else:
time_until_next = 30.0
# Cap wait to react reasonably fast to stop requests
wait_time = min(time_until_next, 30.0)
self._stop_event.wait(timeout=wait_time)
except KeyboardInterrupt:
logger.info("Background worker interrupted by user. Stopping...")
self.stop()
finally:
# Ensure we always stop the download worker
try:
self.download_service.stop()
except Exception:
logger.exception("Failed to stop download service cleanly")
logger.info("Background worker stopped.")
def stop(self):
self.running = False
logger.info("Background worker shutting down.")
if not self._stop_event.is_set():
logger.info("Background worker shutting down...")
self._stop_event.set()

View File

@@ -1,7 +1,10 @@
import os
import sys
from rich.traceback import install as rich_install
from ...core.constants import PROJECT_NAME
def custom_exception_hook(exc_type, exc_value, exc_traceback):
print(f"{exc_type.__name__}: {exc_value}")
@@ -16,6 +19,9 @@ def setup_exceptions_handler(
rich_traceback: bool | None,
rich_traceback_theme: str,
):
if dev:
# auto set env
os.environ[f"{PROJECT_NAME}_DEBUG"] = "1"
if trace or dev:
sys.excepthook = default_exception_hook
if rich_traceback:

View File

@@ -1,10 +1,14 @@
import logging
import os
import re
from hashlib import sha256
from typing import Dict, List, Optional
import httpx
from ...core.config import AppConfig
from ...core.constants import APP_CACHE_DIR, PLATFORM, SCRIPTS_DIR
from ...core.utils.file import AtomicWriter
from ...libs.media_api.types import (
AiringScheduleResult,
Character,
@@ -14,6 +18,103 @@ from ...libs.media_api.types import (
from . import ansi
from .preview_workers import PreviewWorkerManager
def get_rofi_preview(
media_items: List[MediaItem], titles: List[str], config: AppConfig
) -> str:
# Ensure cache directories exist on startup
IMAGES_CACHE_DIR.mkdir(parents=True, exist_ok=True)
INFO_CACHE_DIR.mkdir(parents=True, exist_ok=True)
return (
"".join(
[
f"{title}\0icon\x1f{_get_image(item)}\n"
for item, title in zip(media_items, titles)
]
)
+ "Back\nExit"
)
def _get_image(item: MediaItem) -> str:
if not item.cover_image:
return ""
hash_id = sha256(item.title.english.encode("utf-8")).hexdigest()
image_path = IMAGES_CACHE_DIR / f"{hash_id}.png"
if image_path.exists():
return str(image_path)
if not item.cover_image.large:
return ""
try:
with httpx.stream(
"GET", item.cover_image.large, follow_redirects=True
) as response:
response.raise_for_status()
with AtomicWriter(image_path, "wb", encoding=None) as f:
for chunk in response.iter_bytes():
f.write(chunk)
return str(image_path)
except Exception as e:
logger.error(f"Failed to download image {item.cover_image.large}: {e}")
return ""
def get_rofi_episode_preview(
episodes: List[str], media_item: MediaItem, config: AppConfig
) -> str:
# Ensure cache directories exist on startup
IMAGES_CACHE_DIR.mkdir(parents=True, exist_ok=True)
INFO_CACHE_DIR.mkdir(parents=True, exist_ok=True)
return (
"".join(
[
f"{episode}\0icon\x1f{_get_episode_image(episode, media_item)}\n"
for episode in episodes
]
)
+ "Back\nExit"
)
def _get_episode_image(episode: str, media_item: MediaItem) -> str:
if media_item.streaming_episodes and media_item.streaming_episodes.get(episode):
stream = media_item.streaming_episodes[episode]
image_url = stream.thumbnail
else:
if not media_item.cover_image:
return ""
image_url = media_item.cover_image.large
if not image_url:
return ""
hash_id = sha256(
f"{media_item.title.english}_Episode_{episode}".encode("utf-8")
).hexdigest()
image_path = IMAGES_CACHE_DIR / f"{hash_id}.png"
if image_path.exists():
return str(image_path)
try:
with httpx.stream("GET", image_url, follow_redirects=True) as response:
response.raise_for_status()
with AtomicWriter(image_path, "wb", encoding=None) as f:
for chunk in response.iter_bytes():
f.write(chunk)
return str(image_path)
except Exception as e:
logger.error(
f"Failed to download image {image_url} for {media_item.title.english}: {e}"
)
return ""
logger = logging.getLogger(__name__)
os.environ["SHELL"] = "bash"
@@ -166,6 +267,9 @@ class PreviewContext:
def get_anime_preview(
items: List[MediaItem], titles: List[str], config: AppConfig
) -> str:
if config.general.selector == "rofi":
return get_rofi_preview(items, titles, config)
"""
Generate anime preview script and start background caching.
@@ -213,6 +317,7 @@ def get_anime_preview(
"C_RULE": ansi.get_true_fg(SEPARATOR_COLOR, bold=True),
"RESET": ansi.RESET,
"PREFIX": "",
"SCALE_UP": " --scale-up" if config.general.preview_scale_up else "",
}
for key, value in replacements.items():
@@ -235,6 +340,8 @@ def get_episode_preview(
Returns:
Preview script content for fzf
"""
if config.general.selector == "rofi":
return get_rofi_episode_preview(episodes, media_item, config)
IMAGES_CACHE_DIR.mkdir(parents=True, exist_ok=True)
INFO_CACHE_DIR.mkdir(parents=True, exist_ok=True)
@@ -270,6 +377,7 @@ def get_episode_preview(
"C_RULE": ansi.get_true_fg(SEPARATOR_COLOR, bold=True),
"RESET": ansi.RESET,
"PREFIX": f"{media_item.title.english}_Episode_",
"SCALE_UP": " --scale-up" if config.general.preview_scale_up else "",
}
for key, value in replacements.items():
@@ -323,6 +431,7 @@ def get_dynamic_anime_preview(config: AppConfig) -> str:
"C_VALUE": ansi.get_true_fg(HEADER_COLOR, bold=True),
"C_RULE": ansi.get_true_fg(SEPARATOR_COLOR, bold=True),
"RESET": ansi.RESET,
"SCALE_UP": " --scale-up" if config.general.preview_scale_up else "",
}
for key, value in replacements.items():

View File

@@ -117,7 +117,6 @@ class PreviewCacheWorker(ManagedBackgroundWorker):
# Submit info generation task if needed
if config.general.preview in ("full", "text"):
info_path = self.info_cache_dir / hash_id
info_text = self._generate_info_text(media_item, config)
self.submit_function(self._save_info_text, info_text, hash_id)
@@ -434,7 +433,6 @@ class ReviewCacheWorker(ManagedBackgroundWorker):
for choice_str, review in choice_map.items():
hash_id = self._get_cache_hash(choice_str)
info_path = self.reviews_cache_dir / hash_id
preview_content = self._generate_review_preview_content(review, config)
self.submit_function(self._save_preview_content, preview_content, hash_id)
@@ -522,7 +520,6 @@ class CharacterCacheWorker(ManagedBackgroundWorker):
for choice_str, character in choice_map.items():
hash_id = self._get_cache_hash(choice_str)
info_path = self.characters_cache_dir / hash_id
preview_content = self._generate_character_preview_content(
character, config
@@ -645,7 +642,6 @@ class AiringScheduleCacheWorker(ManagedBackgroundWorker):
raise RuntimeError("AiringScheduleCacheWorker is not running")
hash_id = self._get_cache_hash(anime_title)
info_path = self.airing_schedule_cache_dir / hash_id
preview_content = self._generate_airing_schedule_preview_content(
anime_title, schedule_result, config

View File

@@ -7,13 +7,28 @@ GENERAL_PREFERRED_SPINNER = "smiley"
GENERAL_API_CLIENT = "anilist"
GENERAL_PREFERRED_TRACKER = "local"
GENERAL_PROVIDER = "allanime"
GENERAL_SELECTOR = lambda: "fzf" if detect.has_fzf() else "default"
def GENERAL_SELECTOR():
return "fzf" if detect.has_fzf() else "default"
GENERAL_AUTO_SELECT_ANIME_RESULT = True
GENERAL_ICONS = True
GENERAL_PREVIEW = lambda: "full" if detect.is_running_kitty_terminal() else "none"
GENERAL_IMAGE_RENDERER = (
lambda: "icat" if detect.is_running_kitty_terminal() else "chafa"
)
def GENERAL_PREVIEW():
return "full" if detect.is_running_kitty_terminal() else "none"
GENERAL_SCALE_PREVIEW = True
GENERAL_SCALE_PREVIEW = False
def GENERAL_IMAGE_RENDERER():
return "icat" if detect.is_running_kitty_terminal() else "chafa"
GENERAL_MANGA_VIEWER = "feh"
GENERAL_CHECK_FOR_UPDATES = True
GENERAL_CACHE_REQUESTS = True
@@ -36,9 +51,11 @@ STREAM_YTDLP_FORMAT = "best[height<=1080]/bestvideo[height<=1080]+bestaudio/best
STREAM_FORCE_FORWARD_TRACKING = True
STREAM_DEFAULT_MEDIA_LIST_TRACKING = "prompt"
STREAM_SUB_LANG = "eng"
STREAM_USE_IPC = (
lambda: True if PLATFORM != "win32" and not detect.is_running_in_termux() else False
)
def STREAM_USE_IPC():
return True if PLATFORM != "win32" and not detect.is_running_in_termux() else False
# WorkerConfig
WORKER_ENABLED = True

View File

@@ -1,5 +1,4 @@
# GeneralConfig
from .defaults import SESSIONS_DIR
GENERAL_PYGMENT_STYLE = "The pygment style to use"
GENERAL_PREFERRED_SPINNER = "The spinner to use"
@@ -14,6 +13,11 @@ GENERAL_AUTO_SELECT_ANIME_RESULT = (
)
GENERAL_ICONS = "Display emoji icons in the user interface."
GENERAL_PREVIEW = "Type of preview to display in selectors."
GENERAL_SCALE_PREVIEW = (
"Whether to scale up images rendered with icat to fill the preview area. "
"When using the 'full' preview type in a landscape window, enabling this may reduce "
"the amount of text information displayed."
)
GENERAL_IMAGE_RENDERER = (
"The command-line tool to use for rendering images in the terminal."
)

View File

@@ -17,7 +17,57 @@ class GeneralConfig(BaseModel):
default=defaults.GENERAL_PREFERRED_TRACKER,
description=desc.GENERAL_PREFERRED_TRACKER,
)
pygment_style: str = Field(
pygment_style: Literal[
"abap",
"algol",
"algol_nu",
"arduino",
"autumn",
"bw",
"borland",
"coffee",
"colorful",
"default",
"dracula",
"emacs",
"friendly_grayscale",
"friendly",
"fruity",
"github-dark",
"gruvbox-dark",
"gruvbox-light",
"igor",
"inkpot",
"lightbulb",
"lilypond",
"lovelace",
"manni",
"material",
"monokai",
"murphy",
"native",
"nord-darker",
"nord",
"one-dark",
"paraiso-dark",
"paraiso-light",
"pastie",
"perldoc",
"rainbow_dash",
"rrt",
"sas",
"solarized-dark",
"solarized-light",
"staroffice",
"stata-dark",
"stata-light",
"tango",
"trac",
"vim",
"vs",
"xcode",
"zenburn",
] = Field(
default=defaults.GENERAL_PYGMENT_STYLE, description=desc.GENERAL_PYGMENT_STYLE
)
preferred_spinner: Literal[
@@ -119,6 +169,11 @@ class GeneralConfig(BaseModel):
default_factory=defaults.GENERAL_PREVIEW,
description=desc.GENERAL_PREVIEW,
)
preview_scale_up: bool = Field(
default=defaults.GENERAL_SCALE_PREVIEW,
description=desc.GENERAL_SCALE_PREVIEW,
)
image_renderer: Literal["icat", "chafa", "imgcat"] = Field(
default_factory=defaults.GENERAL_IMAGE_RENDERER,
description=desc.GENERAL_IMAGE_RENDERER,
@@ -239,6 +294,15 @@ class WorkerConfig(OtherConfig):
ge=1,
description="How often to process the download queue (in minutes).",
)
download_check_failed_interval: int = Field(
default=60, # in minutes
ge=1,
description="How often to process the failed download queue (in minutes).",
)
auto_download_new_episode: bool = Field(
default=True,
description="Whether to automatically download a new episode that has been notified",
)
class SessionsConfig(OtherConfig):
@@ -394,7 +458,7 @@ class DownloadsConfig(OtherConfig):
ge=1,
description=desc.DOWNLOADS_MAX_CONCURRENT,
)
retry_attempts: int = Field(
max_retry_attempts: int = Field(
default=defaults.DOWNLOADS_RETRY_ATTEMPTS,
ge=0,
description=desc.DOWNLOADS_RETRY_ATTEMPTS,

View File

@@ -367,7 +367,7 @@ class DefaultDownloader(BaseDownloader):
try:
# Run ffmpeg - use silent flag to control ffmpeg output, not progress
process = subprocess.run(
subprocess.run(
args,
capture_output=params.silent, # Only suppress ffmpeg output if silent
text=True,

View File

@@ -28,7 +28,6 @@ class DownloadFactory:
elif downloader_name == "auto":
# Auto mode: prefer yt-dlp if available, fallback to default
try:
import yt_dlp
from .yt_dlp import YtDLPDownloader
return YtDLPDownloader(config)

View File

@@ -93,7 +93,7 @@ class YtDLPDownloader(BaseDownloader):
"nocheckcertificate": params.no_check_certificate,
}
opts = opts
if params.force_ffmpeg:
if params.force_ffmpeg or params.hls_use_mpegts or params.hls_use_h264:
opts = opts | {
"external_downloader": {"default": "ffmpeg"},
"external_downloader_args": {
@@ -219,7 +219,7 @@ class YtDLPDownloader(BaseDownloader):
# Run the ffmpeg command
try:
process = subprocess.run(args)
subprocess.run(args)
final_output_path = video_path.parent / merged_filename
if final_output_path.exists():

View File

@@ -1,4 +1,5 @@
import os
import re
import shutil
import sys
@@ -19,6 +20,36 @@ def is_running_in_termux():
return False
def is_bash_script(text: str) -> bool:
# Normalize line endings
text = text.strip()
# Check for shebang at the top
if text.startswith("#!/bin/bash") or text.startswith("#!/usr/bin/env bash"):
return True
# Look for common bash syntax/keywords
bash_keywords = [
r"\becho\b",
r"\bfi\b",
r"\bthen\b",
r"\bfunction\b",
r"\bfor\b",
r"\bwhile\b",
r"\bdone\b",
r"\bcase\b",
r"\besac\b",
r"\$\(",
r"\[\[",
r"\]\]",
r";;",
]
# Score based on matches
matches = sum(bool(re.search(pattern, text)) for pattern in bash_keywords)
return matches >= 2
def is_running_kitty_terminal() -> bool:
return True if os.environ.get("KITTY_WINDOW_ID") else False

View File

@@ -308,7 +308,6 @@ class FileLock:
with self.lock_file_path.open("r") as f:
lines = f.readlines()
if len(lines) >= 2:
locked_pid = int(lines[0].strip())
locked_timestamp = float(lines[1].strip())
current_time = time.time()
if current_time - locked_timestamp > self.stale_timeout:

View File

@@ -243,9 +243,7 @@ def renumber_titles(titles: List[str]) -> Dict[str, Union[int, float, None]]:
offset = round(orig_ep - int_part, 3)
renumbered_val = round(base_val + offset, 3)
renumbered[title] = (
int(renumbered_val) if renumbered_val.is_integer() else renumbered_val
)
renumbered[title] = renumbered_val
# Add back the unnumbered titles with `None`
for t in without_numbers:

View File

@@ -1,59 +1,20 @@
"""
Title normalization utilities for converting between provider and media API titles.
This module provides functions to normalize anime titles between different providers
(allanime, hianime, animepahe) and media APIs (AniList) using the normalizer.json
mapping file located in the assets directory.
The normalizer.json file contains mappings in the following format:
{
"provider_name": {
"provider_title": "media_api_title",
...
},
...
}
Key Features:
- Bidirectional title conversion (provider ↔ media API)
- Caching for performance optimization
- Runtime mapping support for dynamic additions
- Comprehensive error handling and logging
- Type hints for better IDE support
Example Usage:
>>> from fastanime.core.utils.normalizer import (
... provider_title_to_media_api_title,
... media_api_title_to_provider_title
... )
# Convert provider title to media API title
>>> provider_title_to_media_api_title("1P", "allanime")
'one piece'
# Convert media API title to provider title
>>> media_api_title_to_provider_title("one piece", "allanime")
'1P'
# Check available providers
>>> get_available_providers()
['allanime', 'hianime', 'animepahe']
Author: FastAnime Contributors
"""
import json
import logging
from typing import Dict, Optional
from ..constants import ASSETS_DIR
from ..constants import APP_DATA_DIR, ASSETS_DIR
logger = logging.getLogger(__name__)
# Cache for the normalizer data to avoid repeated file reads
_normalizer_cache: Optional[Dict[str, Dict[str, str]]] = None
USER_NORMALIZER_JSON = APP_DATA_DIR / "normalizer.json"
DEFAULT_NORMALIZER_JSON = ASSETS_DIR / "normalizer.json"
# will load one in the config dir if available and merge them
def _load_normalizer_data() -> Dict[str, Dict[str, str]]:
"""
Load the normalizer.json file and cache it.
@@ -70,21 +31,41 @@ def _load_normalizer_data() -> Dict[str, Dict[str, str]]:
if _normalizer_cache is not None:
return _normalizer_cache
normalizer_path = ASSETS_DIR / "normalizer.json"
default_normalizer = {}
user_normalizer = {}
with open(DEFAULT_NORMALIZER_JSON, "r", encoding="utf-8") as f:
default_normalizer: dict = json.load(f)
if USER_NORMALIZER_JSON.exists():
with open(USER_NORMALIZER_JSON, "r", encoding="utf-8") as f:
user_normalizer: dict = json.load(f)
try:
with open(normalizer_path, "r", encoding="utf-8") as f:
_normalizer_cache = json.load(f)
logger.debug("Loaded normalizer data from %s", normalizer_path)
# Type checker now knows _normalizer_cache is not None
assert _normalizer_cache is not None
return _normalizer_cache
except FileNotFoundError:
logger.error("Normalizer file not found at %s", normalizer_path)
raise
except json.JSONDecodeError as e:
logger.error("Invalid JSON in normalizer file: %s", e)
raise
_normalizer_cache = default_normalizer
for key in default_normalizer:
if key in user_normalizer:
_normalizer_cache[key].update(user_normalizer[key])
return _normalizer_cache
def update_user_normalizer_json(
provider_title: str, media_api_title: str, provider_name: str
):
import time
from .file import AtomicWriter
print(
"UPDATING USER NORMALIZER JSON. PLEASE CONTRIBUTE TO THE PROJECT BY OPENING A PR ON GITHUB TO MERGE YOUR NORMALIZER JSON TO MAIN. MAEMOTTE KANSHA SHIMASU :)"
)
print(f"NORMALIZER JSON PATH IS: {USER_NORMALIZER_JSON}")
time.sleep(5)
if not _normalizer_cache:
raise RuntimeError(
"Fatal _normalizer_cache missing this should not be the case : (. Please report"
)
_normalizer_cache[provider_name][provider_title] = media_api_title.lower()
with AtomicWriter(USER_NORMALIZER_JSON) as f:
json.dump(_normalizer_cache, f, indent=2)
def provider_title_to_media_api_title(provider_title: str, provider_name: str) -> str:

View File

@@ -1,22 +1,71 @@
from httpx import get
# fastanime/libs/aniskip/api.py
ANISKIP_ENDPOINT = "https://api.aniskip.com/v1/skip-times"
import logging
from typing import List, Literal, Optional
import httpx
from pydantic import BaseModel, Field
logger = logging.getLogger(__name__)
ANISKIP_API_URL = "https://api.aniskip.com/v2/skip-times"
class SkipTime(BaseModel):
"""Represents a single skip interval (e.g., an opening or ending)."""
interval: tuple[float, float]
skip_type: Literal["op", "ed"] = Field(alias="skipType")
skip_id: str = Field(alias="skipId")
episode_length: float = Field(alias="episodeLength")
class SkipTimeResult(BaseModel):
"""Represents the full response from the Aniskip API for an episode."""
found: bool
results: List[SkipTime] = Field(default_factory=list)
message: Optional[str] = None
status_code: int = Field(alias="statusCode")
# TODO: Finish own implementation of aniskip script
class AniSkip:
"""A client for fetching opening and ending skip times from the Aniskip API."""
@classmethod
def get_skip_times(
cls, mal_id: int, episode_number: float | int, types=["op", "ed"]
):
url = f"{ANISKIP_ENDPOINT}/{mal_id}/{episode_number}?types=op&types=ed"
response = get(url)
print(response.text)
return response.json()
cls,
mal_id: int,
episode_number: int,
types: List[Literal["op", "ed"]] = ["op", "ed"],
) -> Optional[SkipTimeResult]:
"""
Fetches skip times for a specific anime episode from Aniskip.
Args:
mal_id: The MyAnimeList ID of the anime.
episode_number: The episode number.
types: A list of types to fetch ('op' for opening, 'ed' for ending).
if __name__ == "__main__":
mal_id = input("Mal id: ")
episode_number = input("episode_number: ")
skip_times = AniSkip.get_skip_times(int(mal_id), float(episode_number))
print(skip_times)
Returns:
A SkipTimeResult object if the request is successful, otherwise None.
"""
if not mal_id or not episode_number:
return None
url = f"{ANISKIP_API_URL}/{mal_id}/{episode_number}"
params = [("type", t) for t in types]
try:
with httpx.Client() as client:
response = client.get(url, params=params, timeout=5)
# Aniskip can return 404 for not found, which is a valid response.
if response.status_code not in [200, 404]:
response.raise_for_status()
return SkipTimeResult.model_validate(response.json())
except (httpx.RequestError, httpx.HTTPStatusError, ValueError) as e:
logger.error(
f"Aniskip API request failed for MAL ID {mal_id}, Ep {episode_number}: {e}"
)
return None

View File

@@ -1,3 +1,9 @@
"""
The player package provides abstractions and implementations for media player integration in FastAnime.
This package defines the base player interface, player parameter/result types, and concrete implementations for various media players (e.g., MPV, VLC, Syncplay).
"""
from .player import create_player
__all__ = ["create_player"]

View File

@@ -1,3 +1,9 @@
"""
Defines the abstract base class for all media player integrations in FastAnime.
All concrete player implementations must inherit from BasePlayer and implement its methods.
"""
import subprocess
from abc import ABC, abstractmethod
@@ -8,19 +14,43 @@ from .types import PlayerResult
class BasePlayer(ABC):
"""
Abstract Base Class defining the contract for all media players.
Abstract base class for all media player integrations.
Subclasses must implement the play and play_with_ipc methods to provide playback functionality.
"""
def __init__(self, config: StreamConfig):
"""
Initialize the player with the given stream configuration.
Args:
config: StreamConfig object containing player configuration.
"""
self.stream_config = config
@abstractmethod
def play(self, params: PlayerParams) -> PlayerResult:
"""
Plays the given media URL.
Play the given media URL using the player.
Args:
params: PlayerParams object containing playback parameters.
Returns:
PlayerResult: Information about the playback session.
"""
pass
@abstractmethod
def play_with_ipc(self, params: PlayerParams, socket_path: str) -> subprocess.Popen:
"""Stream using IPC player for enhanced features."""
"""
Play media using IPC (Inter-Process Communication) for enhanced control.
Args:
params: PlayerParams object containing playback parameters.
socket_path: Path to the IPC socket for player control.
Returns:
subprocess.Popen: The running player process.
"""
pass

View File

@@ -1,3 +1,9 @@
"""
MPV player integration for FastAnime.
This module provides the MpvPlayer class, which implements the BasePlayer interface for the MPV media player.
"""
import logging
import re
import shutil
@@ -17,11 +23,32 @@ MPV_AV_TIME_PATTERN = re.compile(r"AV: ([0-9:]*) / ([0-9:]*) \(([0-9]*)%\)")
class MpvPlayer(BasePlayer):
"""
MPV player implementation for FastAnime.
Provides playback functionality using the MPV media player, supporting desktop, mobile, torrents, and syncplay.
"""
def __init__(self, config: MpvConfig):
"""
Initialize the MpvPlayer with the given MPV configuration.
Args:
config: MpvConfig object containing MPV-specific settings.
"""
self.config = config
self.executable = shutil.which("mpv")
def play(self, params):
"""
Play the given media using MPV, handling desktop, mobile, torrent, and syncplay scenarios.
Args:
params: PlayerParams object containing playback parameters.
Returns:
PlayerResult: Information about the playback session.
"""
if TORRENT_REGEX.match(params.url) and detect.is_running_in_termux():
raise FastAnimeError("Unable to play torrents on termux")
elif params.syncplay and detect.is_running_in_termux():
@@ -32,6 +59,15 @@ class MpvPlayer(BasePlayer):
return self._play_on_desktop(params)
def _play_on_mobile(self, params) -> PlayerResult:
"""
Play media on a mobile device using Android intents.
Args:
params: PlayerParams object containing playback parameters.
Returns:
PlayerResult: Information about the playback session.
"""
if YOUTUBE_REGEX.match(params.url):
args = [
"nohup",
@@ -66,6 +102,15 @@ class MpvPlayer(BasePlayer):
return PlayerResult(params.episode)
def _play_on_desktop(self, params) -> PlayerResult:
"""
Play media on a desktop environment using MPV.
Args:
params: PlayerParams object containing playback parameters.
Returns:
PlayerResult: Information about the playback session.
"""
if not self.executable:
raise FastAnimeError("MPV executable not found in PATH.")
@@ -77,6 +122,15 @@ class MpvPlayer(BasePlayer):
return self._stream_on_desktop_with_subprocess(params)
def _stream_on_desktop_with_subprocess(self, params: PlayerParams) -> PlayerResult:
"""
Stream media using MPV via subprocess, capturing playback times.
Args:
params: PlayerParams object containing playback parameters.
Returns:
PlayerResult: Information about the playback session, including stop and total time.
"""
mpv_args = [self.executable, params.url]
mpv_args.extend(self._create_mpv_cli_options(params))
@@ -105,7 +159,16 @@ class MpvPlayer(BasePlayer):
)
def play_with_ipc(self, params: PlayerParams, socket_path: str) -> subprocess.Popen:
"""Stream using IPC player for enhanced features."""
"""
Stream using MPV with IPC (Inter-Process Communication) for enhanced features.
Args:
params: PlayerParams object containing playback parameters.
socket_path: Path to the IPC socket for player control.
Returns:
subprocess.Popen: The running MPV process.
"""
mpv_args = [
self.executable,
f"--input-ipc-server={socket_path}",
@@ -129,6 +192,15 @@ class MpvPlayer(BasePlayer):
def _stream_on_desktop_with_webtorrent_cli(
self, params: PlayerParams
) -> PlayerResult:
"""
Stream torrent media using the webtorrent CLI and MPV.
Args:
params: PlayerParams object containing playback parameters.
Returns:
PlayerResult: Information about the playback session.
"""
WEBTORRENT_CLI = shutil.which("webtorrent")
if not WEBTORRENT_CLI:
raise FastAnimeError(
@@ -143,8 +215,16 @@ class MpvPlayer(BasePlayer):
subprocess.run(args)
return PlayerResult(params.episode)
# TODO: Get people with real friends to do this lol
def _stream_on_desktop_with_syncplay(self, params: PlayerParams) -> PlayerResult:
"""
Stream media using Syncplay for synchronized playback with friends.
Args:
params: PlayerParams object containing playback parameters.
Returns:
PlayerResult: Information about the playback session.
"""
SYNCPLAY_EXECUTABLE = shutil.which("syncplay")
if not SYNCPLAY_EXECUTABLE:
raise FastAnimeError(
@@ -159,6 +239,15 @@ class MpvPlayer(BasePlayer):
return PlayerResult(params.episode)
def _create_mpv_cli_options(self, params: PlayerParams) -> list[str]:
"""
Create a list of MPV CLI options based on playback parameters.
Args:
params: PlayerParams object containing playback parameters.
Returns:
list[str]: List of MPV CLI arguments.
"""
mpv_args = []
if params.headers:
header_str = ",".join([f"{k}:{v}" for k, v in params.headers.items()])

View File

@@ -1,3 +1,7 @@
"""
Defines the PlayerParams dataclass, which encapsulates all parameters required to launch a media player session.
"""
from dataclasses import dataclass
from typing import TYPE_CHECKING
@@ -7,6 +11,20 @@ if TYPE_CHECKING:
@dataclass(frozen=True)
class PlayerParams:
"""
Parameters for launching a media player session.
Attributes:
url: The media URL to play.
title: The title to display in the player.
query: The original search query or context.
episode: The episode identifier or label.
syncplay: Whether to enable syncplay (synchronized playback).
subtitles: List of subtitle file paths or URLs.
headers: HTTP headers to include in the request.
start_time: The time offset to start playback from.
"""
url: str
title: str
query: str

View File

@@ -1,3 +1,9 @@
"""
Player factory and registration logic for FastAnime media players.
This module provides a factory for instantiating the correct player implementation based on configuration.
"""
from ...core.config import AppConfig
from .base import BasePlayer
@@ -5,19 +11,24 @@ PLAYERS = ["mpv", "vlc", "syncplay"]
class PlayerFactory:
"""
Factory for creating player instances based on configuration.
"""
@staticmethod
def create(config: AppConfig) -> BasePlayer:
"""
Factory method to create a player instance based on its name.
Create a player instance based on the configured player name.
Args:
config: The full application configuration object.
Returns:
An instance of a class that inherits from BasePlayer.
BasePlayer: An instance of a class that inherits from BasePlayer.
Raises:
ValueError: If the player_name is not supported.
NotImplementedError: If the player is recognized but not yet implemented.
"""
player_name = config.stream.player
@@ -35,4 +46,5 @@ class PlayerFactory:
)
# Alias for convenient player creation
create_player = PlayerFactory.create

View File

@@ -1,3 +1,9 @@
"""
Syncplay integration for FastAnime.
This module provides a procedural function to launch Syncplay with the given media and options.
"""
import shutil
import subprocess
@@ -5,6 +11,19 @@ from .tools import exit_app
def SyncPlayer(url: str, anime_title=None, headers={}, subtitles=[], *args):
"""
Launch Syncplay for synchronized playback with friends.
Args:
url: The media URL to play.
anime_title: Optional title to display in the player.
headers: Optional HTTP headers to pass to the player.
subtitles: Optional list of subtitle dicts with 'url' keys.
*args: Additional arguments (unused).
Returns:
Tuple of ("0", "0") for compatibility.
"""
# TODO: handle m3u8 multi quality streams
#
# check for SyncPlay

View File

@@ -1,8 +1,21 @@
"""
Defines the PlayerResult dataclass, which encapsulates the result of a player session.
"""
from dataclasses import dataclass
@dataclass(frozen=True)
class PlayerResult:
"""
Result of a player session.
Attributes:
episode: The episode identifier or label.
stop_time: The time at which playback stopped.
total_time: The total duration of the media.
"""
episode: str
stop_time: str | None = None
total_time: str | None = None

View File

@@ -1,3 +1,9 @@
"""
VLC player integration for FastAnime.
This module provides the VlcPlayer class, which implements the BasePlayer interface for the VLC media player.
"""
import logging
import shutil
import subprocess
@@ -14,11 +20,32 @@ logger = logging.getLogger(__name__)
class VlcPlayer(BasePlayer):
"""
VLC player implementation for FastAnime.
Provides playback functionality using the VLC media player, supporting desktop, mobile, and torrent scenarios.
"""
def __init__(self, config: VlcConfig):
"""
Initialize the VlcPlayer with the given VLC configuration.
Args:
config: VlcConfig object containing VLC-specific settings.
"""
self.config = config
self.executable = shutil.which("vlc")
def play(self, params: PlayerParams) -> PlayerResult:
"""
Play the given media using VLC, handling desktop, mobile, and torrent scenarios.
Args:
params: PlayerParams object containing playback parameters.
Returns:
PlayerResult: Information about the playback session.
"""
if not self.executable:
raise FastAnimeError("VLC executable not found in PATH.")
@@ -27,7 +54,22 @@ class VlcPlayer(BasePlayer):
else:
return self._play_on_desktop(params)
def play_with_ipc(self, params: PlayerParams, socket_path: str) -> subprocess.Popen:
"""
Not implemented for VLC player.
"""
raise NotImplementedError("play_with_ipc is not implemented for VLC player.")
def _play_on_mobile(self, params: PlayerParams) -> PlayerResult:
"""
Play media on a mobile device using Android intents.
Args:
params: PlayerParams object containing playback parameters.
Returns:
PlayerResult: Information about the playback session.
"""
if YOUTUBE_REGEX.match(params.url):
args = [
"nohup",
@@ -62,9 +104,18 @@ class VlcPlayer(BasePlayer):
subprocess.run(args)
return PlayerResult()
return PlayerResult(episode=params.episode)
def _play_on_desktop(self, params: PlayerParams) -> PlayerResult:
"""
Play media on a desktop environment using VLC.
Args:
params: PlayerParams object containing playback parameters.
Returns:
PlayerResult: Information about the playback session.
"""
if TORRENT_REGEX.search(params.url):
return self._stream_on_desktop_with_webtorrent_cli(params)
@@ -80,11 +131,20 @@ class VlcPlayer(BasePlayer):
args.extend(self.config.args.split(","))
subprocess.run(args, encoding="utf-8")
return PlayerResult()
return PlayerResult(episode=params.episode)
def _stream_on_desktop_with_webtorrent_cli(
self, params: PlayerParams
) -> PlayerResult:
"""
Stream torrent media using the webtorrent CLI and VLC.
Args:
params: PlayerParams object containing playback parameters.
Returns:
PlayerResult: Information about the playback session.
"""
WEBTORRENT_CLI = shutil.which("webtorrent")
if not WEBTORRENT_CLI:
raise FastAnimeError(
@@ -98,7 +158,7 @@ class VlcPlayer(BasePlayer):
args.extend(self.config.args.split(","))
subprocess.run(args)
return PlayerResult()
return PlayerResult(episode=params.episode)
if __name__ == "__main__":
@@ -107,5 +167,5 @@ if __name__ == "__main__":
print(APP_ASCII_ART)
url = input("Enter the url you would like to stream: ")
vlc = VlcPlayer(VlcConfig())
player_result = vlc.play(PlayerParams(url=url, title=""))
player_result = vlc.play(PlayerParams(url=url, title="", query="", episode=""))
print(player_result)

View File

@@ -20,7 +20,6 @@ class FmHlsExtractor(BaseExtractor):
timeout=10,
)
response.raise_for_status()
streams = response.json()
embed_html = response.text.replace(" ", "").replace("\n", "")
vid = MP4_SERVER_JUICY_STREAM_REGEX.search(embed_html)
@@ -50,7 +49,6 @@ class OkExtractor(BaseExtractor):
timeout=10,
)
response.raise_for_status()
streams = response.json()
embed_html = response.text.replace(" ", "").replace("\n", "")
vid = MP4_SERVER_JUICY_STREAM_REGEX.search(embed_html)

View File

@@ -20,7 +20,6 @@ class SsHlsExtractor(BaseExtractor):
timeout=10,
)
response.raise_for_status()
embed_html = response.text.replace(" ", "").replace("\n", "")
streams = response.json()["links"]
return Server(

View File

@@ -19,7 +19,6 @@ class VidMp4Extractor(BaseExtractor):
f"https://{API_BASE_URL}{url.replace('clock', 'clock.json')}",
timeout=10,
)
embed_html = response.text.replace(" ", "").replace("\n", "")
response.raise_for_status()
streams = response.json()

View File

@@ -88,10 +88,7 @@ class AllAnimeEpisodeStream(TypedDict):
class AllAnimeEpisodeStreams(TypedDict):
links: [AllAnimeEpisodeStream]
Server = Literal["gogoanime", "dropbox", "wetransfer", "sharepoint"]
links: list[AllAnimeEpisodeStream]
class AllAnimeEpisode(TypedDict):

View File

@@ -51,7 +51,7 @@ def debug_extractor(extractor_function):
f"[AllAnime@Server={args[3].get('sourceName', 'UNKNOWN')}]: {e}"
)
else:
return extractor_function(*args, **kwargs)
return extractor_function(*args)
return _provider_function_wrapper

View File

@@ -0,0 +1,12 @@
# The base domain for HiAnime.
HIANIME_DOMAIN = "hianime.to"
HIANIME_BASE_URL = f"https://{HIANIME_DOMAIN}"
# The endpoint for making AJAX requests (fetching episodes, servers, etc.).
HIANIME_AJAX_URL = f"{HIANIME_BASE_URL}/ajax"
# The base URL for search queries.
SEARCH_URL = f"{HIANIME_BASE_URL}/search"
# The Referer header is crucial for making successful requests to the AJAX endpoints.
AJAX_REFERER_HEADER = f"{HIANIME_BASE_URL}/"

View File

@@ -0,0 +1,30 @@
import logging
from typing import Optional
from ....anime.types import Server
from .megacloud import MegaCloudExtractor
logger = logging.getLogger(__name__)
def extract_server(embed_url: str) -> Optional[Server]:
"""
Acts as a router to select the correct extractor based on the embed URL.
Args:
embed_url: The URL of the video host's embed page.
Returns:
A Server object containing the stream links, or None if extraction fails.
"""
hostname = embed_url.split("/")[2]
if "megacloud" in hostname or "megaplay" in hostname:
return MegaCloudExtractor().extract(embed_url)
# In the future, you could add other extractors here:
# if "streamsb" in hostname:
# return StreamSbExtractor().extract(embed_url)
logger.warning(f"No extractor found for hostname: {hostname}")
return None

View File

@@ -0,0 +1,55 @@
const CryptoJS = require("crypto-js");
/**
* Extracts a secret key from an encrypted string based on an array of index pairs,
* then uses that key to decrypt the rest of the string.
* @param {string} encryptedString - The full encrypted sources string.
* @param {string} varsJson - A JSON string representing an array of [start, length] pairs.
* @returns {string} The decrypted JSON string of video sources.
*/
function getSecretAndDecrypt(encryptedString, varsJson) {
const values = JSON.parse(varsJson);
let secret = "";
let encryptedSource = "";
let encryptedSourceArray = encryptedString.split("");
let currentIndex = 0;
for (const index of values) {
const start = index[0] + currentIndex;
const end = start + index[1];
for (let i = start; i < end; i++) {
secret += encryptedString[i];
encryptedSourceArray[i] = "";
}
currentIndex += index[1];
}
encryptedSource = encryptedSourceArray.join("");
const decrypted = CryptoJS.AES.decrypt(encryptedSource, secret).toString(
CryptoJS.enc.Utf8,
);
return decrypted;
}
// Main execution logic
const args = process.argv.slice(2);
if (args.length < 2) {
console.error(
"Usage: node megacloud_decrypt.js <encryptedString> '<varsJson>'",
);
process.exit(1);
}
const encryptedString = args[0];
const varsJson = args[1];
try {
const result = getSecretAndDecrypt(encryptedString, varsJson);
// The result is already a JSON string of the sources, just print it to stdout.
console.log(result);
} catch (e) {
console.error(e.message);
process.exit(1);
}

View File

@@ -0,0 +1,21 @@
{
"name": "hianime-extractor-helper",
"version": "1.0.0",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "hianime-extractor-helper",
"version": "1.0.0",
"dependencies": {
"crypto-js": "^4.2.0"
}
},
"node_modules/crypto-js": {
"version": "4.2.0",
"resolved": "https://registry.npmjs.org/crypto-js/-/crypto-js-4.2.0.tgz",
"integrity": "sha512-KALDyEYgpY+Rlob/iriUtjV6d5Eq+Y191A5g4UqLAi8CyGP9N1+FdVbkc1SxKc2r4YAYqG8JzO2KGL+AizD70Q==",
"license": "MIT"
}
}
}

View File

@@ -0,0 +1,9 @@
{
"name": "hianime-extractor-helper",
"version": "1.0.0",
"description": "Helper script to decrypt MegaCloud sources for FastAnime.",
"main": "megacloud_decrypt.js",
"dependencies": {
"crypto-js": "^4.2.0"
}
}

View File

@@ -0,0 +1,180 @@
import json
import logging
import re
import subprocess
from pathlib import Path
from typing import List, Optional
import httpx
from ...types import EpisodeStream, Server, Subtitle
from ..types import HiAnimeSource
logger = logging.getLogger(__name__)
# The path to our Node.js decryption script, relative to this file.
DECRYPT_SCRIPT_PATH = Path(__file__).parent / "js" / "megacloud_decrypt.js"
class MegaCloudExtractor:
"""
Extractor for MegaCloud streams.
It works by:
1. Fetching the embed page.
2. Finding the encrypted sources data and the URL to a JavaScript file.
3. Fetching the JavaScript file and using regex to find decryption keys.
4. Calling an external Node.js script to perform the decryption.
5. Parsing the decrypted result to get the final stream URLs.
"""
def _run_node_script(self, encrypted_string: str, vars_json: str) -> Optional[dict]:
"""
Executes the Node.js decryption script as a subprocess.
Args:
encrypted_string: The large encrypted sources string.
vars_json: A JSON string of the array of indexes for key extraction.
Returns:
The decrypted data as a dictionary, or None on failure.
"""
if not DECRYPT_SCRIPT_PATH.exists():
logger.error(
f"Node.js decryption script not found at: {DECRYPT_SCRIPT_PATH}"
)
return None
command = ["node", str(DECRYPT_SCRIPT_PATH), encrypted_string, vars_json]
try:
process = subprocess.run(
command,
capture_output=True,
text=True,
check=True,
cwd=DECRYPT_SCRIPT_PATH.parent, # Run from the 'js' directory
)
return json.loads(process.stdout)
except subprocess.CalledProcessError as e:
logger.error(f"Node.js script failed with error: {e.stderr}")
except json.JSONDecodeError:
logger.error("Failed to parse JSON output from Node.js script.")
except Exception as e:
logger.error(
f"An unexpected error occurred while running Node.js script: {e}"
)
return None
def extract_vars_from_script(self, script_content: str) -> Optional[str]:
"""
Uses regex to find the variable array needed for decryption from the script content.
This pattern is based on the logic from the TypeScript project.
"""
# This regex is a Python adaptation of the one in the TypeScript source.
# It looks for the specific pattern that initializes the decryption keys.
regex = r"case\s*0x[0-9a-f]+:(?![^;]*=partKey)\s*\w+\s*=\s*(\w+)\s*,\s*\w+\s*=\s*(\w+);"
matches = re.findall(regex, script_content)
if not matches:
logger.error("Could not find decryption variables in the script.")
return None
def matching_key(value: str, script: str) -> Optional[str]:
# This nested function replicates the `matchingKey` logic from the TS file.
key_regex = re.compile(f",{value}=((?:0x)?([0-9a-fA-F]+))")
match = key_regex.search(script)
return match.group(1) if match else None
vars_array = []
for match in matches:
try:
key1_hex = matching_key(match[0], script_content)
key2_hex = matching_key(match[1], script_content)
if key1_hex and key2_hex:
vars_array.append([int(key1_hex, 16), int(key2_hex, 16)])
except (ValueError, TypeError):
logger.warning(
f"Could not parse hex values from script for match: {match}"
)
continue
return json.dumps(vars_array) if vars_array else None
def extract(self, embed_url: str) -> Optional[Server]:
"""
Main extraction method.
Args:
embed_url: The URL of the MegaCloud embed page.
Returns:
A Server object containing stream links and subtitles.
"""
try:
with httpx.Client() as client:
# 1. Get the embed page content
embed_response = client.get(
embed_url, headers={"Referer": constants.HIANIME_BASE_URL}
)
embed_response.raise_for_status()
embed_html = embed_response.text
# 2. Find the encrypted sources and the script URL
# The data is usually stored in a script tag as `var sources = [...]`.
sources_match = re.search(r"var sources = ([^;]+);", embed_html)
script_url_match = re.search(
r'src="(/js/player/a/prod/e1-player.min.js\?[^"]+)"', embed_html
)
if not sources_match or not script_url_match:
logger.error("Could not find sources or script URL in embed page.")
return None
encrypted_sources_data = json.loads(sources_match.group(1))
script_url = "https:" + script_url_match.group(1)
encrypted_string = encrypted_sources_data.get("sources")
if not isinstance(encrypted_string, str) or not encrypted_string:
logger.error("Encrypted sources string is missing or invalid.")
return None
# 3. Fetch the script and extract decryption variables
script_response = client.get(script_url)
script_response.raise_for_status()
vars_json = self.extract_vars_from_script(script_response.text)
if not vars_json:
return None
# 4. Decrypt using the Node.js script
decrypted_data = self._run_node_script(encrypted_string, vars_json)
if not decrypted_data or not isinstance(decrypted_data, list):
logger.error("Decryption failed or returned invalid data.")
return None
# 5. Map to generic models
streams = [
EpisodeStream(
link=source["file"], quality="auto", format=source["type"]
)
for source in decrypted_data
]
subtitles = [
Subtitle(url=track["file"], language=track.get("label", "en"))
for track in encrypted_sources_data.get("tracks", [])
if track.get("kind") == "captions"
]
return Server(
name="MegaCloud",
links=streams,
subtitles=subtitles,
headers={"Referer": "https://megacloud.tv/"},
)
except Exception as e:
logger.error(f"MegaCloud extraction failed: {e}", exc_info=True)
return None

View File

@@ -0,0 +1,149 @@
import re
from typing import List, Optional
from ....provider.anime.types import (
Anime,
AnimeEpisodes,
PageInfo,
SearchResult,
SearchResults,
)
from ....provider.scraping.html_parser import (
extract_attributes,
get_element_by_class,
get_elements_by_class,
)
def _parse_episodes(element_html: str) -> AnimeEpisodes:
"""Helper function to parse sub/dub episode counts from an anime item."""
sub_text = get_element_by_class("tick-sub", element_html)
dub_text = get_element_by_class("tick-dub", element_html)
sub_count = 0
dub_count = 0
if sub_text:
match = re.search(r"\d+", sub_text)
if match:
sub_count = int(match.group(0))
if dub_text:
match = re.search(r"\d+", dub_text)
if match:
dub_count = int(match.group(0))
# Generate a list of episode numbers as strings
sub_list = [str(i) for i in range(1, sub_count + 1)]
dub_list = [str(i) for i in range(1, dub_count + 1)]
return AnimeEpisodes(sub=sub_list, dub=dub_list, raw=[])
def map_to_search_results(
anime_elements: List[str], full_html: str
) -> Optional[SearchResults]:
"""
Maps a list of HTML elements from a HiAnime search page to a generic SearchResults object.
Args:
anime_elements: A list of raw HTML strings, each representing an anime (.flw-item).
full_html: The full HTML content of the search page for parsing pagination.
Returns:
A SearchResults object or None if parsing fails.
"""
results = []
for element in anime_elements:
title_element = get_element_by_class("dynamic-name", element)
if not title_element:
continue
attrs = extract_attributes(title_element)
title = title_element.split(">")[1].split("<")[0].strip()
anime_id = attrs.get("href", "").lstrip("/")
poster_element = get_element_by_class("film-poster-img", element)
poster_attrs = extract_attributes(poster_element or "")
results.append(
SearchResult(
id=anime_id,
title=title,
poster=poster_attrs.get("data-src"),
episodes=_parse_episodes(element),
)
)
# Parse pagination to determine total pages
total_pages = 1
# Use a simpler selector that is less prone to parsing issues.
pagination_elements = get_elements_by_class("page-item", full_html)
if pagination_elements:
# Find the last page number from all pagination links
last_page_num = 0
for el in pagination_elements:
attrs = extract_attributes(el)
href = attrs.get("href", "")
if "?page=" in href:
try:
num = int(href.split("?page=")[-1])
if num > last_page_num:
last_page_num = num
except (ValueError, IndexError):
continue
if last_page_num > 0:
total_pages = last_page_num
page_info = PageInfo(total=total_pages)
return SearchResults(page_info=page_info, results=results)
def map_to_anime_result(anime_id_slug: str, episode_list_html: str) -> Optional[Anime]:
"""
Maps the AJAX response for an episode list to a generic Anime object.
Args:
anime_id_slug: The anime's unique ID string (e.g., "steinsgate-3").
episode_list_html: The raw HTML snippet containing the list of episodes.
Returns:
An Anime object containing the episode list, or None.
"""
episodes = get_elements_by_class("ssl-item", episode_list_html)
episode_numbers_sub = []
# Note: HiAnime's episode list doesn't differentiate sub/dub, so we assume all are sub for now.
# The user selects sub/dub when choosing a server later.
for ep_element in episodes:
attrs = extract_attributes(ep_element)
ep_num = attrs.get("data-number")
if ep_num:
episode_numbers_sub.append(ep_num)
# The title isn't in this AJAX response, so we derive a placeholder from the slug.
# The application's state usually carries the real title from the search/list step.
placeholder_title = anime_id_slug.replace("-", " ").title()
return Anime(
id=anime_id_slug,
title=placeholder_title,
episodes=AnimeEpisodes(
sub=episode_numbers_sub,
dub=[], # We don't know dub count from this endpoint
raw=[],
),
)
def map_to_server_id(server_element_html: str) -> Optional[str]:
"""
Extracts the server's unique data-id from its HTML element.
Args:
server_element_html: The raw HTML of a server-item.
Returns:
The server ID string, or None.
"""
attrs = extract_attributes(server_element_html)
return attrs.get("data-id")

View File

@@ -0,0 +1,168 @@
import logging
from typing import Iterator, Optional
from ....provider.anime.base import BaseAnimeProvider
from ....provider.anime.params import AnimeParams, EpisodeStreamsParams, SearchParams
from ....provider.anime.types import Anime, SearchResults, Server
from ....provider.scraping.html_parser import get_elements_by_class
from . import constants, mappers
from .extractors import extract_server
logger = logging.getLogger(__name__)
class HiAnime(BaseAnimeProvider):
"""
Provider for scraping anime data from HiAnime.
This provider implements the search, get, and episode_streams methods
to fetch anime information and video stream URLs from HiAnime's website
and internal AJAX APIs.
"""
HEADERS = {"Referer": constants.HIANIME_BASE_URL}
def search(self, params: SearchParams) -> Optional[SearchResults]:
"""
Searches HiAnime for a given query.
Args:
params: The search parameters containing the query.
Returns:
A SearchResults object containing the found anime, or None.
"""
search_url = f"{constants.SEARCH_URL}?keyword={params.query}"
try:
response = self.client.get(search_url, follow_redirects=True)
response.raise_for_status()
# The search results are rendered in the HTML. We use our HTML parser
# to find all elements with the class '.flw-item', which represent
# individual anime search results.
anime_elements = get_elements_by_class("flw-item", response.text)
if not anime_elements:
return None
# The mapper will convert the raw HTML elements into our generic SearchResults model.
return mappers.map_to_search_results(anime_elements, response.text)
except Exception as e:
logger.error(
f"Failed to perform search on HiAnime for query '{params.query}': {e}"
)
return None
def get(self, params: AnimeParams) -> Optional[Anime]:
"""
Retrieves detailed information and a list of episodes for a specific anime.
Args:
params: The parameters containing the anime ID (slug).
Returns:
An Anime object with a full episode list, or None.
"""
try:
# The numeric ID is the last part of the slug.
clean_id_slug = params.id.split("?")[0]
anime_id_numeric = clean_id_slug.split("-")[-1]
if not anime_id_numeric.isdigit():
raise ValueError("Could not extract numeric ID from anime slug.")
# HiAnime loads episodes via an AJAX request.
episodes_url = (
f"{constants.HIANIME_AJAX_URL}/v2/episode/list/{anime_id_numeric}"
)
response = self.client.get(
episodes_url,
headers={
"X-Requested-With": "XMLHttpRequest",
"Referer": constants.AJAX_REFERER_HEADER,
},
)
response.raise_for_status()
# The response is JSON containing an 'html' key with the episode list.
html_snippet = response.json().get("html", "")
if not html_snippet:
return None
# We pass the original anime ID (slug) and the HTML snippet to the mapper.
return mappers.map_to_anime_result(params.id, html_snippet)
except Exception as e:
logger.error(f"Failed to get anime details for '{params.id}': {e}")
return None
def episode_streams(
self, params: EpisodeStreamsParams
) -> Optional[Iterator[Server]]:
"""
Fetches the actual video stream URLs for a given episode.
This is a multi-step process:
1. Get the list of available servers (e.g., MegaCloud, StreamSB).
2. For each server, get the embed URL.
3. Pass the embed URL to an extractor to get the final stream URL.
Args:
params: The parameters containing the episode ID.
Yields:
A Server object for each available video source.
"""
try:
# The episode ID is in the format 'anime-slug?ep=12345'
episode_id_numeric = params.episode.split("?ep=")[-1]
if not episode_id_numeric.isdigit():
raise ValueError("Could not extract numeric episode ID.")
# 1. Get available servers for the episode.
servers_url = f"{constants.HIANIME_AJAX_URL}/v2/episode/servers?episodeId={episode_id_numeric}"
servers_response = self.client.get(
servers_url,
headers={
"X-Requested-With": "XMLHttpRequest",
"Referer": constants.AJAX_REFERER_HEADER,
},
)
servers_response.raise_for_status()
server_elements = get_elements_by_class(
"server-item", servers_response.json().get("html", "")
)
for server_element in server_elements:
try:
# 2. Extract the server's unique ID.
server_id = mappers.map_to_server_id(server_element)
if not server_id:
continue
# 3. Get the embed URL for this server.
sources_url = f"{constants.HIANIME_AJAX_URL}/v2/episode/sources?id={server_id}"
sources_response = self.client.get(
sources_url,
headers={
"X-Requested-With": "XMLHttpRequest",
"Referer": constants.AJAX_REFERER_HEADER,
},
)
sources_response.raise_for_status()
embed_url = sources_response.json().get("link")
if not embed_url:
continue
# 4. Use an extractor to get the final stream URLs from the embed page.
# The extractor handles the complex, host-specific logic.
server = extract_server(embed_url)
if server:
yield server
except Exception as e:
logger.warning(
f"Failed to process a server for episode '{params.episode}': {e}"
)
continue
except Exception as e:
logger.error(f"Failed to get episode streams for '{params.episode}': {e}")
return None

View File

@@ -0,0 +1,33 @@
from typing import List, Literal, TypedDict
class HiAnimeEpisode(TypedDict):
"""
Represents a single episode entry returned by the
`/ajax/v2/episode/list/{anime_id}` endpoint.
"""
title: str | None
episodeId: str | None
number: int
isFiller: bool
class HiAnimeEpisodeServer(TypedDict):
"""
Represents a single server entry returned by the
`/ajax/v2/episode/servers?episodeId={episode_id}` endpoint.
"""
serverName: str
serverId: int | None
class HiAnimeSource(TypedDict):
"""
Represents the JSON response from the
`/ajax/v2/episode/sources?id={server_id}` endpoint,
which contains the link to the extractor's embed page.
"""
link: str

View File

@@ -12,8 +12,6 @@ PROVIDERS_AVAILABLE = {
"allanime": "provider.AllAnime",
"animepahe": "provider.AnimePahe",
"hianime": "provider.HiAnime",
"nyaa": "provider.Nyaa",
"yugen": "provider.Yugen",
}

View File

@@ -11,6 +11,7 @@ from pydantic import BaseModel, ConfigDict
class ProviderName(Enum):
ALLANIME = "allanime"
ANIMEPAHE = "animepahe"
HIANIME = "hianime"
class ProviderServer(Enum):

View File

@@ -31,6 +31,7 @@ class BaseSelector(ABC):
"""
pass
@abstractmethod
def choose_multiple(
self,
prompt: str,
@@ -50,29 +51,7 @@ class BaseSelector(ABC):
Returns:
A list of the chosen items.
"""
# Default implementation: single selection in a loop
selected = []
remaining_choices = choices.copy()
while remaining_choices:
choice = self.choose(
f"{prompt} (Select multiple, empty to finish)",
remaining_choices + ["[DONE] Finish selection"],
preview=preview,
)
if not choice or choice == "[DONE] Finish selection":
break
selected.append(choice)
remaining_choices.remove(choice)
if not self.confirm(
f"Selected: {', '.join(selected)}. Continue selecting?", default=True
):
break
return selected
pass
@abstractmethod
def confirm(self, prompt: str, *, default: bool = False) -> bool:

View File

@@ -141,3 +141,16 @@ class FzfSelector(BaseSelector):
if result.returncode != 0:
return None
return result.stdout.strip()
if __name__ == "__main__":
config = FzfConfig()
selector = FzfSelector(config)
choice = selector.ask("Hello dev :)")
print(choice)
choice = selector.confirm("Hello dev :)")
print(choice)
choice = selector.choose_multiple("What comes first", ["a", "b"])
print(choice)
choice = selector.choose("What comes first", ["a", "b"])
print(choice)

View File

@@ -20,4 +20,27 @@ class InquirerSelector(BaseSelector):
return Confirm.ask(prompt, default=default)
def ask(self, prompt, *, default=None):
return Prompt.ask(prompt=prompt, default=default or "")
return Prompt.ask(prompt=prompt, default=default or None)
def choose_multiple(
self, prompt: str, choices: list[str], preview: str | None = None
) -> list[str]:
return FuzzyPrompt(
message=prompt,
choices=choices,
height="100%",
multiselect=True,
border=True,
).execute()
if __name__ == "__main__":
selector = InquirerSelector()
choice = selector.ask("Hello dev :)")
print(choice)
choice = selector.confirm("Hello dev :)")
print(choice)
choice = selector.choose_multiple("What comes first", ["a", "b"])
print(choice)
choice = selector.choose("What comes first", ["a", "b"])
print(choice)

View File

@@ -2,6 +2,7 @@ import shutil
import subprocess
from ....core.config import RofiConfig
from ....core.utils import detect
from ..base import BaseSelector
@@ -13,13 +14,52 @@ class RofiSelector(BaseSelector):
raise FileNotFoundError("rofi executable not found in PATH.")
def choose(self, prompt, choices, *, preview=None, header=None):
rofi_input = "\n".join(choices)
if preview and detect.is_bash_script(preview):
preview = None
rofi_input = preview if preview else "\n".join(choices)
args = [
self.executable,
"-no-config",
"-theme",
self.config.theme_preview if preview else self.config.theme_main,
"-p",
prompt,
"-i",
"-dmenu",
]
if preview:
args.append("-show-icons")
result = subprocess.run(
args,
input=rofi_input,
stdout=subprocess.PIPE,
text=True,
)
if result:
choice = result.stdout.strip()
return choice
def confirm(self, prompt, *, default=False):
choices = ["Yes", "No"]
default_choice = "Yes" if default else "No"
result = self.choose(prompt, choices, header=f"Default: {default_choice}")
return result == "Yes"
def ask(self, prompt, *, default=None):
return self.choose(prompt, [])
def choose_multiple(
self, prompt: str, choices: list[str], preview: str | None = None
) -> list[str]:
rofi_input = "\n".join(choices)
args = [
self.executable,
"-no-config",
"-theme",
self.config.theme_main,
"-multi-select",
"-p",
prompt,
"-i",
@@ -34,14 +74,18 @@ class RofiSelector(BaseSelector):
if result:
choice = result.stdout.strip()
return choice
return choice.split()
return []
def confirm(self, prompt, *, default=False):
# Maps directly to your existing `confirm` method
# ... (logic from your `Rofi.confirm` method) ...
pass
def ask(self, prompt, *, default=None):
# Maps directly to your existing `ask` method
# ... (logic from your `Rofi.ask` method) ...
pass
if __name__ == "__main__":
config = RofiConfig()
selector = RofiSelector(config)
choice = selector.ask("Hello dev :)")
print(choice)
choice = selector.confirm("Hello dev :)")
print(choice)
choice = selector.choose_multiple("What comes first", ["a", "b"])
print(choice)
choice = selector.choose("What comes first", ["a", "b"])
print(choice)

View File

@@ -23,7 +23,10 @@ standard = [
"yt-dlp>=2025.7.21",
"pycryptodomex>=3.23.0",
]
notifications = ["plyer>=2.1.0"]
notifications = [
"dbus-python>=1.4.0",
"plyer>=2.1.0",
]
mpv = [
"mpv>=1.0.7",
]

View File

@@ -1,47 +0,0 @@
[tool.pytest.ini_options]
minversion = "6.0"
addopts = [
"-ra",
"--strict-markers",
"--strict-config",
"--cov=fastanime.cli.interactive",
"--cov-report=term-missing",
"--cov-report=html:htmlcov",
"--cov-report=xml",
"-v",
]
testpaths = [
"tests",
]
python_files = [
"test_*.py",
"*_test.py",
]
python_classes = [
"Test*",
]
python_functions = [
"test_*",
]
markers = [
"unit: Unit tests",
"integration: Integration tests",
"slow: Slow running tests",
"network: Tests requiring network access",
"auth: Tests requiring authentication",
]
filterwarnings = [
"ignore::DeprecationWarning",
"ignore::PendingDeprecationWarning",
]
# Test discovery patterns
collect_ignore = [
"setup.py",
]
# Pytest plugins
required_plugins = [
"pytest-cov",
"pytest-mock",
]

10
uv.lock generated
View File

@@ -74,6 +74,12 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/d1/d6/3965ed04c63042e047cb6a3e6ed1a63a35087b6a609aa3a15ed8ac56c221/colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6", size = 25335, upload-time = "2022-10-25T02:36:20.889Z" },
]
[[package]]
name = "dbus-python"
version = "1.4.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/ff/24/63118050c7dd7be04b1ccd60eab53fef00abe844442e1b6dec92dae505d6/dbus-python-1.4.0.tar.gz", hash = "sha256:991666e498f60dbf3e49b8b7678f5559b8a65034fdf61aae62cdecdb7d89c770", size = 232490, upload-time = "2025-03-13T19:57:54.212Z" }
[[package]]
name = "distlib"
version = "0.4.0"
@@ -97,7 +103,7 @@ wheels = [
[[package]]
name = "fastanime"
version = "2.9.9"
version = "3.0.0"
source = { editable = "." }
dependencies = [
{ name = "click" },
@@ -122,6 +128,7 @@ mpv = [
{ name = "mpv" },
]
notifications = [
{ name = "dbus-python" },
{ name = "plyer" },
]
standard = [
@@ -150,6 +157,7 @@ dev = [
[package.metadata]
requires-dist = [
{ name = "click", specifier = ">=8.1.7" },
{ name = "dbus-python", marker = "extra == 'notifications'", specifier = ">=1.4.0" },
{ name = "httpx", specifier = ">=0.28.1" },
{ name = "inquirerpy", specifier = ">=0.3.4" },
{ name = "libtorrent", marker = "extra == 'torrent'", specifier = ">=2.0.11" },