# (c) Copyright 2009-2013. CodeWeavers, Inc.
import errno
import fcntl
import locale
import mmap
import os
import re
import stat
import subprocess
import sys
import time
import traceback
if sys.version_info >= (3,):
from urllib.parse import urlparse, unquote # pylint: disable=E0611,F0401
else:
from urllib import unquote # pylint: disable=E0611
from urlparse import urlparse # pylint: disable=F0401
import cxlog
import distversion
import cxobjc
class CXUtils(cxobjc.Proxy):
pass
#####
#
# CX_ROOT
#
#####
# Points to CrossOver's root directory.
# This should be set at startup and is then considered to be constant for the
# duration of the process.
CX_ROOT = None
#####
#
# File and directory helpers
#
#####
def mkdirs(path, *args, **kwargs):
"""Similar to os.makedirs() but returns False to signal errors and logs
errors.
"""
try:
os.makedirs(path, *args, **kwargs)
except OSError:
ose = sys.exc_info()[1]
if ose.errno != errno.EEXIST:
cxlog.err("unable to create the '%s' directory: %s" % (cxlog.debug_str(path), ose.strerror))
return False
return True
PERMISSIONS_OK = 0
PERMISSIONS_NOREAD = 1
PERMISSIONS_NOEXEC = 2
def check_mmap_permissions(filename):
try:
f = open(filename, 'rb')
except IOError:
return PERMISSIONS_NOREAD
try:
try:
mm = mmap.mmap(f.fileno(), 1, mmap.MAP_SHARED, mmap.PROT_READ|mmap.PROT_EXEC)
except mmap.error:
return PERMISSIONS_NOEXEC
mm.close()
finally:
f.close()
return PERMISSIONS_OK
def check_fake_dll(path):
if os.path.exists(path):
sfile = open(path, "rb")
sfile.seek(0x40)
tag = sfile.read(20)
if tag == "Wine placeholder DLL":
return True
return False
#####
#
# Cross-process lock
#
#####
class FLock(object):
def __init__(self, name):
"""Creates and takes a lock called name using flock().
This is used to avoid races between processes.
"""
self._name = name
# Create the lock directory and perform some sanity checks
lockdir = os.path.join(os.environ.get('TMPDIR', '/tmp'),
".wine-%d" % os.geteuid())
mkdirs(lockdir, int('700', 8))
st = os.lstat(lockdir)
if not stat.S_ISDIR(st.st_mode):
raise ValueError("'%s' is not owned by you" % lockdir)
if st.st_uid != os.geteuid():
raise ValueError("'%s' is not owned by you" % lockdir)
if st.st_mode & int('77', 8):
raise ValueError("'%s' must not be accessible by other users" % lockdir)
# Perform some more sanity checks and create the lock file
self._path = os.path.join(lockdir, name + ".lock")
try:
st = os.lstat(self._path)
if not stat.S_ISREG(st.st_mode):
raise ValueError("'%s' is not a regular file" % self._path)
except OSError:
ose = sys.exc_info()[1]
if ose.errno != errno.ENOENT:
raise ose
self._lock = open(self._path, 'w')
# Take the lock
cxlog.log("%d: Grabbing the '%s' lock" % (os.getpid(), self._path))
try:
fcntl.flock(self._lock, fcntl.LOCK_EX)
except Exception:
e = sys.exc_info()[1]
# The filesystem probably does not support locking
cxlog.warn("unable to take the '%s' lock: %s" % (name, e))
self._lock.close()
self._lock = None
raise # re-raise the exception
def _getname(self):
"""Returns the lock name."""
return self._name
name = property(_getname)
def locked(self):
"""Returns True if the lock has not been released yet."""
return self._lock is not None
def release(self):
"""Releases the lock.
Note that deleting a lock carries a very high risk of causing races
outside the scope of this class and has essentially no benefit. So the
file created for the lock is not deleted and no support is provided
for deleting it.
"""
if self._lock:
cxlog.log("%d: Releasing the '%s' lock" % (os.getpid(), self._name))
try:
try:
fcntl.flock(self._lock, fcntl.LOCK_UN)
except Exception:
e = sys.exc_info()[1]
cxlog.warn("unable to release the '%s' lock: %s" % (self._name, e))
finally:
self._lock.close()
self._lock = None
#####
#
# String handling
#
#####
try:
# pylint: disable=E0602
bytes_type = bytes
except NameError:
bytes_type = str
try:
unicode_type = unicode
except NameError:
unicode_type = str
if sys.version_info < (3,):
def b(x):
return x
else:
def b(x):
if isinstance(x, int):
return bytes([x])
return x.encode('utf8')
def string_to_unicode(string):
if isinstance(string, unicode_type):
return string
elif isinstance(string, bytes_type):
return string.decode('utf8')
else:
raise ValueError("string_to_unicode called with non-string")
def expect_unicode(string):
if isinstance(string, unicode_type):
return string
elif isinstance(string, bytes_type):
cxlog.warn("Byte string passed where unicode string was expected:\n%s" % ''.join(traceback.format_stack()[0:-1]))
return string.decode('utf8')
else:
raise ValueError("expect_unicode() called with non-string")
def unescape_string(string):
"""Unescapes a string that was placed in double-quotes."""
string = string.replace('\\\"', '\"')
string = string.replace("\\\\", "\\")
return string
def escape_string(string):
"""Escapes the string so it can be placed in double-quotes."""
string = string.replace("\\", "\\\\")
string = string.replace('"', '\\"')
return string
def shquote_string(string):
"""Quote a string so it can be used in shell commands.
Note that this implies escaping '$'s and '`'s which may not be appropriate
in another context.
"""
string = string.replace('\\', '\\\\')
string = string.replace('$', '\\$')
string = string.replace('\"', '\\\"')
string = string.replace("\'", "\\\'")
return '"' + string + '"'
_BAD_SHELL_CHARS = re.compile(r"[^a-zA-Z0-9\/.,+_-]")
def maybe_shquote_string(string):
if _BAD_SHELL_CHARS.search(string):
return shquote_string(string)
return string
def unmangle(string):
result = []
x = 0
while x < len(string):
char = string[x]
x += 1
if char == '+':
result.append(' ')
elif char == '_':
result.append('/')
elif char == '~':
result.append('\\')
elif char == '^':
try:
result.append(chr(int(string[x:x+2], 16)))
except ValueError:
# not followed by valid hex digits
pass
else:
x += 2
else:
result.append(char)
return ''.join(result)
_ACCELERATOR_RE = re.compile('&(.)')
def remove_accelerators(string):
"""Remove the ampersands that identify the accelerator keys on Windows.
While only the first one is meant to be significant, they should all be
removed, and '&&' should be turned into a regular '&'.
"""
return _ACCELERATOR_RE.sub('\\1', string)
# Compares two strings by alphabetical (rather than lexical) order
def alpha_cmp(s1, s2):
return locale.strcoll(string_to_unicode(s1).lower(), string_to_unicode(s2).lower())
def md5_hasher():
"""Returns an object for computing MD5 hashes."""
try:
# pylint: disable=F0401,E1101
import hashlib
return hashlib.md5()
except ImportError:
import md5
return md5.new()
def is_valid_windows_filename(filename):
"""Checks that the given filename is valid."""
if not filename:
return False
for x in filename:
if x in "\\/*?:|<>\"'":
return False
return True
def sanitize_windows_filename(filename):
"""Turns a filename that may contain invalid characters into a valid
Windows filename."""
result = []
for c in filename:
if c in '\\:*?"<>|\x7f' or ord(c) < 0x20:
result.append('_')
else:
result.append(c)
return ''.join(result)
def allow_bottlename_char(character):
"""Returns False for characters that are forbidden in bottle names."""
# \/*?:|<>" are not valid characters in Windows paths
# {} and space are not valid in paths stored in Debian and Solaris packages
prohibited = "\\/*?:|<>\"'{}"
# Spaces cause trouble in Nautilus (GNOME bug 700320).
# So allow them but avoid them by default
if not distversion.IS_MACOSX:
prohibited = prohibited + " "
return character not in prohibited
def is_valid_bottlename(bottlename):
"""Checks that the bottle name is valid."""
if bottlename and bottlename[0] == '.':
return False
for x in bottlename:
if not allow_bottlename_char(x):
return False
return True
@cxobjc.method(CXUtils, 'sanitizedBottleName_')
def sanitize_bottlename(name_hint):
"""Builds a valid and nice-looking bottle name based on the given hint."""
result = []
prev_is_edu = False # equal, dash, underscore
for x in name_hint:
if x == '.':
# Avoid leading dots as they cause the bottle to be hidden
if result:
result.append('.')
prev_is_edu = False
elif x == ' ' and result and result[-1] in '_ ':
# Remove the preceding space or underscore
result[-1] = ' '
prev_is_edu = False
elif x == '_':
# Avoid consecutive and leading underscores
if not prev_is_edu and result:
result.append('_')
prev_is_edu = True
elif prev_is_edu and (x == '=' or x == '-'):
# Avoid consecutive equal and dash characters and avoid underscores
if result and result[-1] == '_':
result[-1] = x
elif allow_bottlename_char(x):
result.append(x)
prev_is_edu = False
elif not prev_is_edu and result:
# Again avoid consecutive and leading underscores
result.append('_')
prev_is_edu = True
# Avoid trailing underscores and spaces
bottlename = ''.join(result).rstrip("_ ")
# The bottle name should be valid but check just to be sure
if not is_valid_bottlename(bottlename):
cxlog.err("sanitize_bottlename() generated an invalid bottle name: '%s'" % bottlename)
return bottlename
def path_to_uri(path):
result = ['file://']
if not isinstance(path, bytes_type):
# FIXME: use filesystem encoding?
path = path.encode('utf8')
for char in path:
if ord(char) >= 0x80 or char in '%#?*!&=()[],:;@$+ ':
result.append('%%%02X' % ord(char))
else:
result.append(char)
return ''.join(result)
def uri_to_path(uri):
scheme, _netloc, path, _params, _query, _fragment = urlparse(uri)
if scheme == 'file':
return unquote(path)
return uri
def html_escape(string):
"""Escape characters that have special meaning in html."""
replacements = {
'<': '<',
'>': '>',
'&': '&'}
result = []
for char in string:
result.append(replacements.get(char, char))
return ''.join(result)
_HTML_BR_RE = None
_HTML_P_RE = None
_HTML_LI_RE = None
_HTML_A_RE = None
_HTML_URL_RE = None
_HTML_TAGS_RE = None
_LF_RE = None
def html_to_text(string):
"""Use some heuristics to turn an HTML string into a readable plain
text string."""
if string is None:
return None
# pylint: disable=W0603
global _HTML_BR_RE, _HTML_P_RE, _HTML_LI_RE, _HTML_A_RE, _HTML_URL_RE
global _HTML_TAGS_RE, _LF_RE
if not _HTML_BR_RE:
_HTML_BR_RE = re.compile('<br/?>(?:\\s*\n)*', re.IGNORECASE)
_HTML_P_RE = re.compile(r'''</?(?:div|p|ol|ul)(\s+[a-z]+=(['"])(?:[^\\'"]|\.)*\2)*>(?:\s*\n)*''', re.IGNORECASE)
_HTML_LI_RE = re.compile(r'''(\s*\n)*\s*<li(\s+[a-z]+=(['"])(?:[^\\'"]|\\.)*\3)*>''', re.IGNORECASE)
_HTML_A_RE = re.compile(r'''</?a(\s+[a-z]+=(['"])(?:[^\\'"]|\.)*\2)*>''', re.IGNORECASE)
_HTML_URL_RE = re.compile(r'''\shref=(['"])((?:[^\\'"]|\.)*)\1''', re.IGNORECASE)
_HTML_TAGS_RE = re.compile(r'''(?:</?[a-z]+(\s+[a-z]+=(['"])(?:[^\\'"]|\\.)*\2)*/?>)+''', re.IGNORECASE)
_LF_RE = re.compile(r'''\s*\n\s*\n(?:\s*\n)+''')
# Remove carriage returns and linefeeds
string = string.replace('\r', '').replace('\n', ' ')
# Some tags deserve special treatment
string = _HTML_BR_RE.sub('\n', string) # End of line tag
string = _HTML_LI_RE.sub('\n * ', string) # Enum tag
string = _HTML_P_RE.sub('\n\n', string) # 'Paragraph' tags
# Preserve the URL for links so users can copy/paste them to their browser
def replace_link(match):
atag = match.group(0)
url = _HTML_URL_RE.search(atag)
if url:
return "[" + url.group(2) + "] "
return ""
string = _HTML_A_RE.sub(replace_link, string)
# Simply remove all other tags
string = _HTML_TAGS_RE.sub('', string)
# Replace the most common entities
string = string.replace('<', '<')
string = string.replace('>', '>')
string = string.replace('&', '&')
string = string.replace(' ', ' ')
# Finally remove excess spaces and line feeds
string = re.sub(' +', ' ', string)
string = _LF_RE.sub('\n\n', string).strip()
return string
@cxobjc.method(CXUtils, 'cmdLineToArgv_')
def cmdlinetoargv(string):
"""Convert a windows-style quoted command line to an argument list. This is
NOT equivalent to CommandLineToArgvW."""
in_quotes = False
pos = 0
result = []
arg = []
while pos < len(string):
char = string[pos]
if char == ' ' and not in_quotes:
if arg:
result.append(''.join(arg))
del arg[:]
elif char == '\\' and pos+1 < len(string) and string[pos+1] in '\\"':
pos += 1
arg.append(string[pos])
elif char == '"':
in_quotes = not in_quotes
else:
arg.append(string[pos])
pos += 1
if arg:
result.append(''.join(arg))
return result
def argvtocmdline(argv):
"""Convert an argument list to a windows-style quoted command line."""
result = []
for arg in argv:
arg = arg.replace("\\", "\\\\").replace('"', '\\"')
if ' ' in arg:
arg = '"%s"' % arg
result.append(arg)
return ' '.join(result)
@cxobjc.method(CXUtils, 'basename_')
def basename(path):
"""Get the base name of a Windows or Unix path.
Note that Windows paths can mix forward and backward slashes.
"""
index = max(path.rfind('/'), path.rfind('\\'))
if index >= 0:
return path[index+1:]
return path
@cxobjc.method(CXUtils, 'dirname_')
def dirname(path):
"""Get the directory name of a Windows or Unix path.
Note that Windows paths can mix forward and backward slashes.
"""
index = max(path.rfind('/'), path.rfind('\\'))
if index >= 0:
return path[:index].rstrip('/\\')
return ''
_regex_specials = set('.^$*+?[]|()')
def unescape_regex(string):
"""If a regex can match multiple strings, return None. If it can match only
one string, this function MAY return that string."""
in_curly_brace = False
escape = False
result = []
string = string.lstrip('^').rstrip('$')
for ch in string:
if ch == '{' and not escape:
in_curly_brace = True
elif in_curly_brace:
if ch == '}':
return None
elif not ch.isdigit() and ch != ',':
in_curly_brace = False
if escape:
if ch.isalnum():
# An escaped letter or number might have special meaning
return None
result.append(ch)
escape = False
elif ch in _regex_specials:
return None
elif ch == '\\':
escape = True
else:
result.append(ch)
return ''.join(result)
_OCTALCHAR_RE = None
def expand_octal_chars(string):
r"""Expand escaped octal characters of the form '\040' in the specified
string.
"""
if string is None:
return None
# pylint: disable=W0603
global _OCTALCHAR_RE
if not _OCTALCHAR_RE:
_OCTALCHAR_RE = re.compile('(\\\\[0-7]{3})')
i = 0
expanded = list()
for match in _OCTALCHAR_RE.finditer(string):
expanded.append(string[i:match.start()])
expanded.append(chr(int(string[match.start()+1:match.end()], 8)))
i = match.end()
expanded.append(string[i:])
return ''.join(expanded)
_UNIXVAR_RE = None
def expand_unix_string(environ, string):
"""Expand references to environment variables of the form '${VARNAME}' in
the specified string.
Note that unlike os.path.expandvars(), if a given environment variable is
not set it expands to an empty string.
"""
if string is None:
return None
# pylint: disable=W0603
global _UNIXVAR_RE
if not _UNIXVAR_RE:
_UNIXVAR_RE = re.compile(r'\${(\w+)}')
i = 0
expanded = list()
for match in _UNIXVAR_RE.finditer(string):
expanded.append(string[i:match.start()])
if match.group(1) in environ:
expanded.append(environ[match.group(1)])
i = match.end()
expanded.append(string[i:])
return ''.join(expanded)
def _int_for_version_component(comp):
"""A version component can have a non-numeric suffix (e.g. the "local" in
"25889local"). That interferes with converting to integer. This function
collects just the numeric prefix and converts that.
"""
if comp.isdigit():
return int(comp)
n = 0
for x in comp:
if x.isdigit():
n += 1
else:
break
if n:
return int(comp[0:n])
return 0
def subtract_version(v1, v2):
if v1:
v1 = v1.split('.')
else:
v1 = []
if v2:
v2 = v2.split('.')
else:
v2 = []
if len(v1) < len(v2):
v1.extend(["0"] * (len(v2) - len(v1)))
elif len(v2) < len(v1):
v2.extend(["0"] * (len(v1) - len(v2)))
result = []
sign = 0
for i in range(len(v1)):
if sign == 0:
sign = cmp(_int_for_version_component(v1[i]), _int_for_version_component(v2[i]))
result.append((_int_for_version_component(v1[i]) - _int_for_version_component(v2[i])) * sign)
return tuple(result)
def cmp_versions(v1, v2):
if v1:
v1 = v1.split('.')
else:
v1 = []
if v2:
v2 = v2.split('.')
else:
v2 = []
if len(v1) < len(v2):
v1.extend(["0"] * (len(v2) - len(v1)))
elif len(v2) < len(v1):
v2.extend(["0"] * (len(v1) - len(v2)))
for i in range(len(v1)):
result = cmp(_int_for_version_component(v1[i]), _int_for_version_component(v2[i]))
if result != 0:
return result
return 0
def split_version(s):
result = tuple(_int_for_version_component(x) for x in s.split('.'))
i = len(result)-1
while result[i] == 0 and i > 0:
i -= 1
return result[0:i+1]
_version_as_tuple = None
def version_as_tuple():
# pylint: disable=W0603
global _version_as_tuple
if _version_as_tuple is None:
_version_as_tuple = split_version(distversion.CX_VERSION)
return _version_as_tuple
#####
#
# Locale functions
#
#####
_SINGLETON_RE = re.compile("(-x)?-[a-z]$")
_PREFERRED_LANGUAGES = {}
def get_preferred_languages(language=None):
"""Returns a sequence of the user's preferred languages if language is
None; and a sequence of variants otherwise.
All languages must be identified by their RFC 1766 / RFC 3066 / RFC 5646 /
BCP 47 language id.
In the None case the result corresponds to the user's preferences on
Mac OS X, while on Unix it is derived from the user's locale.
When starting from a single language, the variants added allow broader
matches to be established. For instance 'pt-br' gives ('pt-br', 'pt', '').
"""
if language in _PREFERRED_LANGUAGES:
return _PREFERRED_LANGUAGES[language]
if language:
languages = [language]
else:
languages = distversion.get_user_languages()
if not languages:
languages = ['en']
# Add the user's preferred languages
seen = set()
preferred = []
fallbacks = []
for lang in languages:
lang = lang.lower()
if lang in seen:
continue
preferred.append(lang)
seen.add(lang)
# Compute the fallbacks
while '-' in lang:
lang = lang.rsplit('-', 1)[0]
# Strip trailing singleton subtags such as '-x' as they have no
# meaning on their own. See 'Extension Subtags' in RFC 5646 and
# the lookup algorithm in RFC 4647.
lang = _SINGLETON_RE.sub('', lang)
if lang not in seen:
fallbacks.append((lang, len(preferred)))
# Take extra care to insert the fallbacks where they are useful and yet
# don't override the user's preferences. Here are a few scenarios:
# ('en-us, 'fr') -> ('en-us, 'en', 'fr')
# ('en-us, 'fr', 'en') -> ('en-us, 'fr', 'en')
# ('en-us, 'en-ca') -> ('en-us, 'en-ca', 'en')
# ('zh-hant-tw', 'fr', 'zh-tw') -> ('zh-hant-tw', 'zh-hant', 'fr', 'zh-tw', 'zh')
# ('en-a-bbb-x-a-ccc-ddd') -> ('en-a-bbb-x-a-ccc-ddd', 'en-a-bbb-x-a-ccc', 'en-a-bbb', 'en')
# See also the lookup algorithm in RFC 4647
# Insert the fallbacks starting from the end of the array to not mess up
# the first indices.
for fallback in reversed(fallbacks):
if fallback[0] in seen:
continue
preferred.insert(fallback[1], fallback[0])
seen.add(fallback[0])
# Add the ultimate fallback
preferred.append('')
_PREFERRED_LANGUAGES[language] = preferred
return preferred
@cxobjc.method(CXUtils, 'getLanguageValue_forLanguage_')
def get_language_value(dictionary, language=None):
"""Returns the appropriate value for the specified language from a
dictionary mapping language names to values.
The given dictionary should either contain a single '' key that is used
for all languages, or only have locales as its keys for locale-specific
values. However for compatibility with pre-10.0 releases, dictionaries
containing both types of keys are supported too.
Empty values are ignored.
This function will always return a non-empty value if there is at least one
in the dictionary.
"""
languages = get_preferred_languages(language)
for lang in languages:
if lang in dictionary and dictionary[lang]:
return (lang, dictionary[lang])
if dictionary:
# The dictionary is not empty, but there is no default. Just pick
# something, predictably.
languages = dictionary.keys()
languages.sort()
for lang in languages:
if dictionary[lang]:
return (lang, dictionary[lang])
return ('', '')
_LOCALE_PATH = None
def setup_textdomain():
# pylint: disable=W0603
global _LOCALE_PATH
if _LOCALE_PATH is None:
_LOCALE_PATH = os.path.join(CX_ROOT, "share", "locale")
cxlog.log("locale path %s" % cxlog.debug_str(_LOCALE_PATH))
if hasattr(locale, 'bindtextdomain'):
# Use locale to set the C-level textdomain for the benefit of C
# libraries such as GTK+. But note that bindtextdomain() is missing
# on some platforms, like Mac OS X.
locale.bindtextdomain('crossover', _LOCALE_PATH)
import gettext
gettext.bindtextdomain("crossover", _LOCALE_PATH)
gettext.bind_textdomain_codeset("crossover", "UTF-8")
gettext.textdomain("crossover")
_GETTEXT_TRANSLATOR = None
def _translator():
# pylint: disable=W0603
global _GETTEXT_TRANSLATOR
if not _GETTEXT_TRANSLATOR:
setup_textdomain()
languages = []
for lang in get_preferred_languages():
# We must convert the language id to match the naming of the
# $CX_ROOT/share/locale folders. It's not case sensitive, but the
# dashes must be replaced by underscores.
languages.append(lang.replace('-', '_'))
import gettext
_GETTEXT_TRANSLATOR = gettext.translation("crossover", _LOCALE_PATH, languages=languages, fallback=True)
return _GETTEXT_TRANSLATOR
def cxgettext(message):
return string_to_unicode(_translator().gettext(message))
# So xgettext picks up the strings to translate
_ = cxgettext
#####
#
# Dealing with subprocesses
#
#####
GRAB = 1
NULL = 2
STDOUT = 3
NOLOG = 4
def run(args, bufsize=0, executable=None, stdin=None, stdout=None, stderr=None,
preexec_fn=None, shell=False, background=False, cwd=None, env=None,
universal_newlines=True, logprefix=""):
"""Works the same way as subprocess.Popen, but simplifies retrieving the
output and exit code. Returns (retcode, out, err) triplets.
If stdout or stderr are set to GRAB, then they are captured and returned.
If they or stdin are set to NULL, then they are redirected to/from /dev/null
instead. If stderr is set to STDOUT, then it is redirected to stdout.
If the command cannot be run, then returns -1 as the exit code instead of
raising an exception.
Finally, the execution is timed and traced with cxlog. If stdout or stderr
are being captured, then they are logged too, but that can be disabled by
or-ing NOLOG. If logprefix is set, then the trace is prefixed with it.
"""
if cxlog.is_on():
start = time.time()
if logprefix != "":
logprefix = logprefix + ": "
bg = ""
if background:
bg = " &"
cxlog.log("%sRunning: %s%s" % (logprefix, " ".join(cxlog.debug_str(arg) for arg in args), bg))
out_log = False
if stdin is None:
subin = None
elif stdin == NULL:
subin = open("/dev/null", "r")
else:
raise TypeError("invalid value for stdin")
if stdout:
if (stdout & ~NOLOG) == GRAB and not background:
subout = subprocess.PIPE
if stdout == GRAB:
out_log = True
elif stdout == NULL:
subout = open("/dev/null", "w")
else:
raise TypeError("invalid value for stdout")
else:
subout = None
err_log = False
if stderr:
if (stderr & ~NOLOG) == GRAB and not background:
suberr = subprocess.PIPE
if stderr == GRAB:
err_log = True
elif stderr == STDOUT:
suberr = subprocess.STDOUT
elif stderr == NULL:
if cxlog.is_on():
suberr = None
else:
suberr = open("/dev/null", "w")
else:
raise TypeError("invalid value for stderr")
else:
suberr = None
try:
subp = subprocess.Popen(args, bufsize, executable, subin, subout,
suberr, preexec_fn, close_fds=True,
shell=shell, cwd=cwd, env=env,
universal_newlines=universal_newlines)
if not background:
out, err = subp.communicate()
retcode = subp.wait()
else:
out = err = ""
retcode = 0
except OSError:
exception = sys.exc_info()[1]
retcode = -1
out = ""
err = cxlog.to_str(exception)
if hasattr(subin, 'close'):
# pylint: disable=E0012,E1103
subin.close()
if hasattr(subout, 'close'):
# pylint: disable=E0012,E1103
subout.close()
if hasattr(suberr, 'close'):
# pylint: disable=E0012,E1103
suberr.close()
if not background and cxlog.is_on():
cxlog.log("%s%s -> rc=%d (took %0.3f seconds)" % (logprefix, cxlog.debug_str(os.path.basename(args[0])), retcode, time.time() - start))
if out_log:
count = out.count("\n")
if count > 20:
cxlog.log("out=<%d lines>" % count)
else:
cxlog.log("out=[%s]" % out)
if err_log:
count = err.count("\n")
if count > 20:
cxlog.log("err=<%d lines>" % count)
else:
cxlog.log("err=[%s]" % err)
return (retcode, out, err)
def system(args, executable=None, preexec_fn=None, shell=False,
background=False, cwd=None, env=None, logprefix=""):
"""Works the same way as run(), except that neither stdout nor stderr are
captured, and only the retcode is returned.
"""
retcode, _out, _err = run(args, executable=executable,
preexec_fn=preexec_fn, shell=shell,
background=background, cwd=cwd, env=env,
logprefix=logprefix)
return retcode
def launch_url(url):
"""Uses launchurl to open the specified URL in a web browser."""
return system([os.path.join(CX_ROOT, "bin", "launchurl"), url],
background=True)
#####
#
# Select wrapper
#
#####
# Functions and module imports are stored in the same namespace. This means a
# simple 'import select' would get overridden by our select() function, thus
# preventing it from accessing the content of the module, and necessitating
# annoying workarounds (and rightful Pylint warnings).
#
# So we just import the parts we need, renaming them in the process to avoid
# any conflict.
from select import select as select_select
from select import error as select_error
def select(iwtd, owtd, ewtd, timeout=None):
"""Same as select.select() but retries on EINTR (with the same timeout)."""
while True:
try:
return select_select(iwtd, owtd, ewtd, timeout)
except select_error:
err = sys.exc_info()[1]
if err.args[0] == errno.EINTR:
if timeout is not None:
timeout = 0
continue
else:
raise
#####
#
# User directories
#
#####
def _xdg_dirs_skip_whitespace(pos, string):
while pos < len(string) and string[pos] in ' \t':
pos += 1
return pos
def _internal_load_xdg_dirs():
try:
# Try the system's Python module first
# pylint: disable=F0401
import xdg.BaseDirectory
config_file = xdg.BaseDirectory.load_first_config('user-dirs.dirs')
except:
# Otherwise use our builtin module
import BaseDirectory
config_file = BaseDirectory.load_first_config('user-dirs.dirs')
if config_file is None:
config_file = os.path.join(os.environ.get('HOME', '/'), ".config", "user-dirs.dirs")
result = {}
try:
f = open(config_file, 'U')
except:
return {}
try:
for line in f:
line = line.rstrip('\n')
# Based on MIT code in xdg-user-dir-lookup.c in the xdg-user-dirs package.
pos = _xdg_dirs_skip_whitespace(0, line)
if line[pos:pos+4] != 'XDG_':
continue
pos += 4
dir_pos = line.find('_DIR', pos)
if dir_pos == -1:
continue
name = line[pos:dir_pos]
pos = dir_pos + 4
pos = _xdg_dirs_skip_whitespace(pos, line)
if line[pos:pos+1] != '=':
continue
pos += 1
pos = _xdg_dirs_skip_whitespace(pos, line)
if line[pos:pos+1] != '"':
continue
pos += 1
if line[pos:pos+6] == '$HOME/':
pos += 6
value_base = os.environ.get('HOME', '/') + '/'
elif line[pos:pos+1] == '/':
value_base = ''
else:
continue
value_end = line.find('"', pos)
if value_end != -1:
value = value_base + line[pos:value_end].replace('\\', '')
else:
# Yes, the standard parser does not mind if there's no terminating quote.
value = value_base + line[pos:].replace('\\', '')
result[name] = value
finally:
f.close()
return result
_xdg_dirs = None
def _load_xdg_dirs():
global _xdg_dirs # pylint: disable=W0603
if _xdg_dirs is None:
_xdg_dirs = _internal_load_xdg_dirs()
return _xdg_dirs
def get_download_dir():
if not distversion.IS_MACOSX:
xdg_dirs = _load_xdg_dirs()
if 'DOWNLOAD' in xdg_dirs:
return xdg_dirs['DOWNLOAD']
path = os.path.join(os.environ.get('HOME', '/'), _('Downloads'))
if os.path.isdir(path):
return path
return os.path.join(os.environ.get('HOME', '/'), 'Downloads')
def get_desktop_dir():
if not distversion.IS_MACOSX:
xdg_dirs = _load_xdg_dirs()
if 'DESKTOP' in xdg_dirs:
return xdg_dirs['DESKTOP']
path = os.path.join(os.environ.get('HOME', '/'), _('Desktop'))
if os.path.isdir(path):
return path
return os.path.join(os.environ.get('HOME', '/'), 'Desktop')
#####
#
# Locating icons
#
#####
S_MEDIUM = ('48x48', '32x32', '')
def get_icon_path(root, subdir, iconname, sizes=S_MEDIUM):
for ext in ('.png', '.xpm'):
for size in sizes:
filename = os.path.join(root, size, subdir, iconname + ext)
if os.path.exists(filename):
return filename
return None
def get_icon_paths(root, subdir, iconname):
for ext in ('.png', '.xpm'):
for size in os.listdir(root):
filename = os.path.join(root, size, subdir, iconname + ext)
if os.path.exists(filename):
yield filename
#####
#
# pylint utilities
#
#####
def not_yet_implemented():
"""If a method raises NotImplementedError(), pylint regards it as an
"abstract method" that must be implemented in all subclasses, and the
class as an "abstract class". Sometimes this is desired, but in some cases
we use this exception for parts of the API that we are not using now but
plan to implement and use in the future. Those functions should instead
raise cxutils.not_yet_implemented()."""
return NotImplementedError()
#####
#
# umask cache
#
#####
# Retrieve the umask on startup, that is before we have multiple threads all
# trying to get it at the same time. os.umask() MUST NOT be used anywhere
# else. Should we need to modify the umask, then we will need to write a
# proper umask() wrapper.
UMASK = os.umask(0)
os.umask(UMASK)