# (c) Copyright 2009-2012, 2015. CodeWeavers, Inc.
import os
import fcntl
import subprocess
import threading
import time
import traceback
import cxproduct
import cxlog
import cxutils
import cxevent
import cxconfig
import cxobjc
# for localization
from cxutils import cxgettext as _
class BottleQuery(cxobjc.Proxy):
pass
class NotFoundError(Exception):
pass
registrationBottlePrefix = "registrationTestBottle"
def _validate_wineprefix(prefix):
if prefix == "":
# empty
return False
if not os.path.isdir(prefix):
# not a directory.
return False
if not os.access(prefix, os.R_OK):
# can't read!
return False
if not os.path.exists(os.path.join(prefix, "system.reg")):
return False
if not os.path.isfile(os.path.join(prefix, "cxbottle.conf")):
# Probably an orphaned stub.
return False
return True
@cxobjc.method(BottleQuery, 'bottles')
def get_bottle_list():
allbottles = set()
for path in [cxproduct.get_bottle_path(), cxproduct.get_managed_bottle_path()]:
for dirpath in path.split(":"):
if not os.path.exists(dirpath):
continue
for bottlecandidate in os.listdir(cxutils.string_to_unicode(dirpath)):
if bottlecandidate.lower() == "default":
continue
if bottlecandidate.lower().startswith(registrationBottlePrefix.lower()):
continue
if not _validate_wineprefix(os.path.join(dirpath, bottlecandidate)):
continue
allbottles.add(bottlecandidate)
return list(allbottles)
def get_games_bottle_list():
allbottles = set()
for dirpath in cxproduct.get_games_bottle_path().split(":"):
if not os.path.exists(dirpath):
continue
for bottlecandidate in os.listdir(cxutils.string_to_unicode(dirpath)):
if bottlecandidate.lower() == "default":
continue
if bottlecandidate.lower().startswith(registrationBottlePrefix.lower()):
continue
if not _validate_wineprefix(os.path.join(dirpath, bottlecandidate)):
continue
allbottles.add(bottlecandidate)
return list(allbottles)
def unique_bottle_key(inBasename):
# unique_bottle_name, below, provides a unique name for an active bottle in the bottledir.
# unique_bottle_key does that but also avoids collisions with pending games bottles.
existingBottles = get_bottle_list() + get_games_bottle_list()
collision = True
candidate = inBasename
i = 1
while collision:
collision = False
for bottlename in existingBottles:
if candidate.lower() == bottlename.lower():
collision = True
i += 1
candidate = inBasename + "-" + unicode(i)
break
return candidate
def unique_bottle_name(inBasename):
existingBottles = get_bottle_list()
collision = True
candidate = inBasename
i = 1
while collision:
collision = False
for bottlename in existingBottles:
if candidate.lower() == bottlename.lower():
collision = True
break
if not collision:
wineprefix = os.path.join(cxproduct.get_bottle_path().split(":")[0], candidate)
if os.path.exists(wineprefix):
collision = True
if collision:
i += 1
candidate = inBasename + "-" + unicode(i)
return candidate
@cxobjc.method(BottleQuery, 'defaultBottle')
def get_default_bottle():
for path in [cxproduct.get_bottle_path(), cxproduct.get_managed_bottle_path()]:
for dirpath in path.split(":"):
if not os.path.exists(dirpath):
continue
defaultcandidate = os.path.join(dirpath, "default")
if os.path.exists(defaultcandidate):
return cxutils.string_to_unicode(os.path.basename(os.path.realpath(defaultcandidate)))
return u""
@cxobjc.method(BottleQuery, 'prefixForBottle_')
def get_prefix_for_bottle(bottlename):
for path in [cxproduct.get_bottle_path(), cxproduct.get_managed_bottle_path()]:
for dirpath in path.split(":"):
if not os.path.exists(dirpath):
continue
wineprefix = os.path.join(dirpath, bottlename)
if _validate_wineprefix(wineprefix):
return wineprefix
raise NotFoundError("There is no bottle named %s" % repr(bottlename))
@cxobjc.method(BottleQuery, 'isExistingBottle_')
def is_existing_bottle(bottlename):
for path in [cxproduct.get_bottle_path(), cxproduct.get_managed_bottle_path()]:
for dirpath in path.split(":"):
if not os.path.exists(dirpath):
continue
wineprefix = os.path.join(dirpath, bottlename)
if _validate_wineprefix(wineprefix):
return True
return False
_nativePathCache = {}
def get_native_paths(bottlename, windowspaths):
try:
bottlePathCache = _nativePathCache[bottlename]
except KeyError:
bottlePathCache = {}
pathsToConvert = []
for path in windowspaths:
if path not in bottlePathCache:
pathsToConvert.append(path)
if pathsToConvert:
# Convert those paths which are not in the cache.
manipulator = BottleManipulator(bottlename)
if cxlog.is_on('timing'):
start_time = time.time()
results = []
for path in pathsToConvert:
results.append(manipulator.send('unixpath %s' % path))
if cxlog.is_on('timing'):
cxlog.log_('timing', "got %s unix paths in %0.2f seconds" % (' '.join(cxlog.debug_str(x) for x in pathsToConvert), time.time()-start_time))
for path in pathsToConvert:
result = results.pop(0).get()
if result.startswith('path '):
bottlePathCache[path] = result[5:]
else:
# FIXME
bottlePathCache[path] = ''
_nativePathCache[bottlename] = bottlePathCache
# Now our cache has everything we need in it.
nativePaths = []
for path in windowspaths:
nativePaths.append(bottlePathCache[path])
return nativePaths
@cxobjc.method(BottleQuery, 'forBottle_getNativePath_')
def get_native_path(bottlename, windowspath):
return get_native_paths(bottlename, [windowspath])[0]
_windowsPathCache = {}
def get_windows_paths(bottlename, unixpaths):
try:
bottlePathCache = _windowsPathCache[bottlename]
except KeyError:
bottlePathCache = {}
pathsToConvert = []
for path in unixpaths:
if path not in bottlePathCache:
pathsToConvert.append(path)
if pathsToConvert:
# Convert those paths which are not in the cache.
manipulator = BottleManipulator(bottlename)
if cxlog.is_on('timing'):
start_time = time.time()
results = []
for path in pathsToConvert:
results.append(manipulator.send('windowspath %s' % path))
if cxlog.is_on('timing'):
cxlog.log_('timing', "got %s unix paths in %0.2f seconds" % (' '.join(cxlog.debug_str(x) for x in pathsToConvert), time.time()-start_time))
for path in pathsToConvert:
result = results.pop(0).get()
if result.startswith('path '):
bottlePathCache[path] = result[5:]
else:
# FIXME
bottlePathCache[path] = ''
_windowsPathCache[bottlename] = bottlePathCache
# Now our cache has everything we need in it.
windowsPaths = []
for path in unixpaths:
windowsPaths.append(bottlePathCache[path])
return windowsPaths
def get_windows_path(bottlename, unixpath):
return get_windows_paths(bottlename, [unixpath])[0]
def get_control_panel_info(bottlename, windowsdir=None):
manipulator = BottleManipulator(bottlename)
if not windowsdir:
windowsdir = manipulator.send('getwindowsdir').get()
nativeWindowsDir = get_native_path(bottlename, windowsdir)
dbFileName = os.path.join(nativeWindowsDir, "control-panel.db")
iconCacheDir = os.path.join(nativeWindowsDir, "ControlPanelDB")
try:
os.mkdir(iconCacheDir)
except OSError:
# Probably the dir already existed -- no problem.
pass
args = [os.path.join(cxutils.CX_ROOT, "bin", "wine"),
"--bottle", bottlename, '--no-gui', "--wl-app", "cxcplinfo.exe",
"--all", dbFileName, iconCacheDir]
cxutils.run(args, stderr=cxutils.NULL, logprefix=bottlename)
AppletTable = []
if os.path.exists(dbFileName):
cplDB = cxconfig.Raw()
cplDB.read(dbFileName)
for applet_id in cplDB:
if applet_id.lower() == "appwiz.cpl/0":
# appwiz.cpl is installed because we need it as a back-end.
# It's pretty much meaningless in this context, though.
continue
section = cplDB[applet_id]
path = section.get('Path', '')
if path != '':
icon_path = resolve_icon_path(section.get('Icon', ''),
bottlename)
AppletTable.append([path, section.get('Name', ''),
section.get('Description', ''),
icon_path])
# Special case: Add winecfg
AppletTable.append(["winecfg.exe.so", _("Wine Configuration"), _("Built-in tool for configuring Bottle Settings"), "winecfg"])
# Task Manager...
AppletTable.append(["taskmgr.exe.so", _("Task Manager"), _("Manage the applications and processes running in this bottle"), "winetaskmgr"])
# And reboot.
AppletTable.append(["reboot.exe.so", _("Simulate Reboot"), _("Simulate a system reboot in this bottle"), "cxreboot"])
return AppletTable
@cxobjc.method(BottleQuery, 'controlPanelInfoForBottle_')
def _controlPanelInfoForBottle_(bottlename):
# FIXME: only the exception handling differs -> merge with get_control_panel_info()
try:
return get_control_panel_info(bottlename)
except:
traceback.print_exc()
return False
def config_file_path(bottlename):
wineprefix = get_prefix_for_bottle(bottlename)
return os.path.join(wineprefix, "cxbottle.conf")
def get_config(bottlename):
"""Returns a cxconfig.Stack object that takes into account all the
configuration files relevant to the specified bottle."""
config = cxproduct.get_config().copy()
config.addconfig(config_file_path(bottlename))
return config
def is_managed(bottlename):
config = get_config(bottlename)
managed = config["Bottle"].get("Updater", "") != ""
return managed
def get_bottle_properties(bottlename):
bottledict = {}
wineprefix = get_prefix_for_bottle(bottlename)
if not wineprefix:
# This isn't necessarily a disaster... it just means
# that the bottle has been deleted since we were called.
return bottledict
bottledict["writable"] = os.access(wineprefix, os.W_OK)
config = get_config(bottlename)
# Some very old bottles don't have a template set, so assume win98 for them
bottledict["template"] = expand_unix_string(bottlename, config["Bottle"].get("Template", 'win98'))
bottledict["managed"] = config["Bottle"].get("Updater", "") != ""
bottledict["description"] = config["Bottle"].get("Description", '')
bottledict["version"] = config["Bottle"].get("Version", '')
bottledict["arch"] = config["Bottle"].get("WineArch", 'win32')
return bottledict
def get_system_drive(bottlename):
environ = get_win_environ(bottlename, ("SystemDrive",))
drive = expand_win_string(environ, "%SystemDrive%")
return get_native_path(bottlename, drive)
@cxobjc.method(BottleQuery, 'propertiesForBottle_')
def _propertiesForBottle_(bottlename):
# FIXME: only the exception handling differs -> merge with get_bottle_properties()
try:
return get_bottle_properties(bottlename)
except:
traceback.print_exc()
return None
@cxobjc.method(BottleQuery, 'resolveIconPath_forBottle_')
def resolve_icon_path(iconPath, bottlename):
manipulator = BottleManipulator(bottlename)
windowspath = manipulator.send('getwindowsdir').get()
localIconFileName = os.path.join(windowspath, os.path.basename(iconPath))
if os.path.exists(localIconFileName):
return localIconFileName
if os.path.exists(iconPath):
return iconPath
nativePath = get_native_path(bottlename, iconPath)
if os.path.exists(nativePath):
return nativePath
return ""
@cxobjc.method(BottleQuery, 'shutdownManipulatorForBottle_')
def shutdown_manipulator(bottlename):
manipulator = BottleManipulator(bottlename)
manipulator.clean_shutdown()
#####
#
# The RPC implementation
#
#####
# FIXME: This should be made independent from cxmanip/bottlequery (it's close)
# and moved to a separate module to improve code readability and
# maintainability.
class AsyncManipResult(cxevent.AsyncResult):
def __init__(self, manip):
cxevent.AsyncResult.__init__(self)
self.manip = manip
def get(self):
if self.event.isSet():
return self.poll()
else:
self.manip.wait(self.event.r_pipe)
if self.event.isSet():
return self.poll()
else:
# wait() returned early because cxmanip quit
raise IOError("The cxmanip process quit before serving this query.")
_BottleManipulator_cache = {}
class _BottleManipulator(object):
def __init__(self, bottlename):
self.bottlename = bottlename
self.lock = threading.Lock()
self.started = False
self.got_ok = False
self.closing = False
self.thread = self.subp = self.stdin = self.stdin = None
self.buf = ""
self.results = []
# writes to a pipe can block so delay them if necessary
self.to_write = ''
self.chars_written = 0
self._last_activity_time = time.time()
def _fail_pending_requests(self):
"""self.lock must be acquired when this is called."""
for result in self.results:
result.set_error(IOError("The cxmanip process quit before serving this query."))
self.results = []
self.to_write = ''
self.chars_written = 0
def _start(self):
"""self.lock must be acquired when this is called."""
if not self.started:
self._fail_pending_requests()
winebin = os.path.join(cxutils.CX_ROOT, "bin", "wine")
args = [winebin, '--no-wait', '--no-gui',
'--bottle', self.bottlename, '--wl-app', 'cxmanip']
cxlog.log("starting command %s in %s" % (' '.join(cxlog.debug_str(x) for x in args), cxlog.to_str(self.bottlename)))
self.subp = subprocess.Popen(args, stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
close_fds=True)
# use the fd's for stdout and stdin directly to avoid caching
self.stdin = self.subp.stdin.fileno()
self.stdout = self.subp.stdout.fileno()
# make sure called processes will never, ever, ever inherit the pipes
fcntl.fcntl(self.stdin, fcntl.F_SETFD, fcntl.FD_CLOEXEC)
fcntl.fcntl(self.stdout, fcntl.F_SETFD, fcntl.FD_CLOEXEC)
self.buf = ""
self.started = True
self.got_ok = False
self.closing = False
self.thread = threading.Thread(target=self._thread_proc, args=(self.subp,))
self.thread.start()
def _read(self):
"""self.lock must be acquired when this is called
read from the pipe, without blocking.
"""
if not self.started:
return
readable, _unused, err = cxutils.select([self.stdout], [], [self.stdout], 0)
if err:
self.started = False
cxlog.log("cxmanip process in %s died (read error)" % cxlog.to_str(self.bottlename))
return
if readable:
instr = os.read(self.stdout, 4096)
if not instr:
# end of file
self.started = False
cxlog.log("cxmanip process in %s died (read error)" % cxlog.to_str(self.bottlename))
return
self.buf = '%s%s' % (self.buf, instr)
self._last_activity_time = time.time()
while not self.got_ok and '\n' in self.buf:
if self.buf.startswith("CXMANIP OK\n"):
self.got_ok = True
_dummy, self.buf = self.buf.split('\n', 1)
if self.got_ok:
while '\nend\n' in self.buf:
this_result = self.results.pop(0)
result, self.buf = self.buf.split('\nend\n', 1)
this_result.set(result)
if cxlog.is_on('manip'):
cxlog.log_('manip', "%s: <- %s" % (cxlog.to_str(self.bottlename), cxlog.to_str(result)))
if self.closing and not self.results:
self._close()
self.closing = False
def _write(self):
"""self.lock must be acquired when this is called
write any remaining bytes to the pipe, without blocking.
Returns True if it worked, False if the process needs to be started.
"""
if not self.started:
return False
while self.chars_written < len(self.to_write):
_unused, writable, err = cxutils.select([], [self.stdin], [self.stdin], 0)
if err:
self.started = False
cxlog.log("cxmanip process in %s died (write error)" % cxlog.to_str(self.bottlename))
return False
if writable:
self._last_activity_time = time.time()
try:
self.chars_written += os.write(self.stdin, self.to_write[self.chars_written:])
except OSError:
# broken pipe
cxlog.log("cxmanip process in %s died (write error)" % cxlog.to_str(self.bottlename))
self.started = False
return False
else:
return True #call would block
self.chars_written = 0
self.to_write = ''
return True
def wait(self, r=None, timeout=None):
"""blocks until r becomes readable, the cxmanip process dies, or there
is no activity for at least timeout seconds"""
while self.started:
if self.to_write:
write_sockets = [self.stdin]
err_sockets = [self.stdin, self.stdout]
else:
write_sockets = []
err_sockets = [self.stdout]
if r is not None:
read_sockets = [r, self.stdout]
else:
read_sockets = [self.stdout]
try:
if timeout is None:
readable, writable, err = cxutils.select(read_sockets, write_sockets, err_sockets)
else:
if not self.results and time.time() - self._last_activity_time > timeout:
# no activity for timeout seconds
return
else:
readable, writable, err = cxutils.select(read_sockets, write_sockets, err_sockets, time.time() - self._last_activity_time + timeout + 0.1)
except cxutils.select_error:
# one of the sockets has been closed?
return
if r is not None and r in readable:
return
if err:
return
if writable:
self.lock.acquire()
try:
self._write()
finally:
self.lock.release()
if readable:
self.lock.acquire()
try:
self._read()
finally:
self.lock.release()
def _thread_proc(self, subp):
timeout = 15.0 # end the cxmanip process after 15 seconds of inactivity
while self.started:
self.wait(r=None, timeout=timeout)
self.lock.acquire()
try:
if not self.results and time.time() - self._last_activity_time > timeout:
self._close()
break
finally:
self.lock.release()
subp.wait()
def _send_command(self, string):
"""self.lock must be acquired when this is called.
Send the given command to cxmanip and return an async result for it,
if cxmanip is started. Otherwise, return None."""
result = AsyncManipResult(self)
self.to_write = '%s%s\n' % (self.to_write, string)
self.results.append(result)
if self._write():
return result
return None
def send(self, string):
cxlog.log_('manip', "%s: -> %s" % (cxlog.to_str(self.bottlename), cxlog.debug_str(string)))
if isinstance(string, unicode):
string = string.encode('utf8')
self.lock.acquire()
try:
result = self._send_command(string)
if result is None:
self._start()
result = self._send_command(string)
if result is None:
result = AsyncManipResult(self)
result.set_error(IOError("Could not start the cxmanip process"))
finally:
self.lock.release()
return result
def _close(self):
"""self.lock must be acquired when this is called"""
if self.started:
self.subp.stdin.close()
self.subp.stdout.close()
self.started = False
cxlog.log("shutting down cxmanip process in %s" % cxlog.to_str(self.bottlename))
def close(self):
self.lock.acquire()
try:
self._close()
finally:
self.lock.release()
def clean_shutdown(self):
"""cleanly shut down the cxmanip process when there are no pending requests"""
self.lock.acquire()
try:
if not self.results:
self._close()
else:
self.closing = True
finally:
self.lock.release()
def __del__(self):
self.close()
def BottleManipulator(bottlename):
try:
return _BottleManipulator_cache[bottlename]
except KeyError:
_BottleManipulator_cache[bottlename] = _BottleManipulator(bottlename)
return _BottleManipulator_cache[bottlename]
#####
#
# The remainder of the bottlequery implementation
#
#####
REG_WIN32 = '32\\'
REG_NATIVE = ''
REG_BOTH = '3264\\'
def deserialize_reg(lines):
subkeys = []
values = {}
for line in lines:
if line.startswith('value '):
_dummy, rest = line.split(' ', 1)
valuename, rest = rest.split('\\0 ', 1)
valuename = valuename.encode('string_escape').rstrip('\0').lower()
typecode, rest = rest.split(' ', 1)
if rest.startswith('='):
# FIXME: multistrings and such not handled properly
values[valuename] = rest[1:].decode('string_escape').rstrip('\0')
else:
if typecode in ('4', '11'):
# little endian integer
values[valuename] = int(''.join(rest[x:x+2] for x in range(len(rest)-2, -2, -2)), 16)
else:
# some other kind of data
values[valuename] = rest
elif line.startswith('subkey '):
_dummy, keyname = line.split(' ', 1)
keyname = keyname.lower()
subkeys.append(keyname)
elif line == '':
pass
elif line in ('errno 2', 'error not a valid root key'):
raise NotFoundError("registry key not found")
else:
raise Exception("got unexpected data: %s" % line)
return subkeys, values
@cxobjc.method(BottleQuery, 'getRegistryKeyInBottle_atPath_')
def get_registry_key(bottlename, path, arch=REG_NATIVE):
manipulator = BottleManipulator(bottlename)
cxlog.log("getting registry tree for %s in %s" % (cxlog.debug_str(path), cxlog.to_str(bottlename)))
if cxlog.is_on('timing'):
start_time = time.time()
serialdata = manipulator.send('getregkey %s%s' % (arch, path)).get()
if cxlog.is_on('timing'):
cxlog.log_('timing', "got registry tree for %s in %0.3f seconds" % (cxlog.debug_str(path), time.time()-start_time))
return deserialize_reg(serialdata.split('\n'))
@cxobjc.method(BottleQuery, 'setRegistryKeyInBottle_atPath_withName_andValue_')
def set_registry_key(bottlename, path, name, value, arch=REG_NATIVE):
manipulator = BottleManipulator(bottlename)
cxlog.log("setting registry [%s/%s] in %s to %s" % (cxlog.debug_str(path), cxlog.debug_str(name), cxlog.to_str(bottlename), cxlog.debug_str(value)))
if cxlog.is_on('timing'):
start_time = time.time()
if isinstance(path, unicode):
path = path.encode('utf8', 'ignore')
if isinstance(name, unicode):
name = name.encode('utf8', 'ignore')
if isinstance(value, unicode):
value = value.encode('utf8', 'ignore')
command = arch + path
command = command + "\\n"
command = command + name
command = command + "\\n"
for prefix in ('dword:', 'str:', 'str(2):', 'str(7):', 'hex:'):
if value.startswith(prefix):
command = command + value
break
else:
command = command + "str:" + value
result = manipulator.send('setregkey %s' % command).get()
if cxlog.is_on('timing'):
cxlog.log_('timing', "set registry tree for %s in %0.3f seconds" % (cxlog.debug_str(path), time.time()-start_time))
if result != "success":
cxlog.log("setting registry error is %s" % (cxlog.to_str(result)))
return False
return True
def unset_registry_value(bottlename, path, name, arch=REG_NATIVE):
manipulator = BottleManipulator(bottlename)
if isinstance(path, unicode):
path = path.encode('utf8', 'ignore')
if isinstance(name, unicode):
name = name.encode('utf8', 'ignore')
command = arch + path
command = command + "\\n"
command = command + name
result = manipulator.send('rmregvalue %s' % command).get()
if result != "success":
cxlog.log("removing registry value error is %s" % (cxlog.to_str(result)))
return False
return True
# from winecfg/appdefaults.c and ntdll/version.c
_windows_versions = {
# pylint: disable=C0326
"win10": ("Windows 10", 10, 0, 0x2800, 2, " ", 0, 0, "WinNT"),
"win81": ("Windows 8.1", 6, 3, 0x2580, 2, " ", 0, 0, "WinNT"),
"win8": ("Windows 8", 6, 2, 0x23F0, 2, " ", 0, 0, "WinNT"),
"win2008r2": ("Windows 2008 R2", 6, 1, 0x1DB1, 2, "Service Pack 1", 1, 0, "ServerNT"),
"win7": ("Windows 7", 6, 1, 0x1DB1, 2, "Service Pack 1", 1, 0, "WinNT"),
"win2008": ("Windows 2008", 6, 0, 0x1771, 2, "Service Pack 1", 0, 0, "ServerNT"),
"vista": ("Windows Vista", 6, 0, 0x1772, 2, "Service Pack 2", 2, 0, "WinNT"),
"win2003": ("Windows 2003", 5, 2, 0xECE, 2, "Service Pack 2", 2, 0, "ServerNT"),
"winxp": ("Windows XP", 5, 1, 0xA28, 2, "Service Pack 3", 3, 0, "WinNT"),
"win2k": ("Windows 2000", 5, 0, 0x893, 2, "Service Pack 4", 4, 0, "WinNT"),
"winme": ("Windows ME", 4, 90, 0xBB8, 1, " ", 0, 0, ""),
"win98": ("Windows 98", 4, 10, 0x8AE, 1, " A ", 0, 0, ""),
"win95": ("Windows 95", 4, 0, 0x3B6, 1, "", 0, 0, ""),
"nt40": ("Windows NT 4.0", 4, 0, 0x565, 2, "Service Pack 6a", 6, 0, "WinNT"),
"nt351": ("Windows NT 3.5", 3, 51, 0x421, 2, "Service Pack 2", 0, 0, "WinNT"),
"win31": ("Windows 3.1", 3, 10, 0, 0, "Win32s 1.3", 0, 0, ""),
"win30": ("Windows 3.0", 3, 0, 0, 0, "Win32s 1.3", 0, 0, ""),
"win20": ("Windows 2.0", 2, 0, 0, 0, "Win32s 1.3", 0, 0, ""),
}
version_nicknames = {
"win2000": 'win2k',
"nt2k": 'win2k',
"nt2000": 'win2k',
"win2k3": 'win2003',
"winvista": 'vista',
"win2k8": 'win2008',
"win2k8r2": 'win2008r2',
}
_settable_versions = {
# pylint: disable=C0326
"winxpsp1": ("Windows XP", 5, 1, 0xA28, 2, "Service Pack 1", 1, 0, "WinNT"),
"winxpsp2": ("Windows XP", 5, 1, 0xA28, 2, "Service Pack 2", 2, 0, "WinNT"),
}
_settable_versions.update(_windows_versions)
for nickname in version_nicknames:
_settable_versions[nickname] = _windows_versions[version_nicknames[nickname]]
def set_windows_version(bottlename, version):
info = _settable_versions[version]
if version == 'winxp' and get_bottle_properties(bottlename)['arch'] == 'win64':
# 64-bit winxp version info is different
info = ("Windows XP", 5, 2, 0xECE, 2, "Service Pack 3", 3, 0, "WinNT")
if info[4] == 0: #VER_PLATFORM_WIN32s
unset_registry_value(bottlename, 'HKEY_LOCAL_MACHINE\\Software\\Microsoft\\Windows NT\\CurrentVersion', 'CSDVersion', REG_BOTH)
unset_registry_value(bottlename, 'HKEY_LOCAL_MACHINE\\Software\\Microsoft\\Windows NT\\CurrentVersion', 'CurrentVersion', REG_BOTH)
unset_registry_value(bottlename, 'HKEY_LOCAL_MACHINE\\Software\\Microsoft\\Windows NT\\CurrentVersion', 'CurrentBuildNumber', REG_BOTH)
unset_registry_value(bottlename, 'HKEY_LOCAL_MACHINE\\System\\CurrentControlSet\\Control\\ProductOptions', 'ProductType', REG_BOTH)
unset_registry_value(bottlename, 'HKEY_LOCAL_MACHINE\\System\\CurrentControlSet\\Control\\Windows', 'CSDVersion', REG_BOTH)
unset_registry_value(bottlename, 'HKEY_LOCAL_MACHINE\\System\\CurrentControlSet\\Control\\Session Manager\\Environment', 'OS', REG_BOTH)
unset_registry_value(bottlename, 'HKEY_LOCAL_MACHINE\\Software\\Microsoft\\Windows\\CurrentVersion', 'VersionNumber', REG_BOTH)
unset_registry_value(bottlename, 'HKEY_LOCAL_MACHINE\\Software\\Microsoft\\Windows\\CurrentVersion', 'SubVersionNumber', REG_BOTH)
set_registry_key(bottlename, 'HKEY_CURRENT_USER\\Software\\Wine', 'Version', 'str:' + version, REG_BOTH)
elif info[4] == 1: #VER_PLATFORM_WIN32_WINDOWS
unset_registry_value(bottlename, 'HKEY_LOCAL_MACHINE\\Software\\Microsoft\\Windows NT\\CurrentVersion', 'CSDVersion', REG_BOTH)
unset_registry_value(bottlename, 'HKEY_LOCAL_MACHINE\\Software\\Microsoft\\Windows NT\\CurrentVersion', 'CurrentVersion', REG_BOTH)
unset_registry_value(bottlename, 'HKEY_LOCAL_MACHINE\\Software\\Microsoft\\Windows NT\\CurrentVersion', 'CurrentBuildNumber', REG_BOTH)
unset_registry_value(bottlename, 'HKEY_LOCAL_MACHINE\\System\\CurrentControlSet\\Control\\ProductOptions', 'ProductType', REG_BOTH)
unset_registry_value(bottlename, 'HKEY_LOCAL_MACHINE\\System\\CurrentControlSet\\Control\\Windows', 'CSDVersion', REG_BOTH)
unset_registry_value(bottlename, 'HKEY_LOCAL_MACHINE\\System\\CurrentControlSet\\Control\\Session Manager\\Environment', 'OS', REG_BOTH)
set_registry_key(bottlename, 'HKEY_LOCAL_MACHINE\\Software\\Microsoft\\Windows\\CurrentVersion', 'VersionNumber', 'str:%d.%d.%d' % info[1:4], REG_BOTH)
set_registry_key(bottlename, 'HKEY_LOCAL_MACHINE\\Software\\Microsoft\\Windows\\CurrentVersion', 'SubVersionNumber', 'str:' + info[5], REG_BOTH)
unset_registry_value(bottlename, 'HKEY_CURRENT_USER\\Software\\Wine', 'Version', REG_BOTH)
elif info[4] == 2: #VER_PLATFORM_WIN32_NT
set_registry_key(bottlename, 'HKEY_LOCAL_MACHINE\\Software\\Microsoft\\Windows NT\\CurrentVersion', 'CSDVersion', 'str:' + info[5], REG_BOTH)
set_registry_key(bottlename, 'HKEY_LOCAL_MACHINE\\Software\\Microsoft\\Windows NT\\CurrentVersion', 'CurrentVersion', 'str:%d.%d' % info[1:3], REG_BOTH)
set_registry_key(bottlename, 'HKEY_LOCAL_MACHINE\\Software\\Microsoft\\Windows NT\\CurrentVersion', 'CurrentBuildNumber', 'str:%d' % info[3], REG_BOTH)
set_registry_key(bottlename, 'HKEY_LOCAL_MACHINE\\System\\CurrentControlSet\\Control\\ProductOptions', 'ProductType', 'str:' + info[8], REG_BOTH)
set_registry_key(bottlename, 'HKEY_LOCAL_MACHINE\\System\\CurrentControlSet\\Control\\Windows', 'CSDVersion', 'dword:%02x%02x' % info[6:8], REG_BOTH)
set_registry_key(bottlename, 'HKEY_LOCAL_MACHINE\\System\\CurrentControlSet\\Control\\Session Manager\\Environment', 'OS', 'Windows_NT', REG_BOTH)
unset_registry_value(bottlename, 'HKEY_LOCAL_MACHINE\\Software\\Microsoft\\Windows\\CurrentVersion', 'VersionNumber', REG_BOTH)
unset_registry_value(bottlename, 'HKEY_LOCAL_MACHINE\\Software\\Microsoft\\Windows\\CurrentVersion', 'SubVersionNumber', REG_BOTH)
unset_registry_value(bottlename, 'HKEY_CURRENT_USER\\Software\\Wine', 'Version', REG_BOTH)
def get_windows_version(bottlename):
try:
_subkeys, values = get_registry_key(bottlename, 'HKEY_CURRENT_USER\\Software\\Wine')
if 'version' in values and values['version'] in _windows_versions:
return values['version']
except NotFoundError:
pass
try:
_subkeys, values = get_registry_key(bottlename, 'HKEY_LOCAL_MACHINE\\Software\\Microsoft\\Windows\\CurrentVersion')
if 'versionnumber' in values:
version_numbers = tuple(int(x) for x in values['versionnumber'].split('.'))
for version in _windows_versions:
version_info = _windows_versions[version]
if version_info[4] == 1 and version_info[1:4] == version_numbers:
return version
except NotFoundError:
pass
try:
_subkeys, values = get_registry_key(bottlename, 'HKEY_LOCAL_MACHINE\\Software\\Microsoft\\Windows NT\\CurrentVersion')
_subkeys, prod_values = get_registry_key(bottlename, 'HKEY_LOCAL_MACHINE\\System\\CurrentControlSet\\Control\\ProductOptions')
if 'currentversion' in values and 'producttype' in prod_values:
version_numbers = tuple(int(x) for x in values['currentversion'].split('.'))
for version in _windows_versions:
version_info = _windows_versions[version]
if version_info[4] == 2 and version_info[1:3] == version_numbers and version_info[8] == prod_values['producttype']:
return version
except NotFoundError:
pass
return 'winxp'
@cxobjc.method(BottleQuery, 'setNativeBrowserForBottle_')
def set_native_browser(bottlename):
"""Register winebrowser as the default web browser in the given bottle, so
links are opened in the system native browser."""
assocs = ('htmlfile', 'xmlfile', 'ftp', 'http', 'https')
for assoc in assocs:
set_registry_key(bottlename, 'HKEY_CLASSES_ROOT\\%s\\shell\\open\\command' % assoc,
'', '\"c:\\windows\\system32\\winebrowser.exe\" -nohome')
set_registry_key(bottlename, 'HKEY_CLASSES_ROOT\\%s\\shell\\open\\ddeexec' % assoc,
'', '"%1",,-1,0,,,,')
set_registry_key(bottlename, 'HKEY_CLASSES_ROOT\\%s\\shell\\open\\ddeexec' % assoc,
'NoActivateHandler', '')
set_registry_key(bottlename, 'HKEY_CLASSES_ROOT\\%s\\shell\\open\\ddeexec\\Application' % assoc,
'', 'IExplore')
set_registry_key(bottlename, 'HKEY_CLASSES_ROOT\\%s\\shell\\open\\ddeexec\\Topic' % assoc,
'', 'WWW_OpenURL')
def get_unix_environ(bottlename=None):
"""Get the Unix environment that should be in effect for the specified
bottle.
If bottlename is None, then only the non-bottle specific variables (i.e.
CX_ROOT) are set."""
# This should be in BottleWrapper but this would make it impossible to use
# it from installtask!
environ = os.environ.copy()
environ['CX_ROOT'] = cxutils.CX_ROOT
if bottlename:
environ['CX_BOTTLE'] = bottlename
environ['WINEPREFIX'] = get_prefix_for_bottle(bottlename)
# FIXME: We should probably also take into account the
# [EnvironmentVariables] section
# FIXME: We should also cache this data, however this would mean
# having a mechanism in place to update it.
return environ
def expand_unix_string(bottlename, string):
"""Expand references to environment variables of the form '${VARNAME}' in
the specified string.
The environment is the one appropriate for the specified bottle (see
get_unix_environ()).
"""
# This should be in BottleWrapper but this would make it impossible to use
# it from installtask!
return cxutils.expand_unix_string(get_unix_environ(bottlename), string)
def get_win_environ(bottlename, varnames=None):
"""Returns a mapping of the specified environment variable names to their
values. If no variable name is specified, then all the Windows environment
variables are returned.
Note that the actual Windows environment is augmented with a couple of
variables: accessories, desktop, startmenu and programs. In order to get
these, they must be requested by name though. FIXME: Or not, we'll see.
"""
environ = {}
manipulator = BottleManipulator(bottlename)
for varname in varnames:
# FIXME: It would be better to retrieve them all at once
environ[varname.lower()] = manipulator.send('getexpandedstr %s' % varname).get()
return environ
def expand_win_string(environ, string):
"""Expands references to Windows environment variables using the specified
environment block.
"""
while '%' in string:
try:
pre, varname, post = string.split('%', 2)
except ValueError:
# not enough values to unpack
break
varname = varname.lower()
if varname in environ:
value = environ[varname]
else:
cxlog.warn("unable to expand %s" % cxlog.to_str(varname))
value = ''
string = '%s%s%s' % (pre, value, post)
return string
def add_drive(bottlename, unix_path):
output = BottleManipulator(bottlename).send('adddrive %s' % unix_path).get()
output = output.strip()
if output == '':
return None
elif output.startswith('drive '):
return output[6:]
elif output == 'errno 2':
raise NotFoundError('Path not found %s' % repr(unix_path))
else:
raise Exception("got unexpected data: %s" % output)
def rm_drive(bottlename, drive_letter):
output = BottleManipulator(bottlename).send('rmdrive %s' % drive_letter).get()
output = output.strip()
if output == 'errno 2':
raise NotFoundError('No such drive letter as %s in %s' % (repr(drive_letter), repr(bottlename)))
elif output:
raise Exception("got unexpected data: %s" % output)
def rm_reg_tree(bottlename, regtree, arch=REG_NATIVE):
result = BottleManipulator(bottlename).send('rmregtree %s%s' % (arch, regtree)).get()
result = result.strip()
if result != "success":
cxlog.log("removing registry tree error is %s" % (cxlog.to_str(result)))
return False
return True
def find_files(bottlename, path, max_results=-1):
files = []
dirs = []
output = BottleManipulator(bottlename).send('findfiles %s %s' % (max_results, path)).get()
for line in output.split('\n'):
if line.startswith('file '):
files.append(line[5:])
elif line.startswith('dir '):
dirname = line[4:]
if dirname not in ('.', '..'):
dirs.append(dirname)
elif line in ('errno 2', 'errno 3', 'errno 5', 'errno 267'):
raise NotFoundError
elif line:
raise Exception("got unexpected data: %s" % line)
return dirs, files