feat: re-add the download cmd

This commit is contained in:
Benexl
2025-07-22 14:59:29 +03:00
parent 384d326fa8
commit 65e4726f82
2 changed files with 272 additions and 0 deletions

View File

@@ -30,6 +30,7 @@ commands = {
"config": "config.config",
"search": "search.search",
"anilist": "anilist.anilist",
"download": "download.download",
}

View File

@@ -0,0 +1,271 @@
from typing import TYPE_CHECKING
import click
from ...core.config import AppConfig
from ...core.exceptions import FastAnimeError
from ..utils.completions import anime_titles_shell_complete
from . import examples
if TYPE_CHECKING:
from pathlib import Path
from typing import TypedDict
from typing_extensions import Unpack
from ...libs.players.base import BasePlayer
from ...libs.providers.anime.base import BaseAnimeProvider
from ...libs.providers.anime.types import Anime
from ...libs.selectors.base import BaseSelector
class Options(TypedDict):
anime_title: tuple
episode_range: str
file: Path | None
force_unknown_ext: bool
silent: bool
verbose: bool
merge: bool
clean: bool
wait_time: int
prompt: bool
force_ffmpeg: bool
hls_use_mpegts: bool
hls_use_h264: bool
@click.command(
help="Download anime using the anime provider for a specified range",
short_help="Download anime",
epilog=examples.download,
)
@click.option(
"--anime_title",
"-t",
required=True,
shell_complete=anime_titles_shell_complete,
multiple=True,
help="Specify which anime to download",
)
@click.option(
"--episode-range",
"-r",
help="A range of episodes to download (start-end)",
)
@click.option(
"--file",
"-f",
type=click.File(),
help="A file to read from all anime to download",
)
@click.option(
"--force-unknown-ext",
"-F",
help="This option forces yt-dlp to download extensions its not aware of",
is_flag=True,
)
@click.option(
"--silent/--no-silent",
"-q/-V",
type=bool,
help="Download silently (during download)",
default=True,
)
@click.option("--verbose", "-v", is_flag=True, help="Download verbosely (everywhere)")
@click.option(
"--merge", "-m", is_flag=True, help="Merge the subfile with video using ffmpeg"
)
@click.option(
"--clean",
"-c",
is_flag=True,
help="After merging delete the original files",
)
@click.option(
"--prompt/--no-prompt",
help="Whether to prompt for anything instead just do the best thing",
default=True,
)
@click.option(
"--force-ffmpeg",
is_flag=True,
help="Force the use of FFmpeg for downloading (supports large variety of streams but slower)",
)
@click.option(
"--hls-use-mpegts",
is_flag=True,
help="Use mpegts for hls streams, resulted in .ts file (useful for some streams: see Docs) (this option forces --force-ffmpeg to be True)",
)
@click.option(
"--hls-use-h264",
is_flag=True,
help="Use H.264 (MP4) for hls streams, resulted in .mp4 file (useful for some streams: see Docs) (this option forces --force-ffmpeg to be True)",
)
@click.pass_obj
def download(config: AppConfig, **options: "Unpack[Options]"):
from rich import print
from rich.progress import Progress
from ...core.exceptions import FastAnimeError
from ...libs.players.player import create_player
from ...libs.providers.anime.params import (
AnimeParams,
SearchParams,
)
from ...libs.providers.anime.provider import create_provider
from ...libs.selectors.selector import create_selector
provider = create_provider(config.general.provider)
player = create_player(config)
selector = create_selector(config)
anime_titles = options["anime_title"]
print(f"[green bold]Streaming:[/] {anime_titles}")
for anime_title in anime_titles:
# ---- search for anime ----
print(f"[green bold]Searching for:[/] {anime_title}")
with Progress() as progress:
progress.add_task("Fetching Search Results...", total=None)
search_results = provider.search(
SearchParams(
query=anime_title, translation_type=config.stream.translation_type
)
)
if not search_results:
raise FastAnimeError("No results were found matching your query")
_search_results = {
search_result.title: search_result
for search_result in search_results.results
}
selected_anime_title = selector.choose(
"Select Anime", list(_search_results.keys())
)
if not selected_anime_title:
raise FastAnimeError("No title selected")
anime_result = _search_results[selected_anime_title]
# ---- fetch selected anime ----
with Progress() as progress:
progress.add_task("Fetching Anime...", total=None)
anime = provider.get(AnimeParams(id=anime_result.id))
if not anime:
raise FastAnimeError(f"Failed to fetch anime {anime_result.title}")
episodes_range = []
episodes: list[str] = sorted(
getattr(anime.episodes, config.stream.translation_type), key=float
)
if options["episode_range"]:
if ":" in options["episode_range"]:
ep_range_tuple = options["episode_range"].split(":")
if len(ep_range_tuple) == 3 and all(ep_range_tuple):
episodes_start, episodes_end, step = ep_range_tuple
episodes_range = episodes[
int(episodes_start) : int(episodes_end) : int(step)
]
elif len(ep_range_tuple) == 2 and all(ep_range_tuple):
episodes_start, episodes_end = ep_range_tuple
episodes_range = episodes[int(episodes_start) : int(episodes_end)]
else:
episodes_start, episodes_end = ep_range_tuple
if episodes_start.strip():
episodes_range = episodes[int(episodes_start) :]
elif episodes_end.strip():
episodes_range = episodes[: int(episodes_end)]
else:
episodes_range = episodes
else:
episodes_range = episodes[int(options["episode_range"]) :]
episodes_range = iter(episodes_range)
for episode in episodes_range:
download_anime(
config, options, provider, selector, player, anime, episode
)
else:
episode = selector.choose(
"Select Episode",
getattr(anime.episodes, config.stream.translation_type),
)
if not episode:
raise FastAnimeError("No episode selected")
download_anime(config, options, provider, selector, player, anime, episode)
def download_anime(
config: AppConfig,
download_options: "Options",
provider: "BaseAnimeProvider",
selector: "BaseSelector",
player: "BasePlayer",
anime: "Anime",
episode: str,
):
from rich import print
from rich.progress import Progress
from ...core.downloader import DownloadParams, create_downloader
from ...libs.players.params import PlayerParams
from ...libs.providers.anime.params import EpisodeStreamsParams
downloader = create_downloader(config.downloads)
with Progress() as progress:
progress.add_task("Fetching Episode Streams...", total=None)
streams = provider.episode_streams(
EpisodeStreamsParams(
anime_id=anime.id,
episode=episode,
translation_type=config.stream.translation_type,
)
)
if not streams:
raise FastAnimeError(
f"Failed to get streams for anime: {anime.title}, episode: {episode}"
)
if config.stream.server == "TOP":
with Progress() as progress:
progress.add_task("Fetching top server...", total=None)
server = next(streams, None)
if not server:
raise FastAnimeError(
f"Failed to get server for anime: {anime.title}, episode: {episode}"
)
else:
with Progress() as progress:
progress.add_task("Fetching servers", total=None)
servers = {server.name: server for server in streams}
servers_names = list(servers.keys())
if config.stream.server in servers_names:
server = servers[config.stream.server]
else:
server_name = selector.choose("Select Server", servers_names)
if not server_name:
raise FastAnimeError("Server not selected")
server = servers[server_name]
stream_link = server.links[0].link
if not stream_link:
raise FastAnimeError(
f"Failed to get stream link for anime: {anime.title}, episode: {episode}"
)
print(f"[green bold]Now Downloading:[/] {anime.title} Episode: {episode}")
downloader.download(
DownloadParams(
url=stream_link,
anime_title=anime.title,
episode_title=f"{anime.title}; Episode {episode}",
subtitles=[sub.url for sub in server.subtitles],
headers=server.headers,
vid_format=config.stream.ytdlp_format,
force_unknown_ext=download_options["force_unknown_ext"],
verbose=download_options["verbose"],
hls_use_mpegts=download_options["hls_use_mpegts"],
hls_use_h264=download_options["hls_use_h264"],
silent=download_options["silent"],
)
)