feat: cli search

This commit is contained in:
Benexl
2025-07-13 01:39:52 +03:00
parent be1babbedc
commit a2da6974fa
4 changed files with 156 additions and 326 deletions

View File

@@ -11,6 +11,7 @@ from .utils.logging import setup_logging
commands = {
"config": ".config",
"search": ".search",
}

View File

@@ -1,3 +1,4 @@
from .config import config
from .search import search
__all__ = ["config"]
__all__ = ["config", "search"]

View File

@@ -1,11 +1,24 @@
from typing import TYPE_CHECKING
import click
from fastanime.core.exceptions import FastAnimeError
from ...core.config import AppConfig
from ..utils.completion_functions import anime_titles_shell_complete
if TYPE_CHECKING:
from ...cli.config import Config
from typing import TypedDict
from typing_extensions import Unpack
from ...libs.players.base import BasePlayer
from ...libs.providers.anime.base import BaseAnimeProvider
from ...libs.providers.anime.types import Anime
from ...libs.selectors.base import BaseSelector
class Options(TypedDict):
anime_title: list[str]
episode_range: str | None
@click.command(
@@ -36,8 +49,7 @@ if TYPE_CHECKING:
""",
)
@click.option(
"--anime-titles",
"--anime_title",
"--anime-title",
"-t",
required=True,
shell_complete=anime_titles_shell_complete,
@@ -50,341 +62,157 @@ if TYPE_CHECKING:
help="A range of episodes to binge (start-end)",
)
@click.pass_obj
def search(config: "Config", anime_titles: str, episode_range: str):
from click import clear
def search(config: AppConfig, **options: "Unpack[Options]"):
from rich import print
from rich.progress import Progress
from thefuzz import fuzz
from ...libs.fzf import fzf
from ...libs.rofi import Rofi
from ..utils.tools import exit_app
from ..utils.utils import fuzzy_inquirer
from ...core.exceptions import FastAnimeError
from ...libs.players.player import create_player
from ...libs.providers.anime.params import (
AnimeParams,
SearchParams,
)
from ...libs.providers.anime.provider import create_provider
from ...libs.selectors.selector import create_selector
if config.manga:
from InquirerPy.prompts.number import NumberPrompt
from yt_dlp.utils import sanitize_filename
provider = create_provider(config.general.provider)
player = create_player(config)
selector = create_selector(config)
from ...MangaProvider import MangaProvider
from ..utils.feh import feh_manga_viewer
from ..utils.icat import icat_manga_viewer
anime_titles = options["anime_title"]
print(f"[green bold]Streaming:[/] {anime_titles}")
for anime_title in anime_titles:
# ---- search for anime ----
print(f"[green bold]Searching for:[/] {anime_title}")
with Progress() as progress:
progress.add_task("Fetching Search Results...", total=None)
search_results = provider.search(
SearchParams(
query=anime_title, translation_type=config.stream.translation_type
)
)
if not search_results:
raise FastAnimeError("No results were found matching your query")
manga_title = anime_titles[0]
manga_provider = MangaProvider()
search_data = manga_provider.search_for_manga(manga_title)
if not search_data:
print("No search results")
exit(1)
search_results = search_data["results"]
search_results_ = {
sanitize_filename(search_result["title"]): search_result
for search_result in search_results
_search_results = {
search_result.title: search_result
for search_result in search_results.results
}
if config.auto_select:
search_result_manga_title = max(
search_results_.keys(),
key=lambda title: fuzz.ratio(title, manga_title),
)
print("[cyan]Auto Selecting:[/] ", search_result_manga_title)
selected_anime_title = selector.choose(
"Select Anime", list(_search_results.keys())
)
if not selected_anime_title:
raise FastAnimeError("No title selected")
anime_result = _search_results[selected_anime_title]
# ---- fetch selected anime ----
with Progress() as progress:
progress.add_task("Fetching Anime...", total=None)
anime = provider.get(AnimeParams(id=anime_result.id))
if not anime:
raise FastAnimeError(f"Failed to fetch anime {anime_result.title}")
episodes_range = []
episodes: list[str] = sorted(
getattr(anime.episodes, config.stream.translation_type), key=float
)
if options["episode_range"]:
if ":" in options["episode_range"]:
ep_range_tuple = options["episode_range"].split(":")
if len(ep_range_tuple) == 3 and all(ep_range_tuple):
episodes_start, episodes_end, step = ep_range_tuple
episodes_range = episodes[
int(episodes_start) : int(episodes_end) : int(step)
]
elif len(ep_range_tuple) == 2 and all(ep_range_tuple):
episodes_start, episodes_end = ep_range_tuple
episodes_range = episodes[int(episodes_start) : int(episodes_end)]
else:
episodes_start, episodes_end = ep_range_tuple
if episodes_start.strip():
episodes_range = episodes[int(episodes_start) :]
elif episodes_end.strip():
episodes_range = episodes[: int(episodes_end)]
else:
episodes_range = episodes
else:
episodes_range = episodes[int(options["episode_range"]) :]
episodes_range = iter(episodes_range)
for episode in episodes_range:
stream_anime(config, provider, selector, player, anime, episode)
else:
choices = list(search_results_.keys())
preview = None
if config.preview:
from ..interfaces.utils import get_fzf_manga_preview
episode = selector.choose(
"Select Episode",
getattr(anime.episodes, config.stream.translation_type),
)
if not episode:
raise FastAnimeError("No episode selected")
stream_anime(config, provider, selector, player, anime, episode)
preview = get_fzf_manga_preview(search_results)
if config.use_fzf:
search_result_manga_title = fzf.run(
choices, "Please Select title", preview=preview
)
elif config.use_rofi:
search_result_manga_title = Rofi.run(choices, "Please Select Title")
else:
search_result_manga_title = fuzzy_inquirer(
choices,
"Please Select Title",
)
anilist_id = search_results_[search_result_manga_title]["id"]
manga_info = manga_provider.get_manga(anilist_id)
if not manga_info:
print("No manga info")
exit(1)
def stream_anime(
config: AppConfig,
provider: "BaseAnimeProvider",
selector: "BaseSelector",
player: "BasePlayer",
anime: "Anime",
episode: str,
):
from rich import print
from rich.progress import Progress
anilist_helper = None
if config.user:
from ...anilist import AniList
from ...libs.players.params import PlayerParams
from ...libs.providers.anime.params import EpisodeStreamsParams
AniList.login_user(config.user["token"])
anilist_helper = AniList
def _manga_viewer():
chapter_number = NumberPrompt("Select a chapter number").execute()
chapter_info = manga_provider.get_chapter_thumbnails(
manga_info["id"], str(chapter_number)
with Progress() as progress:
progress.add_task("Fetching Episode Streams...", total=None)
streams = provider.episode_streams(
EpisodeStreamsParams(
anime_id=anime.id,
episode=episode,
translation_type=config.stream.translation_type,
)
)
if not streams:
raise FastAnimeError(
f"Failed to get streams for anime: {anime.title}, episode: {episode}"
)
if not chapter_info:
print("No chapter info")
input("Enter to retry...")
_manga_viewer()
return
print(
f"[purple bold]Now Reading: [/] {search_result_manga_title} [cyan bold]Chapter:[/] {chapter_info['title']}"
)
if config.manga_viewer == "feh":
feh_manga_viewer(chapter_info["thumbnails"], str(chapter_info["title"]))
elif config.manga_viewer == "icat":
icat_manga_viewer(
chapter_info["thumbnails"], str(chapter_info["title"])
if config.stream.server == "TOP":
with Progress() as progress:
progress.add_task("Fetching top server...", total=None)
server = next(streams, None)
if not server:
raise FastAnimeError(
f"Failed to get server for anime: {anime.title}, episode: {episode}"
)
if anilist_helper:
anilist_helper.update_anime_list(
{"mediaId": anilist_id, "progress": chapter_number}
)
_manga_viewer()
_manga_viewer()
else:
from ...BaseAnimeProvider import BaseAnimeProvider
from ...libs.anime_provider.types import Anime
from ...Utility.data import anime_normalizer
from ..utils.mpv import run_mpv
from ..utils.utils import filter_by_quality, move_preferred_subtitle_lang_to_top
anime_provider = BaseAnimeProvider(config.provider)
anilist_anime_info = None
print(f"[green bold]Streaming:[/] {anime_titles}")
for anime_title in anime_titles:
# ---- search for anime ----
with Progress() as progress:
progress.add_task("Fetching Search Results...", total=None)
search_results = anime_provider.search_for_anime(
anime_title, config.translation_type
)
if not search_results:
print("Search results not found")
input("Enter to retry")
search(config, anime_title, episode_range)
return
search_results = search_results["results"]
if not search_results:
print("Anime not found :cry:")
exit_app()
search_results_ = {
search_result["title"]: search_result
for search_result in search_results
}
if config.auto_select:
search_result_manga_title = max(
search_results_.keys(),
key=lambda title: fuzz.ratio(
anime_normalizer.get(title, title), anime_title
),
)
print("[cyan]Auto Selecting:[/] ", search_result_manga_title)
else:
choices = list(search_results_.keys())
if config.use_fzf:
search_result_manga_title = fzf.run(
choices, "Please Select title", "FastAnime"
)
elif config.use_rofi:
search_result_manga_title = Rofi.run(choices, "Please Select Title")
else:
search_result_manga_title = fuzzy_inquirer(
choices,
"Please Select Title",
)
# ---- fetch selected anime ----
with Progress() as progress:
progress.add_task("Fetching Anime...", total=None)
anime: Anime | None = anime_provider.get_anime(
search_results_[search_result_manga_title]["id"]
)
if not anime:
print("Sth went wring anime no found")
input("Enter to continue...")
search(config, anime_title, episode_range)
return
episodes_range = []
episodes: list[str] = sorted(
anime["availableEpisodesDetail"][config.translation_type], key=float
)
if episode_range:
if ":" in episode_range:
ep_range_tuple = episode_range.split(":")
if len(ep_range_tuple) == 3 and all(ep_range_tuple):
episodes_start, episodes_end, step = ep_range_tuple
episodes_range = episodes[
int(episodes_start) : int(episodes_end) : int(step)
]
elif len(ep_range_tuple) == 2 and all(ep_range_tuple):
episodes_start, episodes_end = ep_range_tuple
episodes_range = episodes[
int(episodes_start) : int(episodes_end)
]
else:
episodes_start, episodes_end = ep_range_tuple
if episodes_start.strip():
episodes_range = episodes[int(episodes_start) :]
elif episodes_end.strip():
episodes_range = episodes[: int(episodes_end)]
else:
episodes_range = episodes
else:
episodes_range = episodes[int(episode_range) :]
episodes_range = iter(episodes_range)
if config.normalize_titles:
from ...libs.common.mini_anilist import get_basic_anime_info_by_title
anilist_anime_info = get_basic_anime_info_by_title(anime["title"])
def stream_anime(anime: "Anime"):
clear()
episode = None
if episodes_range:
try:
episode = next(episodes_range) # pyright:ignore
print(
f"[cyan]Auto selecting:[/] {search_result_manga_title} [cyan]Episode:[/] {episode}"
)
except StopIteration:
print("[green]Completed binge sequence[/]:smile:")
return
if not episode or episode not in episodes:
choices = [*episodes, "end"]
if config.use_fzf:
episode = fzf.run(
choices,
"Select an episode",
header=search_result_manga_title,
)
elif config.use_rofi:
episode = Rofi.run(choices, "Select an episode")
else:
episode = fuzzy_inquirer(
choices,
"Select episode",
)
if episode == "end":
return
# ---- fetch streams ----
with Progress() as progress:
progress.add_task("Fetching Episode Streams...", total=None)
streams = anime_provider.get_episode_streams(
anime["id"], episode, config.translation_type
)
if not streams:
print("Failed to get streams")
return
try:
# ---- fetch servers ----
if config.server == "top":
with Progress() as progress:
progress.add_task("Fetching top server...", total=None)
server = next(streams, None)
if not server:
print("Sth went wrong when fetching the episode")
input("Enter to continue")
stream_anime(anime)
return
stream_link = filter_by_quality(config.quality, server["links"])
if not stream_link:
print("Quality not found")
input("Enter to continue")
stream_anime(anime)
return
link = stream_link["link"]
subtitles = server["subtitles"]
stream_headers = server["headers"]
episode_title = server["episode_title"]
else:
with Progress() as progress:
progress.add_task("Fetching servers", total=None)
# prompt for server selection
servers = {server["server"]: server for server in streams}
servers_names = list(servers.keys())
if config.server in servers_names:
server = config.server
elif config.use_fzf:
server = fzf.run(servers_names, "Select an link")
elif config.use_rofi:
server = Rofi.run(servers_names, "Select an link")
else:
server = fuzzy_inquirer(
servers_names,
"Select link",
)
stream_link = filter_by_quality(
config.quality, servers[server]["links"]
)
if not stream_link:
print("Quality not found")
input("Enter to continue")
stream_anime(anime)
return
link = stream_link["link"]
stream_headers = servers[server]["headers"]
subtitles = servers[server]["subtitles"]
episode_title = servers[server]["episode_title"]
selected_anime_title = search_result_manga_title
if anilist_anime_info:
selected_anime_title = (
anilist_anime_info["title"][config.preferred_language]
or anilist_anime_info["title"]["romaji"]
or anilist_anime_info["title"]["english"]
)
import re
for episode_detail in anilist_anime_info["episodes"]:
if re.match(f"Episode {episode} ", episode_detail["title"]):
episode_title = episode_detail["title"]
break
print(
f"[purple]Now Playing:[/] {selected_anime_title} Episode {episode}"
)
subtitles = move_preferred_subtitle_lang_to_top(
subtitles, config.sub_lang
)
if config.sync_play:
from ..utils.syncplay import SyncPlayer
SyncPlayer(
link,
episode_title,
headers=stream_headers,
subtitles=subtitles,
)
else:
run_mpv(
link,
episode_title,
headers=stream_headers,
subtitles=subtitles,
player=config.player,
)
except IndexError as e:
print(e)
input("Enter to continue")
stream_anime(anime)
stream_anime(anime)
with Progress() as progress:
progress.add_task("Fetching servers", total=None)
servers = {server.name: server for server in streams}
servers_names = list(servers.keys())
if config.stream.server in servers_names:
server = servers[config.stream.server]
else:
server_name = selector.choose("Select Server", servers_names)
if not server_name:
raise FastAnimeError("Server not selected")
server = servers[server_name]
stream_link = server.links[0].link
if not stream_link:
raise FastAnimeError(
f"Failed to get stream link for anime: {anime.title}, episode: {episode}"
)
print(f"[green bold]Now Streaming:[/] {anime.title} Episode: {episode}")
player.play(
PlayerParams(
url=stream_link,
title=f"{anime.title}; Episode {episode}",
subtitles=server.subtitles, # type:ignore
headers=server.headers,
)
)

View File

@@ -6,12 +6,11 @@ PLAYERS = ["mpv", "vlc", "syncplay"]
class PlayerFactory:
@staticmethod
def create(player_name: str, config: AppConfig) -> BasePlayer:
def create(config: AppConfig) -> BasePlayer:
"""
Factory method to create a player instance based on its name.
Args:
player_name: The name of the player (e.g., 'mpv', 'vlc').
config: The full application configuration object.
Returns:
@@ -20,6 +19,7 @@ class PlayerFactory:
Raises:
ValueError: If the player_name is not supported.
"""
player_name = config.stream.player
if player_name not in PLAYERS:
raise ValueError(