Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
crossover / opt / cxoffice / lib / python / fileupdate.py
Size: Mime:
# (c) Copyright 2014. CodeWeavers, Inc.

"""Updates a file, checking the signature if needed."""

import os
import shutil
import datetime
import time
import subprocess
import tempfile
import binascii

import cxlog
import cxutils
import cxurlget


def prepare(filename, urlname=None):
    """Unzip the file and/or extract its signature so it can be checked.

    Returns a (datafile, sigfile) tuple containing the filenames of the prepared
    file and its signature. If an error occurs, datafile will be None.
    """

    basename = filename
    gunzip_proc = None
    if urlname is None:
        urlname = filename
    if urlname.endswith(".gz"):
        if basename.endswith(".gz"):
            basename = basename[0:-3]
        import cxwhich
        gzip = cxwhich.which(os.environ["PATH"], "pigz")
        if gzip is None:
            gzip = "gzip"
        # Use single-letter flags for backward-compatibility.
        args = (gzip, "-d", "-c", filename)
        gunzip_proc = subprocess.Popen(args, stdout=subprocess.PIPE)
        srcfh = gunzip_proc.stdout
    else:
        srcfh = open(filename, "r")

    (basename, ext) = os.path.splitext(basename)
    basename = os.path.basename(basename)
    (datafd, datafile) = tempfile.mkstemp(suffix=ext, prefix=basename)
    datafh = os.fdopen(datafd, "w+b")

    sigannounce = "<!-- Signature"
    signed = False
    sigfile = None
    for line in srcfh:
        if sigannounce in line:
            (sigfd, sigfile) = tempfile.mkstemp(suffix=".sig", prefix=datafile)
            sigfh = os.fdopen(sigfd, "w+b")
            signed = True

        if signed:
            # We have a signature embedded in an XML comment.
            # Strip out the commenty bits.
            strippedline = line.replace(sigannounce, "")
            strippedline = strippedline.replace("-->", "")
            strippedline = strippedline.rstrip()
            splitline = strippedline.split("= ")
            if len(splitline) > 1:
                hexline = splitline[1].strip()
            else:
                hexline = strippedline.strip()

            sigfh.write(binascii.unhexlify(hexline))
        else:
            datafh.write(line)

    datafh.close()
    if signed:
        sigfh.close()

    if gunzip_proc:
        gunzip_proc.communicate()
        if gunzip_proc.returncode != 0:
            cxlog.err("an error occurred while unzipping %s. This file will be ignored." % filename)
            os.unlink(datafile)
            if signed:
                os.unlink(sigfile)
            return (None, None)
    else:
        srcfh.close()

    return (datafile, sigfile)


def is_signed(datafile, sigfile=None):
    """Returns True if the file has a valid signature."""

    if not sigfile:
        sigfile = datafile + ".sig"
    if not os.path.exists(sigfile):
        return False

    keyfile = os.path.join(cxutils.CX_ROOT, "share", "crossover", "data", "tie.pub")
    args = ["openssl", "dgst", "-sha1", "-verify", keyfile, "-signature", sigfile, datafile]
    (retcode, _out, _err) = cxutils.run(args, stdout=cxutils.NULL, stderr=cxutils.NULL)
    if retcode == 0:
        return True

    cxlog.err("The file %s is signed, but the signature is invalid." % datafile)
    return False


def install(dstfile, tmpfile, tmpsig):
    """Replace filename and filename.sig with the specified temporary files so
    as to minimize the impact of race conditions and errors.
    Returns a tuple containing the filenames of the new data and signature
    files."""
    dstdir = os.path.dirname(dstfile)
    if not cxutils.mkdirs(dstdir):
        return (None, None)

    # Do a little dance so we can always find a destination file with a valid
    # signature even if we're interrupted.
    dstsig = None
    if tmpsig:
        shutil.move(tmpfile, dstfile + ".new")
        tmpfile = dstfile + ".new"

        if tmpsig is not None:
            dstsig = dstfile + ".sig"
            # Move tmpsig into the destination directory before the final
            # rename, so the replacement is more likely to be atomic.
            shutil.move(tmpsig, dstsig + ".new")
            os.rename(dstsig + ".new", dstsig)
        # If we're interrupted here, then it's up to the caller to notice the
        # .new file on the next run and to move it into place.
    shutil.move(tmpfile, dstfile)
    return (dstfile, dstsig)


def update(filename, url, needs_signature=False, timeout=None, leeway=7200):
    """Updates the specified file from the URL and returns True if it was indeed
    updated. Returns False if the download failed or the file was already up to
    date."""

    if os.path.exists(filename):
        mtime = os.path.getmtime(filename)
        if leeway and time.time() < mtime + leeway:
            # No need for an update, the file is still recent enough
            return False
        last_modified = cxurlget.format_http_date(datetime.datetime.utcfromtimestamp(mtime))
    else:
        dirname = os.path.dirname(filename)
        if not cxutils.mkdirs(dirname):
            return False
        last_modified = None

    # Download the file if it is newer than the one we have.
    (basename, ext) = os.path.splitext(filename)
    (newfd, newfile) = tempfile.mkstemp(suffix=ext, prefix=basename)
    newfh = os.fdopen(newfd, "w+b")
    getter = cxurlget.UrlGetter(url, newfh, last_modified=last_modified)
    getter.timeout = timeout
    getter.fetch()

    if not getter.finished:
        # The file did not change since last time
        os.unlink(newfile)
        if leeway:
            # Update the timestamp so we don't try the download again
            # in the next leeway seconds
            os.utime(filename, None)
        return False

    (datafile, sigfile) = prepare(newfile, getter.basename)
    if datafile is None:
        success = False
    elif needs_signature:
        # Make sure that the downloaded file is properly signed.
        if sigfile is not None and is_signed(datafile, sigfile):
            success = True
        else:
            cxlog.warn("The newly downloaded %s file is not properly signed. Discarding." % filename)
            success = False
    else:
        success = True

    if success:
        # Replace filename and filename.sig with the new files.
        install(filename, datafile, sigfile)

    os.unlink(newfile)
    return success