Implement watch history management system with tracking and data models

- Added WatchHistoryManager for managing local watch history storage, including methods for adding, updating, removing, and retrieving entries.
- Introduced WatchHistoryTracker to automatically track episode viewing and progress updates.
- Created data models for watch history entries and overall history management, including serialization to and from JSON.
- Implemented comprehensive error handling and logging throughout the system.
- Developed a test script to validate the functionality of the watch history management system, covering basic operations and statistics.
This commit is contained in:
Benexl
2025-07-14 22:00:44 +03:00
parent 222c50b4b2
commit f8992d46dd
10 changed files with 1696 additions and 24 deletions

View File

@@ -35,33 +35,46 @@ def episodes(ctx: Context, state: State) -> State | ControlFlow:
chosen_episode: str | None = None
if config.stream.continue_from_watch_history and False:
progress = (
anilist_anime.user_status.progress
if anilist_anime.user_status and anilist_anime.user_status.progress
else 0
)
# Calculate the next episode based on progress
next_episode_num = str(progress + 1)
if next_episode_num in available_episodes:
click.echo(
f"[cyan]Continuing from history. Auto-selecting episode {next_episode_num}.[/cyan]"
if config.stream.continue_from_watch_history:
# Use our new watch history system
from ...utils.watch_history_tracker import get_continue_episode, track_episode_viewing
# Try to get continue episode from watch history
if config.stream.preferred_watch_history == "local":
chosen_episode = get_continue_episode(anilist_anime, available_episodes, prefer_history=True)
if chosen_episode:
click.echo(
f"[cyan]Continuing from local watch history. Auto-selecting episode {chosen_episode}.[/cyan]"
)
# Fallback to AniList progress if local history doesn't have info or preference is remote
if not chosen_episode and config.stream.preferred_watch_history == "remote":
progress = (
anilist_anime.user_status.progress
if anilist_anime.user_status and anilist_anime.user_status.progress
else 0
)
chosen_episode = next_episode_num
else:
# If the next episode isn't available, fall back to the last watched one
last_watched_num = str(progress)
if last_watched_num in available_episodes:
# Calculate the next episode based on progress
next_episode_num = str(progress + 1)
if next_episode_num in available_episodes:
click.echo(
f"[cyan]Next episode ({next_episode_num}) not found. Falling back to last watched episode {last_watched_num}.[/cyan]"
f"[cyan]Continuing from AniList history. Auto-selecting episode {next_episode_num}.[/cyan]"
)
chosen_episode = last_watched_num
chosen_episode = next_episode_num
else:
click.echo(
f"[yellow]Could not find episode based on your watch history. Please select manually.[/yellow]"
)
# If the next episode isn't available, fall back to the last watched one
last_watched_num = str(progress)
if last_watched_num in available_episodes:
click.echo(
f"[cyan]Next episode ({next_episode_num}) not found. Falling back to last watched episode {last_watched_num}.[/cyan]"
)
chosen_episode = last_watched_num
else:
click.echo(
f"[yellow]Could not find episode based on your watch history. Please select manually.[/yellow]"
)
if not chosen_episode:
choices = [*sorted(available_episodes, key=float), "Back"]
@@ -78,6 +91,15 @@ def episodes(ctx: Context, state: State) -> State | ControlFlow:
chosen_episode = chosen_episode_str
# Track episode selection in watch history (if enabled in config)
if config.stream.continue_from_watch_history and config.stream.preferred_watch_history == "local":
from ...utils.watch_history_tracker import track_episode_viewing
try:
episode_num = int(chosen_episode)
track_episode_viewing(anilist_anime, episode_num, start_tracking=True)
except (ValueError, AttributeError):
pass # Skip tracking if episode number is invalid
return State(
menu_name="SERVERS",
media_api=state.media_api,

View File

@@ -58,8 +58,10 @@ def main(ctx: Context, state: State) -> State | ControlFlow:
f"{'🔁 ' if icons else ''}Rewatching": _create_user_list_action(
ctx, "REPEATING"
),
# --- Local Watch History ---
f"{'📖 ' if icons else ''}Local Watch History": lambda: ("WATCH_HISTORY", None),
# --- Control Flow and Utility Options ---
f"{'<EFBFBD> ' if icons else ''}Session Management": lambda: ("SESSION_MANAGEMENT", None),
f"{'🔧 ' if icons else ''}Session Management": lambda: ("SESSION_MANAGEMENT", None),
f"{'<EFBFBD>📝 ' if icons else ''}Edit Config": lambda: ("RELOAD_CONFIG", None),
f"{'' if icons else ''}Exit": lambda: ("EXIT", None),
}

View File

@@ -36,6 +36,7 @@ def media_actions(ctx: Context, state: State) -> State | ControlFlow:
f"{'📼 ' if icons else ''}Watch Trailer": _watch_trailer(ctx, state),
f"{' ' if icons else ''}Add/Update List": _add_to_list(ctx, state),
f"{'' if icons else ''}Score Anime": _score_anime(ctx, state),
f"{'📚 ' if icons else ''}Add to Local History": _add_to_local_history(ctx, state),
f"{' ' if icons else ''}View Info": _view_info(ctx, state),
f"{'🔙 ' if icons else ''}Back to Results": lambda: ControlFlow.BACK,
}
@@ -218,3 +219,71 @@ def _update_user_list_with_feedback(
error_msg="Failed to update list entry",
show_loading=False,
)
def _add_to_local_history(ctx: Context, state: State) -> MenuAction:
"""Add anime to local watch history with status selection."""
def action() -> State | ControlFlow:
anime = state.media_api.anime
if not anime:
click.echo("[bold red]No anime data available.[/bold red]")
return ControlFlow.CONTINUE
feedback = create_feedback_manager(ctx.config.general.icons)
# Check if already in watch history
from ...utils.watch_history_manager import WatchHistoryManager
history_manager = WatchHistoryManager()
existing_entry = history_manager.get_entry(anime.id)
if existing_entry:
# Ask if user wants to update existing entry
if not feedback.confirm(f"'{existing_entry.get_display_title()}' is already in your local watch history. Update it?"):
return ControlFlow.CONTINUE
# Status selection
statuses = ["watching", "completed", "planning", "paused", "dropped"]
status_choices = [status.title() for status in statuses]
chosen_status = ctx.selector.choose(
"Select status for local watch history:",
choices=status_choices + ["Cancel"]
)
if not chosen_status or chosen_status == "Cancel":
return ControlFlow.CONTINUE
status = chosen_status.lower()
# Episode number if applicable
episode = 0
if status in ["watching", "completed"]:
if anime.episodes and anime.episodes > 1:
episode_str = ctx.selector.ask(f"Enter current episode (1-{anime.episodes}, default: 0):")
try:
episode = int(episode_str) if episode_str else 0
episode = max(0, min(episode, anime.episodes))
except ValueError:
episode = 0
# Mark as completed if status is completed
if status == "completed" and anime.episodes:
episode = anime.episodes
# Add to watch history
from ...utils.watch_history_tracker import watch_tracker
success = watch_tracker.add_anime_to_history(anime, status)
if success and episode > 0:
# Update episode progress
history_manager.mark_episode_watched(anime.id, episode, 1.0 if status == "completed" else 0.0)
if success:
feedback.success(f"Added '{anime.title.english or anime.title.romaji}' to local watch history with status: {status}")
else:
feedback.error("Failed to add anime to local watch history")
return ControlFlow.CONTINUE
return action

View File

@@ -83,6 +83,14 @@ def player_controls(ctx: Context, state: State) -> State | ControlFlow:
_update_progress_in_background(
ctx, anilist_anime.id, int(current_episode_num)
)
# Also update local watch history if enabled
if config.stream.continue_from_watch_history and config.stream.preferred_watch_history == "local":
from ...utils.watch_history_tracker import update_episode_progress
try:
update_episode_progress(anilist_anime.id, int(current_episode_num), completion_pct)
except (ValueError, AttributeError):
pass # Skip if episode number conversion fails
# --- Auto-Next Logic ---
available_episodes = getattr(
@@ -93,6 +101,15 @@ def player_controls(ctx: Context, state: State) -> State | ControlFlow:
if config.stream.auto_next and current_index < len(available_episodes) - 1:
console.print("[cyan]Auto-playing next episode...[/cyan]")
next_episode_num = available_episodes[current_index + 1]
# Track next episode in watch history
if config.stream.continue_from_watch_history and config.stream.preferred_watch_history == "local" and anilist_anime:
from ...utils.watch_history_tracker import track_episode_viewing
try:
track_episode_viewing(anilist_anime, int(next_episode_num), start_tracking=True)
except (ValueError, AttributeError):
pass
return State(
menu_name="SERVERS",
media_api=state.media_api,
@@ -105,6 +122,15 @@ def player_controls(ctx: Context, state: State) -> State | ControlFlow:
def next_episode() -> State | ControlFlow:
if current_index < len(available_episodes) - 1:
next_episode_num = available_episodes[current_index + 1]
# Track next episode in watch history
if config.stream.continue_from_watch_history and config.stream.preferred_watch_history == "local" and anilist_anime:
from ...utils.watch_history_tracker import track_episode_viewing
try:
track_episode_viewing(anilist_anime, int(next_episode_num), start_tracking=True)
except (ValueError, AttributeError):
pass
# Transition back to the SERVERS menu with the new episode number.
return State(
menu_name="SERVERS",

View File

@@ -0,0 +1,524 @@
"""
Watch History Management Menu for the interactive CLI.
Provides comprehensive watch history viewing, editing, and management capabilities.
"""
import logging
from pathlib import Path
from typing import Callable, Dict, List
from rich.console import Console
from rich.table import Table
from rich.text import Text
from ....core.constants import APP_DATA_DIR
from ...utils.feedback import create_feedback_manager
from ...utils.watch_history_manager import WatchHistoryManager
from ...utils.watch_history_types import WatchHistoryEntry
from ..session import Context, session
from ..state import ControlFlow, State
logger = logging.getLogger(__name__)
MenuAction = Callable[[], str]
@session.menu
def watch_history(ctx: Context, state: State) -> State | ControlFlow:
"""
Watch history management menu for viewing and managing local watch history.
"""
icons = ctx.config.general.icons
feedback = create_feedback_manager(icons)
console = Console()
console.clear()
# Initialize watch history manager
history_manager = WatchHistoryManager()
# Show watch history stats
_display_history_stats(console, history_manager, icons)
options: Dict[str, MenuAction] = {
f"{'📺 ' if icons else ''}Currently Watching": lambda: _view_watching(ctx, history_manager, feedback),
f"{'' if icons else ''}Completed Anime": lambda: _view_completed(ctx, history_manager, feedback),
f"{'🕒 ' if icons else ''}Recently Watched": lambda: _view_recent(ctx, history_manager, feedback),
f"{'📋 ' if icons else ''}View All History": lambda: _view_all_history(ctx, history_manager, feedback),
f"{'🔍 ' if icons else ''}Search History": lambda: _search_history(ctx, history_manager, feedback),
f"{'✏️ ' if icons else ''}Edit Entry": lambda: _edit_entry(ctx, history_manager, feedback),
f"{'🗑️ ' if icons else ''}Remove Entry": lambda: _remove_entry(ctx, history_manager, feedback),
f"{'📊 ' if icons else ''}View Statistics": lambda: _view_stats(ctx, history_manager, feedback),
f"{'💾 ' if icons else ''}Export History": lambda: _export_history(ctx, history_manager, feedback),
f"{'📥 ' if icons else ''}Import History": lambda: _import_history(ctx, history_manager, feedback),
f"{'🧹 ' if icons else ''}Clear All History": lambda: _clear_history(ctx, history_manager, feedback),
f"{'🔙 ' if icons else ''}Back to Main Menu": lambda: "BACK",
}
choice_str = ctx.selector.choose(
prompt="Select Watch History Action",
choices=list(options.keys()),
header="Watch History Management",
)
if not choice_str:
return ControlFlow.BACK
result = options[choice_str]()
if result == "BACK":
return ControlFlow.BACK
else:
return ControlFlow.CONTINUE
def _display_history_stats(console: Console, history_manager: WatchHistoryManager, icons: bool):
"""Display current watch history statistics."""
stats = history_manager.get_stats()
# Create a stats table
table = Table(title=f"{'📊 ' if icons else ''}Watch History Overview")
table.add_column("Metric", style="cyan")
table.add_column("Count", style="green")
table.add_row("Total Anime", str(stats["total_entries"]))
table.add_row("Currently Watching", str(stats["watching"]))
table.add_row("Completed", str(stats["completed"]))
table.add_row("Dropped", str(stats["dropped"]))
table.add_row("Paused", str(stats["paused"]))
table.add_row("Total Episodes", str(stats["total_episodes_watched"]))
table.add_row("Last Updated", stats["last_updated"])
console.print(table)
console.print()
def _view_watching(ctx: Context, history_manager: WatchHistoryManager, feedback) -> str:
"""View currently watching anime."""
entries = history_manager.get_watching_entries()
if not entries:
feedback.info("No anime currently being watched")
return "CONTINUE"
return _display_entries_list(ctx, entries, "Currently Watching", feedback)
def _view_completed(ctx: Context, history_manager: WatchHistoryManager, feedback) -> str:
"""View completed anime."""
entries = history_manager.get_completed_entries()
if not entries:
feedback.info("No completed anime found")
return "CONTINUE"
return _display_entries_list(ctx, entries, "Completed Anime", feedback)
def _view_recent(ctx: Context, history_manager: WatchHistoryManager, feedback) -> str:
"""View recently watched anime."""
entries = history_manager.get_recently_watched(20)
if not entries:
feedback.info("No recent watch history found")
return "CONTINUE"
return _display_entries_list(ctx, entries, "Recently Watched", feedback)
def _view_all_history(ctx: Context, history_manager: WatchHistoryManager, feedback) -> str:
"""View all watch history entries."""
entries = history_manager.get_all_entries()
if not entries:
feedback.info("No watch history found")
return "CONTINUE"
# Sort by last watched date
entries.sort(key=lambda x: x.last_watched, reverse=True)
return _display_entries_list(ctx, entries, "All Watch History", feedback)
def _search_history(ctx: Context, history_manager: WatchHistoryManager, feedback) -> str:
"""Search watch history by title."""
query = ctx.selector.ask("Enter search query:")
if not query:
return "CONTINUE"
entries = history_manager.search_entries(query)
if not entries:
feedback.info(f"No anime found matching '{query}'")
return "CONTINUE"
return _display_entries_list(ctx, entries, f"Search Results for '{query}'", feedback)
def _display_entries_list(ctx: Context, entries: List[WatchHistoryEntry], title: str, feedback) -> str:
"""Display a list of watch history entries and allow selection."""
console = Console()
console.clear()
# Create table for entries
table = Table(title=title)
table.add_column("Status", style="yellow", width=6)
table.add_column("Title", style="cyan")
table.add_column("Progress", style="green", width=12)
table.add_column("Last Watched", style="blue", width=12)
choices = []
entry_map = {}
for i, entry in enumerate(entries):
# Format last watched date
last_watched = entry.last_watched.strftime("%Y-%m-%d")
# Add to table
table.add_row(
entry.get_status_emoji(),
entry.get_display_title(),
entry.get_progress_display(),
last_watched
)
# Create choice for selector
choice_text = f"{entry.get_status_emoji()} {entry.get_display_title()} - {entry.get_progress_display()}"
choices.append(choice_text)
entry_map[choice_text] = entry
console.print(table)
console.print()
if not choices:
feedback.info("No entries to display")
feedback.pause_for_user()
return "CONTINUE"
choices.append("Back")
choice = ctx.selector.choose(
"Select an anime for details:",
choices=choices
)
if not choice or choice == "Back":
return "CONTINUE"
selected_entry = entry_map[choice]
return _show_entry_details(ctx, selected_entry, feedback)
def _show_entry_details(ctx: Context, entry: WatchHistoryEntry, feedback) -> str:
"""Show detailed information about a watch history entry."""
console = Console()
console.clear()
# Display detailed entry information
console.print(f"[bold cyan]{entry.get_display_title()}[/bold cyan]")
console.print(f"Status: {entry.get_status_emoji()} {entry.status.title()}")
console.print(f"Progress: {entry.get_progress_display()}")
console.print(f"Times Watched: {entry.times_watched}")
console.print(f"First Watched: {entry.first_watched.strftime('%Y-%m-%d %H:%M')}")
console.print(f"Last Watched: {entry.last_watched.strftime('%Y-%m-%d %H:%M')}")
if entry.notes:
console.print(f"Notes: {entry.notes}")
# Show media details if available
media = entry.media_item
if media.description:
console.print(f"\nDescription: {media.description[:200]}{'...' if len(media.description) > 200 else ''}")
if media.genres:
console.print(f"Genres: {', '.join(media.genres)}")
if media.average_score:
console.print(f"Score: {media.average_score}/100")
console.print()
# Action options
actions = [
"Mark Episode as Watched",
"Change Status",
"Edit Notes",
"Remove from History",
"Back to List"
]
choice = ctx.selector.choose(
"Select action:",
choices=actions
)
if choice == "Mark Episode as Watched":
return _mark_episode_watched(ctx, entry, feedback)
elif choice == "Change Status":
return _change_entry_status(ctx, entry, feedback)
elif choice == "Edit Notes":
return _edit_entry_notes(ctx, entry, feedback)
elif choice == "Remove from History":
return _confirm_remove_entry(ctx, entry, feedback)
else:
return "CONTINUE"
def _mark_episode_watched(ctx: Context, entry: WatchHistoryEntry, feedback) -> str:
"""Mark a specific episode as watched."""
current_episode = entry.last_watched_episode
max_episodes = entry.media_item.episodes or 999
episode_str = ctx.selector.ask(f"Enter episode number (current: {current_episode}, max: {max_episodes}):")
try:
episode = int(episode_str)
if episode < 1 or (max_episodes and episode > max_episodes):
feedback.error(f"Invalid episode number. Must be between 1 and {max_episodes}")
return "CONTINUE"
history_manager = WatchHistoryManager()
success = history_manager.mark_episode_watched(entry.media_item.id, episode)
if success:
feedback.success(f"Marked episode {episode} as watched")
else:
feedback.error("Failed to update watch progress")
except ValueError:
feedback.error("Invalid episode number entered")
return "CONTINUE"
def _change_entry_status(ctx: Context, entry: WatchHistoryEntry, feedback) -> str:
"""Change the status of a watch history entry."""
statuses = ["watching", "completed", "paused", "dropped", "planning"]
current_status = entry.status
choices = [f"{status.title()} {'(current)' if status == current_status else ''}" for status in statuses]
choices.append("Cancel")
choice = ctx.selector.choose(
f"Select new status (current: {current_status}):",
choices=choices
)
if not choice or choice == "Cancel":
return "CONTINUE"
new_status = choice.split()[0].lower()
history_manager = WatchHistoryManager()
success = history_manager.change_status(entry.media_item.id, new_status)
if success:
feedback.success(f"Changed status to {new_status}")
else:
feedback.error("Failed to update status")
return "CONTINUE"
def _edit_entry_notes(ctx: Context, entry: WatchHistoryEntry, feedback) -> str:
"""Edit notes for a watch history entry."""
current_notes = entry.notes or ""
new_notes = ctx.selector.ask(f"Enter notes (current: '{current_notes}'):")
if new_notes is None: # User cancelled
return "CONTINUE"
history_manager = WatchHistoryManager()
success = history_manager.update_notes(entry.media_item.id, new_notes)
if success:
feedback.success("Notes updated successfully")
else:
feedback.error("Failed to update notes")
return "CONTINUE"
def _confirm_remove_entry(ctx: Context, entry: WatchHistoryEntry, feedback) -> str:
"""Confirm and remove a watch history entry."""
if feedback.confirm(f"Remove '{entry.get_display_title()}' from watch history?"):
history_manager = WatchHistoryManager()
success = history_manager.remove_entry(entry.media_item.id)
if success:
feedback.success("Entry removed from watch history")
else:
feedback.error("Failed to remove entry")
return "CONTINUE"
def _edit_entry(ctx: Context, history_manager: WatchHistoryManager, feedback) -> str:
"""Edit a watch history entry (select first)."""
entries = history_manager.get_all_entries()
if not entries:
feedback.info("No watch history entries to edit")
return "CONTINUE"
# Sort by title for easier selection
entries.sort(key=lambda x: x.get_display_title())
choices = [f"{entry.get_display_title()} - {entry.get_progress_display()}" for entry in entries]
choices.append("Cancel")
choice = ctx.selector.choose(
"Select anime to edit:",
choices=choices
)
if not choice or choice == "Cancel":
return "CONTINUE"
# Find the selected entry
choice_title = choice.split(" - ")[0]
selected_entry = next((entry for entry in entries if entry.get_display_title() == choice_title), None)
if selected_entry:
return _show_entry_details(ctx, selected_entry, feedback)
return "CONTINUE"
def _remove_entry(ctx: Context, history_manager: WatchHistoryManager, feedback) -> str:
"""Remove a watch history entry (select first)."""
entries = history_manager.get_all_entries()
if not entries:
feedback.info("No watch history entries to remove")
return "CONTINUE"
# Sort by title for easier selection
entries.sort(key=lambda x: x.get_display_title())
choices = [f"{entry.get_display_title()} - {entry.get_progress_display()}" for entry in entries]
choices.append("Cancel")
choice = ctx.selector.choose(
"Select anime to remove:",
choices=choices
)
if not choice or choice == "Cancel":
return "CONTINUE"
# Find the selected entry
choice_title = choice.split(" - ")[0]
selected_entry = next((entry for entry in entries if entry.get_display_title() == choice_title), None)
if selected_entry:
return _confirm_remove_entry(ctx, selected_entry, feedback)
return "CONTINUE"
def _view_stats(ctx: Context, history_manager: WatchHistoryManager, feedback) -> str:
"""View detailed watch history statistics."""
console = Console()
console.clear()
stats = history_manager.get_stats()
# Create detailed stats table
table = Table(title="Detailed Watch History Statistics")
table.add_column("Metric", style="cyan")
table.add_column("Value", style="green")
table.add_row("Total Anime Entries", str(stats["total_entries"]))
table.add_row("Currently Watching", str(stats["watching"]))
table.add_row("Completed", str(stats["completed"]))
table.add_row("Dropped", str(stats["dropped"]))
table.add_row("Paused", str(stats["paused"]))
table.add_row("Total Episodes Watched", str(stats["total_episodes_watched"]))
table.add_row("Last Updated", stats["last_updated"])
# Calculate additional stats
if stats["total_entries"] > 0:
completion_rate = (stats["completed"] / stats["total_entries"]) * 100
table.add_row("Completion Rate", f"{completion_rate:.1f}%")
avg_episodes = stats["total_episodes_watched"] / stats["total_entries"]
table.add_row("Avg Episodes per Anime", f"{avg_episodes:.1f}")
console.print(table)
feedback.pause_for_user()
return "CONTINUE"
def _export_history(ctx: Context, history_manager: WatchHistoryManager, feedback) -> str:
"""Export watch history to a file."""
export_name = ctx.selector.ask("Enter export filename (without extension):")
if not export_name:
return "CONTINUE"
export_path = APP_DATA_DIR / f"{export_name}.json"
if export_path.exists():
if not feedback.confirm(f"File '{export_name}.json' already exists. Overwrite?"):
return "CONTINUE"
success = history_manager.export_history(export_path)
if success:
feedback.success(f"Watch history exported to {export_path}")
else:
feedback.error("Failed to export watch history")
return "CONTINUE"
def _import_history(ctx: Context, history_manager: WatchHistoryManager, feedback) -> str:
"""Import watch history from a file."""
import_name = ctx.selector.ask("Enter import filename (without extension):")
if not import_name:
return "CONTINUE"
import_path = APP_DATA_DIR / f"{import_name}.json"
if not import_path.exists():
feedback.error(f"File '{import_name}.json' not found in {APP_DATA_DIR}")
return "CONTINUE"
merge = feedback.confirm("Merge with existing history? (No = Replace existing history)")
success = history_manager.import_history(import_path, merge=merge)
if success:
action = "merged with" if merge else "replaced"
feedback.success(f"Watch history imported and {action} existing data")
else:
feedback.error("Failed to import watch history")
return "CONTINUE"
def _clear_history(ctx: Context, history_manager: WatchHistoryManager, feedback) -> str:
"""Clear all watch history with confirmation."""
if not feedback.confirm("Are you sure you want to clear ALL watch history? This cannot be undone."):
return "CONTINUE"
if not feedback.confirm("Final confirmation: Clear all watch history?"):
return "CONTINUE"
# Create backup before clearing
backup_success = history_manager.backup_history()
if backup_success:
feedback.info("Backup created before clearing")
success = history_manager.clear_history()
if success:
feedback.success("All watch history cleared")
else:
feedback.error("Failed to clear watch history")
return "CONTINUE"

View File

@@ -0,0 +1,15 @@
"""
Utility modules for the FastAnime CLI.
"""
from .watch_history_manager import WatchHistoryManager
from .watch_history_tracker import WatchHistoryTracker, watch_tracker
from .watch_history_types import WatchHistoryEntry, WatchHistoryData
__all__ = [
"WatchHistoryManager",
"WatchHistoryTracker",
"watch_tracker",
"WatchHistoryEntry",
"WatchHistoryData",
]

View File

@@ -0,0 +1,329 @@
"""
Watch history manager for local storage operations.
Handles saving, loading, and managing local watch history data.
"""
import json
import logging
from pathlib import Path
from typing import List, Optional
from ...core.constants import USER_WATCH_HISTORY_PATH
from ...libs.api.types import MediaItem
from .watch_history_types import WatchHistoryData, WatchHistoryEntry
logger = logging.getLogger(__name__)
class WatchHistoryManager:
"""
Manages local watch history storage and operations.
Provides comprehensive watch history management with error handling.
"""
def __init__(self, history_file_path: Path = USER_WATCH_HISTORY_PATH):
self.history_file_path = history_file_path
self._data: Optional[WatchHistoryData] = None
self._ensure_history_file()
def _ensure_history_file(self):
"""Ensure the watch history file and directory exist."""
try:
self.history_file_path.parent.mkdir(parents=True, exist_ok=True)
if not self.history_file_path.exists():
# Create empty watch history file
empty_data = WatchHistoryData()
self._save_data(empty_data)
logger.info(f"Created new watch history file at {self.history_file_path}")
except Exception as e:
logger.error(f"Failed to ensure watch history file: {e}")
def _load_data(self) -> WatchHistoryData:
"""Load watch history data from file."""
if self._data is not None:
return self._data
try:
if not self.history_file_path.exists():
self._data = WatchHistoryData()
return self._data
with self.history_file_path.open('r', encoding='utf-8') as f:
data = json.load(f)
self._data = WatchHistoryData.from_dict(data)
logger.debug(f"Loaded watch history with {len(self._data.entries)} entries")
return self._data
except json.JSONDecodeError as e:
logger.error(f"Watch history file is corrupted: {e}")
# Create backup of corrupted file
backup_path = self.history_file_path.with_suffix('.backup')
self.history_file_path.rename(backup_path)
logger.info(f"Corrupted file moved to {backup_path}")
# Create new empty data
self._data = WatchHistoryData()
self._save_data(self._data)
return self._data
except Exception as e:
logger.error(f"Failed to load watch history: {e}")
self._data = WatchHistoryData()
return self._data
def _save_data(self, data: WatchHistoryData) -> bool:
"""Save watch history data to file."""
try:
# Create backup of existing file
if self.history_file_path.exists():
backup_path = self.history_file_path.with_suffix('.bak')
self.history_file_path.rename(backup_path)
with self.history_file_path.open('w', encoding='utf-8') as f:
json.dump(data.to_dict(), f, indent=2, ensure_ascii=False)
# Remove backup on successful save
backup_path = self.history_file_path.with_suffix('.bak')
if backup_path.exists():
backup_path.unlink()
logger.debug(f"Saved watch history with {len(data.entries)} entries")
return True
except Exception as e:
logger.error(f"Failed to save watch history: {e}")
# Restore backup if save failed
backup_path = self.history_file_path.with_suffix('.bak')
if backup_path.exists():
backup_path.rename(self.history_file_path)
return False
def add_or_update_entry(
self,
media_item: MediaItem,
episode: int = 0,
progress: float = 0.0,
status: str = "watching",
notes: str = ""
) -> bool:
"""Add or update a watch history entry."""
try:
data = self._load_data()
entry = data.add_or_update_entry(media_item, episode, progress, status)
if notes:
entry.notes = notes
success = self._save_data(data)
if success:
self._data = data # Update cached data
logger.info(f"Updated watch history for {entry.get_display_title()}")
return success
except Exception as e:
logger.error(f"Failed to add/update watch history entry: {e}")
return False
def get_entry(self, media_id: int) -> Optional[WatchHistoryEntry]:
"""Get a specific watch history entry."""
try:
data = self._load_data()
return data.get_entry(media_id)
except Exception as e:
logger.error(f"Failed to get watch history entry: {e}")
return None
def remove_entry(self, media_id: int) -> bool:
"""Remove an entry from watch history."""
try:
data = self._load_data()
removed = data.remove_entry(media_id)
if removed:
success = self._save_data(data)
if success:
self._data = data
logger.info(f"Removed watch history entry for media ID {media_id}")
return success
return False
except Exception as e:
logger.error(f"Failed to remove watch history entry: {e}")
return False
def get_all_entries(self) -> List[WatchHistoryEntry]:
"""Get all watch history entries."""
try:
data = self._load_data()
return list(data.entries.values())
except Exception as e:
logger.error(f"Failed to get all entries: {e}")
return []
def get_entries_by_status(self, status: str) -> List[WatchHistoryEntry]:
"""Get entries by status (watching, completed, etc.)."""
try:
data = self._load_data()
return data.get_entries_by_status(status)
except Exception as e:
logger.error(f"Failed to get entries by status: {e}")
return []
def get_recently_watched(self, limit: int = 10) -> List[WatchHistoryEntry]:
"""Get recently watched entries."""
try:
data = self._load_data()
return data.get_recently_watched(limit)
except Exception as e:
logger.error(f"Failed to get recently watched: {e}")
return []
def search_entries(self, query: str) -> List[WatchHistoryEntry]:
"""Search entries by title."""
try:
data = self._load_data()
return data.search_entries(query)
except Exception as e:
logger.error(f"Failed to search entries: {e}")
return []
def get_watching_entries(self) -> List[WatchHistoryEntry]:
"""Get entries that are currently being watched."""
return self.get_entries_by_status("watching")
def get_completed_entries(self) -> List[WatchHistoryEntry]:
"""Get completed entries."""
return self.get_entries_by_status("completed")
def mark_episode_watched(self, media_id: int, episode: int, progress: float = 1.0) -> bool:
"""Mark a specific episode as watched."""
entry = self.get_entry(media_id)
if entry:
return self.add_or_update_entry(
entry.media_item,
episode,
progress,
entry.status
)
return False
def mark_completed(self, media_id: int) -> bool:
"""Mark an anime as completed."""
entry = self.get_entry(media_id)
if entry:
entry.mark_completed()
data = self._load_data()
return self._save_data(data)
return False
def change_status(self, media_id: int, new_status: str) -> bool:
"""Change the status of an entry."""
entry = self.get_entry(media_id)
if entry:
return self.add_or_update_entry(
entry.media_item,
entry.last_watched_episode,
entry.watch_progress,
new_status
)
return False
def update_notes(self, media_id: int, notes: str) -> bool:
"""Update notes for an entry."""
entry = self.get_entry(media_id)
if entry:
return self.add_or_update_entry(
entry.media_item,
entry.last_watched_episode,
entry.watch_progress,
entry.status,
notes
)
return False
def get_stats(self) -> dict:
"""Get watch history statistics."""
try:
data = self._load_data()
return data.get_stats()
except Exception as e:
logger.error(f"Failed to get stats: {e}")
return {
"total_entries": 0,
"watching": 0,
"completed": 0,
"dropped": 0,
"paused": 0,
"total_episodes_watched": 0,
"last_updated": "Unknown"
}
def export_history(self, export_path: Path) -> bool:
"""Export watch history to a file."""
try:
data = self._load_data()
with export_path.open('w', encoding='utf-8') as f:
json.dump(data.to_dict(), f, indent=2, ensure_ascii=False)
logger.info(f"Exported watch history to {export_path}")
return True
except Exception as e:
logger.error(f"Failed to export watch history: {e}")
return False
def import_history(self, import_path: Path, merge: bool = True) -> bool:
"""Import watch history from a file."""
try:
if not import_path.exists():
logger.error(f"Import file does not exist: {import_path}")
return False
with import_path.open('r', encoding='utf-8') as f:
import_data = json.load(f)
imported_history = WatchHistoryData.from_dict(import_data)
if merge:
# Merge with existing data
current_data = self._load_data()
for media_id, entry in imported_history.entries.items():
current_data.entries[media_id] = entry
success = self._save_data(current_data)
else:
# Replace existing data
success = self._save_data(imported_history)
if success:
self._data = None # Force reload on next access
logger.info(f"Imported watch history from {import_path}")
return success
except Exception as e:
logger.error(f"Failed to import watch history: {e}")
return False
def clear_history(self) -> bool:
"""Clear all watch history."""
try:
empty_data = WatchHistoryData()
success = self._save_data(empty_data)
if success:
self._data = empty_data
logger.info("Cleared all watch history")
return success
except Exception as e:
logger.error(f"Failed to clear watch history: {e}")
return False
def backup_history(self, backup_path: Path = None) -> bool:
"""Create a backup of watch history."""
try:
if backup_path is None:
from datetime import datetime
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_path = self.history_file_path.parent / f"watch_history_backup_{timestamp}.json"
return self.export_history(backup_path)
except Exception as e:
logger.error(f"Failed to backup watch history: {e}")
return False

View File

@@ -0,0 +1,273 @@
"""
Watch history tracking utilities for integration with episode viewing and player controls.
Provides automatic watch history updates during episode viewing.
"""
import logging
from typing import Optional
from ...libs.api.types import MediaItem
from ..utils.watch_history_manager import WatchHistoryManager
logger = logging.getLogger(__name__)
class WatchHistoryTracker:
"""
Tracks watch history automatically during episode viewing.
Integrates with the episode selection and player control systems.
"""
def __init__(self):
self.history_manager = WatchHistoryManager()
def track_episode_start(self, media_item: MediaItem, episode: int) -> bool:
"""
Track when an episode starts being watched.
Args:
media_item: The anime being watched
episode: Episode number being started
Returns:
True if tracking was successful
"""
try:
# Update or create watch history entry
success = self.history_manager.add_or_update_entry(
media_item=media_item,
episode=episode,
progress=0.0,
status="watching"
)
if success:
logger.info(f"Started tracking episode {episode} of {media_item.title.english or media_item.title.romaji}")
return success
except Exception as e:
logger.error(f"Failed to track episode start: {e}")
return False
def track_episode_progress(self, media_id: int, episode: int, progress: float) -> bool:
"""
Track progress within an episode.
Args:
media_id: ID of the anime
episode: Episode number
progress: Progress within the episode (0.0-1.0)
Returns:
True if tracking was successful
"""
try:
success = self.history_manager.mark_episode_watched(media_id, episode, progress)
if success and progress >= 0.8: # Consider episode "watched" at 80%
logger.info(f"Episode {episode} marked as watched (progress: {progress:.1%})")
return success
except Exception as e:
logger.error(f"Failed to track episode progress: {e}")
return False
def track_episode_completion(self, media_id: int, episode: int) -> bool:
"""
Track when an episode is completed.
Args:
media_id: ID of the anime
episode: Episode number completed
Returns:
True if tracking was successful
"""
try:
# Mark episode as fully watched
success = self.history_manager.mark_episode_watched(media_id, episode, 1.0)
if success:
# Check if this was the final episode and mark as completed
entry = self.history_manager.get_entry(media_id)
if entry and entry.media_item.episodes and episode >= entry.media_item.episodes:
self.history_manager.mark_completed(media_id)
logger.info(f"Anime completed: {entry.get_display_title()}")
else:
logger.info(f"Episode {episode} completed")
return success
except Exception as e:
logger.error(f"Failed to track episode completion: {e}")
return False
def get_watch_progress(self, media_id: int) -> Optional[dict]:
"""
Get current watch progress for an anime.
Args:
media_id: ID of the anime
Returns:
Dictionary with progress info or None if not found
"""
try:
entry = self.history_manager.get_entry(media_id)
if entry:
return {
"last_episode": entry.last_watched_episode,
"progress": entry.watch_progress,
"status": entry.status,
"next_episode": entry.last_watched_episode + 1,
"title": entry.get_display_title(),
}
return None
except Exception as e:
logger.error(f"Failed to get watch progress: {e}")
return None
def should_continue_from_history(self, media_id: int, available_episodes: list) -> Optional[str]:
"""
Determine if we should continue from watch history and which episode.
Args:
media_id: ID of the anime
available_episodes: List of available episode numbers
Returns:
Episode number to continue from, or None if no history
"""
try:
progress = self.get_watch_progress(media_id)
if not progress:
return None
last_episode = progress["last_episode"]
next_episode = last_episode + 1
# Check if next episode is available
if str(next_episode) in available_episodes:
logger.info(f"Continuing from episode {next_episode} based on watch history")
return str(next_episode)
# Fall back to last watched episode if next isn't available
elif str(last_episode) in available_episodes and last_episode > 0:
logger.info(f"Next episode not available, falling back to episode {last_episode}")
return str(last_episode)
return None
except Exception as e:
logger.error(f"Failed to determine continue episode: {e}")
return None
def update_anime_status(self, media_id: int, status: str) -> bool:
"""
Update the status of an anime in watch history.
Args:
media_id: ID of the anime
status: New status (watching, completed, dropped, paused)
Returns:
True if update was successful
"""
try:
success = self.history_manager.change_status(media_id, status)
if success:
logger.info(f"Updated anime status to {status}")
return success
except Exception as e:
logger.error(f"Failed to update anime status: {e}")
return False
def add_anime_to_history(self, media_item: MediaItem, status: str = "planning") -> bool:
"""
Add an anime to watch history without watching any episodes.
Args:
media_item: The anime to add
status: Initial status
Returns:
True if successful
"""
try:
success = self.history_manager.add_or_update_entry(
media_item=media_item,
episode=0,
progress=0.0,
status=status
)
if success:
logger.info(f"Added {media_item.title.english or media_item.title.romaji} to watch history")
return success
except Exception as e:
logger.error(f"Failed to add anime to history: {e}")
return False
# Global tracker instance for use throughout the application
watch_tracker = WatchHistoryTracker()
def track_episode_viewing(media_item: MediaItem, episode: int, start_tracking: bool = True) -> bool:
"""
Convenience function to track episode viewing.
Args:
media_item: The anime being watched
episode: Episode number
start_tracking: Whether to start tracking (True) or just update progress
Returns:
True if tracking was successful
"""
if start_tracking:
return watch_tracker.track_episode_start(media_item, episode)
else:
return watch_tracker.track_episode_completion(media_item.id, episode)
def get_continue_episode(media_item: MediaItem, available_episodes: list, prefer_history: bool = True) -> Optional[str]:
"""
Get the episode to continue from based on watch history.
Args:
media_item: The anime
available_episodes: List of available episodes
prefer_history: Whether to prefer local history over remote
Returns:
Episode number to continue from
"""
if prefer_history:
return watch_tracker.should_continue_from_history(media_item.id, available_episodes)
return None
def update_episode_progress(media_id: int, episode: int, completion_percentage: float) -> bool:
"""
Update progress for an episode based on completion percentage.
Args:
media_id: ID of the anime
episode: Episode number
completion_percentage: Completion percentage (0-100)
Returns:
True if update was successful
"""
progress = completion_percentage / 100.0
if completion_percentage >= 80: # Consider episode completed at 80%
return watch_tracker.track_episode_completion(media_id, episode)
else:
return watch_tracker.track_episode_progress(media_id, episode, progress)

View File

@@ -0,0 +1,296 @@
"""
Watch history data models and types for the interactive CLI.
Provides comprehensive data structures for tracking and managing local watch history.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from datetime import datetime
from typing import Dict, List, Optional
from ...libs.api.types import MediaItem
logger = logging.getLogger(__name__)
@dataclass
class WatchHistoryEntry:
"""
Represents a single entry in the watch history.
Contains media information and viewing progress.
"""
media_item: MediaItem
last_watched_episode: int = 0
watch_progress: float = 0.0 # Progress within the episode (0.0-1.0)
times_watched: int = 1
first_watched: datetime = field(default_factory=datetime.now)
last_watched: datetime = field(default_factory=datetime.now)
status: str = "watching" # watching, completed, dropped, paused
notes: str = ""
def to_dict(self) -> dict:
"""Convert entry to dictionary for JSON serialization."""
return {
"media_item": {
"id": self.media_item.id,
"id_mal": self.media_item.id_mal,
"type": self.media_item.type,
"title": {
"romaji": self.media_item.title.romaji,
"english": self.media_item.title.english,
"native": self.media_item.title.native,
},
"status": self.media_item.status,
"format": self.media_item.format,
"cover_image": {
"large": self.media_item.cover_image.large if self.media_item.cover_image else None,
"medium": self.media_item.cover_image.medium if self.media_item.cover_image else None,
} if self.media_item.cover_image else None,
"banner_image": self.media_item.banner_image,
"description": self.media_item.description,
"episodes": self.media_item.episodes,
"duration": self.media_item.duration,
"genres": self.media_item.genres,
"synonyms": self.media_item.synonyms,
"average_score": self.media_item.average_score,
"popularity": self.media_item.popularity,
"favourites": self.media_item.favourites,
},
"last_watched_episode": self.last_watched_episode,
"watch_progress": self.watch_progress,
"times_watched": self.times_watched,
"first_watched": self.first_watched.isoformat(),
"last_watched": self.last_watched.isoformat(),
"status": self.status,
"notes": self.notes,
}
@classmethod
def from_dict(cls, data: dict) -> "WatchHistoryEntry":
"""Create entry from dictionary."""
from ...libs.api.types import MediaImage, MediaTitle
media_data = data["media_item"]
# Reconstruct MediaTitle
title_data = media_data.get("title", {})
title = MediaTitle(
romaji=title_data.get("romaji"),
english=title_data.get("english"),
native=title_data.get("native"),
)
# Reconstruct MediaImage if present
cover_data = media_data.get("cover_image")
cover_image = None
if cover_data:
cover_image = MediaImage(
large=cover_data.get("large", ""),
medium=cover_data.get("medium"),
)
# Reconstruct MediaItem
media_item = MediaItem(
id=media_data["id"],
id_mal=media_data.get("id_mal"),
type=media_data.get("type", "ANIME"),
title=title,
status=media_data.get("status"),
format=media_data.get("format"),
cover_image=cover_image,
banner_image=media_data.get("banner_image"),
description=media_data.get("description"),
episodes=media_data.get("episodes"),
duration=media_data.get("duration"),
genres=media_data.get("genres", []),
synonyms=media_data.get("synonyms", []),
average_score=media_data.get("average_score"),
popularity=media_data.get("popularity"),
favourites=media_data.get("favourites"),
)
return cls(
media_item=media_item,
last_watched_episode=data.get("last_watched_episode", 0),
watch_progress=data.get("watch_progress", 0.0),
times_watched=data.get("times_watched", 1),
first_watched=datetime.fromisoformat(data.get("first_watched", datetime.now().isoformat())),
last_watched=datetime.fromisoformat(data.get("last_watched", datetime.now().isoformat())),
status=data.get("status", "watching"),
notes=data.get("notes", ""),
)
def update_progress(self, episode: int, progress: float = 0.0, status: str = None):
"""Update watch progress for this entry."""
self.last_watched_episode = max(self.last_watched_episode, episode)
self.watch_progress = progress
self.last_watched = datetime.now()
if status:
self.status = status
def mark_completed(self):
"""Mark this entry as completed."""
self.status = "completed"
self.last_watched = datetime.now()
if self.media_item.episodes:
self.last_watched_episode = self.media_item.episodes
self.watch_progress = 1.0
def get_display_title(self) -> str:
"""Get the best available title for display."""
if self.media_item.title.english:
return self.media_item.title.english
elif self.media_item.title.romaji:
return self.media_item.title.romaji
elif self.media_item.title.native:
return self.media_item.title.native
else:
return f"Anime #{self.media_item.id}"
def get_progress_display(self) -> str:
"""Get a human-readable progress display."""
if self.media_item.episodes:
return f"{self.last_watched_episode}/{self.media_item.episodes}"
else:
return f"Ep {self.last_watched_episode}"
def get_status_emoji(self) -> str:
"""Get emoji representation of status."""
status_emojis = {
"watching": "📺",
"completed": "",
"dropped": "🚮",
"paused": "⏸️",
"planning": "📑"
}
return status_emojis.get(self.status, "")
@dataclass
class WatchHistoryData:
"""Complete watch history data container."""
entries: Dict[int, WatchHistoryEntry] = field(default_factory=dict)
last_updated: datetime = field(default_factory=datetime.now)
format_version: str = "1.0"
def to_dict(self) -> dict:
"""Convert to dictionary for JSON serialization."""
return {
"entries": {str(k): v.to_dict() for k, v in self.entries.items()},
"last_updated": self.last_updated.isoformat(),
"format_version": self.format_version,
}
@classmethod
def from_dict(cls, data: dict) -> "WatchHistoryData":
"""Create from dictionary."""
entries = {}
entries_data = data.get("entries", {})
for media_id_str, entry_data in entries_data.items():
try:
media_id = int(media_id_str)
entry = WatchHistoryEntry.from_dict(entry_data)
entries[media_id] = entry
except (ValueError, KeyError) as e:
logger.warning(f"Skipping invalid watch history entry {media_id_str}: {e}")
return cls(
entries=entries,
last_updated=datetime.fromisoformat(data.get("last_updated", datetime.now().isoformat())),
format_version=data.get("format_version", "1.0"),
)
def add_or_update_entry(self, media_item: MediaItem, episode: int = 0, progress: float = 0.0, status: str = "watching") -> WatchHistoryEntry:
"""Add or update a watch history entry."""
media_id = media_item.id
if media_id in self.entries:
# Update existing entry
entry = self.entries[media_id]
entry.update_progress(episode, progress, status)
entry.times_watched += 1
else:
# Create new entry
entry = WatchHistoryEntry(
media_item=media_item,
last_watched_episode=episode,
watch_progress=progress,
status=status,
)
self.entries[media_id] = entry
self.last_updated = datetime.now()
return entry
def get_entry(self, media_id: int) -> Optional[WatchHistoryEntry]:
"""Get a specific watch history entry."""
return self.entries.get(media_id)
def remove_entry(self, media_id: int) -> bool:
"""Remove an entry from watch history."""
if media_id in self.entries:
del self.entries[media_id]
self.last_updated = datetime.now()
return True
return False
def get_entries_by_status(self, status: str) -> List[WatchHistoryEntry]:
"""Get all entries with a specific status."""
return [entry for entry in self.entries.values() if entry.status == status]
def get_recently_watched(self, limit: int = 10) -> List[WatchHistoryEntry]:
"""Get recently watched entries."""
sorted_entries = sorted(
self.entries.values(),
key=lambda x: x.last_watched,
reverse=True
)
return sorted_entries[:limit]
def get_watching_entries(self) -> List[WatchHistoryEntry]:
"""Get entries that are currently being watched."""
return self.get_entries_by_status("watching")
def get_completed_entries(self) -> List[WatchHistoryEntry]:
"""Get completed entries."""
return self.get_entries_by_status("completed")
def search_entries(self, query: str) -> List[WatchHistoryEntry]:
"""Search entries by title."""
query_lower = query.lower()
results = []
for entry in self.entries.values():
title = entry.get_display_title().lower()
if query_lower in title:
results.append(entry)
return results
def get_stats(self) -> dict:
"""Get watch history statistics."""
total_entries = len(self.entries)
watching = len(self.get_entries_by_status("watching"))
completed = len(self.get_entries_by_status("completed"))
dropped = len(self.get_entries_by_status("dropped"))
paused = len(self.get_entries_by_status("paused"))
total_episodes = sum(
entry.last_watched_episode
for entry in self.entries.values()
)
return {
"total_entries": total_entries,
"watching": watching,
"completed": completed,
"dropped": dropped,
"paused": paused,
"total_episodes_watched": total_episodes,
"last_updated": self.last_updated.strftime("%Y-%m-%d %H:%M:%S"),
}

116
test_watch_history.py Normal file
View File

@@ -0,0 +1,116 @@
#!/usr/bin/env python3
"""
Test script for watch history management implementation.
Tests basic functionality without requiring full interactive session.
"""
import sys
from pathlib import Path
# Add the project root to Python path
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from fastanime.cli.utils.watch_history_manager import WatchHistoryManager
from fastanime.cli.utils.watch_history_tracker import WatchHistoryTracker
from fastanime.libs.api.types import MediaItem, MediaTitle, MediaImage
def test_watch_history():
"""Test basic watch history functionality."""
print("Testing Watch History Management System")
print("=" * 50)
# Create test media item
test_anime = MediaItem(
id=123456,
id_mal=12345,
title=MediaTitle(
english="Test Anime",
romaji="Test Anime Romaji",
native="テストアニメ"
),
episodes=24,
cover_image=MediaImage(
large="https://example.com/cover.jpg",
medium="https://example.com/cover_medium.jpg"
),
genres=["Action", "Adventure"],
average_score=85.0
)
# Test watch history manager
print("\n1. Testing WatchHistoryManager...")
history_manager = WatchHistoryManager()
# Add anime to history
success = history_manager.add_or_update_entry(
test_anime,
episode=5,
progress=0.8,
status="watching",
notes="Great anime so far!"
)
print(f" Added anime to history: {success}")
# Get entry back
entry = history_manager.get_entry(123456)
if entry:
print(f" Retrieved entry: {entry.get_display_title()}")
print(f" Progress: {entry.get_progress_display()}")
print(f" Status: {entry.status}")
print(f" Notes: {entry.notes}")
else:
print(" Failed to retrieve entry")
# Test tracker
print("\n2. Testing WatchHistoryTracker...")
tracker = WatchHistoryTracker()
# Track episode viewing
success = tracker.track_episode_start(test_anime, 6)
print(f" Started tracking episode 6: {success}")
# Complete episode
success = tracker.track_episode_completion(123456, 6)
print(f" Completed episode 6: {success}")
# Get progress
progress = tracker.get_watch_progress(123456)
if progress:
print(f" Current progress: Episode {progress['last_episode']}")
print(f" Next episode: {progress['next_episode']}")
print(f" Status: {progress['status']}")
# Test stats
print("\n3. Testing Statistics...")
stats = history_manager.get_stats()
print(f" Total entries: {stats['total_entries']}")
print(f" Watching: {stats['watching']}")
print(f" Total episodes watched: {stats['total_episodes_watched']}")
# Test search
print("\n4. Testing Search...")
search_results = history_manager.search_entries("Test")
print(f" Search results for 'Test': {len(search_results)} found")
# Test status updates
print("\n5. Testing Status Updates...")
success = history_manager.change_status(123456, "completed")
print(f" Changed status to completed: {success}")
# Verify status change
entry = history_manager.get_entry(123456)
if entry:
print(f" New status: {entry.status}")
print("\n" + "=" * 50)
print("Watch History Test Complete!")
# Cleanup test data
history_manager.remove_entry(123456)
print("Test data cleaned up.")
if __name__ == "__main__":
test_watch_history()