doc:Added typing to the Anilist module

This commit is contained in:
Benex254
2024-08-05 09:46:54 +03:00
parent 047ce29da3
commit 2eca3f480d
28 changed files with 786 additions and 389 deletions

3
app/.vscode/settings.json vendored Normal file
View File

@@ -0,0 +1,3 @@
{
"python.analysis.typeCheckingMode": "basic"
}

View File

@@ -5,30 +5,31 @@ from kivy.cache import Cache
from Model import AnimeScreenModel
from View import AnimeScreenView
Cache.register("data.anime",limit=20,timeout=600)
Cache.register("data.anime", limit=20, timeout=600)
class AnimeScreenController:
def __init__(self, model:AnimeScreenModel):
def __init__(self, model: AnimeScreenModel):
self.model = model
self.view = AnimeScreenView(controller=self, model=self.model)
def get_view(self) -> AnimeScreenView:
return self.view
def update_anime_view(self,id:int,caller_screen_name:str):
def update_anime_view(self, id: int, caller_screen_name: str):
if self.model.anime_id != id:
if cached_anime_data:=Cache.get("data.anime",f"{id}"):
if cached_anime_data := Cache.get("data.anime", f"{id}"):
data = cached_anime_data
else:
data = self.model.get_anime_data(id)
if data[0]:
d = data[1]["data"]["Page"]["media"][0]
self.model.anime_id = id
Clock.schedule_once(lambda _:self.view.update_layout(d,caller_screen_name))
Clock.schedule_once(
lambda _: self.view.update_layout(
data[1]["data"]["Page"]["media"][0], caller_screen_name
)
)
Logger.info(f"Anime Screen:Success in opening anime of id: {id}")
Cache.append("data.anime",f"{id}",data)
def update_my_list(self,*args):
self.model.update_user_anime_list(*args)
Cache.append("data.anime", f"{id}", data)

View File

@@ -23,7 +23,7 @@ class HomeScreenController:
def __init__(self, model:HomeScreenModel):
self.model = model # Model.main_screen.MainScreenModel
self.view = HomeScreenView(controller=self, model=self.model)
if self.view.app.config.get("Preferences","is_startup_anime_enable")=="1":
if self.view.app.config.get("Preferences","is_startup_anime_enable")=="1": # type: ignore
Clock.schedule_once(lambda _:self.populate_home_screen())
def get_view(self) -> HomeScreenView:
return self.view

View File

@@ -5,39 +5,39 @@ from kivy.logger import Logger
from View import SearchScreenView
from Model import SearchScreenModel
from Utility import show_notification
class SearchScreenController:
def __init__(self, model:SearchScreenModel):
self.model = model # Model.main_screen.MainScreenModel
def __init__(self, model: SearchScreenModel):
self.model = model
self.view = SearchScreenView(controller=self, model=self.model)
def get_view(self) -> SearchScreenView:
return self.view
def update_trending_anime(self):
trending_cards_generator = self.model.get_trending_anime()
if isgenerator(trending_cards_generator):
self.view.trending_anime_sidebar.clear_widgets()
for card in trending_cards_generator:
card.screen = self.view
card.pos_hint = {'center_x': 0.5}
card.pos_hint = {"center_x": 0.5}
self.view.update_trending_sidebar(card)
else:
Logger.error("Home Screen:Failed to load trending anime")
self.populate_errors.append("trending Anime")
def requested_search_for_anime(self,anime_title,**kwargs):
def requested_search_for_anime(self, anime_title, **kwargs):
self.view.is_searching = True
search_Results = self.model.search_for_anime(anime_title,**kwargs)
search_Results = self.model.search_for_anime(anime_title, **kwargs)
if isgenerator(search_Results):
for result_card in search_Results:
result_card.screen = self.view
self.view.update_layout(result_card)
Clock.schedule_once(lambda _:self.view.update_pagination(self.model.pagination_info))
Clock.schedule_once(lambda _:self.update_trending_anime())
Clock.schedule_once(
lambda _: self.view.update_pagination(self.model.pagination_info)
)
Clock.schedule_once(lambda _: self.update_trending_anime())
else:
Logger.error(f"Home Screen:Failed to search for {anime_title}")
show_notification("Failed to search",f"{search_Results.get('Error')}")
self.view.is_searching = False

View File

@@ -13,7 +13,6 @@ class AnimeScreenModel(BaseScreenModel):
def media_card_generator(self):
for anime_item in self.data["data"]["Page"]["media"]:
yield MediaCardLoader.media_card(anime_item)
self.pagination_info = self.extract_pagination_info()
def get_anime_data(self,id:int):
return AniList.get_anime(id)

View File

@@ -10,10 +10,10 @@ from Utility import show_notification
class SearchScreenModel(BaseScreenModel):
data = {}
def get_trending_anime(self)->MediaCard|dict:
def get_trending_anime(self):
success,data = AniList.get_trending()
if success:
def _data_generator()->Generator[MediaCard,MediaCard,MediaCard]:
def _data_generator():
for anime_item in data["data"]["Page"]["media"]:
yield MediaCardLoader.media_card(anime_item)
return _data_generator()
@@ -25,7 +25,7 @@ class SearchScreenModel(BaseScreenModel):
if success:
return self.media_card_generator()
else:
show_notification(f"Failed to search for {anime_title}",self.data["Error"])
show_notification(f"Failed to search for {anime_title}",self.data.get("Error"))
def media_card_generator(self):
for anime_item in self.data["data"]["Page"]["media"]:

View File

@@ -1,20 +1,31 @@
from datetime import datetime
from libs.anilist.anilist_data_schema import AnilistDateObject,AnilistMediaNextAiringEpisode
# TODO: Add formating options for the final date
def format_anilist_date_object(anilist_date_object:dict|None):
def format_anilist_date_object(anilist_date_object: AnilistDateObject):
if anilist_date_object:
return f"{anilist_date_object['day']}/{anilist_date_object['month']}/{anilist_date_object['year']}"
else:
return "Unknown"
def format_anilist_timestamp(anilist_timestamp:int|None):
if anilist_timestamp:
return datetime.fromtimestamp(anilist_timestamp).strftime("%d/%m/%Y %H:%M:%S")
return f"{anilist_date_object['day']}/{anilist_date_object['month']}/{anilist_date_object['year']}"
else:
return "Unknown"
def format_list_data_with_comma(data:list|None):
def format_anilist_timestamp(anilist_timestamp: int | None):
if anilist_timestamp:
return datetime.fromtimestamp(anilist_timestamp).strftime("%d/%m/%Y %H:%M:%S")
else:
return "Unknown"
def format_list_data_with_comma(data: list | None):
if data:
return ", ".join(data)
else:
return "None"
return "None"
def extract_next_airing_episode(airing_episode:AnilistMediaNextAiringEpisode):
if airing_episode:
return f"Episode: {airing_episode['episode']} on {format_anilist_timestamp(airing_episode['airingAt'])}"
else:
return "Completed"

View File

@@ -6,11 +6,12 @@ from pytube import YouTube
from kivy.clock import Clock
from kivy.cache import Cache
from kivy.loader import _ThreadPool
from kivy.loader import _ThreadPool
from kivy.logger import Logger
from View.components import MediaCard
from Utility import anilist_data_helper, user_data_helper
from libs.anilist.anilist_data_schema import AnilistBaseMediaDataSchema
# Register anime cache in memory
@@ -20,8 +21,8 @@ user_anime_list = user_data_helper.get_user_anime_list()
yt_stream_links = user_data_helper.get_anime_trailer_cache()
for link in yt_stream_links:
Cache.append("yt_stream_links.anime",link[0],tuple(link[1]))
Cache.append("yt_stream_links.anime", link[0], tuple(link[1]))
# for youtube video links gotten from from pytube which is blocking
class MediaCardDataLoader(object):
@@ -36,30 +37,31 @@ class MediaCardDataLoader(object):
self._running = False
self._start_wanted = False
self._trigger_update = Clock.create_trigger(self._update)
def start(self):
'''Start the loader thread/process.'''
"""Start the loader thread/process."""
self._running = True
def run(self, *largs):
'''Main loop for the loader.'''
"""Main loop for the loader."""
pass
def stop(self):
'''Stop the loader thread/process.'''
"""Stop the loader thread/process."""
self._running = False
def pause(self):
'''Pause the loader, can be useful during interactions.
"""Pause the loader, can be useful during interactions.
.. versionadded:: 1.6.0
'''
"""
self._paused = True
def resume(self):
'''Resume the loader, after a :meth:`pause`.
"""Resume the loader, after a :meth:`pause`.
.. versionadded:: 1.6.0
'''
"""
self._paused = False
self._resume_cond.acquire()
self._resume_cond.notify_all()
@@ -71,27 +73,28 @@ class MediaCardDataLoader(object):
self._resume_cond.wait(0.25)
self._resume_cond.release()
def cached_fetch_data(self,yt_watch_url):
data:tuple = Cache.get("yt_stream_links.anime",yt_watch_url) # type: ignore # trailer_url is the yt_watch_link
def cached_fetch_data(self, yt_watch_url):
data: tuple = Cache.get("yt_stream_links.anime", yt_watch_url) # type: ignore # trailer_url is the yt_watch_link
if not data[0]:
yt = YouTube(yt_watch_url)
preview_image = yt.thumbnail_url
preview_image = yt.thumbnail_url
try:
video_stream_url = yt.streams.filter(progressive=True,file_extension="mp4")[-1].url
data = preview_image,video_stream_url
yt_stream_links.append((yt_watch_url,data))
video_stream_url = yt.streams.filter(
progressive=True, file_extension="mp4"
)[-1].url
data = preview_image, video_stream_url
yt_stream_links.append((yt_watch_url, data))
user_data_helper.update_anime_trailer_cache(yt_stream_links)
except:
data = preview_image,None
data = preview_image, None
return data
def _load(self, kwargs):
while len(self._q_done) >= (
self._max_upload_per_frame * self._num_workers):
sleep(0.1) # type: ignore
while len(self._q_done) >= (self._max_upload_per_frame * self._num_workers):
sleep(0.1) # type: ignore
self._wait_for_resume()
yt_watch_link = kwargs['yt_watch_link']
yt_watch_link = kwargs["yt_watch_link"]
try:
data = self.cached_fetch_data(yt_watch_link)
except Exception as e:
@@ -101,7 +104,7 @@ class MediaCardDataLoader(object):
self._q_done.appendleft((yt_watch_link, data))
self._trigger_update()
def _update(self,*largs):
def _update(self, *largs):
if self._start_wanted:
if not self._running:
self.start()
@@ -114,7 +117,7 @@ class MediaCardDataLoader(object):
for _ in range(self._max_upload_per_frame):
try:
yt_watch_link, data= self._q_done.pop()
yt_watch_link, data = self._q_done.pop()
except IndexError:
return
# update client
@@ -128,55 +131,84 @@ class MediaCardDataLoader(object):
if trailer_url:
client.set_trailer_url(trailer_url)
Logger.info(f"Pytube:Found trailer url for {client.title}")
Cache.append("yt_stream_links.anime",yt_watch_link,data)
Cache.append("yt_stream_links.anime", yt_watch_link, data)
self._client.remove((c_yt_watch_link, client))
self._trigger_update()
def media_card(self,anime_item,load_callback=None, post_callback=None,
**kwargs):
def media_card(
self,
anime_item: AnilistBaseMediaDataSchema,
load_callback=None,
post_callback=None,
**kwargs,
):
media_card = MediaCard()
media_card.anime_id = anime_id = anime_item["id"]
# TODO: ADD language preference
if anime_item["title"].get("english"):
media_card.title = anime_item["title"]["english"]
media_card.title = anime_item["title"]["english"]
else:
media_card.title = anime_item["title"]["romaji"]
media_card.title = anime_item["title"]["romaji"]
media_card.cover_image_url = anime_item["coverImage"]["medium"]
media_card.cover_image_url = anime_item["coverImage"]["medium"]
media_card.popularity = str(anime_item["popularity"])
media_card.popularity = str(anime_item["popularity"])
media_card.favourites = str(anime_item["favourites"])
media_card.favourites = str(anime_item["favourites"])
media_card.episodes = str(anime_item["episodes"])
media_card.episodes = str(anime_item["episodes"])
if anime_item.get("description"):
media_card.description = anime_item["description"]
media_card.description = anime_item["description"]
else:
media_card.description = "None"
# TODO: switch to season and year
media_card.first_aired_on = f'{anilist_data_helper.format_anilist_date_object(anime_item["startDate"])}'
media_card.first_aired_on = (
f'{anilist_data_helper.format_anilist_date_object(anime_item["startDate"])}'
)
# TODO: update it to separate studio and producers
media_card.studios = anilist_data_helper.format_list_data_with_comma([studio["name"] for studio in anime_item["studios"]["nodes"]])
media_card.studios = anilist_data_helper.format_list_data_with_comma(
[
studio["name"]
for studio in anime_item["studios"]["nodes"]
if studio["isAnimationStudio"]
]
)
media_card.producers = anilist_data_helper.format_list_data_with_comma(
[
studio["name"]
for studio in anime_item["studios"]["nodes"]
if not studio["isAnimationStudio"]
]
)
media_card.next_airing_episode = "{}".format(
anilist_data_helper.extract_next_airing_episode(
anime_item["nextAiringEpisode"]
)
)
if anime_item.get("tags"):
media_card.tags = anilist_data_helper.format_list_data_with_comma([tag["name"] for tag in anime_item["tags"]])
media_card.tags = anilist_data_helper.format_list_data_with_comma(
[tag["name"] for tag in anime_item["tags"]]
)
media_card.media_status = anime_item["status"]
media_card.media_status = anime_item["status"]
if anime_item.get("genres"):
media_card.genres = anilist_data_helper.format_list_data_with_comma(anime_item["genres"])
media_card.genres = anilist_data_helper.format_list_data_with_comma(
anime_item["genres"]
)
if anime_id in user_anime_list:
media_card.is_in_my_list = True
if anime_item["averageScore"]:
stars = int(anime_item["averageScore"]/100*6)
stars = int(anime_item["averageScore"] / 100 * 6)
if stars:
for i in range(stars):
media_card.stars[i] = 1
@@ -184,33 +216,36 @@ class MediaCardDataLoader(object):
# TODO: ADD a default image if trailer not available
# Setting up trailer info to be gotten if available
if anime_item["trailer"]:
yt_watch_link = "https://youtube.com/watch?v="+anime_item["trailer"]["id"]
data = Cache.get("yt_stream_links.anime",yt_watch_link) # type: ignore # trailer_url is the yt_watch_link
yt_watch_link = "https://youtube.com/watch?v=" + anime_item["trailer"]["id"]
data = Cache.get("yt_stream_links.anime", yt_watch_link) # type: ignore # trailer_url is the yt_watch_link
if data:
if data[1] not in (None,False):
if data[1] not in (None, False):
media_card.set_preview_image(data[0])
media_card.set_trailer_url(data[1])
return media_card
else:
# if data is None, this is really the first time
self._client.append((yt_watch_link,media_card))
self._q_load.appendleft({
'yt_watch_link': yt_watch_link,
'load_callback': load_callback,
'post_callback': post_callback,
'current_anime':anime_item["id"],
'kwargs': kwargs})
if not kwargs.get('nocache', False):
Cache.append('yt_stream_links.anime',yt_watch_link, (False,False))
self._client.append((yt_watch_link, media_card))
self._q_load.appendleft(
{
"yt_watch_link": yt_watch_link,
"load_callback": load_callback,
"post_callback": post_callback,
"current_anime": anime_item["id"],
"kwargs": kwargs,
}
)
if not kwargs.get("nocache", False):
Cache.append("yt_stream_links.anime", yt_watch_link, (False, False))
self._start_wanted = True
self._trigger_update()
return media_card
class LoaderThreadPool(MediaCardDataLoader):
def __init__(self):
super(LoaderThreadPool, self).__init__()
self.pool:_ThreadPool|None = None
self.pool: _ThreadPool | None = None
def start(self):
super(LoaderThreadPool, self).start()
@@ -220,7 +255,7 @@ class LoaderThreadPool(MediaCardDataLoader):
def stop(self):
super(LoaderThreadPool, self).stop()
Clock.unschedule(self.run)
self.pool.stop()
self.pool.stop() # type: ignore
def run(self, *largs):
while self._running:
@@ -228,8 +263,12 @@ class LoaderThreadPool(MediaCardDataLoader):
parameters = self._q_load.pop()
except:
return
self.pool.add_task(self._load, parameters)
self.pool.add_task(self._load, parameters) # type: ignore
MediaCardLoader = LoaderThreadPool()
Logger.info('MediaCardLoader: using a thread pool of {} workers'.format(
MediaCardLoader._num_workers))
Logger.info(
"MediaCardLoader: using a thread pool of {} workers".format(
MediaCardLoader._num_workers
)
)

View File

@@ -1,28 +1,38 @@
from datetime import datetime
from kivy.properties import ObjectProperty,DictProperty,StringProperty
from kivy.properties import ObjectProperty, DictProperty, StringProperty
from Utility import anilist_data_helper
from libs.anilist import AnilistBaseMediaDataSchema
from View.base_screen import BaseScreenView
from .components import (AnimeHeader,AnimeSideBar,AnimeDescription,AnimeReviews,AnimeCharacters,AnimdlStreamDialog,DownloadAnimeDialog,RankingsBar)
from .components import (
AnimeHeader,
AnimeSideBar,
AnimeDescription,
AnimeReviews,
AnimeCharacters,
AnimdlStreamDialog,
DownloadAnimeDialog,
RankingsBar,
)
class AnimeScreenView(BaseScreenView):
caller_screen_name = StringProperty()
header:AnimeHeader = ObjectProperty()
side_bar:AnimeSideBar = ObjectProperty()
rankings_bar:RankingsBar = ObjectProperty()
anime_description:AnimeDescription = ObjectProperty()
anime_characters:AnimeCharacters = ObjectProperty()
anime_reviews:AnimeReviews = ObjectProperty()
header: AnimeHeader = ObjectProperty()
side_bar: AnimeSideBar = ObjectProperty()
rankings_bar: RankingsBar = ObjectProperty()
anime_description: AnimeDescription = ObjectProperty()
anime_characters: AnimeCharacters = ObjectProperty()
anime_reviews: AnimeReviews = ObjectProperty()
data = DictProperty()
anime_id = 0
def update_layout(self,data:dict,caller_screen_name:str):
def update_layout(self, data: AnilistBaseMediaDataSchema, caller_screen_name: str):
self.caller_screen_name = caller_screen_name
self.data = data
# uitlity functions
extract_next_airing_episode = lambda airing_episode: f"Episode: {airing_episode['episode']} on {anilist_data_helper.format_anilist_timestamp(airing_episode['airingAt'])}" if airing_episode else "Completed"
# variables
english_title = data["title"]["english"]
@@ -31,10 +41,9 @@ class AnimeScreenView(BaseScreenView):
# update header
self.header.titles = f"{english_title}\n{jp_title}"
if banner_image:=data["bannerImage"]:
if banner_image := data["bannerImage"]:
self.header.banner_image = banner_image
# -----side bar-----
# update image
@@ -42,63 +51,69 @@ class AnimeScreenView(BaseScreenView):
# update alternative titles
alternative_titles = {
"synonyms":anilist_data_helper.format_list_data_with_comma(data["synonyms"]), # list
"japanese":jp_title,
"english":english_title,
"synonyms": anilist_data_helper.format_list_data_with_comma(
data["synonyms"]
), # list
"japanese": jp_title,
"english": english_title,
}
self.side_bar.alternative_titles = alternative_titles
# update information
information = {
"episodes":data["episodes"],
"status":data["status"],
"nextAiringEpisode":extract_next_airing_episode(data["nextAiringEpisode"]),
"aired":f"{anilist_data_helper.format_anilist_date_object(data['startDate'])} to {anilist_data_helper.format_anilist_date_object(data['endDate'])}",
"premiered":f"{data['season']} {data['seasonYear']}",
"broadcast":data["format"],
"countryOfOrigin":data["countryOfOrigin"],
"hashtag":data["hashtag"],
"studios": anilist_data_helper.format_list_data_with_comma([studio["name"] for studio in studios if studio["isAnimationStudio"]]), # { "name": "Sunrise", "isAnimationStudio": true }
"producers": anilist_data_helper.format_list_data_with_comma([studio["name"] for studio in studios if not studio["isAnimationStudio"]]), # { "name": "Sunrise", "isAnimationStudio": true }
"source":data["source"],
"episodes": data["episodes"],
"status": data["status"],
"nextAiringEpisode": anilist_data_helper.extract_next_airing_episode(
data["nextAiringEpisode"]
),
"aired": f"{anilist_data_helper.format_anilist_date_object(data['startDate'])} to {anilist_data_helper.format_anilist_date_object(data['endDate'])}",
"premiered": f"{data['season']} {data['seasonYear']}",
"broadcast": data["format"],
"countryOfOrigin": data["countryOfOrigin"],
"hashtag": data["hashtag"],
"studios": anilist_data_helper.format_list_data_with_comma(
[studio["name"] for studio in studios if studio["isAnimationStudio"]]
), # { "name": "Sunrise", "isAnimationStudio": true }
"producers": anilist_data_helper.format_list_data_with_comma(
[
studio["name"]
for studio in studios
if not studio["isAnimationStudio"]
]
),
"source": data["source"],
"genres": anilist_data_helper.format_list_data_with_comma(data["genres"]),
"duration":data["duration"],
# "rating":data["rating"],
"duration": data["duration"],
}
self.side_bar.information = information
# update statistics
statistics = [
# { "rank": 44, "context": "highest rated all time" }
*[(stat["context"],stat["rank"]) for stat in data["rankings"]]
]
statistics = [*[(stat["context"], stat["rank"]) for stat in data["rankings"]]]
self.side_bar.statistics = statistics
# update tags
self.side_bar.tags = [
(tag["name"],tag["rank"])
for tag in data["tags"]
]
self.side_bar.tags = [(tag["name"], tag["rank"]) for tag in data["tags"]]
# update external links
external_links = [
("AniList",data["siteUrl"]),
*[(site["site"],site["url"]) for site in data["externalLinks"]]
("AniList", data["siteUrl"]),
*[(site["site"], site["url"]) for site in data["externalLinks"]],
]
self.side_bar.external_links = external_links
self.rankings_bar.rankings = {
"Popularity":data["popularity"],
"Favourites":data["favourites"],
"AverageScore":data["averageScore"] if data["averageScore"] else 0,
"Popularity": data["popularity"],
"Favourites": data["favourites"],
"AverageScore": data["averageScore"] if data["averageScore"] else 0,
}
self.anime_description.description = data["description"]
self.anime_characters.characters = [(character["node"],character["voiceActors"])for character in data["characters"]["edges"]] # list (character,actor)
self.anime_characters.characters = [
(character["node"], character["voiceActors"])
for character in data["characters"]["edges"]
] # list (character,actor)
self.anime_reviews.reviews = data["reviews"]["nodes"]
@@ -116,5 +131,5 @@ class AnimeScreenView(BaseScreenView):
DownloadAnimeDialog(self.data).open()
def add_to_user_anime_list(self,*args):
self.app.add_anime_to_user_anime_list(self.model.anime_id)
def add_to_user_anime_list(self, *args):
self.app.add_anime_to_user_anime_list(self.model.anime_id)

View File

@@ -23,4 +23,5 @@
text:"Status"
FilterDropDown:
id:status_filter
text:root.filters["status"]
on_release: root.open_filter_menu(self,"status")

View File

@@ -1,37 +1,80 @@
from kivy.properties import StringProperty,DictProperty
from kivy.properties import StringProperty, DictProperty
from kivymd.uix.boxlayout import MDBoxLayout
from kivymd.uix.dropdownitem import MDDropDownItem
from kivymd.uix.menu import MDDropdownMenu
class FilterDropDown(MDDropDownItem):
text:str = StringProperty()
text: str = StringProperty()
class Filters(MDBoxLayout):
filters:dict = DictProperty({
"sort":"SEARCH_MATCH"
})
filters: dict = DictProperty({"sort": "SEARCH_MATCH", "status": "FINISHED"})
def open_filter_menu(self, menu_item,filter_name):
def open_filter_menu(self, menu_item, filter_name):
items = []
match filter_name:
case "sort":
items = ["ID","ID_DESC", "TITLE_ROMANJI", "TITLE_ROMANJI_DESC", "TITLE_ENGLISH", "TITLE_ENGLISH_DESC", "TITLE_NATIVE", "TITLE_NATIVE_DESC", "TYPE", "TYPE_DESC", "FORMAT", "FORMAT_DESC", "START_DATE", "START_DATE_DESC", "END_DATE", "END_DATE_DESC", "SCORE", "SCORE_DESC", "TRENDING", "TRENDING_DESC", "EPISODES", "EPISODES_DESC", "DURATION", "DURATION_DESC", "STATUS", "STATUS_DESC", "UPDATED_AT", "UPDATED_AT_DESC", "SEARCH_MATCH" "POPULARITY","POPULARITY_DESC","FAVOURITES","FAVOURITES_DESC"]
items = [
"ID",
"ID_DESC",
"TITLE_ROMANJI",
"TITLE_ROMANJI_DESC",
"TITLE_ENGLISH",
"TITLE_ENGLISH_DESC",
"TITLE_NATIVE",
"TITLE_NATIVE_DESC",
"TYPE",
"TYPE_DESC",
"FORMAT",
"FORMAT_DESC",
"START_DATE",
"START_DATE_DESC",
"END_DATE",
"END_DATE_DESC",
"SCORE",
"SCORE_DESC",
"TRENDING",
"TRENDING_DESC",
"EPISODES",
"EPISODES_DESC",
"DURATION",
"DURATION_DESC",
"STATUS",
"STATUS_DESC",
"UPDATED_AT",
"UPDATED_AT_DESC",
"SEARCH_MATCH",
"POPULARITY",
"POPULARITY_DESC",
"FAVOURITES",
"FAVOURITES_DESC",
]
case "status":
items = ["FINISHED", "RELEASING", "NOT_YET_RELEASED", "CANCELLED", "HIATUS"]
items = [
"FINISHED",
"RELEASING",
"NOT_YET_RELEASED",
"CANCELLED",
"HIATUS",
]
case _:
items = []
if items:
if items:
menu_items = [
{
"text": f"{item}",
"on_release": lambda filter_value=f"{item}": self.filter_menu_callback(filter_name,filter_value),
} for item in items
"on_release": lambda filter_value=f"{item}": self.filter_menu_callback(
filter_name, filter_value
),
}
for item in items
]
MDDropdownMenu(caller=menu_item, items=menu_items).open()
def filter_menu_callback(self, filter_name,filter_value):
def filter_menu_callback(self, filter_name, filter_value):
match filter_name:
case "sort":
self.ids.sort_filter.text = filter_value
@@ -39,4 +82,3 @@ class Filters(MDBoxLayout):
case "status":
self.ids.status_filter.text = filter_value
self.filters["status"] = filter_value

View File

@@ -1,12 +1,12 @@
<PaginationLabel@MDLabel>
<PaginationLabel@MDLabel>:
max_lines:0
shorten:False
markup:True
adaptive_height:True
font_style: "Label"
pos_hint:{"center_y":.5}
halign:"center"
role: "medium"
<SearchResultsPagination>:
md_bg_color:self.theme_cls.surfaceContainerLowColor
radius:8

View File

@@ -1,9 +1,9 @@
from kivy.properties import ObjectProperty,NumericProperty
from kivy.properties import ObjectProperty, NumericProperty
from kivymd.uix.boxlayout import MDBoxLayout
class SearchResultsPagination(MDBoxLayout):
current_page = NumericProperty()
total_pages = NumericProperty()
search_view = ObjectProperty()

View File

@@ -1,4 +1,5 @@
from kivymd.uix.boxlayout import MDBoxLayout
class TrendingAnimeSideBar(MDBoxLayout):
pass

View File

@@ -50,18 +50,16 @@
size_hint_x:None
width: dp(250)
MDLabel:
md_bg_color:self.theme_cls.secondaryContainerColor
text:"Trending"
adaptive_height:True
halign:"center"
max_lines:0
# md_bg_color:Stat
shorten:False
bold:True
markup:True
font_style: "Label"
role: "large"
text:"Trending"
padding:"10dp"
MDScrollView:
TrendingAnimeSideBar:
id:trending_anime_sidebar

View File

@@ -1,44 +1,56 @@
from kivy.properties import ObjectProperty,StringProperty
from kivy.properties import ObjectProperty, StringProperty
from kivy.clock import Clock
from View.base_screen import BaseScreenView
from .components import TrendingAnimeSideBar,Filters,SearchResultsPagination
from .components import TrendingAnimeSideBar, Filters, SearchResultsPagination
class SearchScreenView(BaseScreenView):
trending_anime_sidebar:TrendingAnimeSideBar = ObjectProperty()
search_results_pagination:SearchResultsPagination = ObjectProperty()
filters:Filters = ObjectProperty()
class SearchScreenView(BaseScreenView):
trending_anime_sidebar: TrendingAnimeSideBar = ObjectProperty()
search_results_pagination: SearchResultsPagination = ObjectProperty()
filters: Filters = ObjectProperty()
search_results_container = ObjectProperty()
search_term:str = StringProperty()
search_term: str = StringProperty()
is_searching = False
has_next_page = False
current_page = 0
total_pages = 0
def handle_search_for_anime(self,search_widget=None,page=None):
def handle_search_for_anime(self, search_widget=None, page=None):
if search_widget:
search_term = search_widget.text
elif page:
search_term = self.search_term
else:
return
if search_term and not(self.is_searching):
if search_term and not (self.is_searching):
self.search_term = search_term
self.search_results_container.clear_widgets()
if filters:=self.filters.filters:
Clock.schedule_once(lambda _:self.controller.requested_search_for_anime(search_term,**filters,page=page))
if filters := self.filters.filters:
Clock.schedule_once(
lambda _: self.controller.requested_search_for_anime(
search_term, **filters, page=page
)
)
else:
Clock.schedule_once(lambda _:self.controller.requested_search_for_anime(search_term,page=page))
def update_layout(self,widget):
Clock.schedule_once(
lambda _: self.controller.requested_search_for_anime(
search_term, page=page
)
)
def update_layout(self, widget):
self.search_results_container.add_widget(widget)
def update_pagination(self,pagination_info):
self.search_results_pagination.current_page =self.current_page = pagination_info["currentPage"]
self.search_results_pagination.total_pages = self.total_pages = max(int(pagination_info["total"]/30),1)
def update_pagination(self, pagination_info):
self.search_results_pagination.current_page = self.current_page = (
pagination_info["currentPage"]
)
self.search_results_pagination.total_pages = self.total_pages = max(
int(pagination_info["total"] / 30), 1
)
self.has_next_page = pagination_info["hasNextPage"]
def next_page(self):
@@ -51,5 +63,5 @@ class SearchScreenView(BaseScreenView):
page = self.current_page - 1
self.handle_search_for_anime(page=page)
def update_trending_sidebar(self,trending_anime):
def update_trending_sidebar(self, trending_anime):
self.trending_anime_sidebar.add_widget(trending_anime)

View File

@@ -1,29 +1,28 @@
from kivy.properties import ObjectProperty,StringProperty
from kivy.properties import ObjectProperty, StringProperty
from kivymd.app import MDApp
from kivymd.uix.screen import MDScreen
from kivymd.uix.navigationrail import MDNavigationRail
from kivymd.uix.boxlayout import MDBoxLayout
from kivymd.uix.button import MDIconButton
from kivymd.uix.tooltip import MDTooltip
from kivymd.uix.tooltip import MDTooltip
from Utility.observer import Observer
class NavRail(MDNavigationRail):
screen=ObjectProperty()
screen = ObjectProperty()
class SearchBar(MDBoxLayout):
screen=ObjectProperty()
screen = ObjectProperty()
class Tooltip(MDTooltip):
pass
class TooltipMDIconButton(Tooltip,MDIconButton):
class TooltipMDIconButton(Tooltip, MDIconButton):
tooltip_text = StringProperty()
@@ -61,6 +60,8 @@ class BaseScreenView(MDScreen, Observer):
super().__init__(**kw)
# Often you need to get access to the application object from the view
# class. You can do this using this attribute.
self.app = MDApp.get_running_app()
from main import AniXStreamApp
self.app: AniXStreamApp = MDApp.get_running_app() # type: ignore
# Adding a view class as observer.
self.model.add_observer(self)

View File

@@ -146,21 +146,21 @@
# footer
PopupBoxLayout:
orientation:"vertical"
SingleLineLabel:
font_style:"Label"
markup:True
role:"small"
text: f"[color={get_hex_from_color(self.theme_cls.primaryColor)}]"+"Next Airing Episode: "+"[/color]"+root.caller.next_airing_episode
SingleLineLabel:
font_style:"Label"
role:"small"
markup:True
text: f"[color={get_hex_from_color(self.theme_cls.primaryColor)}]"+"Author: "+"[/color]"+root.caller.author
text: f"[color={get_hex_from_color(self.theme_cls.primaryColor)}]"+"Studios: " + "[/color]"+root.caller.studios
SingleLineLabel:
font_style:"Label"
markup:True
role:"small"
text: f"[color={get_hex_from_color(self.theme_cls.primaryColor)}]"+"Studios: "+"[/color]"+root.caller.studios
SingleLineLabel:
font_style:"Label"
markup:True
role:"small"
text: f"[color={get_hex_from_color(self.theme_cls.primaryColor)}]"+"Characters: "+"[/color]"+root.caller.characters
text: f"[color={get_hex_from_color(self.theme_cls.primaryColor)}]"+"Producers: " + "[/color]"+root.caller.producers
SingleLineLabel:
font_style:"Label"
markup:True

View File

@@ -1,4 +1,10 @@
from kivy.properties import ObjectProperty,StringProperty,BooleanProperty,ListProperty,NumericProperty
from kivy.properties import (
ObjectProperty,
StringProperty,
BooleanProperty,
ListProperty,
NumericProperty,
)
from kivy.clock import Clock
from kivy.uix.behaviors import ButtonBehavior
@@ -7,7 +13,8 @@ from kivymd.uix.behaviors import HoverBehavior
from .components import MediaPopup
class MediaCard(ButtonBehavior,HoverBehavior,MDBoxLayout):
class MediaCard(ButtonBehavior, HoverBehavior, MDBoxLayout):
screen = ObjectProperty()
anime_id = NumericProperty()
title = StringProperty()
@@ -22,17 +29,16 @@ class MediaCard(ButtonBehavior,HoverBehavior,MDBoxLayout):
genres = StringProperty()
first_aired_on = StringProperty()
description = StringProperty()
author = StringProperty()
producers = StringProperty()
studios = StringProperty()
characters = StringProperty()
next_airing_episode = StringProperty()
tags = StringProperty()
stars = ListProperty([0,0,0,0,0,0])
stars = ListProperty([0, 0, 0, 0, 0, 0])
cover_image_url = StringProperty()
preview_image = StringProperty()
has_trailer_color = ListProperty([1,1,1,0])
has_trailer_color = ListProperty([1, 1, 1, 0])
def __init__(self,trailer_url=None,**kwargs):
def __init__(self, trailer_url=None, **kwargs):
super().__init__(**kwargs)
self.orientation = "vertical"
@@ -40,49 +46,46 @@ class MediaCard(ButtonBehavior,HoverBehavior,MDBoxLayout):
self.trailer_url = trailer_url
self.adaptive_size = True
# self.app = MDApp.get_running_app()
# def on_screen_name(self,instance,value):
# if self.app:
# self.screen = self.app.manager_screens.get_screen(value)
# def
def on_enter(self):
def _open_popup(dt):
if self.hovering:
window = self.get_parent_window()
if window:
for widget in window.children: # type: ignore
if isinstance(widget,MediaPopup):
for widget in window.children: # type: ignore
if isinstance(widget, MediaPopup):
return
self.open()
Clock.schedule_once(_open_popup,5)
def on_popup_open(self,popup:MediaPopup):
Clock.schedule_once(_open_popup, 5)
def on_popup_open(self, popup: MediaPopup):
popup.center = self.center
def on_dismiss(self,popup:MediaPopup):
def on_dismiss(self, popup: MediaPopup):
popup.player.unload()
def set_preview_image(self,image):
def set_preview_image(self, image):
self.preview_image = image
def set_trailer_url(self,trailer_url):
def set_trailer_url(self, trailer_url):
self.trailer_url = trailer_url
self.has_trailer_color = self.theme_cls.primaryColor
def open(self,*_):
def open(self, *_):
popup = MediaPopup(self)
popup.title = self.title
popup.bind(on_dismiss=self.on_dismiss,on_open=self.on_popup_open)
popup.bind(on_dismiss=self.on_dismiss, on_open=self.on_popup_open)
popup.open(self)
# ---------------respond to user actions and call appropriate model-------------------------
def on_is_in_my_list(self,instance,in_user_anime_list):
def on_is_in_my_list(self, instance, in_user_anime_list):
if self.screen:
if in_user_anime_list:
self.screen.app.add_anime_to_user_anime_list(self.anime_id)
else:
self.screen.app.remove_anime_from_user_anime_list(self.anime_id)
def on_trailer_url(self,*args):
def on_trailer_url(self, *args):
pass

View File

@@ -1,5 +1,21 @@
from Controller import (SearchScreenController,HomeScreenController,MyListScreenController,AnimeScreenController,DownloadsScreenController,HelpScreenController,CrashLogScreenController)
from Model import (HomeScreenModel,SearchScreenModel,MyListScreenModel,AnimeScreenModel,DownloadsScreenModel,HelpScreenModel,CrashLogScreenModel)
from Controller import (
SearchScreenController,
HomeScreenController,
MyListScreenController,
AnimeScreenController,
DownloadsScreenController,
HelpScreenController,
CrashLogScreenController,
)
from Model import (
HomeScreenModel,
SearchScreenModel,
MyListScreenModel,
AnimeScreenModel,
DownloadsScreenModel,
HelpScreenModel,
CrashLogScreenModel,
)
screens = {
@@ -31,4 +47,4 @@ screens = {
"model": HelpScreenModel,
"controller": HelpScreenController,
},
}
}

0
app/__init__.py Normal file
View File

View File

@@ -1 +1,6 @@
"""
This module contains an abstraction for interaction with the anilist api making it easy and efficient
"""
from .anilist import AniList
from .anilist_data_schema import AnilistBaseMediaDataSchema

View File

@@ -1,3 +1,8 @@
"""
This is the core module availing all the abstractions of the anilist api
"""
import requests
from .queries_graphql import (
most_favourite_query,
most_recently_updated_query,
@@ -12,34 +17,46 @@ from .queries_graphql import (
upcoming_anime_query,
anime_query
)
import requests
from .anilist_data_schema import AnilistDataSchema
# from kivy.network.urlrequest import UrlRequestRequests
class AniList:
"""
This class provides an abstraction for the anilist api
"""
@classmethod
def get_data(cls,query:str,variables:dict = {})->tuple[bool,dict]:
def get_data(cls,query:str,variables:dict = {})->tuple[bool,AnilistDataSchema]:
"""
The core abstraction for getting data from the anilist api
Parameters:
----------
query:str
a valid anilist graphql query
variables:dict
variables to pass to the anilist api
"""
url = "https://graphql.anilist.co"
# req=UrlRequestRequests(url, cls.got_data,)
try:
# TODO: check if data is as expected
response = requests.post(url,json={"query":query,"variables":variables},timeout=5)
return (True,response.json())
anilist_data:AnilistDataSchema = response.json()
return (True,anilist_data)
except requests.exceptions.Timeout:
return (False,{"Error":"Timeout Exceeded for connection there might be a problem with your internet or anilist is down."})
return (False,{"Error":"Timeout Exceeded for connection there might be a problem with your internet or anilist is down."}) # type: ignore
except requests.exceptions.ConnectionError:
return (False,{"Error":"There might be a problem with your internet or anilist is down."})
return (False,{"Error":"There might be a problem with your internet or anilist is down."}) # type: ignore
except Exception as e:
return (False,{"Error":f"{e}"})
return (False,{"Error":f"{e}"}) # type: ignore
@classmethod
def got_data(cls):
pass
@classmethod
def search(cls,
query:str|None=None,
sort:list[str]|None=None,
genre_in:list[str]|None=None,
id_in:list[int]|None=None,
genre_not_in:list[str]|None=None,
genre_not_in:list[str]=["hentai"],
popularity_greater:int|None=None,
popularity_lesser:int|None=None,
averageScore_greater:int|None=None,
@@ -54,8 +71,10 @@ class AniList:
start_greater:int|None=None,
start_lesser:int|None=None,
page:int|None=None
)->tuple[bool,dict]:
):
"""
A powerful method for searching anime using the anilist api availing most of its options
"""
variables = {}
for key, val in list(locals().items())[1:]:
if val is not None and key not in ["variables"]:
@@ -64,87 +83,86 @@ class AniList:
return search_results
@classmethod
def get_anime(cls,id:int)->tuple[bool,dict]:
def get_anime(cls,id:int):
"""
Gets a single anime by a valid anilist anime id
"""
variables = {
"id":id
}
return cls.get_data(anime_query,variables)
@classmethod
def get_trending(cls)->tuple[bool,dict]:
def get_trending(cls):
"""
Gets the currently trending anime
"""
trending = cls.get_data(trending_query)
return trending
@classmethod
def get_most_favourite(cls)->tuple[bool,dict]:
def get_most_favourite(cls):
"""
Gets the most favoured anime on anilist
"""
most_favourite = cls.get_data(most_favourite_query)
return most_favourite
@classmethod
def get_most_scored(cls)->tuple[bool,dict]:
def get_most_scored(cls):
"""
Gets most scored anime on anilist
"""
most_scored = cls.get_data(most_scored_query)
return most_scored
@classmethod
def get_most_recently_updated(cls)->tuple[bool,dict]:
def get_most_recently_updated(cls):
"""
Gets most recently updated anime from anilist
"""
most_recently_updated = cls.get_data(most_recently_updated_query)
return most_recently_updated
@classmethod
def get_most_popular(cls)->tuple[bool,dict]:
def get_most_popular(cls):
"""
Gets most popular anime on anilist
"""
most_popular = cls.get_data(most_popular_query)
return most_popular
# FIXME:dont know why its not giving useful data
@classmethod
def get_recommended_anime_for(cls,id:int)->tuple[bool,dict]:
def get_recommended_anime_for(cls,id:int):
recommended_anime = cls.get_data(recommended_query)
return recommended_anime
@classmethod
def get_charcters_of(cls,id:int)->tuple[bool,dict]:
def get_charcters_of(cls,id:int):
variables = {"id":id}
characters = cls.get_data(anime_characters_query,variables)
return characters
@classmethod
def get_related_anime_for(cls,id:int)->tuple[bool,dict]:
def get_related_anime_for(cls,id:int):
variables = {"id":id}
related_anime = cls.get_data(anime_relations_query,variables)
return related_anime
@classmethod
def get_airing_schedule_for(cls,id:int)->tuple[bool,dict]:
def get_airing_schedule_for(cls,id:int):
variables = {"id":id}
airing_schedule = cls.get_data(airing_schedule_query,variables)
return airing_schedule
@classmethod
def get_upcoming_anime(cls,page:int)->tuple[bool,dict]:
def get_upcoming_anime(cls,page:int):
"""
Gets upcoming anime from anilist
"""
variables = {"page":page}
upcoming_anime = cls.get_data(upcoming_anime_query,variables)
return upcoming_anime
if __name__ == "__main__":
import json
# data = AniList.get_most_popular()
# data = AniList.get_most_favourite()
# data = AniList.get_most_recently_updated()
# data = AniList.get_trending()
# data = AniList.get_most_scored()
# term = input("enter term: ")
# data = AniList.search(query="Ninja")+
# data = AniList.get_anime(1)
data = AniList.search(query="one",status="RELEASING")
print(data)
# data = AniList.get_recommended_anime_for(21)
# data = AniList.get_related_anime_for(21)
# data = AniList.get_airing_schedule_for(21)
# data = AniList.get_upcoming_anime(1)
if data[0]:
with open("search.json","w") as file:
json.dump(data[1],file)
else:
print(data)

View File

@@ -0,0 +1,156 @@
"""
This module defines the shape of the anilist data that can be received in order to enhance dev experience
"""
from typing import TypedDict
class AnilistMediaTitle(TypedDict):
english:str
romaji:str
class AnilistImage(TypedDict):
medium:str
extraLarge:str
small:str
large:str
class AnilistMediaTrailer(TypedDict):
id:str
site:str
class AnilistStudio(TypedDict):
name:str
favourites:int
isAnimationStudio:bool
class AnilistStudioNodes(TypedDict):
nodes:list[AnilistStudio]
class AnilistMediaTag(TypedDict):
name:str
rank:int
class AnilistDateObject(TypedDict):
day:int
month:int
year:int
class AnilistMediaNextAiringEpisode(TypedDict):
timeUntilAiring:int
airingAt:int
episode:int
class AnilistUser(TypedDict):
name:str
avatar:AnilistImage
class AnilistReview(TypedDict):
summary:str
user:AnilistUser
class AnilistReviewNodes(TypedDict):
nodes:list[AnilistReview]
class AnilistMediaRanking(TypedDict):
rank:int
context:str
class AnilistExternalLink(TypedDict):
url:str
site:str
icon:str
class AnilistName(TypedDict):
full:str
class AnilistCharacter(TypedDict):
name:AnilistName
gender:str|None
dateOfBirth:AnilistDateObject|None
age:int
image:AnilistImage
description:str
# class AnilistCharacterNode(TypedDict):
# node
class AnilistVoiceActor(TypedDict):
name:AnilistName
image:AnilistImage
class AnilistCharactersEdge(TypedDict):
node:list[AnilistCharacter]
voiceActors:list[AnilistVoiceActor]
class AnilistCharactersEdges(TypedDict):
edges:list[AnilistCharactersEdge]
class AnilistBaseMediaDataSchema(TypedDict):
"""
This a convenience class is used to type the received Anilist data to enhance dev experience
"""
id:str
title:AnilistMediaTitle
coverImage:AnilistImage
trailer:AnilistMediaTrailer|None
popularity:int
favourites:int
averageScore:int
genres:list[str]
episodes:int|None
description:str|None
studios:AnilistStudioNodes
tags:list[AnilistMediaTag]
startDate:AnilistDateObject
endDate:AnilistDateObject
status:str
nextAiringEpisode:AnilistMediaNextAiringEpisode
season:str
seasonYear:int
duration:int
synonyms:list[str]
countryOfOrigin:str
source:str
hashtag:str|None
siteUrl:str
reviews:AnilistReviewNodes
bannerImage:str|None
rankings:list[AnilistMediaRanking]
externalLinks:list[AnilistExternalLink]
characters:AnilistCharactersEdges
format:str
class AnilistPageInfo(TypedDict):
total:int
perPage:int
currentPage:int
hasNextPage:bool
class AnilistPage(TypedDict):
media:list[AnilistBaseMediaDataSchema]
pageInfo:AnilistPageInfo
class AnilistPages(TypedDict):
Page: AnilistPage
class AnilistDataSchema(TypedDict):
data:AnilistPages
Error:str

View File

@@ -1,3 +1,8 @@
"""
This module contains all the preset queries for the sake of neatness and convinience
Mostly for internal usage
"""
optional_variables = "\
$page:Int,\
$sort:[MediaSort],\
@@ -20,7 +25,8 @@ $endDate_lesser:FuzzyDateInt\
"
# FuzzyDateInt = (yyyymmdd)
# MediaStatus = (FINISHED,RELEASING,NOT_YET_RELEASED,CANCELLED,HIATUS)
search_query = """
search_query = (
"""
query($query:String,%s){
Page(perPage:30,page:$page){
pageInfo{
@@ -71,7 +77,7 @@ query($query:String,%s){
studios{
nodes{
name
favourites
isAnimationStudio
}
}
tags {
@@ -97,13 +103,15 @@ query($query:String,%s){
}
}
}
""" % optional_variables
"""
% optional_variables
)
trending_query = """
query{
Page(perPage:15){
media(sort:TRENDING_DESC,type:ANIME){
media(sort:TRENDING_DESC,type:ANIME,genre_not_in:["hentai"]){
id
title{
romaji
@@ -125,7 +133,7 @@ query{
studios {
nodes {
name
favourites
isAnimationStudio
}
}
tags {
@@ -156,7 +164,7 @@ query{
most_favourite_query = """
query{
Page(perPage:15){
media(sort:FAVOURITES_DESC,type:ANIME){
media(sort:FAVOURITES_DESC,type:ANIME,genre_not_in:["hentai"]){
id
title{
romaji
@@ -179,7 +187,7 @@ query{
studios {
nodes {
name
favourites
isAnimationStudio
}
}
tags {
@@ -209,7 +217,7 @@ query{
most_scored_query = """
query{
Page(perPage:15){
media(sort:SCORE_DESC,type:ANIME){
media(sort:SCORE_DESC,type:ANIME,genre_not_in:["hentai"]){
id
title{
romaji
@@ -232,7 +240,7 @@ query{
studios {
nodes {
name
favourites
isAnimationStudio
}
}
tags {
@@ -262,7 +270,7 @@ query{
most_popular_query = """
query{
Page(perPage:15){
media(sort:POPULARITY_DESC,type:ANIME){
media(sort:POPULARITY_DESC,type:ANIME,genre_not_in:["hentai"]){
id
title{
romaji
@@ -285,7 +293,7 @@ query{
studios {
nodes {
name
favourites
isAnimationStudio
}
}
tags {
@@ -314,7 +322,7 @@ query{
most_recently_updated_query = """
query{
Page(perPage:15){
Page(perPage:15,genre_not_in:["hentai"]){
media(sort:UPDATED_AT_DESC,type:ANIME,averageScore_greater:50){
id
title{
@@ -337,7 +345,7 @@ query{
studios {
nodes {
name
favourites
isAnimationStudio
}
}
tags {
@@ -367,7 +375,7 @@ query{
recommended_query = """
query {
Page(perPage:15) {
media( type: ANIME) {
media( type: ANIME,genre_not_in:["hentai"]) {
recommendations(sort:RATING_DESC){
nodes{
media{
@@ -455,7 +463,7 @@ query($id:Int){
anime_relations_query = """
query ($id: Int) {
Page(perPage: 20) {
media(id: $id, sort: POPULARITY_DESC, type: ANIME) {
media(id: $id, sort: POPULARITY_DESC, type: ANIME,genre_not_in:["hentai"]) {
relations {
nodes {
id
@@ -529,7 +537,7 @@ query ($page: Int) {
currentPage
hasNextPage
}
media(type: ANIME, status: NOT_YET_RELEASED,sort:POPULARITY_DESC) {
media(type: ANIME, status: NOT_YET_RELEASED,sort:POPULARITY_DESC,genre_not_in:["hentai"]) {
id
title {
romaji
@@ -551,7 +559,7 @@ query ($page: Int) {
studios {
nodes {
name
favourites
isAnimationStudio
}
}
tags {
@@ -704,4 +712,4 @@ query($id:Int){
}
"""
# print(search_query)
# print(search_query)

View File

@@ -23,7 +23,7 @@ class AnimdlApi:
return run([py_path,"-m", "animdl", *cmds])
@classmethod
def run_custom_command(cls,*cmds:tuple[str])->Popen:
def run_custom_command(cls,cmds:list[str])->Popen|None:
"""
Runs an AnimDl custom command with the full power of animdl and returns a subprocess(popen) for full control
"""
@@ -33,14 +33,17 @@ class AnimdlApi:
if py_path:=shutil.which("python"):
base_cmds = [py_path,"-m","animdl"]
child_process = Popen([*base_cmds,*parsed_cmds])
cmds_ = [*base_cmds,*parsed_cmds]
child_process = Popen(cmds_)
return child_process
else:
return None
@classmethod
def stream_anime_by_title(cls,title,episodes_range=None):
def stream_anime_by_title(cls,title,episodes_range=None)->Popen|None:
anime = cls.get_anime_url_by_title(title)
if not anime:
return False
return None
if py_path:=shutil.which("python"):
base_cmds = [py_path,"-m", "animdl","stream",anime[1]]
cmd = [*base_cmds,"-r",episodes_range] if episodes_range else base_cmds
@@ -140,7 +143,7 @@ class AnimdlApi:
process = Popen([mpv,url,f"--stream-dump={output_path}"],stderr=PIPE,text=True,stdout=PIPE)
progress_regex = re.compile(r"\d+/\d+") # eg Dumping 2044776/125359745
for stream in process.stderr:
for stream in process.stderr: # type: ignore
if matches:=progress_regex.findall(stream):
current_bytes,total_bytes = [float(val) for val in matches[0].split("/")]
on_progress(current_bytes,total_bytes)
@@ -212,7 +215,7 @@ class AnimdlApi:
try:
cmd = ["grab",anime_url,"-r",episodes_range] if episodes_range else ["grab",anime_url]
result = cls.run_animdl_command(cmd)
return [json.loads(episode.strip()) for episode in result.stdout.strip().split("\n")]
return [json.loads(episode.strip()) for episode in result.stdout.strip().split("\n")] # type: ignore
except:
return None
@@ -228,8 +231,8 @@ class AnimdlApi:
return all(char.isspace() for char in input_string)
@classmethod
def output_parser(cls,result_of_cmd:str):
data = result_of_cmd.stderr.split("\n")[3:]
def output_parser(cls,result_of_cmd):
data = result_of_cmd.stderr.split("\n")[3:] # type: ignore
parsed_data = {}
pass_next = False
for i,data_item in enumerate(data[:]):

View File

@@ -1,5 +1,5 @@
import os
os.environ["KIVY_VIDEO"] = "ffpyplayer"
from queue import Queue
@@ -10,27 +10,32 @@ import webbrowser
import plyer
from kivy.config import Config
# Config.set('kivy', 'window_icon', "logo.ico")
# Config.write()
from kivy.loader import Loader
Loader.num_workers = 5
Loader.max_upload_per_frame = 5
from kivy.clock import Clock
from kivy.logger import Logger
from kivy.uix.screenmanager import ScreenManager,FadeTransition
from kivy.uix.settings import SettingsWithSidebar,Settings
from kivy.uix.screenmanager import ScreenManager, FadeTransition
from kivy.uix.settings import SettingsWithSidebar, Settings
from kivymd.icon_definitions import md_icons
from kivymd.app import MDApp
from View.screens import screens
from libs.animdl.animdl_api import AnimdlApi
from Utility import themes_available,show_notification,user_data_helper
from Utility import themes_available, show_notification, user_data_helper
# Ensure the user data fields exist
if not(user_data_helper.user_data.exists("user_anime_list")):
if not (user_data_helper.user_data.exists("user_anime_list")):
user_data_helper.update_user_anime_list([])
if not(user_data_helper.yt_cache.exists("yt_stream_links")):
if not (user_data_helper.yt_cache.exists("yt_stream_links")):
user_data_helper.update_anime_trailer_cache([])
@@ -38,18 +43,17 @@ if not(user_data_helper.yt_cache.exists("yt_stream_links")):
class AniXStreamApp(MDApp):
queue = Queue()
downloads_queue = Queue()
animdl_streaming_subprocess:Popen|None = None
animdl_streaming_subprocess: Popen | None = None
def worker(self,queue:Queue):
def worker(self, queue: Queue):
while True:
task = queue.get() # task should be a function
task = queue.get() # task should be a function
task()
self.queue.task_done()
def downloads_worker(self,queue:Queue):
def downloads_worker(self, queue: Queue):
while True:
download_task = queue.get() # task should be a function
download_task = queue.get() # task should be a function
download_task()
self.downloads_queue.task_done()
@@ -62,13 +66,15 @@ class AniXStreamApp(MDApp):
self.manager_screens.transition = FadeTransition()
# initialize worker
self.worker_thread = Thread(target=self.worker,args=(self.queue,))
self.worker_thread = Thread(target=self.worker, args=(self.queue,))
self.worker_thread.daemon = True
self.worker_thread.start()
Logger.info("AniXStream:Successfully started background tasks worker")
# initialize downloads worker
self.downloads_worker_thread = Thread(target=self.downloads_worker,args=(self.downloads_queue,))
self.downloads_worker_thread = Thread(
target=self.downloads_worker, args=(self.downloads_queue,)
)
self.downloads_worker_thread.daemon = True
self.downloads_worker_thread.start()
Logger.info("AniXStream:Successfully started download worker")
@@ -78,18 +84,18 @@ class AniXStreamApp(MDApp):
self.generate_application_screens()
if config:=self.config:
if theme_color:=config.get("Preferences","theme_color"):
if config := self.config:
if theme_color := config.get("Preferences", "theme_color"):
self.theme_cls.primary_palette = theme_color
if theme_style:=config.get("Preferences","theme_style"):
if theme_style := config.get("Preferences", "theme_style"):
self.theme_cls.theme_style = theme_style
self.anime_screen = self.manager_screens.get_screen("anime screen")
self.search_screen = self.manager_screens.get_screen("search screen")
self.download_screen = self.manager_screens.get_screen("downloads screen")
return self.manager_screens
def on_start(self,*args):
def on_start(self, *args):
pass
def generate_application_screens(self) -> None:
@@ -102,108 +108,160 @@ class AniXStreamApp(MDApp):
self.manager_screens.add_widget(view)
def build_config(self, config):
config.setdefaults('Preferences', {
'theme_color': 'Cyan',
"theme_style": "Dark",
"downloads_dir": plyer.storagepath.get_videos_dir() if plyer.storagepath.get_videos_dir() else ".",
"is_startup_anime_enable":False
})
config.setdefaults(
"Preferences",
{
"theme_color": "Cyan",
"theme_style": "Dark",
"downloads_dir": plyer.storagepath.get_videos_dir() if plyer.storagepath.get_videos_dir() else ".", # type: ignore
"is_startup_anime_enable": False,
},
)
def build_settings(self, settings: Settings):
settings.add_json_panel("Settings", self.config, "settings.json")
def build_settings(self,settings:Settings):
settings.add_json_panel("Settings",self.config,"settings.json")
def on_config_change(self, config, section, key, value):
if section=="Preferences":
if section == "Preferences":
match key:
case "theme_color":
if value in themes_available:
self.theme_cls.primary_palette = value
else:
Logger.warning("Settings:An invalid theme has been entered and will be ignored")
config.set("Preferences","theme_color","Cyan")
Logger.warning(
"Settings:An invalid theme has been entered and will be ignored"
)
config.set("Preferences", "theme_color", "Cyan")
config.write()
case "theme_style":
self.theme_cls.theme_style = value
def on_stop(self):
if self.animdl_streaming_subprocess:
self.animdl_streaming_subprocess.terminate()
Logger.info("Animdl:Successfully terminated existing animdl subprocess")
# custom methods
# TODO: may move theme to a personalized class
def search_for_anime(self,search_field,**kwargs):
# custom methods
def search_for_anime(self, search_field, **kwargs):
if self.manager_screens.current != "search screen":
self.manager_screens.current = "search screen"
self.search_screen.handle_search_for_anime(search_field,**kwargs)
self.search_screen.handle_search_for_anime(search_field, **kwargs)
def add_anime_to_user_anime_list(self,id:int):
def add_anime_to_user_anime_list(self, id: int):
updated_list = user_data_helper.get_user_anime_list()
updated_list.append(id)
user_data_helper.update_user_anime_list(updated_list)
def remove_anime_from_user_anime_list(self,id:int):
def remove_anime_from_user_anime_list(self, id: int):
updated_list = user_data_helper.get_user_anime_list()
if updated_list.count(id): updated_list.remove(id)
if updated_list.count(id):
updated_list.remove(id)
user_data_helper.update_user_anime_list(updated_list)
def add_anime_to_user_downloads_list(self,id:int):
def add_anime_to_user_downloads_list(self, id: int):
updated_list = user_data_helper.get_user_downloads()
updated_list.append(id)
user_data_helper.get_user_downloads(updated_list)
user_data_helper.get_user_downloads()
def show_anime_screen(self,id:int,caller_screen_name:str):
def show_anime_screen(self, id: int, caller_screen_name: str):
self.manager_screens.current = "anime screen"
self.anime_screen.controller.update_anime_view(id,caller_screen_name)
self.anime_screen.controller.update_anime_view(id, caller_screen_name)
def watch_on_allanime(self,title_):
def download_anime_complete(
self, successful_downloads: list, failed_downloads: list, anime_title: str
):
show_notification(
f"Finished Dowloading {anime_title}",
f"There were {len(successful_downloads)} successful downloads and {len(failed_downloads)} failed downloads",
)
Logger.info(
f"Downloader:Finished Downloading {anime_title} and there were {len(failed_downloads)} failed downloads"
)
def download_anime(self, anime_id: int, default_cmds: dict):
show_notification(
"New Download Task Queued",
f"{default_cmds.get('title')} has been queued for downloading",
)
self.add_anime_to_user_downloads_list(anime_id)
# TODO:Add custom download cmds functionality
on_progress = lambda *args: self.download_screen.on_episode_download_progress(
*args
)
output_path = self.config.get("Preferences", "downloads_dir") # type: ignore
if episodes_range := default_cmds.get("episodes_range"):
download_task = lambda: AnimdlApi.download_anime_by_title(
default_cmds["title"],
on_progress,
self.download_anime_complete,
output_path,
episodes_range,
) # ,default_cmds["quality"]
self.downloads_queue.put(download_task)
Logger.info(
f"Downloader:Successfully Queued {default_cmds['title']} for downloading"
)
else:
download_task = lambda: AnimdlApi.download_anime_by_title(
default_cmds["title"],
on_progress,
self.download_anime_complete,
output_path,
) # ,default_cmds.get("quality")
self.downloads_queue.put(download_task)
def watch_on_allanime(self, title_):
"""
Opens the given anime in your default browser on allanimes site
Parameters:
----------
title_: The anime title requested to be opened
"""
if anime:=AnimdlApi.get_anime_url_by_title(title_):
title,link = anime
if anime := AnimdlApi.get_anime_url_by_title(title_):
title, link = anime
parsed_link = f"https://allmanga.to/bangumi/{link.split('/')[-1]}"
else:
Logger.error(f"AniXStream:Failed to open {title} in browser on allanime site")
show_notification("Failure",f"Failed to open {title} in browser on allanime site")
Logger.error(
f"AniXStream:Failed to open {title_} in browser on allanime site"
)
show_notification(
"Failure", f"Failed to open {title_} in browser on allanime site"
)
if webbrowser.open(parsed_link):
Logger.info(f"AniXStream:Successfully opened {title} in browser allanime site")
show_notification("Success",f"Successfully opened {title} in browser allanime site")
Logger.info(
f"AniXStream:Successfully opened {title} in browser allanime site"
)
show_notification(
"Success", f"Successfully opened {title} in browser allanime site"
)
else:
Logger.error(f"AniXStream:Failed to open {title} in browser on allanime site")
show_notification("Failure",f"Failed to open {title} in browser on allanime site")
Logger.error(
f"AniXStream:Failed to open {title} in browser on allanime site"
)
show_notification(
"Failure", f"Failed to open {title} in browser on allanime site"
)
def download_anime_complete(self,successful_downloads:list,failed_downloads:list,anime_title:str):
show_notification(f"Finished Dowloading {anime_title}",f"There were {len(successful_downloads)} successful downloads and {len(failed_downloads)} failed downloads")
Logger.info(f"Downloader:Finished Downloading {anime_title} and there were {len(failed_downloads)} failed downloads")
def stream_anime_with_custom_input_cmds(self, *cmds):
self.animdl_streaming_subprocess = AnimdlApi.run_custom_command(
["stream", *cmds]
)
def download_anime(self,anime_id:int,default_cmds:dict):
show_notification("New Download Task Queued",f"{default_cmds.get('title')} has been queued for downloading")
def stream_anime_by_title_with_animdl(
self, title, episodes_range: str | None = None
):
self.animdl_streaming_subprocess = AnimdlApi.stream_anime_by_title(
title, episodes_range
)
self.add_anime_to_user_downloads_list(anime_id)
# TODO:Add custom download cmds functionality
on_progress = lambda *args:self.download_screen.on_episode_download_progress(*args)
output_path = self.config.get("Preferences","downloads_dir")
if episodes_range:=default_cmds.get("episodes_range"):
download_task =lambda: AnimdlApi.download_anime_by_title(default_cmds['title'],on_progress,self.download_anime_complete,output_path,episodes_range) # ,default_cmds["quality"]
self.downloads_queue.put(download_task)
Logger.info(f"Downloader:Successfully Queued {default_cmds['title']} for downloading")
else:
download_task =lambda: AnimdlApi.download_anime_by_title(default_cmds["title"],on_progress,self.download_anime_complete,output_path) # ,default_cmds.get("quality")
self.downloads_queue.put(download_task)
def stream_anime_with_custom_input_cmds(self,*cmds):
self.animdl_streaming_subprocess = AnimdlApi.run_custom_command("stream",*cmds)
def stream_anime_by_title_with_animdl(self,title,episodes_range:str|None=None):
self.animdl_streaming_subprocess = AnimdlApi.stream_anime_by_title(title,episodes_range)
# self.stop_streaming = False
def watch_on_animdl(self,title_dict:dict|None=None,episodes_range:str|None=None,custom_options:tuple[str]|None=None):
def watch_on_animdl(
self,
title_dict: dict | None = None,
episodes_range: str | None = None,
custom_options: tuple[str] | None = None,
):
"""
Enables you to stream an anime using animdl either by parsing a title or custom animdl options
@@ -218,17 +276,24 @@ class AniXStreamApp(MDApp):
self.animdl_streaming_subprocess.terminate()
if title_dict:
if title:=title_dict.get("japanese"):
stream_func = lambda: self.stream_anime_by_title_with_animdl(title,episodes_range)
if title := title_dict.get("japanese"):
stream_func = lambda: self.stream_anime_by_title_with_animdl(
title, episodes_range
)
self.queue.put(stream_func)
elif title:=title_dict.get("english"):
stream_func = lambda:self.stream_anime_by_title_with_animdl(title,episodes_range)
Logger.info(f"Animdl:Successfully started to stream {title}")
elif title := title_dict.get("english"):
stream_func = lambda: self.stream_anime_by_title_with_animdl(
title, episodes_range
)
self.queue.put(stream_func)
Logger.info(f"Animdl:Successfully started to stream {title}")
else:
stream_func = lambda:self.stream_anime_with_custom_input_cmds(*custom_options)
stream_func = lambda: self.stream_anime_with_custom_input_cmds(
*custom_options
)
self.queue.put(stream_func)
Logger.info(f"Animdl:Successfully started to stream {title}")
if __name__ == "__main__":
AniXStreamApp().run()
AniXStreamApp().run()

View File

@@ -1 +1 @@
{"user_anime_list": {"user_anime_list": [116674, 21640, 17641, 117612, 269, 6702, 20657, 20626, 8247, 107226, 19163, 15583]}}
{"user_anime_list": {"user_anime_list": [166531, 21640, 269, 21519, 150672, 20626, 21, 9756, 9253, 6702, 20657, 11061, 8247, 116674, 21827, 107226, 19163, 15583, 21857, 17641, 117612, 21745, 21874, 104051, 5114]}}