Compare commits

..

4 Commits

Author SHA1 Message Date
Benex254
6346ea7343 docs: update readme 2024-08-23 11:40:08 +03:00
Benex254
32de01047f chore:bump version 2024-08-23 11:39:57 +03:00
Benex254
35c7f81afb fix: no chapter title 2024-08-23 11:39:45 +03:00
Benex254
2dbbb1c4df feat: add experimental manga support 2024-08-23 11:19:25 +03:00
19 changed files with 763 additions and 436 deletions

View File

@@ -177,6 +177,7 @@ The only required external dependency, unless you won't be streaming, is [MPV](h
- [ani-skip](https://github.com/synacktraa/ani-skip) used for skipping the opening and ending theme songs - [ani-skip](https://github.com/synacktraa/ani-skip) used for skipping the opening and ending theme songs
- [ffmpegthumbnailer](https://github.com/dirkvdb/ffmpegthumbnailer) used for local previews of downloaded anime - [ffmpegthumbnailer](https://github.com/dirkvdb/ffmpegthumbnailer) used for local previews of downloaded anime
- [syncplay](https://syncplay.pl/) to enable watch together. - [syncplay](https://syncplay.pl/) to enable watch together.
- [feh]() used in manga mode
## Usage ## Usage
@@ -239,6 +240,7 @@ Available options for the fastanime include:
- `--sync-play` or `-sp` use syncplay for streaming anime so you can watch with your friends - `--sync-play` or `-sp` use syncplay for streaming anime so you can watch with your friends
- `--sub-lang <en/or any other common shortform for country>` regex is used to determine the appropriate. Only works when provider is aniwatch. - `--sub-lang <en/or any other common shortform for country>` regex is used to determine the appropriate. Only works when provider is aniwatch.
- `--normalize-titles/--no-normalize-titles` whether to normalize provider titles - `--normalize-titles/--no-normalize-titles` whether to normalize provider titles
- `--manga` toggle experimental manga mode
Example usage of the above options Example usage of the above options
@@ -259,6 +261,9 @@ fastanime --icons --preview --fzf anilist
# use icons with default ui # use icons with default ui
fastanime --icons --default anilist fastanime --icons --default anilist
# viewing manga
fastanime --manga search -t <manga-title>
``` ```
#### The anilist command :fire: :fire: :fire: #### The anilist command :fire: :fire: :fire:

105
fastanime/MangaProvider.py Normal file
View File

@@ -0,0 +1,105 @@
"""An abstraction over all providers offering added features with a simple and well typed api
[TODO:description]
"""
import importlib
import logging
from typing import TYPE_CHECKING
from .libs.manga_provider import manga_sources
if TYPE_CHECKING:
pass
logger = logging.getLogger(__name__)
class MangaProvider:
"""Class that manages all anime sources adding some extra functionality to them.
Attributes:
PROVIDERS: [TODO:attribute]
provider: [TODO:attribute]
provider: [TODO:attribute]
dynamic: [TODO:attribute]
retries: [TODO:attribute]
manga_provider: [TODO:attribute]
"""
PROVIDERS = list(manga_sources.keys())
provider = PROVIDERS[0]
def __init__(self, provider="mangadex", dynamic=False, retries=0) -> None:
self.provider = provider
self.dynamic = dynamic
self.retries = retries
self.lazyload_provider(self.provider)
def lazyload_provider(self, provider):
"""updates the current provider being used"""
_, anime_provider_cls_name = manga_sources[provider].split(".", 1)
package = f"fastanime.libs.manga_provider.{provider}"
provider_api = importlib.import_module(".api", package)
manga_provider = getattr(provider_api, anime_provider_cls_name)
self.manga_provider = manga_provider()
def search_for_manga(
self,
user_query,
nsfw=True,
unknown=True,
):
"""core abstraction over all providers search functionality
Args:
user_query ([TODO:parameter]): [TODO:description]
translation_type ([TODO:parameter]): [TODO:description]
nsfw ([TODO:parameter]): [TODO:description]
manga_provider ([TODO:parameter]): [TODO:description]
anilist_obj: [TODO:description]
Returns:
[TODO:return]
"""
manga_provider = self.manga_provider
try:
results = manga_provider.search_for_manga(user_query, nsfw, unknown)
except Exception as e:
logger.error(e)
results = None
return results
def get_manga(
self,
anime_id: str,
):
"""core abstraction over getting info of an anime from all providers
Args:
anime_id: [TODO:description]
anilist_obj: [TODO:description]
Returns:
[TODO:return]
"""
manga_provider = self.manga_provider
try:
results = manga_provider.get_manga(anime_id)
except Exception as e:
logger.error(e)
results = None
return results
def get_chapter_thumbnails(
self,
manga_id: str,
chapter: str,
):
manga_provider = self.manga_provider
try:
results = manga_provider.get_chapter_thumbnails(manga_id, chapter)
except Exception as e:
logger.error(e)
results = None
return results # pyright:ignore

View File

@@ -6,7 +6,7 @@ if sys.version_info < (3, 10):
) # noqa: F541 ) # noqa: F541
__version__ = "v2.4.2" __version__ = "v2.4.3"
APP_NAME = "FastAnime" APP_NAME = "FastAnime"
AUTHOR = "Benex254" AUTHOR = "Benex254"

View File

@@ -40,6 +40,7 @@ signal.signal(signal.SIGINT, handle_exit)
short_help="Stream Anime", short_help="Stream Anime",
) )
@click.version_option(__version__, "--version") @click.version_option(__version__, "--version")
@click.option("--manga", "-m", help="Enable manga mode", is_flag=True)
@click.option("--log", help="Allow logging to stdout", is_flag=True) @click.option("--log", help="Allow logging to stdout", is_flag=True)
@click.option("--log-file", help="Allow logging to a file", is_flag=True) @click.option("--log-file", help="Allow logging to a file", is_flag=True)
@click.option("--rich-traceback", help="Use rich to output tracebacks", is_flag=True) @click.option("--rich-traceback", help="Use rich to output tracebacks", is_flag=True)
@@ -150,6 +151,7 @@ signal.signal(signal.SIGINT, handle_exit)
@click.pass_context @click.pass_context
def run_cli( def run_cli(
ctx: click.Context, ctx: click.Context,
manga,
log, log,
log_file, log_file,
rich_traceback, rich_traceback,
@@ -183,6 +185,7 @@ def run_cli(
from .config import Config from .config import Config
ctx.obj = Config() ctx.obj = Config()
ctx.obj.manga = manga
if log: if log:
import logging import logging

View File

@@ -56,26 +56,19 @@ def grab(
from thefuzz import fuzz from thefuzz import fuzz
from ...AnimeProvider import AnimeProvider
logger = getLogger(__name__) logger = getLogger(__name__)
if config.manga:
manga_title = anime_titles[0]
from ...MangaProvider import MangaProvider
anime_provider = AnimeProvider(config.provider) manga_provider = MangaProvider()
search_data = manga_provider.search_for_manga(manga_title)
grabbed_animes = [] if not search_data:
for anime_title in anime_titles:
# ---- search for anime ----
search_results = anime_provider.search_for_anime(
anime_title, translation_type=config.translation_type
)
if not search_results:
exit(1) exit(1)
if search_results_only: if search_results_only:
# grab only search results skipping all lines after this print(json.dumps(search_data))
grabbed_animes.append(search_results) exit(0)
continue search_results = search_data["results"]
search_results = search_results["results"]
if not search_results: if not search_results:
logger.error("no results for your search") logger.error("no results for your search")
exit(1) exit(1)
@@ -83,83 +76,133 @@ def grab(
search_result["title"]: search_result for search_result in search_results search_result["title"]: search_result for search_result in search_results
} }
search_result = max( search_result_anime_title = max(
search_results_.keys(), key=lambda title: fuzz.ratio(title, anime_title) search_results_.keys(), key=lambda title: fuzz.ratio(title, anime_titles[0])
) )
manga_info = manga_provider.get_manga(
# ---- fetch anime ---- search_results_[search_result_anime_title]["id"]
anime = anime_provider.get_anime(search_results_[search_result]["id"]) )
if not anime: if not manga_info:
exit(1) return
if anime_info_only: if anime_info_only:
# grab only the anime data skipping all lines after this print(json.dumps(manga_info))
grabbed_animes.append(anime) exit(0)
continue
episodes = sorted( chapter_info = manga_provider.get_chapter_thumbnails(
anime["availableEpisodesDetail"][config.translation_type], key=float manga_info["id"], str(episode_range)
) )
if not chapter_info:
exit(1)
print(json.dumps(chapter_info))
# where the magic happens
if episode_range:
if ":" in episode_range:
ep_range_tuple = episode_range.split(":")
if len(ep_range_tuple) == 2 and all(ep_range_tuple):
episodes_start, episodes_end = ep_range_tuple
episodes_range = episodes[int(episodes_start) : int(episodes_end)]
elif len(ep_range_tuple) == 3 and all(ep_range_tuple):
episodes_start, episodes_end, step = ep_range_tuple
episodes_range = episodes[
int(episodes_start) : int(episodes_end) : int(step)
]
else:
episodes_start, episodes_end = ep_range_tuple
if episodes_start.strip():
episodes_range = episodes[int(episodes_start) :]
elif episodes_end.strip():
episodes_range = episodes[: int(episodes_end)]
else:
episodes_range = episodes
else:
episodes_range = episodes[int(episode_range) :]
else:
episodes_range = sorted(episodes, key=float)
if not episode_streams_only:
grabbed_anime = dict(anime)
grabbed_anime["requested_episodes"] = episodes_range
grabbed_anime["translation_type"] = config.translation_type
grabbed_anime["episodes_streams"] = {}
else:
grabbed_anime = {}
# lets download em
for episode in episodes_range:
try:
if episode not in episodes:
continue
streams = anime_provider.get_episode_streams(
anime, episode, config.translation_type
)
if not streams:
continue
episode_streams = {server["server"]: server for server in streams}
if episode_streams_only:
grabbed_anime[episode] = episode_streams
else:
grabbed_anime["episodes_streams"][ # pyright:ignore
episode
] = episode_streams
except Exception as e:
logger.error(e)
# grab the full data for single title and appen to final result or episode streams
grabbed_animes.append(grabbed_anime)
# print out the final result either {} or [] depending if more than one title os requested
if len(grabbed_animes) == 1:
print(json.dumps(grabbed_animes[0]))
else: else:
print(json.dumps(grabbed_animes)) from ...AnimeProvider import AnimeProvider
anime_provider = AnimeProvider(config.provider)
grabbed_animes = []
for anime_title in anime_titles:
# ---- search for anime ----
search_results = anime_provider.search_for_anime(
anime_title, translation_type=config.translation_type
)
if not search_results:
exit(1)
if search_results_only:
# grab only search results skipping all lines after this
grabbed_animes.append(search_results)
continue
search_results = search_results["results"]
if not search_results:
logger.error("no results for your search")
exit(1)
search_results_ = {
search_result["title"]: search_result
for search_result in search_results
}
search_result_anime_title = max(
search_results_.keys(), key=lambda title: fuzz.ratio(title, anime_title)
)
# ---- fetch anime ----
anime = anime_provider.get_anime(
search_results_[search_result_anime_title]["id"]
)
if not anime:
exit(1)
if anime_info_only:
# grab only the anime data skipping all lines after this
grabbed_animes.append(anime)
continue
episodes = sorted(
anime["availableEpisodesDetail"][config.translation_type], key=float
)
# where the magic happens
if episode_range:
if ":" in episode_range:
ep_range_tuple = episode_range.split(":")
if len(ep_range_tuple) == 2 and all(ep_range_tuple):
episodes_start, episodes_end = ep_range_tuple
episodes_range = episodes[
int(episodes_start) : int(episodes_end)
]
elif len(ep_range_tuple) == 3 and all(ep_range_tuple):
episodes_start, episodes_end, step = ep_range_tuple
episodes_range = episodes[
int(episodes_start) : int(episodes_end) : int(step)
]
else:
episodes_start, episodes_end = ep_range_tuple
if episodes_start.strip():
episodes_range = episodes[int(episodes_start) :]
elif episodes_end.strip():
episodes_range = episodes[: int(episodes_end)]
else:
episodes_range = episodes
else:
episodes_range = episodes[int(episode_range) :]
else:
episodes_range = sorted(episodes, key=float)
if not episode_streams_only:
grabbed_anime = dict(anime)
grabbed_anime["requested_episodes"] = episodes_range
grabbed_anime["translation_type"] = config.translation_type
grabbed_anime["episodes_streams"] = {}
else:
grabbed_anime = {}
# lets download em
for episode in episodes_range:
try:
if episode not in episodes:
continue
streams = anime_provider.get_episode_streams(
anime, episode, config.translation_type
)
if not streams:
continue
episode_streams = {server["server"]: server for server in streams}
if episode_streams_only:
grabbed_anime[episode] = episode_streams
else:
grabbed_anime["episodes_streams"][ # pyright:ignore
episode
] = episode_streams
except Exception as e:
logger.error(e)
# grab the full data for single title and appen to final result or episode streams
grabbed_animes.append(grabbed_anime)
# print out the final result either {} or [] depending if more than one title os requested
if len(grabbed_animes) == 1:
print(json.dumps(grabbed_animes[0]))
else:
print(json.dumps(grabbed_animes))

View File

@@ -1,8 +1,12 @@
from typing import TYPE_CHECKING
import click import click
from ...cli.config import Config
from ..completion_functions import anime_titles_shell_complete from ..completion_functions import anime_titles_shell_complete
if TYPE_CHECKING:
from ...cli.config import Config
@click.command( @click.command(
help="This subcommand directly interacts with the provider to enable basic streaming. Useful for binging anime.", help="This subcommand directly interacts with the provider to enable basic streaming. Useful for binging anime.",
@@ -23,240 +27,335 @@ from ..completion_functions import anime_titles_shell_complete
help="A range of episodes to binge (start-end)", help="A range of episodes to binge (start-end)",
) )
@click.pass_obj @click.pass_obj
def search(config: Config, anime_titles: str, episode_range: str): def search(config: "Config", anime_titles: str, episode_range: str):
from click import clear from click import clear
from rich import print from rich import print
from rich.progress import Progress from rich.progress import Progress
from thefuzz import fuzz from thefuzz import fuzz
from ...AnimeProvider import AnimeProvider
from ...libs.anime_provider.types import Anime
from ...libs.fzf import fzf from ...libs.fzf import fzf
from ...libs.rofi import Rofi from ...libs.rofi import Rofi
from ...Utility.data import anime_normalizer
from ..utils.mpv import run_mpv
from ..utils.tools import exit_app from ..utils.tools import exit_app
from ..utils.utils import ( from ..utils.utils import fuzzy_inquirer
filter_by_quality,
fuzzy_inquirer,
move_preferred_subtitle_lang_to_top,
)
anime_provider = AnimeProvider(config.provider) if config.manga:
anilist_anime_info = None from InquirerPy.prompts.number import NumberPrompt
from yt_dlp.utils import sanitize_filename
from ...MangaProvider import MangaProvider
from ..utils.feh import feh_manga_viewer
manga_title = anime_titles[0]
manga_provider = MangaProvider()
search_data = manga_provider.search_for_manga(manga_title)
if not search_data:
print("No search results")
exit(1)
search_results = search_data["results"]
print(f"[green bold]Streaming:[/] {anime_titles}")
for anime_title in anime_titles:
# ---- search for anime ----
with Progress() as progress:
progress.add_task("Fetching Search Results...", total=None)
search_results = anime_provider.search_for_anime(
anime_title, config.translation_type
)
if not search_results:
print("Search results not found")
input("Enter to retry")
search(config, anime_title, episode_range)
return
search_results = search_results["results"]
if not search_results:
print("Anime not found :cry:")
exit_app()
search_results_ = { search_results_ = {
search_result["title"]: search_result for search_result in search_results sanitize_filename(search_result["title"]): search_result
for search_result in search_results
} }
if config.auto_select: if config.auto_select:
search_result = max( search_result_manga_title = max(
search_results_.keys(), search_results_.keys(),
key=lambda title: fuzz.ratio( key=lambda title: fuzz.ratio(title, manga_title),
anime_normalizer.get(title, title), anime_title
),
) )
print("[cyan]Auto Selecting:[/] ", search_result) print("[cyan]Auto Selecting:[/] ", search_result_manga_title)
else: else:
choices = list(search_results_.keys()) choices = list(search_results_.keys())
preview = None
if config.preview:
from ..interfaces.utils import get_fzf_manga_preview
preview = get_fzf_manga_preview(search_results)
if config.use_fzf: if config.use_fzf:
search_result = fzf.run(choices, "Please Select title: ", "FastAnime") search_result_manga_title = fzf.run(
choices, "Please Select title: ", preview=preview
)
elif config.use_rofi: elif config.use_rofi:
search_result = Rofi.run(choices, "Please Select Title") search_result_manga_title = Rofi.run(choices, "Please Select Title")
else: else:
search_result = fuzzy_inquirer( search_result_manga_title = fuzzy_inquirer(
choices, choices,
"Please Select Title", "Please Select Title",
) )
# ---- fetch selected anime ---- anilist_id = search_results_[search_result_manga_title]["id"]
with Progress() as progress: manga_info = manga_provider.get_manga(anilist_id)
progress.add_task("Fetching Anime...", total=None) if not manga_info:
anime: Anime | None = anime_provider.get_anime( print("No manga info")
search_results_[search_result]["id"] exit(1)
anilist_helper = None
if config.user:
from ...anilist import AniList
AniList.login_user(config.user["token"])
anilist_helper = AniList
def _manga_viewer():
chapter_number = NumberPrompt("Select a chapter number").execute()
chapter_info = manga_provider.get_chapter_thumbnails(
manga_info["id"], str(chapter_number)
) )
if not anime: if not chapter_info:
print("Sth went wring anime no found") print("No chapter info")
input("Enter to continue...") input("Enter to retry...")
search(config, anime_title, episode_range) _manga_viewer()
return return
episodes_range = [] print(
episodes: list[str] = sorted( f"[purple bold]Now Reading: [/] {search_result_manga_title} [cyan bold]Chapter:[/] {chapter_info['title']}"
anime["availableEpisodesDetail"][config.translation_type], key=float )
) feh_manga_viewer(chapter_info["thumbnails"], str(chapter_info["title"]))
if episode_range: if anilist_helper:
if ":" in episode_range: anilist_helper.update_anime_list(
ep_range_tuple = episode_range.split(":") {"mediaId": anilist_id, "progress": chapter_number}
if len(ep_range_tuple) == 3 and all(ep_range_tuple): )
episodes_start, episodes_end, step = ep_range_tuple _manga_viewer()
episodes_range = episodes[
int(episodes_start) : int(episodes_end) : int(step) _manga_viewer()
] else:
from ...AnimeProvider import AnimeProvider
from ...libs.anime_provider.types import Anime
from ...Utility.data import anime_normalizer
from ..utils.mpv import run_mpv
from ..utils.utils import filter_by_quality, move_preferred_subtitle_lang_to_top
anime_provider = AnimeProvider(config.provider)
anilist_anime_info = None
print(f"[green bold]Streaming:[/] {anime_titles}")
for anime_title in anime_titles:
# ---- search for anime ----
with Progress() as progress:
progress.add_task("Fetching Search Results...", total=None)
search_results = anime_provider.search_for_anime(
anime_title, config.translation_type
)
if not search_results:
print("Search results not found")
input("Enter to retry")
search(config, anime_title, episode_range)
return
search_results = search_results["results"]
if not search_results:
print("Anime not found :cry:")
exit_app()
search_results_ = {
search_result["title"]: search_result
for search_result in search_results
}
if config.auto_select:
search_result_manga_title = max(
search_results_.keys(),
key=lambda title: fuzz.ratio(
anime_normalizer.get(title, title), anime_title
),
)
print("[cyan]Auto Selecting:[/] ", search_result_manga_title)
elif len(ep_range_tuple) == 2 and all(ep_range_tuple):
episodes_start, episodes_end = ep_range_tuple
episodes_range = episodes[int(episodes_start) : int(episodes_end)]
else:
episodes_start, episodes_end = ep_range_tuple
if episodes_start.strip():
episodes_range = episodes[int(episodes_start) :]
elif episodes_end.strip():
episodes_range = episodes[: int(episodes_end)]
else:
episodes_range = episodes
else: else:
episodes_range = episodes[int(episode_range) :] choices = list(search_results_.keys())
episodes_range = iter(episodes_range)
if config.normalize_titles:
from ...libs.common.mini_anilist import get_basic_anime_info_by_title
anilist_anime_info = get_basic_anime_info_by_title(anime["title"])
def stream_anime():
clear()
episode = None
if episodes_range:
try:
episode = next(episodes_range) # pyright:ignore
print(
f"[cyan]Auto selecting:[/] {search_result} [cyan]Episode:[/] {episode}"
)
except StopIteration:
print("[green]Completed binge sequence[/]:smile:")
return
if not episode or episode not in episodes:
choices = [*episodes, "end"]
if config.use_fzf: if config.use_fzf:
episode = fzf.run( search_result_manga_title = fzf.run(
choices, "Select an episode: ", header=search_result choices, "Please Select title: ", "FastAnime"
) )
elif config.use_rofi: elif config.use_rofi:
episode = Rofi.run(choices, "Select an episode") search_result_manga_title = Rofi.run(choices, "Please Select Title")
else: else:
episode = fuzzy_inquirer( search_result_manga_title = fuzzy_inquirer(
choices, choices,
"Select episode", "Please Select Title",
) )
if episode == "end":
return
# ---- fetch streams ---- # ---- fetch selected anime ----
with Progress() as progress: with Progress() as progress:
progress.add_task("Fetching Episode Streams...", total=None) progress.add_task("Fetching Anime...", total=None)
streams = anime_provider.get_episode_streams( anime: Anime | None = anime_provider.get_anime(
anime, episode, config.translation_type search_results_[search_result_manga_title]["id"]
) )
if not streams:
print("Failed to get streams") if not anime:
print("Sth went wring anime no found")
input("Enter to continue...")
search(config, anime_title, episode_range)
return
episodes_range = []
episodes: list[str] = sorted(
anime["availableEpisodesDetail"][config.translation_type], key=float
)
if episode_range:
if ":" in episode_range:
ep_range_tuple = episode_range.split(":")
if len(ep_range_tuple) == 3 and all(ep_range_tuple):
episodes_start, episodes_end, step = ep_range_tuple
episodes_range = episodes[
int(episodes_start) : int(episodes_end) : int(step)
]
elif len(ep_range_tuple) == 2 and all(ep_range_tuple):
episodes_start, episodes_end = ep_range_tuple
episodes_range = episodes[
int(episodes_start) : int(episodes_end)
]
else:
episodes_start, episodes_end = ep_range_tuple
if episodes_start.strip():
episodes_range = episodes[int(episodes_start) :]
elif episodes_end.strip():
episodes_range = episodes[: int(episodes_end)]
else:
episodes_range = episodes
else:
episodes_range = episodes[int(episode_range) :]
episodes_range = iter(episodes_range)
if config.normalize_titles:
from ...libs.common.mini_anilist import get_basic_anime_info_by_title
anilist_anime_info = get_basic_anime_info_by_title(anime["title"])
def stream_anime():
clear()
episode = None
if episodes_range:
try:
episode = next(episodes_range) # pyright:ignore
print(
f"[cyan]Auto selecting:[/] {search_result_manga_title} [cyan]Episode:[/] {episode}"
)
except StopIteration:
print("[green]Completed binge sequence[/]:smile:")
return
if not episode or episode not in episodes:
choices = [*episodes, "end"]
if config.use_fzf:
episode = fzf.run(
choices,
"Select an episode: ",
header=search_result_manga_title,
)
elif config.use_rofi:
episode = Rofi.run(choices, "Select an episode")
else:
episode = fuzzy_inquirer(
choices,
"Select episode",
)
if episode == "end":
return return
try: # ---- fetch streams ----
# ---- fetch servers ---- with Progress() as progress:
if config.server == "top": progress.add_task("Fetching Episode Streams...", total=None)
with Progress() as progress: streams = anime_provider.get_episode_streams(
progress.add_task("Fetching top server...", total=None) anime, episode, config.translation_type
server = next(streams, None) )
if not server: if not streams:
print("Sth went wrong when fetching the episode") print("Failed to get streams")
return
try:
# ---- fetch servers ----
if config.server == "top":
with Progress() as progress:
progress.add_task("Fetching top server...", total=None)
server = next(streams, None)
if not server:
print("Sth went wrong when fetching the episode")
input("Enter to continue")
stream_anime()
return
stream_link = filter_by_quality(config.quality, server["links"])
if not stream_link:
print("Quality not found")
input("Enter to continue") input("Enter to continue")
stream_anime() stream_anime()
return return
stream_link = filter_by_quality(config.quality, server["links"]) link = stream_link["link"]
if not stream_link: subtitles = server["subtitles"]
print("Quality not found") stream_headers = server["headers"]
input("Enter to continue") episode_title = server["episode_title"]
stream_anime()
return
link = stream_link["link"]
subtitles = server["subtitles"]
stream_headers = server["headers"]
episode_title = server["episode_title"]
else:
with Progress() as progress:
progress.add_task("Fetching servers", total=None)
# prompt for server selection
servers = {server["server"]: server for server in streams}
servers_names = list(servers.keys())
if config.server in servers_names:
server = config.server
else: else:
if config.use_fzf: with Progress() as progress:
server = fzf.run(servers_names, "Select an link: ") progress.add_task("Fetching servers", total=None)
elif config.use_rofi: # prompt for server selection
server = Rofi.run(servers_names, "Select an link") servers = {server["server"]: server for server in streams}
servers_names = list(servers.keys())
if config.server in servers_names:
server = config.server
else: else:
server = fuzzy_inquirer( if config.use_fzf:
servers_names, server = fzf.run(servers_names, "Select an link: ")
"Select link", elif config.use_rofi:
) server = Rofi.run(servers_names, "Select an link")
stream_link = filter_by_quality( else:
config.quality, servers[server]["links"] server = fuzzy_inquirer(
) servers_names,
if not stream_link: "Select link",
print("Quality not found") )
input("Enter to continue") stream_link = filter_by_quality(
stream_anime() config.quality, servers[server]["links"]
return )
link = stream_link["link"] if not stream_link:
stream_headers = servers[server]["headers"] print("Quality not found")
subtitles = servers[server]["subtitles"] input("Enter to continue")
episode_title = servers[server]["episode_title"] stream_anime()
return
link = stream_link["link"]
stream_headers = servers[server]["headers"]
subtitles = servers[server]["subtitles"]
episode_title = servers[server]["episode_title"]
selected_anime_title = search_result selected_anime_title = search_result_manga_title
if anilist_anime_info: if anilist_anime_info:
selected_anime_title = ( selected_anime_title = (
anilist_anime_info["title"][config.preferred_language] anilist_anime_info["title"][config.preferred_language]
or anilist_anime_info["title"]["romaji"] or anilist_anime_info["title"]["romaji"]
or anilist_anime_info["title"]["english"] or anilist_anime_info["title"]["english"]
) )
import re import re
for episode_detail in anilist_anime_info["episodes"]: for episode_detail in anilist_anime_info["episodes"]:
if re.match(f"Episode {episode} ", episode_detail["title"]): if re.match(f"Episode {episode} ", episode_detail["title"]):
episode_title = episode_detail["title"] episode_title = episode_detail["title"]
break break
print( print(
f"[purple]Now Playing:[/] {selected_anime_title} Episode {episode}" f"[purple]Now Playing:[/] {selected_anime_title} Episode {episode}"
) )
subtitles = move_preferred_subtitle_lang_to_top( subtitles = move_preferred_subtitle_lang_to_top(
subtitles, config.sub_lang subtitles, config.sub_lang
) )
if config.sync_play: if config.sync_play:
from ..utils.syncplay import SyncPlayer from ..utils.syncplay import SyncPlayer
SyncPlayer(
link,
episode_title,
headers=stream_headers,
subtitles=subtitles,
)
else:
run_mpv(
link,
episode_title,
headers=stream_headers,
subtitles=subtitles,
)
except IndexError as e:
print(e)
input("Enter to continue")
stream_anime()
SyncPlayer(
link, episode_title, headers=stream_headers, subtitles=subtitles
)
else:
run_mpv(
link, episode_title, headers=stream_headers, subtitles=subtitles
)
except IndexError as e:
print(e)
input("Enter to continue")
stream_anime() stream_anime()
stream_anime()

View File

@@ -55,6 +55,7 @@ class Config(object):
user: [TODO:attribute] user: [TODO:attribute]
""" """
manga = False
sync_play = False sync_play = False
anime_list: list anime_list: list
watch_history: dict watch_history: dict

View File

@@ -7,7 +7,7 @@ import textwrap
from threading import Thread from threading import Thread
import requests import requests
from yt_dlp.utils import clean_html from yt_dlp.utils import clean_html, sanitize_filename
from ...constants import APP_CACHE_DIR from ...constants import APP_CACHE_DIR
from ...libs.anilist.types import AnilistBaseMediaDataSchema from ...libs.anilist.types import AnilistBaseMediaDataSchema
@@ -168,6 +168,63 @@ def get_rofi_icons(
logger.error("%r generated an exception: %s" % (url, e)) logger.error("%r generated an exception: %s" % (url, e))
# get rofi icons
def get_fzf_manga_preview(manga_results, workers=None, wait=False):
"""A helper function to make sure that the images are downloaded so they can be used as icons
Args:
titles (list[str]): sanitized titles of the anime; NOTE: its important that they are sanitized since they are used as the filenames of the images
workers ([TODO:parameter]): Number of threads to use to download the images; defaults to as many as possible
anilist_results: the anilist results from an anilist action
"""
def _worker():
# use concurrency to download the images as fast as possible
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
# load the jobs
future_to_url = {}
for manga in manga_results:
image_url = manga["poster"]
future_to_url[
executor.submit(
save_image_from_url,
image_url,
sanitize_filename(manga["title"]),
)
] = image_url
# execute the jobs
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
future.result()
except Exception as e:
logger.error("%r generated an exception: %s" % (url, e))
background_worker = Thread(
target=_worker,
)
# ensure images and info exists
background_worker.daemon = True
background_worker.start()
# the preview script is in bash so making sure fzf doesnt use any other shell lang to process the preview script
os.environ["SHELL"] = shutil.which("bash") or "bash"
preview = """
%s
if [ -s %s/{} ]; then fzf-preview %s/{}
else echo Loading...
fi
""" % (
fzf_preview,
IMAGES_CACHE_DIR,
IMAGES_CACHE_DIR,
)
if wait:
background_worker.join()
return preview
# get rofi icons # get rofi icons
def get_fzf_episode_preview( def get_fzf_episode_preview(
anilist_result: AnilistBaseMediaDataSchema, episodes, workers=None, wait=False anilist_result: AnilistBaseMediaDataSchema, episodes, workers=None, wait=False

View File

@@ -0,0 +1,12 @@
import shutil
import subprocess
from sys import exit
def feh_manga_viewer(image_links: list[str], window_title: str):
FEH_EXECUTABLE = shutil.which("feh")
if not FEH_EXECUTABLE:
print("feh not found")
exit(1)
commands = [FEH_EXECUTABLE, *image_links, "--title", window_title]
subprocess.run(commands)

View File

@@ -165,6 +165,7 @@ class AnilistBaseMediaDataSchema(TypedDict):
nextAiringEpisode: AnilistMediaNextAiringEpisode nextAiringEpisode: AnilistMediaNextAiringEpisode
season: str season: str
streamingEpisodes: list[StreamingEpisode] streamingEpisodes: list[StreamingEpisode]
chapters: int
seasonYear: int seasonYear: int
duration: int duration: int
synonyms: list[str] synonyms: list[str]

View File

@@ -1,153 +0,0 @@
import logging
from typing import TYPE_CHECKING
from requests import post
from thefuzz import fuzz
if TYPE_CHECKING:
from ..anilist.types import AnilistDataSchema
logger = logging.getLogger(__name__)
ANILIST_ENDPOINT = "https://graphql.anilist.co"
"""
query($query:String){
Page(perPage:50){
pageInfo{
total
currentPage
hasNextPage
}
media(search:$query,type:ANIME){
id
idMal
title{
romaji
english
}
episodes
status
nextAiringEpisode {
timeUntilAiring
airingAt
episode
}
}
}
}
"""
def search_for_anime_with_anilist(anime_title: str):
query = """
query($query:String){
Page(perPage:50){
pageInfo{
total
currentPage
hasNextPage
}
media(search:$query,type:ANIME){
id
idMal
title{
romaji
english
}
episodes
status
nextAiringEpisode {
timeUntilAiring
airingAt
episode
}
}
}
}
"""
response = post(
ANILIST_ENDPOINT,
json={"query": query, "variables": {"query": anime_title}},
timeout=10,
)
if response.status_code == 200:
anilist_data: "AnilistDataSchema" = response.json()
return {
"pageInfo": anilist_data["data"]["Page"]["pageInfo"],
"results": [
{
"id": anime_result["id"],
"title": anime_result["title"]["romaji"]
or anime_result["title"]["english"],
"type": "anime",
"availableEpisodes": list(
range(
1,
(
anime_result["episodes"]
if not anime_result["status"] == "RELEASING"
and anime_result["episodes"]
else (
anime_result["nextAiringEpisode"]["episode"] - 1
if anime_result["nextAiringEpisode"]
else 0
)
),
)
),
}
for anime_result in anilist_data["data"]["Page"]["media"]
],
}
def get_mal_id_and_anilist_id(anime_title: str) -> "dict[str,int] | None":
"""the abstraction over all none authenticated requests and that returns data of a similar type
Args:
query: the anilist query
variables: the anilist api variables
Returns:
a boolean indicating success and none or an anilist object depending on success
"""
query = """
query($query:String){
Page(perPage:50){
pageInfo{
total
currentPage
hasNextPage
}
media(search:$query,type:ANIME){
id
idMal
title{
romaji
english
}
}
}
}
"""
try:
variables = {"query": anime_title}
response = post(
ANILIST_ENDPOINT,
json={"query": query, "variables": variables},
timeout=10,
)
anilist_data: "AnilistDataSchema" = response.json()
if response.status_code == 200:
anime = max(
anilist_data["data"]["Page"]["media"],
key=lambda anime: max(
(
fuzz.ratio(anime, str(anime["title"]["romaji"])),
fuzz.ratio(anime_title, str(anime["title"]["english"])),
)
),
)
return {"id_anilist": anime["id"], "id_mal": anime["idMal"]}
except Exception as e:
logger.error(f"Something unexpected occured {e}")

View File

@@ -0,0 +1,15 @@
import logging
from requests import get
logger = logging.getLogger(__name__)
def fetch_anime_info_from_bal(anilist_id):
try:
url = f"https://raw.githubusercontent.com/bal-mackup/mal-backup/master/anilist/anime/{anilist_id}.json"
response = get(url, timeout=11)
if response.status_code == 200:
return response.json()
except Exception as e:
logger.error(e)

View File

@@ -37,7 +37,66 @@ query($query:String){
""" """
def search_foranime_with_anilist(anime_title: str): def search_for_manga_with_anilist(manga_title: str):
query = """
query($query:String){
Page(perPage:50){
pageInfo{
currentPage
}
media(search:$query,type:MANGA){
id
idMal
title{
romaji
english
}
chapters
status
coverImage{
medium
large
}
}
}
}
"""
response = post(
ANILIST_ENDPOINT,
json={"query": query, "variables": {"query": manga_title}},
timeout=10,
)
if response.status_code == 200:
anilist_data: "AnilistDataSchema" = response.json()
return {
"pageInfo": anilist_data["data"]["Page"]["pageInfo"],
"results": [
{
"id": anime_result["id"],
"poster": anime_result["coverImage"]["large"],
"title": (
anime_result["title"]["romaji"]
or anime_result["title"]["english"]
)
+ f" [Chapters: {anime_result['chapters']}]",
"type": "manga",
"availableChapters": list(
range(
1,
(
anime_result["chapters"]
if anime_result["chapters"]
else 0
),
)
),
}
for anime_result in anilist_data["data"]["Page"]["media"]
],
}
def search_for_anime_with_anilist(anime_title: str):
query = """ query = """
query($query:String){ query($query:String){
Page(perPage:50){ Page(perPage:50){

View File

@@ -0,0 +1 @@
manga_sources = {"mangadex": "api.MangaDexApi"}

View File

@@ -0,0 +1,13 @@
import requests
from yt_dlp.utils.networking import random_user_agent
class MangaProvider:
session: requests.Session
USER_AGENT = random_user_agent()
HEADERS = {}
def __init__(self) -> None:
self.session = requests.session()
self.session.headers.update({"User-Agent": self.USER_AGENT, **self.HEADERS})

View File

@@ -0,0 +1,15 @@
import logging
from requests import get
logger = logging.getLogger(__name__)
def fetch_manga_info_from_bal(anilist_id):
try:
url = f"https://raw.githubusercontent.com/bal-mackup/mal-backup/master/anilist/manga/{anilist_id}.json"
response = get(url, timeout=11)
if response.ok:
return response.json()
except Exception as e:
logger.error(e)

View File

@@ -0,0 +1,51 @@
import logging
from ...common.mini_anilist import search_for_manga_with_anilist
from ..base_provider import MangaProvider
from ..common import fetch_manga_info_from_bal
logger = logging.getLogger(__name__)
class MangaDexApi(MangaProvider):
def search_for_manga(self, title: str, *args):
try:
search_results = search_for_manga_with_anilist(title)
return search_results
except Exception as e:
logger.error(f"[MANGADEX-ERROR]: {e}")
def get_manga(self, anilist_manga_id: str):
bal_data = fetch_manga_info_from_bal(anilist_manga_id)
if not bal_data:
return
manga_id, MangaDexManga = next(iter(bal_data["Sites"]["Mangadex"].items()))
return {
"id": manga_id,
"title": MangaDexManga["title"],
"poster": MangaDexManga["image"],
"availableChapters": [],
}
def get_chapter_thumbnails(self, manga_id, chapter):
chapter_info_url = f"https://api.mangadex.org/chapter?manga={manga_id}&translatedLanguage[]=en&chapter={chapter}&includeEmptyPages=0"
chapter_info_response = self.session.get(chapter_info_url)
if not chapter_info_response.ok:
return
chapter_info = next(iter(chapter_info_response.json()["data"]))
chapters_thumbnails_url = (
f"https://api.mangadex.org/at-home/server/{chapter_info['id']}"
)
chapter_thumbnails_response = self.session.get(chapters_thumbnails_url)
if not chapter_thumbnails_response.ok:
return
chapter_thumbnails_info = chapter_thumbnails_response.json()
base_url = chapter_thumbnails_info["baseUrl"]
hash = chapter_thumbnails_info["chapter"]["hash"]
return {
"thumbnails": [
f"{base_url}/data/{hash}/{chapter_thumbnail}"
for chapter_thumbnail in chapter_thumbnails_info["chapter"]["data"]
],
"title": chapter_info["attributes"]["title"],
}

View File

@@ -1,6 +1,6 @@
[tool.poetry] [tool.poetry]
name = "fastanime" name = "fastanime"
version = "2.4.2" version = "2.4.3"
description = "A browser anime site experience from the terminal" description = "A browser anime site experience from the terminal"
authors = ["Benextempest <benextempest@gmail.com>"] authors = ["Benextempest <benextempest@gmail.com>"]
license = "UNLICENSE" license = "UNLICENSE"