chore: format with ruff

This commit is contained in:
Benexl
2025-07-27 12:49:44 +03:00
parent abe36296c1
commit 5b06039cef
16 changed files with 96 additions and 83 deletions

View File

@@ -237,9 +237,7 @@ def _create_local_recent_media_action(ctx: Context, state: State) -> MenuAction:
),
)
else:
ctx.feedback.info(
"No recently watched media found in local registry"
)
ctx.feedback.info("No recently watched media found in local registry")
return InternalDirective.RELOAD
return action

View File

@@ -2,7 +2,6 @@ import json
import logging
import os
import tempfile
from pathlib import Path
from .....core.constants import APP_CACHE_DIR, SCRIPTS_DIR
from .....libs.media_api.params import MediaSearchParams
@@ -30,20 +29,22 @@ def dynamic_search(ctx: Context, state: State) -> State | InternalDirective:
# Read the GraphQL search query
from .....libs.media_api.anilist import gql
search_query = gql.SEARCH_MEDIA.read_text(encoding="utf-8")
# Properly escape the GraphQL query for JSON
search_query_escaped = json.dumps(search_query)
# Prepare the search script
auth_header = ""
if ctx.media_api.is_authenticated() and hasattr(ctx.media_api, 'token'):
if ctx.media_api.is_authenticated() and hasattr(ctx.media_api, "token"):
auth_header = f"Bearer {ctx.media_api.token}"
# Create a temporary search script
with tempfile.NamedTemporaryFile(mode='w', suffix='.sh', delete=False) as temp_script:
with tempfile.NamedTemporaryFile(
mode="w", suffix=".sh", delete=False
) as temp_script:
script_content = SEARCH_TEMPLATE_SCRIPT
replacements = {
"GRAPHQL_ENDPOINT": "https://graphql.anilist.co",
"GRAPHQL_QUERY": search_query_escaped,
@@ -51,17 +52,17 @@ def dynamic_search(ctx: Context, state: State) -> State | InternalDirective:
"SEARCH_RESULTS_FILE": str(SEARCH_RESULTS_FILE),
"AUTH_HEADER": auth_header,
}
for key, value in replacements.items():
script_content = script_content.replace(f"{{{key}}}", str(value))
temp_script.write(script_content)
temp_script_path = temp_script.name
try:
# Make the script executable
os.chmod(temp_script_path, 0o755)
# Use the selector's search functionality
try:
# Prepare preview functionality
@@ -76,56 +77,69 @@ def dynamic_search(ctx: Context, state: State) -> State | InternalDirective:
prompt="Search Anime",
search_command=f"bash {temp_script_path} {{q}}",
preview=preview_command,
header="Type to search for anime dynamically"
header="Type to search for anime dynamically",
)
else:
choice = ctx.selector.search(
prompt="Search Anime",
search_command=f"bash {temp_script_path} {{q}}",
header="Type to search for anime dynamically"
header="Type to search for anime dynamically",
)
except NotImplementedError:
feedback.error("Dynamic search is not supported by your current selector")
feedback.info("Please use the regular search option or switch to fzf selector")
feedback.info(
"Please use the regular search option or switch to fzf selector"
)
return InternalDirective.MAIN
if not choice:
return InternalDirective.MAIN
# Read the cached search results
if not SEARCH_RESULTS_FILE.exists():
logger.error("Search results file not found")
return InternalDirective.MAIN
try:
with open(SEARCH_RESULTS_FILE, 'r', encoding='utf-8') as f:
with open(SEARCH_RESULTS_FILE, "r", encoding="utf-8") as f:
raw_data = json.load(f)
# Transform the raw data into MediaSearchResult
search_result = ctx.media_api.transform_raw_search_data(raw_data)
if not search_result or not search_result.media:
feedback.info("No results found")
return InternalDirective.MAIN
# Find the selected media item by matching the choice with the displayed format
selected_media = None
for media_item in search_result.media:
title = media_item.title.english or media_item.title.romaji or media_item.title.native or "Unknown"
year = media_item.start_date.year if media_item.start_date else "Unknown"
title = (
media_item.title.english
or media_item.title.romaji
or media_item.title.native
or "Unknown"
)
year = (
media_item.start_date.year if media_item.start_date else "Unknown"
)
status = media_item.status.value if media_item.status else "Unknown"
genres = ", ".join([genre.value for genre in media_item.genres[:3]]) if media_item.genres else "Unknown"
genres = (
", ".join([genre.value for genre in media_item.genres[:3]])
if media_item.genres
else "Unknown"
)
display_format = f"{title} ({year}) [{status}] - {genres}"
if choice.strip() == display_format.strip():
selected_media = media_item
break
if not selected_media:
logger.error(f"Could not find selected media for choice: {choice}")
return InternalDirective.MAIN
# Navigate to media actions with the selected item
return State(
menu_name=MenuName.MEDIA_ACTIONS,
@@ -136,12 +150,12 @@ def dynamic_search(ctx: Context, state: State) -> State | InternalDirective:
page_info=search_result.page_info,
),
)
except (json.JSONDecodeError, KeyError, Exception) as e:
logger.error(f"Error processing search results: {e}")
feedback.error("Failed to process search results")
return InternalDirective.MAIN
finally:
# Clean up the temporary script
try:

View File

@@ -39,7 +39,9 @@ def main(ctx: Context, state: State) -> State | InternalDirective:
ctx, state, UserMediaListStatus.PLANNING
),
f"{'🔎 ' if icons else ''}Search": _create_search_media_list(ctx, state),
f"{'🔍 ' if icons else ''}Dynamic Search": _create_dynamic_search_action(ctx, state),
f"{'🔍 ' if icons else ''}Dynamic Search": _create_dynamic_search_action(
ctx, state
),
f"{'🏠 ' if icons else ''}Downloads": _create_downloads_action(ctx, state),
f"{'🔔 ' if icons else ''}Recently Updated": _create_media_list_action(
ctx, state, MediaSort.UPDATED_AT_DESC

View File

@@ -82,18 +82,17 @@ def servers(ctx: Context, state: State) -> State | InternalDirective:
# TODO: Refine implementation mpv ipc player
# Check if IPC player should be used and if we have the required data
if (config.mpv.use_ipc and
state.provider.anime and
provider_anime and
episode_number):
if (
config.mpv.use_ipc
and state.provider.anime
and provider_anime
and episode_number
):
# Get available episodes for current translation type
available_episodes = getattr(
provider_anime.episodes,
config.stream.translation_type,
[]
provider_anime.episodes, config.stream.translation_type, []
)
# Create player params with IPC dependencies for episode navigation
player_result = ctx.player.play(
PlayerParams(
@@ -109,7 +108,7 @@ def servers(ctx: Context, state: State) -> State | InternalDirective:
current_episode=episode_number,
current_anime_id=provider_anime.id,
current_anime_title=provider_anime.title,
current_translation_type=config.stream.translation_type
current_translation_type=config.stream.translation_type,
)
)
else: