Repository URL to install this package:
|
Version:
1.0.18-1pclos ▾
|
#!/usr/bin/python3
# Version: 1.0.18
# Licence: WTFP
# CopyCat is free software: you can redistribute it and/or modify
# it under the terms of WTFP Public License This program is distributed
# in the hope that it will be useful, but WITHOUT ANY WARRANTY.
from subprocess import Popen,PIPE,call,STDOUT
import os
import signal
import re
import gettext
import locale
import sys
import getopt
import time
import gi
gi.require_version('Polkit', '1.0')
gi.require_version('Gtk', '3.0')
gi.require_version('UDisks', '2.0')
gi.require_version('XApp', '1.0')
from gi.repository import GObject, Gio, Polkit, Gtk, GLib, UDisks, XApp
try:
from gi.repository import CopyCat
Using_CopyCat = True
except ImportError:
Using_CopyCat = False
if Using_CopyCat :
launcher = CopyCat.LauncherEntry.get_for_desktop_id ("copycat.desktop")
APP = 'copycat'
LOCALE_DIR = "/usr/share/locale"
locale.bindtextdomain(APP, LOCALE_DIR)
gettext.bindtextdomain(APP, LOCALE_DIR)
gettext.textdomain(APP)
_ = gettext.gettext
def print_timing(func):
def wrapper(*arg):
t1 = time.time()
res = func(*arg)
t2 = time.time()
print('%s took %0.3f ms' % (func.__name__, (t2-t1)*1000.0))
return res
return wrapper
class CopyCat :
def __init__(self, iso_path=None, usb_path=None, filesystem=None, mode=None, debug=False):
self.debug = debug
def devices_changed_callback(client):
self.get_devices()
self.udisks_client = UDisks.Client.new_sync()
self.udisks_client.connect("changed", devices_changed_callback)
# get glade tree
self.gladefile = "/usr/share/copycat/copycat.ui"
self.wTree = Gtk.Builder()
self.process = None
self.source_id = None
self.wTree.set_translation_domain(APP)
self.wTree.add_from_file(self.gladefile)
self.ddpid = 0
self.emergency_dialog = self.wTree.get_object("emergency_dialog")
self.confirm_dialog = self.wTree.get_object("confirm_dialog")
self.success_dialog = self.wTree.get_object("success_dialog")
if mode == "iso":
self.mode = "normal"
self.devicelist = self.wTree.get_object("device_combobox")
self.label = self.wTree.get_object("to_label")
self.expander = self.wTree.get_object("detail_expander")
self.go_button = self.wTree.get_object("write_button")
self.go_button.set_label(_("Write"))
self.logview = self.wTree.get_object("detail_text")
self.progress = self.wTree.get_object("progressbar")
self.chooser = self.wTree.get_object("filechooserbutton")
# Devicelist model
self.devicemodel = Gtk.ListStore(str, str)
# Renderer
renderer_text = Gtk.CellRendererText()
self.devicelist.pack_start(renderer_text, True)
self.devicelist.add_attribute(renderer_text, "text", 1)
self.get_devices()
# get globally needed widgets
self.window = self.wTree.get_object("main_dialog")
self.window.connect("destroy", self.close)
# set default file filter to *.iso/*.img
filt = Gtk.FileFilter()
filt.add_pattern("*.[iI][mM][gG]")
filt.add_pattern("*.[iI][sS][oO]")
filt.add_pattern("*.img")
self.chooser.set_filter(filt)
# set callbacks
dict = {
"on_cancel_button_clicked" : self.close,
"on_emergency_button_clicked" : self.emergency_ok,
"on_success_button_clicked" : self.success_ok,
"on_confirm_cancel_button_clicked" : self.confirm_cancel}
self.wTree.connect_signals(dict)
self.devicelist.connect("changed", self.device_selected)
self.go_button.connect("clicked", self.do_write)
self.chooser.connect("file-set", self.file_selected)
if iso_path:
if os.path.exists(iso_path):
self.chooser.set_filename(iso_path)
self.file_selected(self.chooser)
if usb_path is not None:
iter = self.devicemodel.get_iter_first()
while iter is not None:
value = self.devicemodel.get_value(iter, 0)
if usb_path in value:
self.devicelist.set_active_iter(iter)
iter = self.devicemodel.iter_next(iter)
self.window.show_all()
if self.mode=="format":
self.expander.hide()
self.log = self.logview.get_buffer()
def get_devices(self):
self.go_button.set_sensitive(False)
self.devicemodel.clear()
dct = []
self.dev = None
manager = self.udisks_client.get_object_manager()
for obj in manager.get_objects():
if obj is not None:
block = obj.get_block()
if block is not None:
drive = self.udisks_client.get_drive_for_block(block)
if drive is not None:
is_usb = str(drive.get_property("connection-bus")) == 'usb'
size = int(drive.get_property('size'))
optical = bool(drive.get_property('optical'))
removable = bool(drive.get_property('removable'))
if is_usb and size > 0 and removable and not optical:
name = _("unknown")
block = obj.get_block()
if block is not None:
name = block.get_property('device')
name = ''.join([i for i in name if not i.isdigit()])
driveVendor = str(drive.get_property('vendor'))
driveModel = str(drive.get_property('model'))
if driveVendor.strip() != "":
driveModel = "%s %s" % (driveVendor, driveModel)
if size >= 1000000000000:
size = "%.0fTB" % round(size / 1000000000000)
elif size >= 1000000000:
size = "%.0fGB" % round(size / 1000000000)
elif size >= 1000000:
size = "%.0fMB" % round(size / 1000000)
elif size >= 1000:
size = "%.0fkB" % round(size / 1000)
else:
size = "%.0fB" % round(size)
item = "%s (%s) - %s" % (driveModel, name, size)
if item not in dct:
dct.append(item)
self.devicemodel.append([name, item])
self.devicelist.set_model(self.devicemodel)
def device_selected(self, widget):
iter = self.devicelist.get_active_iter()
if iter is not None:
self.dev = self.devicemodel.get_value(iter, 0)
self.go_button.set_sensitive(True)
def filesystem_selected(self, widget):
iter = self.filesystemlist.get_active_iter()
if iter is not None:
self.filesystem = self.fsmodel.get_value(iter, 0)
self.activate_devicelist()
def file_selected(self, widget):
self.activate_devicelist()
def do_format(self, widget):
if self.debug:
print("DEBUG: Format %s as %s % (self.dev, self.filesystem")
return
self.devicelist.set_sensitive(False)
self.filesystemlist.set_sensitive(False)
self.go_button.set_sensitive(False)
label = self.wTree.get_object("volume_label_entry").get_text()
if os.geteuid() > 0:
self.raw_format(self.dev, self.filesystem, label)
else:
# We are root, display confirmation dialog
resp = self.confirm_dialog.run()
if resp == Gtk.ResponseType.OK:
self.confirm_dialog.hide()
self.raw_format(self.dev, self.filesystem, label)
else:
self.confirm_dialog.hide()
self.set_format_sensitive()
def check_format_job(self):
self.process.poll()
if self.process.returncode is None:
self.format_progressbar.pulse()
return True
else:
GObject.idle_add(self.format_job_done, self.process.returncode)
self.process = None
return False
def do_write(self, widget):
if self.debug:
print("DEBUG: Write %s to %s" % (self.chooser.get_filename(), self.dev))
return
self.go_button.set_sensitive(False)
self.devicelist.set_sensitive(False)
self.chooser.set_sensitive(False)
source = self.chooser.get_filename()
target = self.dev
self.logger(_('Write:') + ' ' + source)
self.logger(_('USB device:')+ ' ' + self.dev)
if os.geteuid() > 0:
self.raw_write(source, target)
else:
# We are root, display confirmation dialog
resp = self.confirm_dialog.run()
if resp == Gtk.ResponseType.OK:
self.confirm_dialog.hide()
self.raw_write(source, target)
else:
self.confirm_dialog.hide()
self.set_iso_sensitive()
def set_progress_bar_fraction(self, size):
self.progress.set_fraction(size)
str_progress = "%3.0f%%" % (float(size)*100)
self.progress.set_text(str_progress)
self.window.set_title("%s - %s" % (str_progress, _("CopyCat 1.0.18")))
def update_progress(self, fd, condition):
if Using_CopyCat :
launcher.set_property("progress_visible", True)
if condition is GLib.IO_IN:
line = fd.readline()
try:
size = float(line.strip())
progress = round(size * 100)
if progress > self.write_progress:
self.write_progress = progress
GObject.idle_add(self.set_progress_bar_fraction, size)
if Using_CopyCat :
launcher.set_property("progress", size)
except:
pass
return True
else:
GLib.source_remove(self.source_id)
return False
def check_write_job(self):
self.process.poll()
if self.process.returncode is None:
return True
else:
GObject.idle_add(self.write_job_done, self.process.returncode)
self.process = None
return False
def raw_write(self, source, target):
self.progress.set_sensitive(True)
self.progress.set_text(_('Writing %(VAR_FILE)s to %(VAR_DEV)s') % {'VAR_FILE': source.split('/')[-1], 'VAR_DEV': self.dev})
self.logger(_('Starting copy from %(VAR_SOURCE)s to %(VAR_TARGET)s') % {'VAR_SOURCE':source, 'VAR_TARGET':target})
if os.geteuid() > 0:
launcher='pkexec'
self.process = Popen([launcher,'/usr/bin/python3', '-u', '/usr/share/copycat/raw_write.py','-s',source,'-t',target], shell=False, stdout=PIPE, preexec_fn=os.setsid)
else:
self.process = Popen(['/usr/bin/python3', '-u', '/usr/share/copycat/raw_write.py','-s',source,'-t',target], shell=False, stdout=PIPE, preexec_fn=os.setsid)
self.write_progress = 0
self.source_id = GLib.io_add_watch(self.process.stdout, GLib.IO_IN|GLib.IO_HUP, self.update_progress)
GObject.timeout_add(500, self.check_write_job)
def write_job_done(self, rc):
if rc == 0:
if Using_CopyCat :
launcher.set_property("progress_visible", False)
launcher.set_property("urgent", True)
message = _('The iso was successfully written.')
self.set_progress_bar_fraction(1.0)
self.logger(message)
self.success(_('The iso was successfully written.'))
return False
elif rc == 3:
message = _('Not enough space on the USB device.')
elif rc == 4:
message = _('An error occured while copying the iso.')
elif rc == 127:
message = _('Authentication Error.')
else:
message = _('An error occurred.')
self.logger(message)
self.emergency(message)
return False
def success(self,message):
label = self.wTree.get_object("label5")
label.set_text(message)
if self.mode == "normal":
self.final_unsensitive()
resp = self.success_dialog.run()
if resp == Gtk.ResponseType.OK:
self.success_dialog.hide()
def emergency(self, message):
if self.mode == "normal":
self.final_unsensitive()
label = self.wTree.get_object("label6")
label.set_text(message)
#self.expander.set_expanded(True)
mark = self.log.create_mark("end", self.log.get_end_iter(), False)
self.logview.scroll_to_mark(mark, 0.05, True, 0.0, 1.0)
resp = self.emergency_dialog.run()
if resp == Gtk.ResponseType.OK:
self.emergency_dialog.hide()
def final_unsensitive(self):
self.chooser.set_sensitive(False)
self.devicelist.set_sensitive(False)
self.go_button.set_sensitive(False)
self.progress = self.wTree.get_object("progressbar")
self.progress.set_sensitive(False)
self.window.set_title(_("CopyCat 1.0.18"))
def close(self, widget):
self.write_logfile()
if self.process is not None:
try:
os.killpg(self.process.pid, signal.SIGTERM)
except:
pass
finally:
Gtk.main_quit()
else:
Gtk.main_quit()
def write_logfile(self):
start = self.log.get_start_iter()
end = self.log.get_end_iter()
print(self.log.get_text(start, end, False))
def logger(self, text):
self.log.insert_at_cursor(text+"\n")
def activate_devicelist(self):
self.devicelist.set_sensitive(True)
self.expander.set_sensitive(True)
self.label.set_sensitive(True)
def confirm_cancel(self,widget):
self.confirm_dialog.hide()
if self.mode == "normal": self.set_iso_sensitive()
if self.mode == "format": self.set_format_sensitive()
def emergency_ok(self,widget):
self.emergency_dialog.hide()
if self.mode == "normal": self.set_iso_sensitive()
if self.mode == "format":
self.set_format_sensitive()
self.go_button.set_sensitive(False)
def success_ok(self,widget):
self.success_dialog.hide()
if self.mode == "normal":
self.set_iso_sensitive()
if self.mode == "format":
self.set_format_sensitive()
self.go_button.set_sensitive(False)
def set_iso_sensitive(self):
self.chooser.set_sensitive(True)
self.devicelist.set_sensitive(True)
self.go_button.set_sensitive(True)
def set_format_sensitive(self):
self.filesystemlist.set_sensitive(True)
self.devicelist.set_sensitive(True)
self.go_button.set_sensitive(True)
if __name__ == "__main__":
usb_path=None
iso_path=None
filesystem=None
mode=None
def usage():
print("Usage: copycat [--debug] -m [iso] : mode (burn iso image)")
print(" copycat [--debug] -m iso [-i|--iso] iso_path")
print(" [-f|--filesystem] filesystem")
exit (0)
try:
opts, args = getopt.getopt(sys.argv[1:], "hm:i:u:f:", ["debug", "help", "mode=", "iso=","usb=","filesystem="])
except getopt.error as msg:
print(msg)
print("for help use --help")
sys.exit(2)
debug = False
for o, a in opts:
if o in ("-h", "--help"):
usage()
elif o in ("-i", "--iso"):
iso_path = a
elif o in ("-u", "--usb"):
usb_path = ''.join([i for i in a if not i.isdigit()])
elif o in ("-f", "--filesystem"):
filesystem = a
elif o in ("-m", "--mode"):
mode=a
elif o in ("--debug"):
debug = True
argc = len(sys.argv)
if argc > 8:
print("Too many arguments")
print("for help use --help")
exit(2)
# Mandatory argument
if (mode is None) or ((mode != "iso")):
usage()
CopyCat(iso_path, usb_path, filesystem, mode, debug)
Gtk.main()