Repository URL to install this package:
|
Version:
15.1.0-1 ▾
|
# (c) Copyright 2014. CodeWeavers, Inc.
"""Updates a file, checking the signature if needed."""
import os
import shutil
import datetime
import time
import subprocess
import tempfile
import binascii
import cxlog
import cxutils
import cxurlget
def prepare(filename, urlname=None):
"""Unzip the file and/or extract its signature so it can be checked.
Returns a (datafile, sigfile) tuple containing the filenames of the prepared
file and its signature. If an error occurs, datafile will be None.
"""
basename = filename
gunzip_proc = None
if urlname is None:
urlname = filename
if urlname.endswith(".gz"):
if basename.endswith(".gz"):
basename = basename[0:-3]
import cxwhich
gzip = cxwhich.which(os.environ["PATH"], "pigz")
if gzip is None:
gzip = "gzip"
# Use single-letter flags for backward-compatibility.
args = (gzip, "-d", "-c", filename)
gunzip_proc = subprocess.Popen(args, stdout=subprocess.PIPE)
srcfh = gunzip_proc.stdout
else:
srcfh = open(filename, "r")
(basename, ext) = os.path.splitext(basename)
basename = os.path.basename(basename)
(datafd, datafile) = tempfile.mkstemp(suffix=ext, prefix=basename)
datafh = os.fdopen(datafd, "w+b")
sigannounce = "<!-- Signature"
signed = False
sigfile = None
for line in srcfh:
if sigannounce in line:
(sigfd, sigfile) = tempfile.mkstemp(suffix=".sig", prefix=datafile)
sigfh = os.fdopen(sigfd, "w+b")
signed = True
if signed:
# We have a signature embedded in an XML comment.
# Strip out the commenty bits.
strippedline = line.replace(sigannounce, "")
strippedline = strippedline.replace("-->", "")
strippedline = strippedline.rstrip()
splitline = strippedline.split("= ")
if len(splitline) > 1:
hexline = splitline[1].strip()
else:
hexline = strippedline.strip()
sigfh.write(binascii.unhexlify(hexline))
else:
datafh.write(line)
datafh.close()
if signed:
sigfh.close()
if gunzip_proc:
gunzip_proc.communicate()
if gunzip_proc.returncode != 0:
cxlog.err("an error occurred while unzipping %s. This file will be ignored." % filename)
os.unlink(datafile)
if signed:
os.unlink(sigfile)
return (None, None)
else:
srcfh.close()
return (datafile, sigfile)
def is_signed(datafile, sigfile=None):
"""Returns True if the file has a valid signature."""
if not sigfile:
sigfile = datafile + ".sig"
if not os.path.exists(sigfile):
return False
keyfile = os.path.join(cxutils.CX_ROOT, "share", "crossover", "data", "tie.pub")
args = ["openssl", "dgst", "-sha1", "-verify", keyfile, "-signature", sigfile, datafile]
(retcode, _out, _err) = cxutils.run(args, stdout=cxutils.NULL, stderr=cxutils.NULL)
if retcode == 0:
return True
cxlog.err("The file %s is signed, but the signature is invalid." % datafile)
return False
def install(dstfile, tmpfile, tmpsig):
"""Replace filename and filename.sig with the specified temporary files so
as to minimize the impact of race conditions and errors.
Returns a tuple containing the filenames of the new data and signature
files."""
dstdir = os.path.dirname(dstfile)
if not cxutils.mkdirs(dstdir):
return (None, None)
# Do a little dance so we can always find a destination file with a valid
# signature even if we're interrupted.
dstsig = None
if tmpsig:
shutil.move(tmpfile, dstfile + ".new")
tmpfile = dstfile + ".new"
if tmpsig is not None:
dstsig = dstfile + ".sig"
# Move tmpsig into the destination directory before the final
# rename, so the replacement is more likely to be atomic.
shutil.move(tmpsig, dstsig + ".new")
os.rename(dstsig + ".new", dstsig)
# If we're interrupted here, then it's up to the caller to notice the
# .new file on the next run and to move it into place.
shutil.move(tmpfile, dstfile)
return (dstfile, dstsig)
def update(filename, url, needs_signature=False, timeout=None, leeway=7200):
"""Updates the specified file from the URL and returns True if it was indeed
updated. Returns False if the download failed or the file was already up to
date."""
if os.path.exists(filename):
mtime = os.path.getmtime(filename)
if leeway and time.time() < mtime + leeway:
# No need for an update, the file is still recent enough
return False
last_modified = cxurlget.format_http_date(datetime.datetime.utcfromtimestamp(mtime))
else:
dirname = os.path.dirname(filename)
if not cxutils.mkdirs(dirname):
return False
last_modified = None
# Download the file if it is newer than the one we have.
(basename, ext) = os.path.splitext(filename)
(newfd, newfile) = tempfile.mkstemp(suffix=ext, prefix=basename)
newfh = os.fdopen(newfd, "w+b")
getter = cxurlget.UrlGetter(url, newfh, last_modified=last_modified)
getter.timeout = timeout
getter.fetch()
if not getter.finished:
# The file did not change since last time
os.unlink(newfile)
if leeway:
# Update the timestamp so we don't try the download again
# in the next leeway seconds
os.utime(filename, None)
return False
(datafile, sigfile) = prepare(newfile, getter.basename)
if datafile is None:
success = False
elif needs_signature:
# Make sure that the downloaded file is properly signed.
if sigfile is not None and is_signed(datafile, sigfile):
success = True
else:
cxlog.warn("The newly downloaded %s file is not properly signed. Discarding." % filename)
success = False
else:
success = True
if success:
# Replace filename and filename.sig with the new files.
install(filename, datafile, sigfile)
os.unlink(newfile)
return success