Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
lutris / usr / lib / python3 / dist-packages / lutris / gui / views / store.py
Size: Mime:
"""Store object for a list of games"""
# Standard Library
# pylint: disable=not-an-iterable
import concurrent.futures

# Third Party Libraries
from gi.repository import GLib, GObject, Gtk
from gi.repository.GdkPixbuf import Pixbuf

# Lutris Modules
from lutris import api, pga
from lutris.gui.views.pga_game import PgaGame
from lutris.gui.widgets.utils import get_pixbuf_for_game
from lutris.util import system
from lutris.util.jobs import AsyncCall
from lutris.util.log import logger
from lutris.util.resources import download_media, get_icon_path, update_desktop_icons

from . import (
    COL_ICON, COL_ID, COL_INSTALLED, COL_INSTALLED_AT, COL_INSTALLED_AT_TEXT, COL_LASTPLAYED, COL_LASTPLAYED_TEXT,
    COL_NAME, COL_PLATFORM, COL_PLAYTIME, COL_PLAYTIME_TEXT, COL_RUNNER, COL_RUNNER_HUMAN_NAME, COL_SLUG, COL_YEAR
)


def try_lower(value):
    try:
        out = value.lower()
    except AttributeError:
        out = value
    return out


def sort_func(model, row1, row2, sort_col):
    """Sorting function for the GameStore"""
    value1 = model.get_value(row1, sort_col)
    value2 = model.get_value(row2, sort_col)
    if value1 is None and value2 is None:
        value1 = value2 = 0
    elif value1 is None:
        value1 = type(value2)()
    elif value2 is None:
        value2 = type(value1)()
    value1 = try_lower(value1)
    value2 = try_lower(value2)
    diff = -1 if value1 < value2 else 0 if value1 == value2 else 1
    if diff == 0:
        value1 = try_lower(model.get_value(row1, COL_NAME))
        value2 = try_lower(model.get_value(row2, COL_NAME))
        try:
            diff = -1 if value1 < value2 else 0 if value1 == value2 else 1
        except TypeError:
            diff = 0
    if diff == 0:
        value1 = try_lower(model.get_value(row1, COL_RUNNER_HUMAN_NAME))
        value2 = try_lower(model.get_value(row2, COL_RUNNER_HUMAN_NAME))
    try:
        return -1 if value1 < value2 else 0 if value1 == value2 else 1
    except TypeError:
        return 0


class GameStore(GObject.Object):
    __gsignals__ = {
        "media-loaded": (GObject.SIGNAL_RUN_FIRST, None, ()),
        "icon-loaded": (GObject.SIGNAL_RUN_FIRST, None, (str, str)),
        "icons-changed": (GObject.SIGNAL_RUN_FIRST, None, (str, )),
        "sorting-changed": (GObject.SIGNAL_RUN_FIRST, None, (str, bool)),
    }

    sort_columns = {
        "name": COL_NAME,
        "year": COL_YEAR,
        "runner": COL_RUNNER_HUMAN_NAME,
        "platform": COL_PLATFORM,
        "lastplayed": COL_LASTPLAYED,
        "lastplayed_text": COL_LASTPLAYED_TEXT,
        "installed_at": COL_INSTALLED_AT,
        "installed_at_text": COL_INSTALLED_AT_TEXT,
        "playtime": COL_PLAYTIME,
        "playtime_text": COL_PLAYTIME_TEXT,
    }

    def __init__(
        self,
        games,
        icon_type,
        filter_installed,
        sort_key,
        sort_ascending,
        show_hidden_games,
        show_installed_first=False,
    ):
        super(GameStore, self).__init__()
        self.games = games or pga.get_games(show_installed_first=show_installed_first)
        if not show_hidden_games:
            # Check if the PGA contains game IDs that the user does not
            # want to see
            self.games = [game for game in self.games if game["id"] not in pga.get_hidden_ids()]

        self.search_mode = False
        self.games_to_refresh = set()
        self.icon_type = icon_type
        self.filter_installed = filter_installed
        self.show_installed_first = show_installed_first
        self.filter_text = None
        self.filter_runner = None
        self.filter_platform = None
        self.store = Gtk.ListStore(
            int,
            str,
            str,
            Pixbuf,
            str,
            str,
            str,
            str,
            int,
            str,
            bool,
            int,
            str,
            float,
            str,
        )
        sort_col = COL_NAME
        if show_installed_first:
            sort_col = COL_INSTALLED
            self.store.set_sort_column_id(sort_col, Gtk.SortType.DESCENDING)
        else:
            self.store.set_sort_column_id(sort_col, Gtk.SortType.ASCENDING)
        self.prevent_sort_update = False  # prevent recursion with signals
        self.modelfilter = self.store.filter_new()
        self.modelfilter.set_visible_func(self.filter_view)
        try:
            self.modelsort = Gtk.TreeModelSort.sort_new_with_model(self.modelfilter)
        except AttributeError:
            # Apparently some API breaking changes on GTK minor versions.
            self.modelsort = Gtk.TreeModelSort.new_with_model(self.modelfilter)  # pylint: disable=no-member  # NOQA
        self.modelsort.connect("sort-column-changed", self.on_sort_column_changed)
        self.modelsort.set_sort_func(sort_col, sort_func, sort_col)
        self.sort_view(sort_key, sort_ascending)
        self.medias = {"banner": {}, "icon": {}}
        self.banner_misses = set()
        self.icon_misses = set()
        self.media_loaded = False
        self.connect("media-loaded", self.on_media_loaded)
        self.connect("icon-loaded", self.on_icon_loaded)

    def __str__(self):
        return (
            "GameStore: <filter_installed: {filter_installed}, "
            "filter_text: {filter_text}>".format(**self.__dict__)
        )

    def load(self, from_search=False):
        if not self.games:
            return
        self.search_mode = from_search
        self.add_games(self.games)

    @property
    def game_slugs(self):
        return [game["slug"] for game in self.games]

    def add_games(self, games):
        """Add games to the store"""
        self.media_loaded = False
        if games:
            AsyncCall(self.get_missing_media, None, [game["slug"] for game in games])
        for game in list(games):
            GLib.idle_add(self.add_game, game)

    def has_icon(self, game_slug, media_type=None):
        """Return True if the game_slug has the icon of `icon_type`"""
        media_type = media_type or self.icon_type
        return system.path_exists(get_icon_path(game_slug, media_type))

    def get_missing_media(self, slugs=None):
        """Query the Lutris.net API for missing icons"""
        slugs = slugs or self.game_slugs
        unavailable_banners = {slug for slug in slugs if not self.has_icon(slug, "banner")}
        unavailable_icons = {slug for slug in slugs if not self.has_icon(slug, "icon")}

        # Remove duplicate slugs
        missing_media_slugs = ((unavailable_banners - self.banner_misses) | (unavailable_icons - self.icon_misses))
        if not missing_media_slugs:
            return
        if len(missing_media_slugs) > 10:
            logger.debug("Requesting missing icons from API for %d games", len(missing_media_slugs))
        else:
            logger.debug("Requesting missing icons from API for %s", ", ".join(missing_media_slugs))

        lutris_media = api.get_api_games(list(missing_media_slugs), inject_aliases=True)
        if not lutris_media:
            return

        for game in lutris_media:
            if game["slug"] in unavailable_banners and game["banner_url"]:
                self.medias["banner"][game["slug"]] = game["banner_url"]
                unavailable_banners.remove(game["slug"])
            if game["slug"] in unavailable_icons and game["icon_url"]:
                self.medias["icon"][game["slug"]] = game["icon_url"]
                unavailable_icons.remove(game["slug"])
        self.banner_misses = unavailable_banners
        self.icon_misses = unavailable_icons
        self.media_loaded = True
        self.emit("media-loaded")

    def filter_view(self, model, _iter, _filter_data=None):
        """Filter function for the game model"""
        if self.search_mode:
            return True
        if self.filter_installed:
            installed = model.get_value(_iter, COL_INSTALLED)
            if not installed and not self.search_mode:
                return False
        if self.filter_text:
            name = model.get_value(_iter, COL_NAME)
            if not self.filter_text.lower() in name.lower():
                return False
        if self.filter_runner:
            runner = model.get_value(_iter, COL_RUNNER)
            if not self.filter_runner == runner:
                return False
        if self.filter_platform:
            platform = model.get_value(_iter, COL_PLATFORM)
            if platform != self.filter_platform:
                return False
        return True

    def sort_view(self, key="name", ascending=True):
        """Sort the model on a given column name"""
        try:
            sort_column = self.sort_columns[key]
        except KeyError:
            logger.error("Invalid column name '%s'", key)
            sort_column = COL_NAME
        self.modelsort.set_sort_column_id(
            sort_column,
            Gtk.SortType.ASCENDING if ascending else Gtk.SortType.DESCENDING,
        )

    def on_sort_column_changed(self, model):
        if self.prevent_sort_update:
            return
        (col, direction) = model.get_sort_column_id()
        key = next((c for c, k in self.sort_columns.items() if k == col), None)
        ascending = direction == Gtk.SortType.ASCENDING
        self.prevent_sort_update = True
        if not key:
            raise ValueError("Invalid sort key for col %s" % col)
        self.sort_view(key, ascending)
        self.prevent_sort_update = False
        self.emit("sorting-changed", key, ascending)

    def get_row_by_id(self, game_id, filtered=False):
        if filtered:
            store = self.modelsort
        else:
            store = self.store
        for model_row in store:
            if model_row[COL_ID] == int(game_id):
                return model_row

    def get_row_by_slug(self, slug):
        """Return a row by its slug.
        Requires slugs to be unique, thus only works for search mode
        """
        if not self.search_mode:
            raise RuntimeError("get_row_by_slug can only be used with search_mode")
        for model_row in self.store:
            if model_row[COL_SLUG] == slug:
                return model_row

    def remove_game(self, game_id):
        """Remove a game from the view."""
        game_index = 0
        for index, game in enumerate(self.games):
            if game["id"] == game_id:
                game_index = index
                break
        if game_index:
            self.games.pop(game_index)
        else:
            logger.warning("Can't find game %s in game list", game_id)
        row = self.get_row_by_id(game_id)
        if row:
            self.store.remove(row.iter)

    def update_game_by_id(self, game_id):
        pga_game = pga.get_game_by_field(game_id, "id")
        if pga_game:
            return self.update(pga_game)
        return self.remove_game(game_id)

    def update(self, pga_game):
        """Update game informations."""
        game = PgaGame(pga_game)
        if self.search_mode:
            row = self.get_row_by_slug(game.slug)
        else:
            row = self.get_row_by_id(game.id)
        if not row:
            raise ValueError("No existing row for game %s" % game.slug)
        row[COL_ID] = game.id
        row[COL_SLUG] = game.slug
        row[COL_NAME] = game.name
        row[COL_ICON] = game.get_pixbuf(self.icon_type)
        row[COL_YEAR] = game.year
        row[COL_RUNNER] = game.runner
        row[COL_RUNNER_HUMAN_NAME] = game.runner_text
        row[COL_PLATFORM] = game.platform
        row[COL_LASTPLAYED] = game.lastplayed
        row[COL_LASTPLAYED_TEXT] = game.lastplayed_text
        row[COL_INSTALLED] = game.installed
        row[COL_INSTALLED_AT] = game.installed_at
        row[COL_INSTALLED_AT_TEXT] = game.installed_at_text
        row[COL_PLAYTIME] = game.playtime
        row[COL_PLAYTIME_TEXT] = game.playtime_text
        if not self.has_icon(game.slug):
            self.refresh_icon(game.slug)

    def refresh_icon(self, game_slug):
        AsyncCall(self.fetch_icon, None, game_slug)

    def on_icon_loaded(self, _store, game_slug, media_type):
        if not self.has_icon(game_slug):
            logger.debug("%s has no %s", game_slug, media_type)
            return
        if media_type != self.icon_type:
            return
        if self.search_mode:
            GLib.idle_add(self.update_icon, game_slug)
            return
        for pga_game in pga.get_games_by_slug(game_slug):
            logger.debug("Updating %s", pga_game["id"])
            GLib.idle_add(self.update, pga_game)

    def update_icon(self, game_slug):
        row = self.get_row_by_slug(game_slug)
        row[COL_ICON] = get_pixbuf_for_game(game_slug, self.icon_type, True)

    def fetch_icon(self, slug):
        if not self.media_loaded:
            self.games_to_refresh.add(slug)
            return

        for media_type in ("banner", "icon"):
            url = self.medias[media_type].get(slug)
            if url:
                logger.debug("Getting %s for %s: %s", media_type, slug, url)
                download_media(url, get_icon_path(slug, media_type))
                self.emit("icon-loaded", slug, media_type)

    def on_media_loaded(self, _response):
        """Callback to handle a response from the API with the new media"""
        if not self.medias:
            return
        for media_type in ("banner", "icon"):
            self.download_icons(
                [
                    (slug, self.medias[media_type][slug], get_icon_path(slug, media_type))
                    for slug in self.medias[media_type]
                ], media_type
            )

    def download_icons(self, downloads, media_type):
        """Download a list of media files concurrently.

        Limits the number of simultaneous downloads to avoid API throttling
        and UI being overloaded with signals.
        """
        if not downloads:
            return

        with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
            future_downloads = {
                executor.submit(download_media, url, dest_path): slug
                for slug, url, dest_path in downloads
            }
            for future in concurrent.futures.as_completed(future_downloads):
                slug = future_downloads[future]
                try:
                    future.result()
                except Exception as ex:  # pylint: disable=broad-except
                    logger.exception('%r failed: %s', slug, ex)
                else:
                    self.emit("icon-loaded", slug, media_type)
        if media_type == "icon":
            update_desktop_icons()

    def add_games_by_ids(self, game_ids):
        self.add_games(pga.get_games_by_ids(game_ids))

    def add_game_by_id(self, game_id):
        """Add a game into the store."""
        return self.add_games_by_ids([game_id])

    def add_game(self, pga_game):
        """Add a PGA game to the store"""
        game = PgaGame(pga_game)
        self.games.append(pga_game)
        self.store.append(
            (
                game.id,
                game.slug,
                game.name,
                game.get_pixbuf(self.icon_type),
                game.year,
                game.runner,
                game.runner_text,
                game.platform,
                game.lastplayed,
                game.lastplayed_text,
                game.installed,
                game.installed_at,
                game.installed_at_text,
                game.playtime,
                game.playtime_text,
            )
        )
        if not self.has_icon(game.slug):
            self.refresh_icon(game.slug)

    def add_or_update(self, game_id):
        try:
            self.update_game_by_id(game_id)
        except ValueError:
            self.add_game_by_id(game_id)

    def set_icon_type(self, icon_type):
        """Change the icon type"""
        if icon_type == self.icon_type:
            return
        self.icon_type = icon_type
        for row in self.store:
            row[COL_ICON] = get_pixbuf_for_game(
                row[COL_SLUG],
                icon_type,
                is_installed=row[COL_INSTALLED] if not self.search_mode else True,
            )
        self.emit("icons-changed", icon_type)