Repository URL to install this package:
|
Version:
3.10.0 ▾
|
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
###############################################################################
# $Id$
#
# Project: GDAL/OGR
# Purpose: Test compliance of GeoPackage database w.r.t GeoPackage spec
# Author: Even Rouault <even.rouault at spatialys.com>
#
###############################################################################
# Copyright (c) 2017, Even Rouault <even.rouault at spatialys.com>
#
# SPDX-License-Identifier: MIT
###############################################################################
import datetime
import os
import re
import sqlite3
import struct
import sys
# GDAL may be used for checks on tile content for the tiled gridded extension.
# If not available, those tests will be skipped
try:
from osgeo import gdal, ogr
has_gdal = True
except ImportError:
has_gdal = False
def _esc_literal(literal):
return literal.replace("'", "''")
def _esc_id(identifier):
return '"' + identifier.replace('"', '""') + '"'
def _is_valid_data_type(typ):
return (
typ
in (
"BOOLEAN",
"TINYINT",
"SMALLINT",
"MEDIUMINT",
"INT",
"INTEGER",
"FLOAT",
"DOUBLE",
"REAL",
"TEXT",
"BLOB",
"DATE",
"DATETIME",
)
or typ.startswith("TEXT(")
or typ.startswith("BLOB(")
)
class GPKGCheckException(Exception):
pass
class GPKGChecker(object):
BASE_GEOM_TYPES = (
"GEOMETRY",
"POINT",
"LINESTRING",
"POLYGON",
"MULTIPOINT",
"MULTILINESTRING",
"MULTIPOLYGON",
"GEOMETRYCOLLECTION",
)
EXT_GEOM_TYPES = (
"CIRCULARSTRING",
"COMPOUNDCURVE",
"CURVEPOLYGON",
"MULTICURVE",
"MULTISURFACE",
"CURVE",
"SURFACE",
)
def __init__(
self,
filename,
abort_at_first_error=True,
extra_checks=False,
log_msg=False,
warning_msg=True,
warning_as_error=False,
):
self.filename = filename
self.conn = None
self.has_tried_spatialite = False
self.extended_pragma_info = False
self.abort_at_first_error = abort_at_first_error
self.extra_checks = extra_checks
self.log_msg = log_msg
self.warning_msg = warning_msg
self.warning_as_error = warning_as_error
self.errors = []
self.warnings = []
def _log(self, msg):
if self.log_msg:
print(msg)
def _assert(self, cond, req, msg):
# self._log('Verified requirement %s' % req)
if not cond:
self.errors += [(req, msg)]
if self.abort_at_first_error:
if req:
raise GPKGCheckException("Req %s: %s" % (str(req), msg))
else:
raise GPKGCheckException(msg)
return cond
def _warn(self, msg):
if self.warning_as_error:
self._assert(False, None, msg)
else:
self.warnings += [msg]
if self.warning_msg:
print(msg)
def _check_structure(self, columns, expected_columns, req, table_name):
self._assert(
len(columns) == len(expected_columns),
req,
"Table %s has %d columns, whereas %d are expected"
% (table_name, len(columns), len(expected_columns)),
)
for (
_,
expected_name,
expected_type,
expected_notnull,
expected_default,
expected_pk,
) in expected_columns:
found = False
for (_, name, typ, notnull, default, pk) in columns:
if name != expected_name:
continue
if "INTEGER" in expected_type and expected_pk:
expected_notnull = 1
if typ == "INTEGER" and pk:
notnull = 1
if not self.extended_pragma_info and expected_pk > 1:
expected_pk = 1
self._assert(
typ in expected_type,
req,
"Wrong type for %s of %s. Expected %s, got %s"
% (name, table_name, str(expected_type), typ),
)
self._assert(
notnull == expected_notnull,
req,
("Wrong notnull for %s of %s. " + "Expected %s, got %s")
% (name, table_name, expected_notnull, notnull),
)
# GeoPackage ETS suite accepts CURRENT_TIMESTAMP instead of 'now'
# Spatialite at time of writing uses CURRENT_TIMESTAMP.
# https://github.com/opengeospatial/ets-gpkg12/blob/04b4a0b8c7d90755b016182e2992684f198da1ba/src/main/java/org/opengis/cite/gpkg12/TableVerifier.java#L173
if (
default != expected_default
and expected_default == "strftime('%Y-%m-%dT%H:%M:%fZ','now')"
and default.lower().replace(" ", "")
in (
"strftime('%Y-%m-%dT%H:%M:%fZ','now')".lower(),
"strftime('%Y-%m-%dT%H:%M:%fZ',CURRENT_TIMESTAMP)".lower(),
)
):
pass
else:
self._assert(
default == expected_default,
req,
("Wrong default for %s of %s. " + "Expected %s, got %s")
% (name, table_name, expected_default, default),
)
self._assert(
pk == expected_pk,
req,
"Wrong pk for %s of %s. Expected %s, got %s"
% (name, table_name, expected_pk, pk),
)
found = True
break
self._assert(
found, req, "Column %s of %s not found!" % (expected_name, table_name)
)
def _check_gpkg_spatial_ref_sys(self, c):
self._log("Checking gpkg_spatial_ref_sys")
c.execute("SELECT 1 FROM sqlite_master WHERE " "name = 'gpkg_spatial_ref_sys'")
if not self._assert(
c.fetchone() is not None, 10, "gpkg_spatial_ref_sys table missing"
):
return
c.execute("PRAGMA table_info(gpkg_spatial_ref_sys)")
columns = c.fetchall()
has_definition_12_063 = False
has_epoch = False
for (_, name, _, _, _, _) in columns:
if name == "definition_12_063":
has_definition_12_063 = True
if name == "epoch":
has_epoch = True
c.execute("SELECT 1 FROM sqlite_master WHERE name = 'gpkg_extensions'")
row = None
if c.fetchone() is not None:
c.execute(
"SELECT scope, extension_name FROM gpkg_extensions WHERE "
"extension_name IN ('gpkg_crs_wkt', 'gpkg_crs_wkt_1_1')"
)
row = c.fetchone()
if row:
scope, extension_name = row
self._assert(
scope == "read-write",
145,
f"scope of {extension_name} extension should be read-write",
)
self._assert(
has_definition_12_063,
145,
"gpkg_spatial_ref_sys should have a definition_12_063 column, "
f"as {extension_name} extension is declared",
)
if has_epoch:
self._assert(
extension_name == "gpkg_crs_wkt_1_1",
145,
"gpkg_extensions should declare gpkg_crs_wkt_1_1 extension "
"as gpkg_spatial_ref_sys has a epoch column",
)
else:
self._assert(
not has_definition_12_063,
145,
"gpkg_extensions should declare gpkg_crs_wkt extension "
"as gpkg_spatial_ref_sys has a definition_12_063 column",
)
self._assert(
not has_epoch,
145,
"gpkg_extensions should declare gpkg_crs_wkt_1_1 extension "
"as gpkg_spatial_ref_sys has a epoch column",
)
if has_definition_12_063:
expected_columns = [
(0, "srs_name", "TEXT", 1, None, 0),
(1, "srs_id", "INTEGER", 1, None, 1),
(2, "organization", "TEXT", 1, None, 0),
(3, "organization_coordsys_id", "INTEGER", 1, None, 0),
(4, "definition", "TEXT", 1, None, 0),
(5, "description", "TEXT", 0, None, 0),
(6, "definition_12_063", "TEXT", 1, None, 0),
]
if has_epoch:
expected_columns += [(7, "epoch", "DOUBLE", 0, None, 0)]
# "Previous versions of this extension specified default values for
# definition and definition_12_063. Those defaults have been removed
# for interoperability reasons but implementers should be aware that
# some GeoPackages may have these defaults in place."
columns = [[v for v in column] for column in columns]
for column in columns:
if (
column[1] in ("definition", "definition_12_063")
and column[4] == "''"
):
column[4] = None
else:
expected_columns = [
(0, "srs_name", "TEXT", 1, None, 0),
(1, "srs_id", "INTEGER", 1, None, 1),
(2, "organization", "TEXT", 1, None, 0),
(3, "organization_coordsys_id", "INTEGER", 1, None, 0),
(4, "definition", "TEXT", 1, None, 0),
(5, "description", "TEXT", 0, None, 0),
]
self._check_structure(columns, expected_columns, 10, "gpkg_spatial_ref_sys")
if has_definition_12_063:
c.execute(
"SELECT srs_id, organization, organization_coordsys_id, "
"definition, definition_12_063 "
"FROM gpkg_spatial_ref_sys "
"WHERE srs_id IN (-1, 0, 4326) ORDER BY srs_id"
)
else:
c.execute(
"SELECT srs_id, organization, organization_coordsys_id, "
"definition FROM gpkg_spatial_ref_sys "
"WHERE srs_id IN (-1, 0, 4326) ORDER BY srs_id"
)
ret = c.fetchall()
self._assert(
len(ret) == 3,
11,
"There should be at least 3 records in " "gpkg_spatial_ref_sys",
)
if len(ret) != 3:
return
self._assert(
ret[0][1] == "NONE",
11,
"wrong value for organization for srs_id = -1: %s" % ret[0][1],
)
self._assert(
ret[0][2] == -1,
11,
"wrong value for organization_coordsys_id for "
"srs_id = -1: %s" % ret[0][2],
)
self._assert(
ret[0][3] == "undefined",
11,
"wrong value for definition for srs_id = -1: %s" % ret[0][3],
)
if has_definition_12_063:
self._assert(
ret[0][4] == "undefined",
116,
"wrong value for definition_12_063 for "
+ "srs_id = -1: %s" % ret[0][4],
)
self._assert(
ret[1][1] == "NONE",
11,
"wrong value for organization for srs_id = 0: %s" % ret[1][1],
)
self._assert(
ret[1][2] == 0,
11,
"wrong value for organization_coordsys_id for "
"srs_id = 0: %s" % ret[1][2],
)
self._assert(
ret[1][3] == "undefined",
11,
"wrong value for definition for srs_id = 0: %s" % ret[1][3],
)
if has_definition_12_063:
self._assert(
ret[1][4] == "undefined",
116,
"wrong value for definition_12_063 for " + "srs_id = 0: %s" % ret[1][4],
)
self._assert(
ret[2][1].lower() == "epsg",
11,
"wrong value for organization for srs_id = 4326: %s" % ret[2][1],
)
self._assert(
ret[2][2] == 4326,
11,
"wrong value for organization_coordsys_id for "
"srs_id = 4326: %s" % ret[2][2],
)
self._assert(
ret[2][3] != "undefined",
11,
"wrong value for definition for srs_id = 4326: %s" % ret[2][3],
)
if has_definition_12_063:
self._assert(
ret[2][4] != "undefined",
116,
"wrong value for definition_12_063 for "
+ "srs_id = 4326: %s" % ret[2][4],
)
if has_definition_12_063:
c.execute(
"SELECT srs_id FROM gpkg_spatial_ref_sys "
"WHERE srs_id NOT IN (0, -1) AND "
"definition = 'undefined' AND "
"definition_12_063 = 'undefined'"
)
rows = c.fetchall()
for (srs_id,) in rows:
self._assert(
False,
117,
"srs_id = %d has both definition and " % srs_id
+ "definition_12_063 undefined",
)
def _check_gpkg_contents(self, c):
self._log("Checking gpkg_contents")
c.execute("SELECT 1 FROM sqlite_master WHERE name = 'gpkg_contents'")
self._assert(c.fetchone() is not None, 13, "gpkg_contents table missing")
c.execute("PRAGMA table_info(gpkg_contents)")
columns = c.fetchall()
expected_columns = [
(0, "table_name", "TEXT", 1, None, 1),
(1, "data_type", "TEXT", 1, None, 0),
(2, "identifier", "TEXT", 0, None, 0),
(3, "description", "TEXT", 0, "''", 0),
(
4,
"last_change",
"DATETIME",
1,
"strftime('%Y-%m-%dT%H:%M:%fZ','now')",
0,
),
(5, "min_x", "DOUBLE", 0, None, 0),
(6, "min_y", "DOUBLE", 0, None, 0),
(7, "max_x", "DOUBLE", 0, None, 0),
(8, "max_y", "DOUBLE", 0, None, 0),
(9, "srs_id", "INTEGER", 0, None, 0),
]
self._check_structure(columns, expected_columns, 13, "gpkg_contents")
c.execute(
"SELECT table_name, data_type FROM gpkg_contents "
"WHERE data_type NOT IN "
"('features', 'tiles', 'attributes', '2d-gridded-coverage')"
)
ret = c.fetchall()
self._assert(
len(ret) == 0,
17, # no longer required actually...
"Unexpected data types in gpkg_contents: %s" % str(ret),
)
c.execute("SELECT table_name, last_change, srs_id FROM gpkg_contents")
rows = c.fetchall()
for (table_name, last_change, srs_id) in rows:
c.execute(
"SELECT 1 FROM sqlite_master WHERE "
"lower(name) = lower(?) AND type IN ('table', 'view')",
(table_name,),
)
self._assert(
c.fetchone() is not None,
14,
("table_name=%s in gpkg_contents is not a " + "table or view")
% table_name,
)
try:
datetime.datetime.strptime(last_change, "%Y-%m-%dT%H:%M:%S.%fZ")
except ValueError:
self._assert(
False,
15,
("last_change = %s for table_name = %s " + "is invalid datetime")
% (last_change, table_name),
)
if srs_id is not None:
c.execute(
"SELECT 1 FROM gpkg_spatial_ref_sys " "WHERE srs_id = ?", (srs_id,)
)
self._assert(
c.fetchone() is not None,
14,
(
"table_name=%s has srs_id=%d in gpkg_contents "
+ "which isn't found in gpkg_spatial_ref_sys"
)
% (table_name, srs_id),
)
def _check_user_table_content(self, c, table_name, cols):
if not self.extra_checks:
self._log("Skipping checks on actual table content")
return
self._log("Checking actual table content")
warned_col = [False for col in cols]
c.execute(
"SELECT %s FROM %s"
% (
",".join(
(
"%s,typeof(%s),length(%s)"
% (_esc_id(col[1]), _esc_id(col[1]), _esc_id(col[1]))
)
for col in cols
),
_esc_id(table_name),
)
)
geom_types = GPKGChecker.BASE_GEOM_TYPES + GPKGChecker.EXT_GEOM_TYPES
date_pattern_str = "[0-9]{4}-[0-1][0-9]-[0-3][0-9]"
date_pattern = re.compile(date_pattern_str)
datetime_pattern_no_seconds = re.compile(
date_pattern_str + "T[0-2][0-9]:[0-5][0-9]Z"
)
datetime_pattern_with_seconds_no_ms = re.compile(
date_pattern_str + "T[0-2][0-9]:[0-5][0-9]:[0-5][0-9]Z"
)
datetime_pattern_with_milliseconds = re.compile(
date_pattern_str + "T[0-2][0-9]:[0-5][0-9]:[0-5][0-9].[0-9]{3}Z"
)
for row in c.fetchall():
for i in range(len(cols)):
if warned_col[i]:
continue
val = row[3 * i + 0]
got_col_type = row[3 * i + 1].upper()
length_value = row[3 * i + 2]
if val is None:
continue
expected_col_type = cols[i][2]
col_name = cols[i][1]
if expected_col_type == "BOOLEAN" and got_col_type == "INTEGER":
if val not in (0, 1):
warned_col[i] = True
self._warn(
"In column %s, value %d found whereas 0 or 1 was expected"
% (col_name, val)
)
continue
continue
elif expected_col_type == "TINYINT" and got_col_type == "INTEGER":
if val < -128 or val > 127:
warned_col[i] = True
self._warn(
"In column %s, value %d found whereas it was expected to be in [-128,127]"
% (col_name, val)
)
continue
continue
elif expected_col_type == "SMALLINT" and got_col_type == "INTEGER":
if val < -32768 or val > 32767:
warned_col[i] = True
self._warn(
"In column %s, value %d found whereas it was expected to be in [-32768,32767]"
% (col_name, val)
)
continue
continue
elif expected_col_type == "MEDIUMINT" and got_col_type == "INTEGER":
if val < -2147483648 or val > 2147483647:
warned_col[i] = True
self._warn(
"In column %s, value %d found whereas it was expected to be in [-2147483648,2147483647]"
% (col_name, val)
)
continue
continue
elif (
expected_col_type in ("INT", "INTEGER")
and got_col_type == "INTEGER"
):
continue
elif (
expected_col_type in ("FLOAT", "DOUBLE", "REAL")
and got_col_type == "REAL"
):
continue
elif expected_col_type.startswith("TEXT("):
if got_col_type != "TEXT":
warned_col[i] = True
self._warn(
"In column %s, content of type %s found whereas %s was expected"
% (col_name, got_col_type, expected_col_type)
)
continue
expected_max_length = int(expected_col_type[len("TEXT(") : -1])
if length_value > expected_max_length:
warned_col[i] = True
self._warn(
"In column %s, string of length %d found whereas %d max was expected"
% (col_name, length_value, expected_max_length)
)
continue
elif expected_col_type.startswith("BLOB("):
if got_col_type != "BLOB":
warned_col[i] = True
self._warn(
"In column %s, content of type %s found whereas %s was expected"
% (col_name, got_col_type, expected_col_type)
)
continue
expected_max_length = int(expected_col_type[len("BLOB(") : -1])
if length_value > expected_max_length:
warned_col[i] = True
self._warn(
"In column %s, blob of length %d found whereas %d max was expected"
% (col_name, length_value, expected_max_length)
)
continue
elif got_col_type == "BLOB" and expected_col_type in geom_types:
continue
elif expected_col_type == "DATE" and got_col_type == "TEXT":
if date_pattern.match(val) is None:
warned_col[i] = True
self._warn(
"In column %s, text %s found which is not a valid DATE"
% (col_name, val)
)
continue
elif expected_col_type == "DATETIME" and got_col_type == "TEXT":
if (
datetime_pattern_with_milliseconds.match(val) is None
and datetime_pattern_with_seconds_no_ms.match(val) is None
and datetime_pattern_no_seconds.match(val) is None
):
warned_col[i] = True
self._warn(
"In column %s, text %s found which is not a valid DATETIME"
% (col_name, val)
)
continue
elif expected_col_type != got_col_type:
warned_col[i] = True
self._warn(
"In column %s, content of type %s found whereas %s was expected"
% (col_name, got_col_type, expected_col_type)
)
continue
def _check_vector_user_table(self, c, table_name):
self._log("Checking vector user table " + table_name)
c.execute(
"SELECT column_name, z, m, geometry_type_name, srs_id "
"FROM gpkg_geometry_columns WHERE table_name = ?",
(table_name,),
)
rows_gpkg_geometry_columns = c.fetchall()
self._assert(
len(rows_gpkg_geometry_columns) == 1,
22,
("table_name = %s is not registered in " + "gpkg_geometry_columns")
% table_name,
)
geom_column_name = rows_gpkg_geometry_columns[0][0]
z = rows_gpkg_geometry_columns[0][1]
m = rows_gpkg_geometry_columns[0][2]
geometry_type_name = rows_gpkg_geometry_columns[0][3]
srs_id = rows_gpkg_geometry_columns[0][4]
c.execute(
"SELECT 1 FROM sqlite_master WHERE name = ? and type = 'view'",
(table_name,),
)
is_view = c.fetchone() is not None
is_spatialite_computed_column = False
if is_view:
c.execute("SELECT 1 FROM sqlite_master WHERE name = 'gpkg_extensions'")
if c.fetchone() is not None:
c.execute(
"SELECT 1 FROM gpkg_extensions WHERE table_name = ? AND column_name = ? AND extension_name = 'gdal_spatialite_computed_geom_column'",
(table_name, geom_column_name),
)
if c.fetchone() is not None:
is_spatialite_computed_column = True
try:
c.execute("PRAGMA table_info(%s)" % _esc_id(table_name))
except sqlite3.OperationalError as e:
if not self.has_tried_spatialite:
self.has_tried_spatialite = True
spatialite_loaded = False
try:
self.conn.enable_load_extension(True)
self.conn.execute('SELECT load_extension("mod_spatialite")')
spatialite_loaded = True
except Exception:
pass
if not spatialite_loaded:
raise e
# Check if we must set PRAGMA trusted_schema = ON
c.execute("SELECT CountUnsafeTriggers()")
if c.fetchone()[0] == 0:
temp_conn = sqlite3.connect(":memory:")
temp_conn.enable_load_extension(True)
temp_conn.execute('SELECT load_extension("mod_spatialite")')
temp_c = temp_conn.cursor()
temp_c.execute("CREATE VIEW v AS SELECT ST_Multi(NULL)")
try:
temp_c.execute("SELECT * FROM v")
except sqlite3.OperationalError:
temp_c.execute("PRAGMA trusted_schema = ON")
temp_c.execute("SELECT * FROM v")
c.execute("PRAGMA trusted_schema = ON")
c.execute("PRAGMA table_info(%s)" % _esc_id(table_name))
cols = c.fetchall()
found_geom = False
count_pkid = 0
pkid_column_name = None
for (_, name, typ, notnull, default, pk) in cols:
if name.lower() == geom_column_name.lower():
found_geom = True
if not is_spatialite_computed_column:
self._assert(
typ in GPKGChecker.BASE_GEOM_TYPES
or typ in GPKGChecker.EXT_GEOM_TYPES,
25,
("invalid type (%s) for geometry " + "column of table %s")
% (typ, table_name),
)
self._assert(
typ == geometry_type_name,
31,
(
"table %s has geometry column of type %s in "
+ "SQL and %s in geometry_type_name of "
+ "gpkg_geometry_columns"
)
% (table_name, typ, geometry_type_name),
)
elif pk == 1:
if pkid_column_name is None:
pkid_column_name = name
count_pkid += 1
self._assert(
typ == "INTEGER",
29,
("table %s has a PRIMARY KEY of type %s " + "instead of INTEGER")
% (table_name, typ),
)
else:
self._assert(
_is_valid_data_type(typ),
5,
(
"table %s has column %s of unexpected type %s"
% (table_name, name, typ)
),
)
self._assert(
found_geom, 24, "table %s has no %s column" % (table_name, geom_column_name)
)
c.execute(
"SELECT 1 FROM sqlite_master WHERE " "type = 'table' AND name = ?",
(table_name,),
)
if c.fetchone():
self._assert(
count_pkid == 1, 29, "table %s has no INTEGER PRIMARY KEY" % table_name
)
self._check_user_table_content(c, table_name, cols)
else:
self._assert(
len(cols) > 0 and cols[0][2] == "INTEGER",
150,
"view %s has no INTEGER first column" % table_name,
)
c.execute(
"SELECT COUNT(*) - COUNT(DISTINCT %s) FROM %s"
% (_esc_id(cols[0][1]), _esc_id(table_name))
)
self._assert(
c.fetchone()[0] == 0,
150,
"First column of view %s should contain " "unique values" % table_name,
)
self._assert(
z in (0, 1, 2),
27,
("z value of %s is %d. " + "Expected 0, 1 or 2") % (table_name, z),
)
self._assert(
m in (0, 1, 2),
27,
("m value of %s is %d. " + "Expected 0, 1 or 2") % (table_name, m),
)
if geometry_type_name in GPKGChecker.EXT_GEOM_TYPES:
c.execute(
"SELECT 1 FROM gpkg_extensions WHERE "
"extension_name = 'gpkg_geom_%s' AND "
"table_name = ? AND column_name = ? AND "
"scope = 'read-write'" % geometry_type_name,
(table_name, geom_column_name),
)
self._assert(
c.fetchone() is not None,
68,
"gpkg_geom_%s extension should be declared for "
"table %s" % (geometry_type_name, table_name),
)
wkb_geometries = GPKGChecker.BASE_GEOM_TYPES + GPKGChecker.EXT_GEOM_TYPES
c.execute(
"SELECT %s, %s FROM %s "
% (
_esc_id(pkid_column_name) if pkid_column_name else -1,
_esc_id(geom_column_name),
_esc_id(table_name),
)
)
found_geom_types = set()
warning_messages = set()
for (rowid, blob) in c.fetchall():
if blob is None:
continue
self._assert(len(blob) >= 8, 19, "Invalid geometry")
max_size_needed = min(len(blob), 8 + 4 * 2 * 8 + 5)
blob_ar = struct.unpack("B" * max_size_needed, blob[0:max_size_needed])
self._assert(blob_ar[0] == ord("G"), 19, "Invalid geometry")
self._assert(blob_ar[1] == ord("P"), 19, "Invalid geometry")
self._assert(blob_ar[2] == 0, 19, "Invalid geometry")
flags = blob_ar[3]
empty_flag = ((flags >> 3) & 1) == 1
big_endian = (flags & 1) == 0
env_ind = (flags >> 1) & 7
self._assert(
((flags >> 5) & 1) == 0,
19,
"Invalid geometry: ExtendedGeoPackageBinary not " "allowed",
)
self._assert(
env_ind <= 4, 19, "Invalid geometry: invalid envelope indicator code"
)
endian_prefix = ">" if big_endian else "<"
geom_srs_id = struct.unpack((endian_prefix + "i") * 1, blob[4:8])[0]
if srs_id != geom_srs_id:
warning_msg = (
"table %s has geometries with SRID %d, "
+ "whereas only %d is expected"
) % (table_name, geom_srs_id, srs_id)
if warning_msg not in warning_messages:
warning_messages.add(warning_msg)
self._assert(False, 33, warning_msg)
self._assert(
not (empty_flag and env_ind != 0), 152, "Invalid empty geometry"
)
if env_ind == 0:
coord_dim = 0
elif env_ind == 1:
coord_dim = 2
elif env_ind == 2 or env_ind == 3:
coord_dim = 3
else:
coord_dim = 4
# if env_ind == 2 or env_ind == 4:
# self._assert(z > 0, 19,
# 'z found in geometry, but not in gpkg_geometry_columns')
# if env_ind == 3 or env_ind == 4:
# self._assert(m > 0, 19,
# 'm found in geometry, but not in gpkg_geometry_columns')
header_len = 8 + coord_dim * 2 * 8
self._assert(len(blob) >= header_len, 19, "Invalid geometry")
wkb_endianness = blob_ar[header_len]
wkb_big_endian = wkb_endianness == 0
wkb_endian_prefix = ">" if wkb_big_endian else "<"
wkb_geom_type = struct.unpack(
(wkb_endian_prefix + "I") * 1, blob[header_len + 1 : header_len + 5]
)[0]
self._assert(
wkb_geom_type >= 0 and (wkb_geom_type % 1000) < len(wkb_geometries),
19,
"Invalid WKB geometry type",
)
wkb_dim = int(wkb_geom_type / 1000)
if z == 1:
self._assert(
wkb_dim == 1 or wkb_dim == 3, 19, "geometry without Z found"
)
if m == 1:
self._assert(
wkb_dim == 2 or wkb_dim == 3, 19, "geometry without M found"
)
if wkb_dim == 1 or wkb_dim == 3: # Z or ZM
self._assert(
z > 0,
19,
"z found in geometry, but not in " "gpkg_geometry_columns",
)
if wkb_dim == 2 or wkb_dim == 3: # M or ZM
self._assert(
m > 0,
19,
"m found in geometry, but not in " "gpkg_geometry_columns",
)
found_geom_types.add(wkb_geometries[wkb_geom_type % 1000])
if has_gdal:
geom = ogr.CreateGeometryFromWkb(blob[header_len:])
self._assert(
geom is not None,
19,
f"Invalid geometry for fid {rowid} of " f"table {table_name}",
)
if geom is not None:
self._assert(
(geom.IsEmpty() and empty_flag)
or (not geom.IsEmpty() and not empty_flag),
152,
"Inconsistent empty_flag vs geometry content",
)
if geometry_type_name in (
"POINT",
"LINESTRING",
"POLYGON",
"MULTIPOINT",
"MULTILINESTRING",
"MULTIPOLYGON",
):
self._assert(
not found_geom_types or found_geom_types == set([geometry_type_name]),
32,
"in table %s, found geometry types %s whereas %s was expected"
% (table_name, str(found_geom_types), geometry_type_name),
)
elif geometry_type_name == "GEOMETRYCOLLECTION":
self._assert(
not found_geom_types
or not found_geom_types.difference(
set(
[
"GEOMETRYCOLLECTION",
"MULTIPOINT",
"MULTILINESTRING",
"MULTIPOLYGON",
"MULTICURVE",
"MULTISURFACE",
]
)
),
32,
"in table %s, found geometry types %s"
% (table_name, str(found_geom_types)),
)
elif geometry_type_name in ("CURVEPOLYGON", "SURFACE"):
self._assert(
not found_geom_types
or not found_geom_types.difference(set(["POLYGON", "CURVEPOLYGON"])),
32,
"in table %s, found geometry types %s"
% (table_name, str(found_geom_types)),
)
elif geometry_type_name == "MULTICURVE":
self._assert(
not found_geom_types
or not found_geom_types.difference(
set(["MULTILINESTRING", "MULTICURVE"])
),
32,
"in table %s, found geometry types %s"
% (table_name, str(found_geom_types)),
)
elif geometry_type_name == "MULTISURFACE":
self._assert(
not found_geom_types
or not found_geom_types.difference(
set(["MULTIPOLYGON", "MULTISURFACE"])
),
32,
"in table %s, found geometry types %s"
% (table_name, str(found_geom_types)),
)
elif geometry_type_name == "CURVE":
self._assert(
not found_geom_types
or not found_geom_types.difference(
set(["LINESTRING", "CIRCULARSTRING", "COMPOUNDCURVE"])
),
32,
"in table %s, found geometry types %s"
% (table_name, str(found_geom_types)),
)
for geom_type in found_geom_types:
if geom_type in GPKGChecker.EXT_GEOM_TYPES:
c.execute(
"SELECT 1 FROM gpkg_extensions WHERE "
"extension_name = 'gpkg_geom_%s' AND "
"table_name = ? AND column_name = ? AND "
"scope = 'read-write'" % geom_type,
(table_name, geom_column_name),
)
self._assert(
c.fetchone() is not None,
68,
"gpkg_geom_%s extension should be declared for "
"table %s" % (geom_type, table_name),
)
rtree_name = "rtree_%s_%s" % (table_name, geom_column_name)
c.execute("SELECT 1 FROM sqlite_master WHERE name = ?", (rtree_name,))
has_rtree = c.fetchone() is not None
if has_rtree:
c.execute(
"SELECT 1 FROM gpkg_extensions WHERE "
"extension_name = 'gpkg_rtree_index' AND "
"table_name=? AND column_name=? AND "
"scope='write-only'",
(table_name, geom_column_name),
)
self._assert(
c.fetchone() is not None,
78,
("Table %s has a RTree, but not declared in " + "gpkg_extensions")
% table_name,
)
c.execute("PRAGMA table_info(%s)" % _esc_id(rtree_name))
columns = c.fetchall()
expected_columns = [
(0, "id", ["", "INT"], 0, None, 0),
(1, "minx", ["", "NUM", "REAL"], 0, None, 0),
(2, "maxx", ["", "NUM", "REAL"], 0, None, 0),
(3, "miny", ["", "NUM", "REAL"], 0, None, 0),
(4, "maxy", ["", "NUM", "REAL"], 0, None, 0),
]
self._check_structure(columns, expected_columns, 77, rtree_name)
c.execute(
"SELECT 1 FROM sqlite_master WHERE type = 'trigger' "
+ "AND name = '%s_insert'" % _esc_literal(rtree_name)
)
self._assert(
c.fetchone() is not None, 75, "%s_insert trigger missing" % rtree_name
)
if self.version < (1, 4):
expected_update_triggers_numbers = (1, 2, 3, 4)
else:
expected_update_triggers_numbers = (2, 4, 5, 6, 7)
c.execute(
"SELECT name FROM sqlite_master WHERE "
+ "type = 'trigger' "
+ "AND name LIKE '%s_update%%'" % (_esc_literal(rtree_name))
)
trigger_names = c.fetchall()
found_numbers = set()
for (name,) in trigger_names:
number = int(name[len(_esc_literal(rtree_name)) + len("_update") :])
if number not in expected_update_triggers_numbers:
self._assert(False, 75, f"{name} trigger unexpected")
found_numbers.add(number)
for expected_number in expected_update_triggers_numbers:
self._assert(
expected_number in found_numbers,
75,
"%s_update%d trigger missing" % (rtree_name, expected_number),
)
c.execute(
"SELECT 1 FROM sqlite_master WHERE type = 'trigger' "
+ "AND name = '%s_delete'" % _esc_literal(rtree_name)
)
self._assert(
c.fetchone() is not None, 75, "%s_delete trigger missing" % rtree_name
)
def _check_features(self, c):
self._log("Checking features")
c.execute("SELECT 1 FROM gpkg_contents WHERE data_type = 'features'")
if c.fetchone() is None:
self._log("... No features table")
return
self._log("Checking gpkg_geometry_columns")
c.execute("SELECT 1 FROM sqlite_master WHERE " "name = 'gpkg_geometry_columns'")
self._assert(
c.fetchone() is not None, 21, "gpkg_geometry_columns table missing"
)
c.execute("PRAGMA table_info(gpkg_geometry_columns)")
columns = c.fetchall()
expected_columns = [
(0, "table_name", "TEXT", 1, None, 1),
(1, "column_name", "TEXT", 1, None, 2),
(2, "geometry_type_name", "TEXT", 1, None, 0),
(3, "srs_id", "INTEGER", 1, None, 0),
(4, "z", "TINYINT", 1, None, 0),
(5, "m", "TINYINT", 1, None, 0),
]
self._check_structure(columns, expected_columns, 21, "gpkg_geometry_columns")
c.execute(
"SELECT table_name FROM gpkg_contents WHERE " "data_type = 'features'"
)
rows = c.fetchall()
for (table_name,) in rows:
self._check_vector_user_table(c, table_name)
c.execute("SELECT table_name, srs_id FROM gpkg_geometry_columns")
rows = c.fetchall()
for (table_name, srs_id) in rows:
c.execute(
"SELECT 1 FROM gpkg_contents WHERE table_name = ? "
+ "AND data_type='features'",
(table_name,),
)
ret = c.fetchall()
self._assert(
len(ret) == 1,
23,
(
"table_name = %s is registered in "
+ "gpkg_geometry_columns, but not in gpkg_contents"
)
% table_name,
)
c.execute(
"SELECT 1 FROM gpkg_spatial_ref_sys WHERE " + "srs_id = ?", (srs_id,)
)
self._assert(
c.fetchone() is not None,
14,
(
"table_name=%s has srs_id=%d in "
+ "gpkg_geometry_columns which isn't found in "
+ "gpkg_spatial_ref_sys"
)
% (table_name, srs_id),
)
c.execute(
"SELECT a.table_name, a.srs_id, b.srs_id FROM "
+ "gpkg_geometry_columns a, gpkg_contents b "
+ "WHERE a.table_name = b.table_name AND a.srs_id != b.srs_id"
)
rows = c.fetchall()
for (table_name, a_srs_id, b_srs_id) in rows:
self._assert(
False,
146,
"Table %s is declared with srs_id %d in "
"gpkg_geometry_columns and %d in gpkg_contents"
% (table_name, a_srs_id, b_srs_id),
)
def _check_attribute_user_table(self, c, table_name):
self._log("Checking attributes table " + table_name)
c.execute("PRAGMA table_info(%s)" % _esc_id(table_name))
cols = c.fetchall()
count_pkid = 0
for (_, name, typ, _, _, pk) in cols:
if pk == 1:
count_pkid += 1
self._assert(
typ == "INTEGER",
119,
("table %s has a PRIMARY KEY of type %s " + "instead of INTEGER")
% (table_name, typ),
)
else:
self._assert(
_is_valid_data_type(typ),
5,
"table %s has column %s of unexpected type %s"
% (table_name, name, typ),
)
c.execute(
"SELECT 1 FROM sqlite_master WHERE " "type = 'table' AND name = ?",
(table_name,),
)
if c.fetchone():
self._assert(
count_pkid == 1, 119, "table %s has no INTEGER PRIMARY KEY" % table_name
)
self._check_user_table_content(c, table_name, cols)
else:
self._assert(
len(cols) > 0 and cols[0][2] == "INTEGER",
151,
"view %s has no INTEGER first column" % table_name,
)
c.execute(
"SELECT COUNT(*) - COUNT(DISTINCT %s) FROM %s"
% (_esc_id(cols[0][1]), _esc_id(table_name))
)
self._assert(
c.fetchone()[0] == 0,
151,
"First column of view %s should contain " "unique values" % table_name,
)
def _check_attributes(self, c):
self._log("Checking attributes")
c.execute(
"SELECT table_name FROM gpkg_contents WHERE " "data_type = 'attributes'"
)
rows = c.fetchall()
if not rows:
self._log("... No attributes table")
for (table_name,) in rows:
self._check_attribute_user_table(c, table_name)
def _check_tile_user_table(self, c, table_name, data_type):
self._log("Checking tile pyramid user table " + table_name)
c.execute("PRAGMA table_info(%s)" % _esc_id(table_name))
columns = c.fetchall()
expected_columns = [
(0, "id", "INTEGER", 0, None, 1),
(1, "zoom_level", "INTEGER", 1, None, 0),
(2, "tile_column", "INTEGER", 1, None, 0),
(3, "tile_row", "INTEGER", 1, None, 0),
(4, "tile_data", "BLOB", 1, None, 0),
]
self._check_structure(columns, expected_columns, 54, "gpkg_tile_matrix_set")
c.execute("SELECT DISTINCT zoom_level FROM %s" % _esc_id(table_name))
rows = c.fetchall()
for (zoom_level,) in rows:
c.execute(
"SELECT 1 FROM gpkg_tile_matrix WHERE table_name = ? "
"AND zoom_level = ?",
(table_name, zoom_level),
)
self._assert(
c.fetchone() is not None,
44,
(
"Table %s has data for zoom_level = %d, but no "
+ "corresponding row in gpkg_tile_matrix"
)
% (table_name, zoom_level),
)
zoom_other_levels = False
c.execute("SELECT 1 FROM sqlite_master WHERE name = 'gpkg_extensions'")
if c.fetchone() is not None:
c.execute(
"SELECT column_name FROM gpkg_extensions WHERE "
"table_name = ? "
"AND extension_name = 'gpkg_zoom_other'",
(table_name,),
)
row = c.fetchone()
if row is not None:
(column_name,) = row
self._assert(
column_name == "tile_data",
88,
"Wrong column_name in gpkg_extensions for " "gpkg_zoom_other",
)
zoom_other_levels = True
c.execute(
"SELECT zoom_level, pixel_x_size, pixel_y_size "
"FROM gpkg_tile_matrix "
"WHERE table_name = ? ORDER BY zoom_level",
(table_name,),
)
rows = c.fetchall()
prev_zoom_level = None
prev_pixel_x_size = None
prev_pixel_y_size = None
for (zoom_level, pixel_x_size, pixel_y_size) in rows:
if prev_pixel_x_size is not None:
self._assert(
pixel_x_size < prev_pixel_x_size
and pixel_y_size < prev_pixel_y_size,
53,
("For table %s, pixel size are not consistent " + "with zoom_level")
% table_name,
)
if (
prev_zoom_level is not None
and zoom_level == prev_zoom_level + 1
and not zoom_other_levels
):
self._assert(
abs((pixel_x_size - prev_pixel_x_size / 2) / prev_pixel_x_size)
< 1e-5,
35,
"Expected pixel_x_size=%f for zoom_level=%d. Got %f"
% (prev_pixel_x_size / 2, zoom_level, pixel_x_size),
)
self._assert(
abs((pixel_y_size - prev_pixel_y_size / 2) / prev_pixel_y_size)
< 1e-5,
35,
"Expected pixel_y_size=%f for zoom_level=%d. Got %f"
% (prev_pixel_y_size / 2, zoom_level, pixel_y_size),
)
prev_pixel_x_size = pixel_x_size
prev_pixel_y_size = pixel_y_size
prev_zoom_level = zoom_level
c.execute(
"SELECT max_x - min_x, "
" MIN(matrix_width * tile_width * pixel_x_size), "
" MAX(matrix_width * tile_width * pixel_x_size), "
" max_y - min_y, "
" MIN(matrix_height * tile_height * pixel_y_size), "
" MAX(matrix_height * tile_height * pixel_y_size) "
"FROM gpkg_tile_matrix tm JOIN gpkg_tile_matrix_set tms "
"ON tm.table_name = tms.table_name WHERE tm.table_name = ?",
(table_name,),
)
rows = c.fetchall()
if rows:
(dx, min_dx, max_dx, dy, min_dy, max_dy) = rows[0]
self._assert(
abs((min_dx - dx) / dx) < 1e-3
and abs((max_dx - dx) / dx) < 1e-3
and abs((min_dy - dy) / dy) < 1e-3
and abs((max_dy - dy) / dy) < 1e-3,
45,
(
"Inconsistent values in gpkg_tile_matrix and "
+ "gpkg_tile_matrix_set for table %s"
)
% table_name,
)
c.execute("SELECT DISTINCT zoom_level FROM %s" % _esc_id(table_name))
rows = c.fetchall()
for (zoom_level,) in rows:
c.execute(
(
"SELECT MIN(tile_column), MAX(tile_column), "
+ "MIN(tile_row), MAX(tile_row) FROM %s "
+ "WHERE zoom_level = %d"
)
% (_esc_id(table_name), zoom_level)
)
min_col, max_col, min_row, max_row = c.fetchone()
c.execute(
"SELECT matrix_width, matrix_height FROM "
"gpkg_tile_matrix "
"WHERE table_name = ? AND zoom_level = ?",
(table_name, zoom_level),
)
rows2 = c.fetchall()
if not rows2:
self._assert(False, 55, "Invalid zoom_level in %s" % table_name)
else:
matrix_width, matrix_height = rows2[0]
self._assert(
min_col >= 0 and min_col < matrix_width,
56,
"Invalid tile_col in %s" % table_name,
)
self._assert(
min_row >= 0 and min_row < matrix_height,
57,
"Invalid tile_row in %s" % table_name,
)
c.execute("SELECT tile_data FROM %s" % _esc_id(table_name))
found_webp = False
for (blob,) in c.fetchall():
self._assert(blob is not None and len(blob) >= 12, 19, "Invalid blob")
max_size_needed = 12
blob_ar = struct.unpack("B" * max_size_needed, blob[0:max_size_needed])
is_jpeg = blob_ar[0:3] == (0xFF, 0xD8, 0xFF)
is_png = blob_ar[0:4] == (0x89, 0x50, 0x4E, 0x47)
is_webp = blob_ar[0:4] == (
ord("R"),
ord("I"),
ord("F"),
ord("F"),
) and blob_ar[8:12] == (ord("W"), ord("E"), ord("B"), ord("P"))
is_tiff = blob_ar[0:4] == (0x49, 0x49, 0x2A, 0x00) or blob_ar[0:4] == (
0x4D,
0x4D,
0x00,
0x2A,
)
self._assert(
is_jpeg or is_png or is_webp or is_tiff,
36,
"Unrecognized image mime type",
)
if data_type == "tiles":
self._assert(
is_jpeg or is_png or is_webp, 36, "Unrecognized image mime type"
)
elif data_type == "2d-gridded-coverage":
self._assert(is_png or is_tiff, 36, "Unrecognized image mime type")
if is_webp:
found_webp = True
if found_webp:
c.execute(
"SELECT 1 FROM gpkg_extensions WHERE "
"table_name = ? AND column_name = 'tile_data' AND "
"extension_name = 'gpkg_webp' AND "
"scope = 'read-write'",
(table_name,),
)
self._assert(
c.fetchone() is not None,
91,
(
"Table %s has webp content, but not registered "
"in gpkg_extensions" % table_name
),
)
def _check_tiles(self, c):
self._log("Checking tiles")
c.execute(
"SELECT 1 FROM gpkg_contents WHERE data_type IN "
"('tiles', '2d-gridded-coverage')"
)
if c.fetchone() is None:
self._log("... No tiles table")
return
self._log("Checking gpkg_tile_matrix_set ")
c.execute("SELECT 1 FROM sqlite_master WHERE " "name = 'gpkg_tile_matrix_set'")
self._assert(c.fetchone() is not None, 38, "gpkg_tile_matrix_set table missing")
c.execute("PRAGMA table_info(gpkg_tile_matrix_set)")
columns = c.fetchall()
expected_columns = [
(0, "table_name", "TEXT", 1, None, 1),
(1, "srs_id", "INTEGER", 1, None, 0),
(2, "min_x", "DOUBLE", 1, None, 0),
(3, "min_y", "DOUBLE", 1, None, 0),
(4, "max_x", "DOUBLE", 1, None, 0),
(5, "max_y", "DOUBLE", 1, None, 0),
]
self._check_structure(columns, expected_columns, 38, "gpkg_tile_matrix_set")
c.execute("SELECT table_name, srs_id FROM gpkg_tile_matrix_set")
rows = c.fetchall()
for (table_name, srs_id) in rows:
c.execute(
"SELECT 1 FROM gpkg_contents WHERE table_name = ? "
+ "AND data_type IN ('tiles', '2d-gridded-coverage')",
(table_name,),
)
ret = c.fetchall()
self._assert(
len(ret) == 1,
39,
(
"table_name = %s is registered in "
+ "gpkg_tile_matrix_set, but not in gpkg_contents"
)
% table_name,
)
c.execute("SELECT 1 FROM gpkg_spatial_ref_sys WHERE srs_id = ?", (srs_id,))
self._assert(
c.fetchone() is not None,
41,
(
"table_name=%s has srs_id=%d in "
+ "gpkg_tile_matrix_set which isn't found in "
+ "gpkg_spatial_ref_sys"
)
% (table_name, srs_id),
)
self._log("Checking gpkg_tile_matrix")
c.execute("SELECT 1 FROM sqlite_master WHERE " "name = 'gpkg_tile_matrix'")
self._assert(c.fetchone() is not None, 42, "gpkg_tile_matrix table missing")
c.execute("PRAGMA table_info(gpkg_tile_matrix)")
columns = c.fetchall()
expected_columns = [
(0, "table_name", "TEXT", 1, None, 1),
(1, "zoom_level", "INTEGER", 1, None, 2),
(2, "matrix_width", "INTEGER", 1, None, 0),
(3, "matrix_height", "INTEGER", 1, None, 0),
(4, "tile_width", "INTEGER", 1, None, 0),
(5, "tile_height", "INTEGER", 1, None, 0),
(6, "pixel_x_size", "DOUBLE", 1, None, 0),
(7, "pixel_y_size", "DOUBLE", 1, None, 0),
]
self._check_structure(columns, expected_columns, 42, "gpkg_tile_matrix")
c.execute(
"SELECT table_name, zoom_level, matrix_width, "
"matrix_height, tile_width, tile_height, pixel_x_size, "
"pixel_y_size FROM gpkg_tile_matrix"
)
rows = c.fetchall()
for (
table_name,
zoom_level,
matrix_width,
matrix_height,
tile_width,
tile_height,
pixel_x_size,
pixel_y_size,
) in rows:
c.execute(
"SELECT 1 FROM gpkg_contents WHERE table_name = ? "
"AND data_type IN ('tiles', '2d-gridded-coverage')",
(table_name,),
)
ret = c.fetchall()
self._assert(
len(ret) == 1,
43,
(
"table_name = %s is registered in "
+ "gpkg_tile_matrix, but not in gpkg_contents"
)
% table_name,
)
self._assert(
zoom_level >= 0,
46,
"Invalid zoom_level = %d for table %s" % (zoom_level, table_name),
)
self._assert(
matrix_width > 0,
47,
"Invalid matrix_width = %d for table %s" % (matrix_width, table_name),
)
self._assert(
matrix_height > 0,
48,
"Invalid matrix_height = %d for table %s" % (matrix_height, table_name),
)
self._assert(
tile_width > 0,
49,
"Invalid tile_width = %d for table %s" % (tile_width, table_name),
)
self._assert(
tile_height > 0,
50,
"Invalid tile_height = %d for table %s" % (tile_height, table_name),
)
self._assert(
pixel_x_size > 0,
51,
"Invalid pixel_x_size = %f for table %s" % (pixel_x_size, table_name),
)
self._assert(
pixel_y_size > 0,
52,
"Invalid pixel_y_size = %f for table %s" % (pixel_y_size, table_name),
)
c.execute(
"SELECT table_name, data_type FROM gpkg_contents WHERE "
"data_type IN ('tiles', '2d-gridded-coverage')"
)
rows = c.fetchall()
for (table_name, data_type) in rows:
self._check_tile_user_table(c, table_name, data_type)
def _check_tiled_gridded_coverage_data(self, c):
self._log("Checking tiled gridded elevation data")
c.execute(
"SELECT table_name FROM gpkg_contents WHERE "
"data_type = '2d-gridded-coverage'"
)
tables = c.fetchall()
if not tables:
self._log("... No tiled gridded coverage table")
return
tables = [tables[i][0] for i in range(len(tables))]
c.execute(
"SELECT 1 FROM sqlite_master WHERE "
"name = 'gpkg_2d_gridded_coverage_ancillary'"
)
self._assert(
c.fetchone() is not None,
"gpkg_2d_gridded_coverage#1",
"gpkg_2d_gridded_coverage_ancillary table is missing",
)
c.execute("PRAGMA table_info(gpkg_2d_gridded_coverage_ancillary)")
columns = c.fetchall()
expected_columns = [
(0, "id", "INTEGER", 1, None, 1),
(1, "tile_matrix_set_name", "TEXT", 1, None, 0),
(2, "datatype", "TEXT", 1, "'integer'", 0),
(3, "scale", "REAL", 1, "1.0", 0),
(4, "offset", "REAL", 1, "0.0", 0),
(5, "precision", "REAL", 0, "1.0", 0),
(6, "data_null", "REAL", 0, None, 0),
(7, "grid_cell_encoding", "TEXT", 0, "'grid-value-is-center'", 0),
(8, "uom", "TEXT", 0, None, 0),
(9, "field_name", "TEXT", 0, "'Height'", 0),
(10, "quantity_definition", "TEXT", 0, "'Height'", 0),
]
self._check_structure(
columns,
expected_columns,
"gpkg_2d_gridded_coverage#1",
"gpkg_2d_gridded_coverage_ancillary",
)
c.execute(
"SELECT 1 FROM sqlite_master WHERE "
"name = 'gpkg_2d_gridded_tile_ancillary'"
)
self._assert(
c.fetchone() is not None,
"gpkg_2d_gridded_coverage#2",
"gpkg_2d_gridded_tile_ancillary table is missing",
)
c.execute("PRAGMA table_info(gpkg_2d_gridded_tile_ancillary)")
columns = c.fetchall()
expected_columns = [
(0, "id", "INTEGER", 0, None, 1),
(1, "tpudt_name", "TEXT", 1, None, 0),
(2, "tpudt_id", "INTEGER", 1, None, 0),
(3, "scale", "REAL", 1, "1.0", 0),
(4, "offset", "REAL", 1, "0.0", 0),
(5, "min", "REAL", 0, "NULL", 0),
(6, "max", "REAL", 0, "NULL", 0),
(7, "mean", "REAL", 0, "NULL", 0),
(8, "std_dev", "REAL", 0, "NULL", 0),
]
self._check_structure(
columns,
expected_columns,
"gpkg_2d_gridded_coverage#2",
"gpkg_2d_gridded_tile_ancillary",
)
c.execute(
"SELECT srs_id, organization, organization_coordsys_id, "
"definition FROM gpkg_spatial_ref_sys "
"WHERE srs_id = 4979"
)
ret = c.fetchall()
self._assert(
len(ret) == 1,
"gpkg_2d_gridded_coverage#3",
"gpkg_spatial_ref_sys shall have a row for srs_id=4979",
)
if len(ret) == 1:
self._assert(
ret[0][1].lower() == "epsg",
"gpkg_2d_gridded_coverage#3",
"wrong value for organization for srs_id = 4979: %s" % ret[0][1],
)
self._assert(
ret[0][2] == 4979,
"gpkg_2d_gridded_coverage#3",
("wrong value for organization_coordsys_id for " + "srs_id = 4979: %s")
% ret[0][2],
)
c.execute("SELECT 1 FROM sqlite_master WHERE name = 'gpkg_extensions'")
self._assert(
c.fetchone() is not None,
"gpkg_2d_gridded_coverage#6",
"gpkg_extensions does not exist",
)
c.execute(
"SELECT table_name, column_name, definition, scope FROM "
"gpkg_extensions WHERE "
"extension_name = 'gpkg_2d_gridded_coverage'"
)
rows = c.fetchall()
self._assert(
len(rows) == 2 + len(tables),
"gpkg_2d_gridded_coverage#6",
"Wrong number of entries in gpkg_extensions with "
"2d_gridded_coverage extension name",
)
found_gpkg_2d_gridded_coverage_ancillary = False
found_gpkg_2d_gridded_tile_ancillary = False
expected_def = "http://docs.opengeospatial.org/is/17-066r1/17-066r1.html"
for (table_name, column_name, definition, scope) in rows:
if table_name == "gpkg_2d_gridded_coverage_ancillary":
found_gpkg_2d_gridded_coverage_ancillary = True
self._assert(
column_name is None,
"gpkg_2d_gridded_coverage#6",
"Wrong entry for "
"gpkg_2d_gridded_coverage_ancillary "
"in gpkg_extensions",
)
self._assert(
definition == expected_def,
"gpkg_2d_gridded_coverage#6",
"Wrong entry (definition) for "
"gpkg_2d_gridded_coverage_ancillary "
"in gpkg_extensions",
)
self._assert(
scope == "read-write",
"gpkg_2d_gridded_coverage#6",
"Wrong entry for "
"gpkg_2d_gridded_coverage_ancillary "
"in gpkg_extensions",
)
elif table_name == "gpkg_2d_gridded_tile_ancillary":
found_gpkg_2d_gridded_tile_ancillary = True
self._assert(
column_name is None,
"gpkg_2d_gridded_coverage#6",
"Wrong entry for "
"gpkg_2d_gridded_tile_ancillary "
"in gpkg_extensions",
)
self._assert(
definition == expected_def,
"gpkg_2d_gridded_coverage#6",
"Wrong entry (definition) for "
"gpkg_2d_gridded_tile_ancillary "
"in gpkg_extensions",
)
self._assert(
scope == "read-write",
"gpkg_2d_gridded_coverage#6",
"Wrong entry for "
"gpkg_2d_gridded_tile_ancillary "
"in gpkg_extensions",
)
else:
self._assert(
table_name in tables,
"gpkg_2d_gridded_coverage#6",
"Unexpected table_name registered for "
+ "2d_gridded_coverage: %s" % table_name,
)
self._assert(
column_name == "tile_data",
"gpkg_2d_gridded_coverage#6",
"Wrong entry for %s " % table_name + "in gpkg_extensions",
)
self._assert(
definition == expected_def,
"gpkg_2d_gridded_coverage#6",
"Wrong entry (definition) for %s " % table_name
+ "in gpkg_extensions",
)
self._assert(
scope == "read-write",
"gpkg_2d_gridded_coverage#6",
"Wrong entry for %s " % table_name + "in gpkg_extensions",
)
self._assert(
found_gpkg_2d_gridded_coverage_ancillary,
"gpkg_2d_gridded_coverage#6",
"gpkg_2d_gridded_coverage_ancillary not registered "
"for 2d_gridded_coverage",
)
self._assert(
found_gpkg_2d_gridded_tile_ancillary,
"gpkg_2d_gridded_coverage#6",
"gpkg_2d_gridded_tile_ancillary not registered " "for 2d_gridded_coverage",
)
c.execute(
"SELECT tile_matrix_set_name, datatype FROM "
"gpkg_2d_gridded_coverage_ancillary"
)
rows = c.fetchall()
self._assert(
len(rows) == len(tables),
"gpkg_2d_gridded_coverage#7",
"Wrong number of entries in " "gpkg_2d_gridded_coverage_ancillary",
)
for (tile_matrix_set_name, datatype) in rows:
self._assert(
tile_matrix_set_name in tables,
"gpkg_2d_gridded_coverage#7",
"Table %s has a row in " % tile_matrix_set_name
+ "gpkg_2d_gridded_coverage_ancillary, but not in "
"gpkg_contents",
)
c.execute(
"SELECT 1 FROM gpkg_tile_matrix_set WHERE " "table_name = ?",
(tile_matrix_set_name,),
)
self._assert(
c.fetchone() is not None,
"gpkg_2d_gridded_coverage#8",
"missing entry in gpkg_tile_matrix_set "
+ "for %s" % tile_matrix_set_name,
)
self._assert(
datatype in ("integer", "float"),
"gpkg_2d_gridded_coverage#9",
"Unexpected datatype = %s" % datatype,
)
for table in tables:
c.execute("SELECT COUNT(*) FROM %s" % _esc_id(table))
count_tpudt = c.fetchone()
c.execute(
"SELECT COUNT(*) FROM gpkg_2d_gridded_tile_ancillary "
"WHERE tpudt_name = ?",
(table,),
)
count_tile_ancillary = c.fetchone()
self._assert(
count_tpudt == count_tile_ancillary,
"gpkg_2d_gridded_coverage#10",
(
"Inconsistent number of rows in "
+ "gpkg_2d_gridded_tile_ancillary for %s"
)
% table,
)
c.execute("SELECT DISTINCT tpudt_name FROM " "gpkg_2d_gridded_tile_ancillary")
rows = c.fetchall()
for (tpudt_name,) in rows:
self._assert(
tpudt_name in tables,
"gpkg_2d_gridded_coverage#11",
"tpudt_name = %s is invalid" % tpudt_name,
)
c.execute(
"SELECT tile_matrix_set_name FROM "
"gpkg_2d_gridded_coverage_ancillary WHERE "
"datatype = 'float'"
)
rows = c.fetchall()
for (tile_matrix_set_name,) in rows:
c.execute(
"SELECT 1 FROM gpkg_2d_gridded_tile_ancillary WHERE "
"tpudt_name = ? AND "
"NOT (offset == 0.0 AND scale == 1.0)",
(tile_matrix_set_name,),
)
self._assert(
len(c.fetchall()) == 0,
"gpkg_2d_gridded_coverage#9",
"Wrong scale and offset values "
+ "for %s " % tile_matrix_set_name
+ "in gpkg_2d_gridded_coverage_ancillary",
)
for table in tables:
c.execute(
"SELECT 1 FROM gpkg_2d_gridded_tile_ancillary WHERE "
+ "tpudt_name = ? AND tpudt_id NOT IN (SELECT id FROM "
+ "%s)" % table,
(table,),
)
self._assert(
len(c.fetchall()) == 0,
"gpkg_2d_gridded_coverage#12",
"tpudt_id in gpkg_2d_gridded_coverage_ancillary "
+ "not referencing an id from %s" % table,
)
c.execute(
"SELECT tile_matrix_set_name, datatype FROM "
"gpkg_2d_gridded_coverage_ancillary"
)
rows = c.fetchall()
warn_gdal_not_available = False
for (table_name, datatype) in rows:
c.execute("SELECT id, tile_data FROM %s" % _esc_id(table_name))
for (ident, blob) in c.fetchall():
self._assert(blob is not None and len(blob) >= 12, 19, "Invalid blob")
max_size_needed = 12
blob_ar = struct.unpack("B" * max_size_needed, blob[0:max_size_needed])
is_png = blob_ar[0:4] == (0x89, 0x50, 0x4E, 0x47)
is_tiff = blob_ar[0:4] == (0x49, 0x49, 0x2A, 0x00) or blob_ar[0:4] == (
0x4D,
0x4D,
0x00,
0x2A,
)
if datatype == "integer":
self._assert(
is_png,
"gpkg_2d_gridded_coverage#13",
"Tile for %s should be PNG" % table_name,
)
if has_gdal:
tmp_file = "/vsimem/temp_validate_gpkg.tif"
gdal.FileFromMemBuffer(tmp_file, bytes(blob))
ds = gdal.Open(tmp_file)
try:
self._assert(
ds is not None,
"gpkg_2d_gridded_coverage#13",
"Invalid tile %d in %s" % (ident, table_name),
)
self._assert(
ds.RasterCount == 1,
"gpkg_2d_gridded_coverage#13",
"Invalid tile %d in %s" % (ident, table_name),
)
self._assert(
ds.GetRasterBand(1).DataType == gdal.GDT_UInt16,
"gpkg_2d_gridded_coverage#13",
"Invalid tile %d in %s" % (ident, table_name),
)
finally:
gdal.Unlink(tmp_file)
else:
if not warn_gdal_not_available:
warn_gdal_not_available = True
self._log(
"GDAL not available. Req gpkg_2d_gridded_coverage#13 not tested"
)
elif datatype == "float":
self._assert(
is_tiff,
"gpkg_2d_gridded_coverage#14",
"Tile for %s should be TIFF" % table_name,
)
if has_gdal:
tmp_file = "/vsimem/temp_validate_gpkg.tif"
gdal.FileFromMemBuffer(tmp_file, bytes(blob))
ds = gdal.Open(tmp_file)
try:
self._assert(
ds is not None,
"gpkg_2d_gridded_coverage#15",
"Invalid tile %d in %s" % (ident, table_name),
)
self._assert(
ds.RasterCount == 1,
"gpkg_2d_gridded_coverage#16",
"Invalid tile %d in %s" % (ident, table_name),
)
self._assert(
ds.GetRasterBand(1).DataType == gdal.GDT_Float32,
"gpkg_2d_gridded_coverage#17",
"Invalid tile %d in %s" % (ident, table_name),
)
compression = ds.GetMetadataItem(
"COMPRESSION", "IMAGE_STRUCTURE"
)
self._assert(
compression is None or compression == "LZW",
"gpkg_2d_gridded_coverage#18",
"Invalid tile %d in %s" % (ident, table_name),
)
ovr_count = ds.GetRasterBand(1).GetOverviewCount()
self._assert(
not ds.GetSubDatasets() and ovr_count == 0,
"gpkg_2d_gridded_coverage#19",
"Invalid tile %d in %s" % (ident, table_name),
)
(blockxsize, _) = ds.GetRasterBand(1).GetBlockSize()
self._assert(
blockxsize == ds.RasterXSize,
"gpkg_2d_gridded_coverage#20",
"Invalid tile %d in %s" % (ident, table_name),
)
finally:
gdal.Unlink(tmp_file)
else:
if not warn_gdal_not_available:
warn_gdal_not_available = True
self._log(
"GDAL not available. "
"Req gpkg_2d_gridded_coverage#15 to gpkg_2d_gridded_coverage#19 not tested"
)
def _check_column_exists(self, c, table_name, col_name):
try:
# 'SELECT "col_name" FROM table_name' doesn't work because
# it will be interpreted as a literal if the columm does not exist
c.execute(
"SELECT 1 FROM %s t ORDER BY t.%s LIMIT 0"
% (_esc_id(table_name), _esc_id(col_name))
)
return True
except sqlite3.OperationalError:
return False
def _check_gpkg_extensions(self, c):
self._log("Checking gpkg_extensions")
c.execute("SELECT 1 FROM sqlite_master WHERE name = 'gpkg_extensions'")
if c.fetchone() is None:
self._log("... No extensions")
return
c.execute("PRAGMA table_info(gpkg_extensions)")
columns = c.fetchall()
expected_columns = [
(0, "table_name", "TEXT", 0, None, 0),
(1, "column_name", "TEXT", 0, None, 0),
(2, "extension_name", "TEXT", 1, None, 0),
(3, "definition", "TEXT", 1, None, 0),
(4, "scope", "TEXT", 1, None, 0),
]
self._check_structure(columns, expected_columns, 58, "gpkg_extensions")
c.execute(
"SELECT table_name, column_name FROM gpkg_extensions WHERE "
"table_name IS NOT NULL"
)
rows = c.fetchall()
for (table_name, column_name) in rows:
# Doesn't work for gpkg_2d_gridded_coverage_ancillary
# c.execute("SELECT 1 FROM gpkg_contents WHERE table_name = ?", \
# (table_name,) )
# ret = c.fetchall()
# self._assert(len(ret) == 1, \
# 60, ('table_name = %s is registered in ' +\
# 'gpkg_extensions, but not in gpkg_contents') % table_name)
if column_name is not None:
self._assert(
self._check_column_exists(c, table_name, column_name),
61,
(
"Column %s of table %s mentioned in "
+ "gpkg_extensions doesn't exist"
)
% (column_name, table_name),
)
c.execute("SELECT extension_name FROM gpkg_extensions")
rows = c.fetchall()
KNOWN_EXTENSIONS = [
"gpkg_rtree_index",
"gpkg_zoom_other",
"gpkg_webp",
"gpkg_metadata",
"gpkg_schema",
"gpkg_crs_wkt",
"gpkg_crs_wkt_1_1",
"gpkg_elevation_tiles", # deprecated one
"gpkg_2d_gridded_coverage",
"gpkg_related_tables",
"related_tables",
]
for geom_name in GPKGChecker.EXT_GEOM_TYPES:
KNOWN_EXTENSIONS += ["gpkg_geom_" + geom_name]
if self.version < (1, 2):
KNOWN_EXTENSIONS += ["gpkg_geometry_type_trigger", "gpkg_srs_id_trigger"]
for (extension_name,) in rows:
if extension_name.startswith("gpkg_"):
self._assert(
extension_name in KNOWN_EXTENSIONS,
62,
"extension_name %s not valid" % extension_name,
)
else:
self._assert(
"_" in extension_name,
62,
"extension_name %s not valid" % extension_name,
)
author = extension_name[0 : extension_name.find("_")]
ext_name = extension_name[extension_name.find("_") + 1 :]
for x in author:
self._assert(
(x >= "a" and x <= "z")
or (x >= "A" and x <= "Z")
or (x >= "0" and x <= "9"),
62,
"extension_name %s not valid" % extension_name,
)
for x in ext_name:
self._assert(
(x >= "a" and x <= "z")
or (x >= "A" and x <= "Z")
or (x >= "0" and x <= "9")
or x == "_",
62,
"extension_name %s not valid" % extension_name,
)
# c.execute("SELECT extension_name, definition FROM gpkg_extensions "
# "WHERE definition NOT LIKE 'Annex %' AND "
# "definition NOT LIKE 'http%' AND "
# "definition NOT LIKE 'mailto:%' AND "
# "definition NOT LIKE 'Extension Title%' ")
# rows = c.fetchall()
# for (extension_name, definition) in rows:
# self._assert(False, 63,
# "extension_name %s has invalid definition %s" %
# (extension_name, definition))
c.execute(
"SELECT extension_name, scope FROM gpkg_extensions "
"WHERE scope NOT IN ('read-write', 'write-only')"
)
rows = c.fetchall()
for (extension_name, scope) in rows:
self._assert(
False,
64,
"extension_name %s has invalid scope %s" % (extension_name, scope),
)
c.execute(
"SELECT table_name, scope FROM gpkg_extensions "
"WHERE extension_name = 'gpkg_rtree_index' "
)
rows = c.fetchall()
for (table_name, scope) in rows:
c.execute(
"SELECT 1 FROM gpkg_contents WHERE lower(table_name) = lower(?) "
"AND data_type = 'features'",
(table_name,),
)
self._assert(
c.fetchone() is not None,
75,
(
"gpkg_extensions declares gpkg_rtree_index for %s,"
+ " but this is not a features table"
)
% table_name,
)
self._assert(
scope == "write-only",
75,
"Invalid scope %s for gpkg_rtree_index" % scope,
)
def _check_metadata(self, c):
self._log("Checking gpkg_metadata")
must_have_gpkg_metadata = False
c.execute("SELECT 1 FROM sqlite_master WHERE name = 'gpkg_extensions'")
if c.fetchone() is not None:
c.execute(
"SELECT scope FROM gpkg_extensions WHERE "
"extension_name = 'gpkg_metadata'"
)
row = c.fetchone()
if row is not None:
must_have_gpkg_metadata = True
(scope,) = row
self._assert(
scope == "read-write",
140,
"Wrong scope for gpkg_metadata in " "gpkg_extensions",
)
c.execute("SELECT 1 FROM sqlite_master WHERE name = 'gpkg_metadata'")
if c.fetchone() is None:
if must_have_gpkg_metadata:
self._assert(False, 140, "gpkg_metadata table missing")
else:
self._log("... No metadata")
return
c.execute("PRAGMA table_info(gpkg_metadata)")
columns = c.fetchall()
expected_columns = [
(0, "id", "INTEGER", 1, None, 1),
(1, "md_scope", "TEXT", 1, "'dataset'", 0),
(2, "md_standard_uri", "TEXT", 1, None, 0),
(3, "mime_type", "TEXT", 1, "'text/xml'", 0),
(4, "metadata", "TEXT", 1, "''", 0),
]
self._check_structure(columns, expected_columns, 93, "gpkg_metadata")
c.execute(
"SELECT 1 FROM sqlite_master " "WHERE name = 'gpkg_metadata_reference'"
)
self._assert(c.fetchone() is not None, 95, "gpkg_metadata_reference is missing")
c.execute("PRAGMA table_info(gpkg_metadata_reference)")
columns = c.fetchall()
expected_columns = [
(0, "reference_scope", "TEXT", 1, None, 0),
(1, "table_name", "TEXT", 0, None, 0),
(2, "column_name", "TEXT", 0, None, 0),
(3, "row_id_value", "INTEGER", 0, None, 0),
(4, "timestamp", "DATETIME", 1, "strftime('%Y-%m-%dT%H:%M:%fZ','now')", 0),
(5, "md_file_id", "INTEGER", 1, None, 0),
(6, "md_parent_id", "INTEGER", 0, None, 0),
]
self._check_structure(columns, expected_columns, 95, "gpkg_metadata_reference")
c.execute(
"SELECT DISTINCT md_scope FROM gpkg_metadata WHERE "
"md_scope NOT IN ('undefined', 'fieldSession', "
"'collectionSession', 'series', 'dataset', 'featureType', "
"'feature', 'attributeType', 'attribute', 'tile', "
"'model', 'catalog', 'schema', 'taxonomy', 'software', "
"'service', 'collectionHardware', 'nonGeographicDataset', "
"'dimensionGroup')"
)
rows = c.fetchall()
for (md_scope,) in rows:
self._assert(False, 94, "Invalid md_scope %s found" % md_scope)
c.execute(
"SELECT DISTINCT reference_scope FROM "
"gpkg_metadata_reference WHERE "
"reference_scope NOT IN ('geopackage', 'table', "
"'column', 'row', 'row/col')"
)
rows = c.fetchall()
for (md_scope,) in rows:
self._assert(False, 96, "Invalid reference_scope %s found" % md_scope)
c.execute(
"SELECT table_name FROM "
"gpkg_metadata_reference WHERE "
"reference_scope = 'geopackage' AND table_name is NOT NULL"
)
rows = c.fetchall()
for (table_name,) in rows:
self._assert(
False,
97,
"row in gpkg_metadata_reference with table_name "
+ "not null (%s)" % table_name
+ "but reference_scope = geopackage",
)
c.execute(
"SELECT table_name FROM "
"gpkg_metadata_reference WHERE "
"reference_scope != 'geopackage'"
)
rows = c.fetchall()
for (table_name,) in rows:
self._assert(
table_name is not None,
97,
"row in gpkg_metadata_reference with null table_name",
)
c.execute("SELECT 1 FROM gpkg_contents WHERE table_name = ?", (table_name,))
self._assert(
c.fetchone() is not None,
97,
"row in gpkg_metadata_reference with table_name "
+ "not null (%s) with no reference in " % table_name
+ "gpkg_contents but reference_scope != geopackage",
)
c.execute(
"SELECT table_name FROM "
"gpkg_metadata_reference WHERE "
"reference_scope IN ('geopackage', 'table', 'row') "
"AND column_name is NOT NULL"
)
rows = c.fetchall()
for (table_name,) in rows:
self._assert(
False,
98,
"row in gpkg_metadata_reference with column_name "
+ "not null (table=%s)" % table_name
+ "but reference_scope = geopackage, table or row",
)
c.execute(
"SELECT table_name, column_name FROM "
"gpkg_metadata_reference WHERE "
"reference_scope NOT IN ('geopackage', 'table', 'row')"
)
rows = c.fetchall()
for (table_name, column_name) in rows:
self._assert(
column_name is not None,
98,
"row in gpkg_metadata_reference with null " "column_name",
)
self._assert(
self._check_column_exists(c, table_name, column_name),
98,
"column %s of %s does not exist" % (column_name, table_name),
)
c.execute(
"SELECT table_name FROM "
"gpkg_metadata_reference WHERE "
"reference_scope IN ('geopackage', 'table', 'column') "
"AND row_id_value is NOT NULL"
)
rows = c.fetchall()
for (table_name,) in rows:
self._assert(
False,
99,
"row in gpkg_metadata_reference with row_id_value "
+ "not null (table=%s)" % table_name
+ "but reference_scope = geopackage, table or column",
)
c.execute(
"SELECT table_name, row_id_value FROM "
"gpkg_metadata_reference WHERE "
"reference_scope NOT IN ('geopackage', 'table', 'column')"
)
rows = c.fetchall()
for (table_name, row_id_value) in rows:
self._assert(
row_id_value is not None,
99,
"row in gpkg_metadata_reference with null " "row_id_value",
)
c.execute(
"SELECT 1 FROM %s WHERE ROWID = ?" % _esc_id(table_name),
(row_id_value,),
)
self._assert(
c.fetchone() is not None,
99,
"row %s of %s does not exist" % (str(row_id_value), table_name),
)
c.execute("SELECT timestamp FROM gpkg_metadata_reference")
rows = c.fetchall()
for (timestamp,) in rows:
try:
datetime.datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%S.%fZ")
except ValueError:
self._assert(
False,
100,
(
"timestamp = %s in gpkg_metadata_reference"
+ "is invalid datetime"
)
% (timestamp),
)
c.execute("SELECT md_file_id FROM gpkg_metadata_reference")
rows = c.fetchall()
for (md_file_id,) in rows:
c.execute("SELECT 1 FROM gpkg_metadata WHERE id = ?", (md_file_id,))
self._assert(
c.fetchone() is not None,
101,
"md_file_id = %s " % str(md_file_id)
+ "does not have a row in gpkg_metadata",
)
c.execute(
"SELECT md_parent_id FROM gpkg_metadata_reference "
"WHERE md_parent_id IS NOT NULL"
)
rows = c.fetchall()
for (md_parent_id,) in rows:
c.execute("SELECT 1 FROM gpkg_metadata WHERE id = ?", (md_parent_id,))
self._assert(
c.fetchone() is not None,
102,
"md_parent_id = %s " % str(md_parent_id)
+ "does not have a row in gpkg_metadata",
)
c.execute(
"SELECT md_file_id FROM "
"gpkg_metadata_reference WHERE md_parent_id IS NOT NULL "
"AND md_file_id = md_parent_id"
)
rows = c.fetchall()
for (md_file_id,) in rows:
self._assert(
False, 102, "Row with md_file_id = md_parent_id = %s " % str(md_file_id)
)
def _check_schema(self, c):
self._log("Checking gpkg_schema")
must_have_gpkg_schema = False
c.execute("SELECT 1 FROM sqlite_master WHERE name = 'gpkg_extensions'")
if c.fetchone() is not None:
c.execute(
"SELECT scope FROM gpkg_extensions WHERE "
"extension_name = 'gpkg_schema'"
)
row = c.fetchone()
if row is not None:
must_have_gpkg_schema = True
(scope,) = row
self._assert(
scope == "read-write",
141,
"Wrong scope for gpkg_schema in " "gpkg_extensions",
)
if self.version >= (1, 2, 1):
self._assert(
c.fetchone() is not None,
141,
"There should be exactly 2 rows with "
+ "extension_name = "
+ "'gpkg_schema' in gpkg_extensions",
)
self._assert(
c.fetchone() is None,
141,
"There should be exactly 2 rows with "
+ "extension_name = "
+ "'gpkg_schema' in gpkg_extensions",
)
c.execute(
"SELECT 1 FROM gpkg_extensions WHERE "
"extension_name = 'gpkg_schema' AND "
"column_name IS NOT NULL"
)
row = c.fetchone()
if row is not None:
self._assert(
False,
141,
"gpkg_extensions contains row(s) with "
+ "gpkg_schema and a not-NULL column_name",
)
c.execute("SELECT 1 FROM sqlite_master WHERE name = 'gpkg_data_columns'")
if c.fetchone() is None:
if must_have_gpkg_schema:
self._assert(False, 141, "gpkg_data_columns table missing.")
else:
self._log("... No schema")
return
c.execute("PRAGMA table_info(gpkg_data_columns)")
columns = c.fetchall()
expected_columns = [
(0, "table_name", "TEXT", 1, None, 1),
(1, "column_name", "TEXT", 1, None, 2),
(2, "name", "TEXT", 0, None, 0),
(3, "title", "TEXT", 0, None, 0),
(4, "description", "TEXT", 0, None, 0),
(5, "mime_type", "TEXT", 0, None, 0),
(6, "constraint_name", "TEXT", 0, None, 0),
]
self._check_structure(columns, expected_columns, 103, "gpkg_data_columns")
c.execute("SELECT table_name, column_name FROM gpkg_data_columns")
rows = c.fetchall()
for (table_name, column_name) in rows:
c.execute("SELECT 1 FROM gpkg_contents WHERE table_name = ?", (table_name,))
if c.fetchone() is None:
c.execute(
"SELECT 1 FROM gpkg_extensions WHERE table_name = ?", (table_name,)
)
self._assert(
c.fetchone(),
104,
(
"table_name = %s " % table_name
+ "in gpkg_data_columns refer to non-existing "
+ "table/view in gpkg_contents or gpkg_extensions"
),
)
self._assert(
self._check_column_exists(c, table_name, column_name),
105,
(
"table_name = %s, " % table_name
+ "column_name = %s " % column_name
+ "in gpkg_data_columns refer to non-existing "
+ "column"
),
)
c.execute(
"SELECT 1 FROM sqlite_master WHERE name = 'gpkg_data_column_constraints'"
)
if c.fetchone() is None:
self._assert(False, 141, "gpkg_data_column_constraints table missing.")
c.execute("PRAGMA table_info(gpkg_data_column_constraints)")
columns = c.fetchall()
# GPKG 1.1 uses min_is_inclusive/max_is_inclusive but GPKG 1.0 had
# minIsInclusive/maxIsInclusive
min_is_inclusive = (
"min_is_inclusive" if self.version >= (1, 1) else "minIsInclusive"
)
max_is_inclusive = (
"max_is_inclusive" if self.version >= (1, 1) else "maxIsInclusive"
)
expected_columns = [
(0, "constraint_name", "TEXT", 1, None, 0),
(1, "constraint_type", "TEXT", 1, None, 0),
(2, "value", "TEXT", 0, None, 0),
(3, "min", "NUMERIC", 0, None, 0),
(4, min_is_inclusive, "BOOLEAN", 0, None, 0),
(5, "max", "NUMERIC", 0, None, 0),
(6, max_is_inclusive, "BOOLEAN", 0, None, 0),
(7, "description", "TEXT", 0, None, 0),
]
self._check_structure(
columns, expected_columns, 107, "gpkg_data_column_constraints"
)
c.execute(
"SELECT DISTINCT constraint_type FROM "
+ "gpkg_data_column_constraints WHERE constraint_type "
+ "NOT IN ('range', 'enum', 'glob')"
)
if c.fetchone() is not None:
self._assert(
False,
108,
"gpkg_data_column_constraints.constraint_type "
+ "contains value other than range, enum, glob",
)
c.execute(
"SELECT 1 FROM (SELECT COUNT(constraint_name) AS c FROM "
+ "gpkg_data_column_constraints WHERE constraint_type "
+ "IN ('range', 'glob') GROUP BY constraint_name) u "
+ "WHERE u.c != 1"
)
if c.fetchone() is not None:
self._assert(
False,
109,
"gpkg_data_column_constraints contains non unique "
+ "constraint_name for constraints of type range/glob",
)
c.execute(
"SELECT 1 FROM gpkg_data_column_constraints WHERE "
+ "constraint_type = 'range' AND value IS NOT NULL"
)
if c.fetchone() is not None:
self._assert(
False,
110,
"gpkg_data_column_constraints contains constraint "
+ "of type range whose 'value' column is not null",
)
c.execute(
"SELECT 1 FROM gpkg_data_column_constraints WHERE "
+ "constraint_type = 'range' AND min IS NULL"
)
if c.fetchone() is not None:
self._assert(
False,
111,
"gpkg_data_column_constraints contains constraint "
+ "of type range whose min value is NULL",
)
c.execute(
"SELECT 1 FROM gpkg_data_column_constraints WHERE "
+ "constraint_type = 'range' AND max IS NULL"
)
if c.fetchone() is not None:
self._assert(
False,
111,
"gpkg_data_column_constraints contains constraint "
+ "of type range whose max value is NULL",
)
c.execute(
"SELECT 1 FROM gpkg_data_column_constraints WHERE "
+ "constraint_type = 'range' AND min >= max"
)
if c.fetchone() is not None:
self._assert(
False,
111,
"gpkg_data_column_constraints contains constraint "
+ "of type range whose min value is not less than max",
)
c.execute(
"SELECT 1 FROM gpkg_data_column_constraints WHERE "
+ f"constraint_type = 'range' AND {min_is_inclusive} NOT IN (0,1)"
)
if c.fetchone() is not None:
self._assert(
False,
112,
"gpkg_data_column_constraints contains constraint "
+ "of type range whose min_is_inclusive value is "
+ "not 0 or 1",
)
c.execute(
"SELECT 1 FROM gpkg_data_column_constraints WHERE "
+ f"constraint_type = 'range' AND {max_is_inclusive} NOT IN (0,1)"
)
if c.fetchone() is not None:
self._assert(
False,
112,
"gpkg_data_column_constraints contains constraint "
+ "of type range whose max_is_inclusive value is "
+ "not 0 or 1",
)
for col_name in ("min", min_is_inclusive, "max", max_is_inclusive):
c.execute(
"SELECT 1 FROM gpkg_data_column_constraints WHERE "
+ "constraint_type IN ('enum', 'glob') AND "
+ col_name
+ " IS NOT NULL"
)
if c.fetchone() is not None:
self._assert(
False,
113,
"gpkg_data_column_constraints contains constraint "
+ "of type enum or glob whose "
+ col_name
+ " column is not NULL",
)
c.execute(
"SELECT 1 FROM gpkg_data_column_constraints WHERE "
+ "constraint_type = 'enum' AND value IS NULL"
)
if c.fetchone() is not None:
self._assert(
False,
114,
"gpkg_data_column_constraints contains constraint "
+ "of type enum whose value column is NULL",
)
def _check_relations(self, c):
self._log("Checking relations")
must_have_gpkgext_relations = False
c.execute("SELECT 1 FROM sqlite_master WHERE name = 'gpkg_extensions'")
if c.fetchone() is not None:
c.execute(
"SELECT table_name, scope FROM gpkg_extensions WHERE "
"extension_name IN ('related_tables', "
"'gpkg_related_tables')"
)
rows = c.fetchall()
gpkgext_relations_found = False
if rows:
must_have_gpkgext_relations = True
for table_name, scope in rows:
if table_name == "gpkgext_relations":
gpkgext_relations_found = True
self._assert(
scope == "read-write",
None,
"Wrong scope for gpkgext_relations in "
"gpkg_extensions for table " + table_name,
)
self._assert(
gpkgext_relations_found,
None,
"gpkg_extensions should have an entry for "
"extension_name = related_tables and "
"table_name = gpkgext_relations",
)
self._assert(
len(rows) > 1,
None,
"gpkg_extensions should have at least one entry for "
"extension_name = related_tables and "
"table_name != gpkgext_relations",
)
c.execute("SELECT 1 FROM sqlite_master WHERE name = 'gpkgext_relations'")
if c.fetchone() is None:
if must_have_gpkgext_relations:
self._assert(False, None, "gpkgext_relations table missing")
else:
self._log("... No relations")
return
c.execute("PRAGMA table_info(gpkgext_relations)")
columns = c.fetchall()
expected_columns = [
(0, "id", "INTEGER", 0, None, 1),
(1, "base_table_name", "TEXT", 1, None, 0),
(2, "base_primary_column", "TEXT", 1, "'id'", 0),
(3, "related_table_name", "TEXT", 1, None, 0),
(4, "related_primary_column", "TEXT", 1, "'id'", 0),
(5, "relation_name", "TEXT", 1, None, 0),
(6, "mapping_table_name", "TEXT", 1, None, 0),
]
self._check_structure(columns, expected_columns, None, "gpkgext_relations")
c.execute(
"SELECT base_table_name, base_primary_column, "
"related_table_name, related_primary_column, "
"mapping_table_name FROM gpkgext_relations"
)
for btn, btc, rtn, rpc, mtn in c.fetchall():
c.execute("SELECT 1 FROM sqlite_master WHERE name = ?", (btn,))
ok = c.fetchone() is not None
self._assert(
ok,
None,
"gpkgext_relations refers to a non-existing "
"base_table_name = " + btn,
)
if ok:
self._assert(
self._check_column_exists(c, btn, btc),
None,
"gpkgext_relations refers to a non-existing "
"base_primary_column = " + btc,
)
c.execute("SELECT 1 FROM sqlite_master WHERE name = ?", (rtn,))
ok = c.fetchone() is not None
self._assert(
ok,
None,
"gpkgext_relations refers to a non-existing "
"related_table_name = " + rtn,
)
if ok:
self._assert(
self._check_column_exists(c, rtn, rpc),
None,
"gpkgext_relations refers to a non-existing "
"related_primary_column = " + rpc,
)
c.execute("SELECT 1 FROM sqlite_master WHERE name = ?", (mtn,))
ok = c.fetchone() is not None
self._assert(
ok,
None,
"gpkgext_relations refers to a non-existing "
"mapping_table_name = " + mtn,
)
c.execute(
"SELECT 1 FROM gpkg_extensions WHERE table_name = ? "
+ "AND extension_name IN "
+ "('related_tables', 'gpkg_related_tables')",
(mtn,),
)
ok = c.fetchone() is not None
self._assert(
ok,
None,
"Missing entry in gpkg_extensions for "
"mapping_table_name = " + mtn + " and "
"extension_name = 'related_tables'",
)
self._assert(
self._check_column_exists(c, mtn, "base_id"),
None,
"Table %s should have a %s column" % (mtn, "base_id"),
)
self._assert(
self._check_column_exists(c, mtn, "related_id"),
None,
"Table %s should have a %s column" % (mtn, "related_id"),
)
def check(self):
self._assert(
os.path.exists(self.filename), None, "%s does not exist" % self.filename
)
self._assert(
self.filename.lower().endswith(".gpkg"),
3,
"filename extension isn't .gpkg'",
)
with open(self.filename, "rb") as f:
f.seek(68, 0)
application_id = struct.unpack("B" * 4, f.read(4))
gp10 = struct.unpack("B" * 4, "GP10".encode("ASCII"))
gp11 = struct.unpack("B" * 4, "GP11".encode("ASCII"))
gpkg = struct.unpack("B" * 4, "GPKG".encode("ASCII"))
self._assert(
application_id in (gp10, gp11, gpkg),
2,
("Wrong application_id: %s. " + "Expected one of GP10, GP11, GPKG")
% str(application_id),
)
if application_id == gp10:
self.version = (1, 0)
elif application_id == gp11:
self.version = (1, 1)
elif application_id == gpkg:
f.seek(60, 0)
user_version = f.read(4)
expected_version = 10200
user_version = struct.unpack(">I", user_version)[0]
self._assert(
user_version >= expected_version,
2,
"Wrong user_version: %d. Expected >= %d"
% (user_version, expected_version),
)
self.version = (
user_version // 10000,
(user_version % 10000) // 100,
user_version % 100,
)
if self.version >= (1, 5, 0):
self._warn(f"Version {self.version} not handled by this script")
else:
self.version = (99, 99)
conn = sqlite3.connect(":memory:")
c = conn.cursor()
c.execute(
"CREATE TABLE foo(one TEXT, two TEXT, "
"CONSTRAINT pk PRIMARY KEY (one, two))"
)
c.execute("PRAGMA table_info(foo)")
rows = c.fetchall()
if rows[1][5] == 2:
self.extended_pragma_info = True
c.close()
conn.close()
conn = sqlite3.connect(self.filename)
self.conn = conn
c = conn.cursor()
try:
try:
c.execute("SELECT 1 FROM sqlite_master")
c.fetchone()
except Exception:
self._assert(False, 1, "not a sqlite3 database")
c.execute("PRAGMA foreign_key_check")
ret = c.fetchall()
self._assert(len(ret) == 0, 7, "foreign_key_check failed: %s" % str(ret))
c.execute("PRAGMA integrity_check")
self._assert(c.fetchone()[0] == "ok", 6, "integrity_check failed")
self._check_gpkg_spatial_ref_sys(c)
self._check_gpkg_contents(c)
self._check_features(c)
self._check_tiles(c)
self._check_attributes(c)
self._check_tiled_gridded_coverage_data(c)
self._check_gpkg_extensions(c)
self._check_metadata(c)
self._check_schema(c)
self._check_relations(c)
finally:
c.close()
conn.close()
def check(
filename,
abort_at_first_error=True,
verbose=None, # deprecated
extra_checks=False,
log_msg=False,
warning_msg=True,
warning_as_error=False,
):
if verbose is not None:
if verbose:
log_msg = True
warning_msg = True
else:
log_msg = False
warning_msg = False
checker = GPKGChecker(
filename,
abort_at_first_error=abort_at_first_error,
extra_checks=extra_checks,
log_msg=log_msg,
warning_msg=warning_msg,
warning_as_error=warning_as_error,
)
checker.check()
return checker.errors
def Usage():
print(
"Usage: validate_gpkg.py [[-v]|[-q]] [-k] [--extra] [--warning-as-error] my.gpkg"
)
print("")
print("-v: verbose mode")
print("-q: quiet mode")
print("-k: (try to) keep going when error is encountered")
print(
"--extra: run extra checks, potentially going beyond strict requirements of specification"
)
print("--warning-as-error: turn warnings as errors")
return 2
def main(argv=sys.argv):
filename = None
log_msg = False
warning_msg = True
abort_at_first_error = True
warning_as_error = False
extra_checks = False
if len(argv) == 1:
return Usage()
for arg in argv[1:]:
if arg == "-k":
abort_at_first_error = False
elif arg == "-q":
warning_msg = False
elif arg == "-v":
log_msg = True
elif arg == "--extra":
extra_checks = True
elif arg == "--warning-as-error":
warning_as_error = True
elif arg[0] == "-":
return Usage()
else:
filename = arg
if filename is None:
return Usage()
ret = check(
filename,
abort_at_first_error=abort_at_first_error,
extra_checks=extra_checks,
log_msg=log_msg,
warning_msg=warning_msg,
warning_as_error=warning_as_error,
)
if not abort_at_first_error:
if not ret:
return 0
else:
for (req, msg) in ret:
if req:
print("Req %s: %s" % (str(req), msg))
else:
print(msg)
return 1
return 0
if __name__ == "__main__":
sys.exit(main(sys.argv))