Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
lutris / usr / lib / python3 / dist-packages / lutris / api.py
Size: Mime:
"""Functions to interact with the Lutris REST API"""
# Standard Library
import json
import os
import re
import socket
import urllib.error
import urllib.parse
import urllib.request

# Lutris Modules
from lutris import settings
from lutris.util import http, resources, system
from lutris.util.log import logger

API_KEY_FILE_PATH = os.path.join(settings.CACHE_DIR, "auth-token")
USER_INFO_FILE_PATH = os.path.join(settings.CACHE_DIR, "user.json")
USER_ICON_FILE_PATH = os.path.join(settings.CACHE_DIR, "user.png")


def read_api_key():
    """Read the API token from disk"""
    if not system.path_exists(API_KEY_FILE_PATH):
        return None
    with open(API_KEY_FILE_PATH, "r") as token_file:
        api_string = token_file.read()
    try:
        username, token = api_string.split(":")
    except ValueError:
        logger.error("Unable to read Lutris token in %s", API_KEY_FILE_PATH)
        return None
    return {"token": token, "username": username}


def connect(username, password):
    """Connect to the Lutris API"""
    credentials = urllib.parse.urlencode({"username": username, "password": password}).encode("utf-8")
    login_url = settings.SITE_URL + "/api/accounts/token"
    try:
        request = urllib.request.urlopen(login_url, credentials, 10)
    except (socket.timeout, urllib.error.URLError) as ex:
        logger.error("Unable to connect to server (%s): %s", login_url, ex)
        return False
    response = json.loads(request.read().decode())
    if "token" in response:
        token = response["token"]
        with open(API_KEY_FILE_PATH, "w") as token_file:
            token_file.write(":".join((username, token)))
        get_user_info()
        return response["token"]
    return False


def disconnect():
    """Removes the API token, disconnecting the user"""
    for file_path in [API_KEY_FILE_PATH, USER_INFO_FILE_PATH, USER_ICON_FILE_PATH]:
        if system.path_exists(file_path):
            os.remove(file_path)


def get_user_info():
    """Retrieves the user info to cache it locally"""
    credentials = read_api_key()
    if not credentials:
        return
    url = settings.SITE_URL + "/api/users/me"
    request = http.Request(url, headers={"Authorization": "Token " + credentials["token"]})
    response = request.get()
    account_info = response.json
    if not account_info:
        logger.warning("Unable to fetch user info for %s", credentials["username"])
    with open(USER_INFO_FILE_PATH, "w") as token_file:
        json.dump(account_info, token_file, indent=2)
    if account_info.get("avatar_url"):
        resources.download_media(account_info["avatar_url"], USER_ICON_FILE_PATH)


def get_library():
    """Return the remote library as a list of dicts."""
    credentials = read_api_key()
    if not credentials:
        return []
    url = settings.SITE_URL + "/api/games/library/%s" % urllib.parse.quote(credentials["username"])
    request = http.Request(url, headers={"Authorization": "Token " + credentials["token"]})
    try:
        response = request.get()
    except http.HTTPError as ex:
        logger.error("Unable to load library: %s", ex)
        return []
    response_data = response.json
    if response_data:
        return response_data["games"]
    return []


def get_runners(runner_name):
    """Return the available runners for a given runner name"""
    api_url = settings.SITE_URL + "/api/runners/" + runner_name
    response = http.Request(api_url).get()
    return response.json


def get_game_api_page(game_ids, page="1", query_type="games"):
    """Read a single page of games from the API and return the response

    Args:
        game_ids (list): list of game IDs, the ID type is determined by `query_type`
        page (str): Page of results to get
        query_type (str): Type of the IDs in game_ids, by default 'games' queries
                          games by their Lutris slug. 'gogid' can also be used.
    """
    url = settings.SITE_URL + "/api/games"

    if int(page) > 1:
        url += "?page={}".format(page)

    response = http.Request(url, headers={"Content-Type": "application/json"})
    if game_ids:
        payload = json.dumps({query_type: game_ids, "page": page}).encode("utf-8")
    else:
        raise ValueError("No game id provided will fetch all games from the API")
    try:
        response.get(data=payload)
    except http.HTTPError as ex:
        logger.error("Unable to get games from API: %s", ex)
        return None
    response_data = response.json
    num_games = len(response_data.get("results"))
    if num_games:
        logger.debug("Loaded %s games from page %s", num_games, page)
    else:
        logger.debug("No game found for %s", ', '.join(game_ids))

    if not response_data:
        logger.warning("Unable to get games from API, status code: %s", response.status_code)
        return None
    return response_data


def get_api_games(game_slugs=None, page="1", query_type="games", inject_aliases=False):
    """Return all games from the Lutris API matching the given game slugs"""
    response_data = get_game_api_page(game_slugs, page=page, query_type=query_type)
    if not response_data:
        return []
    results = response_data.get("results", [])
    while response_data.get("next"):
        page_match = re.search(r"page=(\d+)", response_data["next"])
        if page_match:
            next_page = page_match.group(1)
        else:
            logger.error("No page found in %s", response_data["next"])
            break
        response_data = get_game_api_page(game_slugs, page=next_page, query_type=query_type)
        if not response_data:
            logger.warning("Unable to get response for page %s", next_page)
            break
        results += response_data.get("results")
    if game_slugs and inject_aliases:
        matched_games = []
        for game in results:
            for alias_slug in [alias["slug"] for alias in game.get("aliases", [])]:
                if alias_slug in game_slugs:
                    matched_games.append((alias_slug, game))
        for alias_slug, game in matched_games:
            game["slug"] = alias_slug
            results.append(game)
    return results


def search_games(query):
    if not query:
        return []
    query = query.lower().strip()[:32]
    if query == "open source games":
        url = "/api/bundles/open-source"
    elif query == "free to play games":
        url = "/api/bundles/free-to-play"
    else:
        url = "/api/games?%s" % urllib.parse.urlencode({"search": query})
    response = http.Request(settings.SITE_URL + url, headers={"Content-Type": "application/json"})
    try:
        response.get()
    except http.HTTPError as ex:
        logger.error("Unable to get games from API: %s", ex)
        return None
    response_data = response.json
    if "bundles" in url:
        api_games = response_data.get("games", [])
    else:
        api_games = response_data.get("results", [])
    for index, game in enumerate(api_games, 1):
        game["id"] = index * -1
        game["installed"] = 1
        game["runner"] = None
        game["platform"] = None
        game["lastplayed"] = None
        game["installed_at"] = None
        game["playtime"] = None
    return api_games


def parse_installer_url(url):
    """
    Parses `lutris:` urls, extracting any info necessary to install or run a game.
    """
    action = None
    try:
        parsed_url = urllib.parse.urlparse(url, scheme="lutris")
    except Exception:  # pylint: disable=broad-except
        logger.warning("Unable to parse url %s", url)
        return False
    if parsed_url.scheme != "lutris":
        return False
    url_path = parsed_url.path
    if not url_path:
        return False
    # urlparse can't parse if the path only contain numbers
    # workaround to remove the scheme manually:
    if url_path.startswith("lutris:"):
        url_path = url_path[7:]

    url_parts = url_path.split("/")
    if len(url_parts) == 2:
        action = url_parts[0]
        game_slug = url_parts[1]
    elif len(url_parts) == 1:
        game_slug = url_parts[0]
    else:
        raise ValueError("Invalid lutris url %s" % url)

    revision = None
    if parsed_url.query:
        query = dict(urllib.parse.parse_qsl(parsed_url.query))
        revision = query.get("revision")
    return {"game_slug": game_slug, "revision": revision, "action": action}