Repository URL to install this package:
|
Version:
5.0-r2 ▾
|
enigma2-plugin-systemplugins-gutemine
/
usr
/
lib
/
enigma2
/
python
/
Plugins
/
SystemPlugins
/
gutemine
/
unix_ar.py
|
|---|
import os
import io
import struct
__version__ = '0.2.1'
_open = open
CHUNKSIZE = 4096
def utf8(s):
"""
Keeps bytes, converts unicode into UTF-8.
This is used for filenames, which the user may supply as unicode, but is
always stored as bytes in the archive.
"""
if isinstance(s, bytes):
return s
else:
return s.encode('utf-8')
class ArInfo(object):
"""
Information on a file in an archive.
This has the filename and all the metadata for a file in an archive.
It is returned by :meth:`~unix_ar.ArFile.infolist()` and
:meth:`~unix_ar.ArFile.getinfo()`, and can be passed when adding or
extracting a file to or from the archive.
Missing fields will be autocompleted when passed to `ArFile`, but note that
things like `size` will be respected, allowing you to store or extract only
part of a file.
`ArInfo` objects returned by `ArFile` have the offset to the file in the
archive, allowing to extract the correct one even if multiple files with
the same name are present; if you change the `name` attribute, the initial
file will be extracted with the new name (and new metadata).
"""
def __init__(self, name, size=None, mtime=None, perms=None, uid=None, gid=None):
self.name = name
self.size = size
self.mtime = mtime
self.perms = perms
self.uid = uid
self.gid = gid
self.offset = None
@property
def name(self):
return self._name
@name.setter
def name(self, value):
self._name = utf8(value)
@classmethod
def frombuffer(cls, buffer):
"""
Decode the archive header.
"""
# 0 16 File name ASCII
# 16 12 File modification timestamp Decimal
# 28 6 Owner ID Decimal
# 34 6 Group ID Decimal
# 40 8 File mode Octal
# 48 10 File size in bytes Decimal
# 58 2 File magic 0x60 0x0A
name, mtime, uid, gid, perms, size, magic = (
struct.unpack('16s12s6s6s8s10s2s', buffer))
name = utf8(name).rstrip(b' ')
mtime = int(mtime, 10)
uid = int(uid, 10)
gid = int(gid, 10)
perms = int(perms, 8)
size = int(size, 10)
if magic != b'\x60\n':
raise ValueError("Invalid file signature")
return cls(name, size, mtime, perms, uid, gid)
def tobuffer(self):
"""
Encode as an archive header.
"""
if any(f is None for f in (self._name, self.mtime, self.uid, self.gid, self.perms, self.size)):
raise ValueError("ArInfo object has None fields")
return (
'{0: <16}{1: <12}{2: <6}{3: <6}{4: <8o}{5: <10}\x60\n'.format(
self.name.decode(
'iso-8859-1'), self.mtime, self.uid, self.gid, self.perms, self.size
).encode('iso-8859-1')
)
def updatefromdisk(self, path=None):
"""
Fill in the missing attributes from an actual file.
This is called by `ArFile` when adding a file to the archive. If some
attributes are missing, they have to be provided from the disk.
If the file doesn't exist, adding will fail. There is currently no
default values for the attributes, it is thus your responsibility to
provide them.
"""
attrs = (self._name, self.size, self.mtime,
self.perms, self.uid, self.gid)
if not any(a is None for a in attrs):
return self.__class__(*attrs)
name, size, mtime, perms, uid, gid = attrs
if path is None:
path = name
stat = os.stat(path)
if name is None:
name = utf8(path)
if size is None:
size = stat.st_size
if mtime is None:
mtime = int(stat.st_mtime)
if perms is None:
perms = stat.st_mode
if uid is None:
uid = stat.st_uid
if gid is None:
gid = stat.st_gid
return self.__class__(name, size, mtime, perms, uid, gid)
def __copy__(self):
member = self.__class__(self._name, self.size,
self.mtime, self.perms, self.uid, self.gid)
member.offset = self.offset
return member
class ArFile(object):
"""
An UNIX ar archive.
This object allows you to either read or write an AR archive.
"""
def __init__(self, file, mode='r'):
"""
Create an `ArFile` from an opened file (in 'rb' or 'wb' mode).
Don't use this constructor, call :func:`unix_ar.open()` instead.
"""
self._file = file
self._mode = mode
if mode == 'r':
self._read_entries()
elif mode == 'w':
self._file.write(b'!<arch>\n')
else:
raise ValueError("mode must be one of 'r' or 'w'")
def _read_entries(self):
if self._file.read(8) != b'!<arch>\n':
raise ValueError("Invalid archive signature")
self._entries = []
self._name_map = {}
pos = 8
while True:
buffer = self._file.read(60)
if len(buffer) == 0:
break
elif len(buffer) == 60:
member = ArInfo.frombuffer(buffer)
member.offset = pos
self._name_map[member.name] = len(self._entries)
self._entries.append(member)
skip = member.size
if skip % 2 != 0:
skip += 1
pos += 60 + skip
self._file.seek(skip, 1)
if pos == self._file.tell():
continue
raise ValueError("Truncated archive?")
def _check(self, expected_mode):
if self._file is None:
raise ValueError("Attempted to use a closed %s" %
self.__class__.__name__)
if self._mode != expected_mode:
if self._mode == 'r':
raise ValueError("Can't change a read-only archive")
else:
raise ValueError("Can't read from a write-only archive")
def add(self, name, arcname=None):
"""
Add a file to the archive.
:param name: Path to the file to be added.
:type name: bytes | unicode
:param arcname: Name the file will be stored as in the archive, or
a full :class:`~unix_ar.ArInfo`. If unset, `name` will be used.
:type arcname: None | bytes | unicode | unix_ar.ArInfo
"""
self._check('w')
if arcname is None:
arcname = ArInfo(name)
elif not isinstance(arcname, ArInfo):
arcname = ArInfo(arcname)
arcname = arcname.updatefromdisk(name)
with _open(name, 'rb') as fp:
self.addfile(arcname, fp)
def addfile(self, name, fileobj=None):
"""
Add a file to the archive from a file object.
:param name: Name the file will be stored as in the archive, or
a full :class:`~unix_ar.ArInfo`.
:type name: bytes | unicode | unix_ar.ArInfo
:param fileobj: File object to read from.
"""
self._check('w')
if not isinstance(name, ArInfo):
name = ArInfo(name)
name = name.updatefromdisk()
self._file.write(name.tobuffer())
if fileobj is None:
fp = _open(name.name, 'rb')
else:
fp = fileobj
for pos in range(0, name.size, CHUNKSIZE):
chunk = fp.read(min(CHUNKSIZE, name.size - pos))
if len(chunk) != CHUNKSIZE and len(chunk) != name.size - pos:
raise RuntimeError("File changed size?")
self._file.write(chunk)
if name.size % 2 == 1:
self._file.write(b'\n')
if fileobj is None:
fp.close()
def infolist(self):
"""
Return a list of :class:`~unix_ar.ArInfo` for files in the archive.
These objects are copy, so feel free to change them before feeding them
to :meth:`~unix_ar.ArFile.add()` or :meth:`~unix_ar.ArFile.addfile()`.
:rtype: [unix_ar.ArInfo]
"""
self._check('r')
return list(i.__copy__() for i in self._entries)
def getinfo(self, member):
"""
Return an :class:`~unix_ar.ArInfo` for a specific file.
This object is a copy, so feel free to change it before feeding them to
:meth:`~unix_ar.ArFile.add()` or :meth:`~unix_ar.ArFile.addfile()`.
:param member: Either a file name or an incomplete
:class:`unix_ar.ArInfo` object to search for.
:type member: bytes | unicode | unix_ar.ArInfo
:rtype: unix_ar.ArInfo
"""
self._check('r')
if isinstance(member, ArInfo):
if member.offset is not None:
self._file.seek(member.offset, 0)
return ArInfo.frombuffer(self._file.read(60))
else:
index = self._name_map[member.name]
return self._entries[index].__copy__()
else:
index = self._name_map[utf8(member)]
return self._entries[index].__copy__()
def _extract(self, member, path):
if hasattr(path, 'write'):
fp = path
else:
fp = _open(path.rstrip(b'/'), 'wb')
self._file.seek(member.offset + 60, 0)
for pos in range(0, member.size, CHUNKSIZE):
chunk = self._file.read(min(CHUNKSIZE, member.size - pos))
fp.write(chunk)
fp.flush()
fp.seek(0)
return fp
# def extract(self, member, path='') -> 'filelike':
def extract(self, member, path=''):
"""
Extract a single file from the archive.
:param member: Either a file name or an :class:`unix_ar.ArInfo` object
to extract.
:type member: bytes | unicode | unix_ar.ArInfo
:param path: Destination path (current directory by default). You can
also change the `name` attribute on the `ArInfo` you pass this
method to extract to any file name.
:type path: bytes | unicode
"""
self._check('r')
actualmember = self.getinfo(member)
if isinstance(member, ArInfo):
if member.offset is None:
member.offset = actualmember.offset
if member.size > actualmember.size:
member.size = actualmember.size
else:
member = actualmember
if not hasattr(path, 'write'):
if not path or os.path.isdir(path):
path = os.path.join(utf8(path), member.name)
return self._extract(member, path)
def extractfile(self, member):
self._check('r')
raise NotImplementedError("extractfile() is not yet implemented")
def extractall(self, path=''):
"""
Extract all the files in the archive.
:param path: Destination path (current directory by default).
:type path: bytes | unicode
"""
self._check('r')
# Iterate on _name_map instead of plain _entries so we don't extract
# multiple files with the same name, just the last one
for index in self._name_map.values():
member = self._entries[index]
self._extract(member, os.path.join(utf8(path), member.name))
# def open(self, member: str) -> io.BytesIO:
def open(self, member):
filelike = self.extract(member, path=io.BytesIO())
filelike.name = member.strip('/')
return filelike
def close(self):
"""
Close this archive and the underlying file.
No method should be called on the object after this.
"""
if self._file is not None:
self._file.close()
self._file = None
self._entries = None
self._name_map = None
def open(file, mode='r'):
"""
Open an archive file.
:param file: File name to open.
:type file: bytes | unicode
:param mode: Either ''r' or 'w'
:rtype: unix_ar.ArFile
"""
if hasattr(file, 'read'):
return ArFile(file, mode)
else:
if mode == 'r' or mode == 'rb':
omode = 'rb'
elif mode == 'w' or mode == 'wb':
omode = 'wb'
else:
raise ValueError("mode must be one of 'r' or 'w'")
return ArFile(_open(file, omode), mode)