Repository URL to install this package:
|
Version:
0.10.0 ▾
|
pyogrio
/
util.py
|
|---|
"""Utility functions."""
import re
import sys
from packaging.version import Version
from pathlib import Path
from typing import Union
from urllib.parse import urlparse
from pyogrio._vsi import vsimem_rmtree_toplevel as _vsimem_rmtree_toplevel
def get_vsi_path_or_buffer(path_or_buffer):
"""Get VSI-prefixed path or bytes buffer depending on type of path_or_buffer.
If path_or_buffer is a bytes object, it will be returned directly and will
be read into an in-memory dataset when passed to one of the Cython functions.
If path_or_buffer is a file-like object with a read method, bytes will be
read from the file-like object and returned.
Otherwise, it will be converted to a string, and parsed to prefix with
appropriate GDAL /vsi*/ prefixes.
Parameters
----------
path_or_buffer : str, pathlib.Path, bytes, or file-like
A dataset path or URI, raw buffer, or file-like object with a read method.
Returns
-------
str or bytes
"""
# treat Path objects here already to ignore their read method + to avoid backslashes
# on Windows.
if isinstance(path_or_buffer, Path):
return vsi_path(path_or_buffer)
if isinstance(path_or_buffer, bytes):
return path_or_buffer
if hasattr(path_or_buffer, "read"):
bytes_buffer = path_or_buffer.read()
# rewind buffer if possible so that subsequent operations do not need to rewind
if hasattr(path_or_buffer, "seekable") and path_or_buffer.seekable():
path_or_buffer.seek(0)
return bytes_buffer
return vsi_path(str(path_or_buffer))
def vsi_path(path: Union[str, Path]) -> str:
"""Ensure path is a local path or a GDAL-compatible VSI path."""
# Convert Path objects to string, but for VSI paths, keep posix style path.
if isinstance(path, Path):
if sys.platform == "win32" and path.as_posix().startswith("/vsi"):
path = path.as_posix()
else:
path = str(path)
# path is already in GDAL format
if path.startswith("/vsi"):
return path
# Windows drive letters (e.g. "C:\") confuse `urlparse` as they look like
# URL schemes
if sys.platform == "win32" and re.match("^[a-zA-Z]\\:", path):
if not path.split("!")[0].endswith(".zip"):
return path
# prefix then allow to proceed with remaining parsing
path = f"zip://{path}"
path, archive, scheme = _parse_uri(path)
if scheme or archive or path.endswith(".zip"):
return _construct_vsi_path(path, archive, scheme)
return path
# Supported URI schemes and their mapping to GDAL's VSI suffix.
SCHEMES = {
"file": "file",
"zip": "zip",
"tar": "tar",
"gzip": "gzip",
"http": "curl",
"https": "curl",
"ftp": "curl",
"s3": "s3",
"gs": "gs",
"az": "az",
"adls": "adls",
"adl": "adls", # fsspec uses this
"hdfs": "hdfs",
"webhdfs": "webhdfs",
# GDAL additionally supports oss and swift for remote filesystems, but
# those are for now not added as supported URI
}
CURLSCHEMES = {k for k, v in SCHEMES.items() if v == "curl"}
def _parse_uri(path: str):
"""Parse a URI.
Returns a tuples of (path, archive, scheme)
path : str
Parsed path. Includes the hostname and query string in the case
of a URI.
archive : str
Parsed archive path.
scheme : str
URI scheme such as "https" or "zip+s3".
"""
parts = urlparse(path, allow_fragments=False)
# if the scheme is not one of GDAL's supported schemes, return raw path
if parts.scheme and not all(p in SCHEMES for p in parts.scheme.split("+")):
return path, "", ""
# we have a URI
path = parts.path
scheme = parts.scheme or ""
if parts.query:
path += "?" + parts.query
if parts.scheme and parts.netloc:
path = parts.netloc + path
parts = path.split("!")
path = parts.pop() if parts else ""
archive = parts.pop() if parts else ""
return (path, archive, scheme)
def _construct_vsi_path(path, archive, scheme) -> str:
"""Convert a parsed path to a GDAL VSI path."""
prefix = ""
suffix = ""
schemes = scheme.split("+")
if "zip" not in schemes and (archive.endswith(".zip") or path.endswith(".zip")):
schemes.insert(0, "zip")
if schemes:
prefix = "/".join(f"vsi{SCHEMES[p]}" for p in schemes if p and p != "file")
if schemes[-1] in CURLSCHEMES:
suffix = f"{schemes[-1]}://"
if prefix:
if archive:
return "/{}/{}{}/{}".format(prefix, suffix, archive, path.lstrip("/"))
else:
return f"/{prefix}/{suffix}{path}"
return path
def _preprocess_options_key_value(options):
"""Preprocess options.
For example, `spatial_index=True` gets converted to `SPATIAL_INDEX="YES"`.
"""
if not isinstance(options, dict):
raise TypeError(f"Expected options to be a dict, got {type(options)}")
result = {}
for k, v in options.items():
if v is None:
continue
k = k.upper()
if isinstance(v, bool):
v = "ON" if v else "OFF"
else:
v = str(v)
result[k] = v
return result
def _mask_to_wkb(mask):
"""Convert a Shapely mask geometry to WKB.
Parameters
----------
mask : Shapely geometry
The geometry to convert to WKB.
Returns
-------
WKB bytes or None
Raises
------
ValueError
raised if Shapely >= 2.0 is not available or mask is not a Shapely
Geometry object
"""
if mask is None:
return mask
try:
import shapely
if Version(shapely.__version__) < Version("2.0.0"):
shapely = None
except ImportError:
shapely = None
if not shapely:
raise ValueError("'mask' parameter requires Shapely >= 2.0")
if not isinstance(mask, shapely.Geometry):
raise ValueError("'mask' parameter must be a Shapely geometry")
return shapely.to_wkb(mask)
def vsimem_rmtree_toplevel(path: Union[str, Path]):
"""Remove the parent directory of the file path recursively.
This is used for final cleanup of an in-memory dataset, which may have been
created within a directory to contain sibling files.
Additional VSI handlers may be chained to the left of /vsimem/ in path and
will be ignored.
Remark: function is defined here to be able to run tests on it.
Parameters
----------
path : str or pathlib.Path
path to in-memory file
"""
if isinstance(path, Path):
path = path.as_posix()
_vsimem_rmtree_toplevel(path)