Repository URL to install this package:
|
Version:
3.10.0 ▾
|
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ******************************************************************************
# $Id$
#
# Project: GDAL
# Purpose: Build a JPEG2000 file from the XML structure dumped by dump_jp2.py
# Mostly useful to build non-conformant files
# Author: Even Rouault, <even dot rouault at spatialys dot com>
#
# ******************************************************************************
# Copyright (c) 2015, European Union (European Environment Agency)
#
# SPDX-License-Identifier: MIT
# ******************************************************************************
import os
import struct
import sys
from osgeo import gdal
XML_TYPE_IDX = 0
XML_VALUE_IDX = 1
XML_FIRST_CHILD_IDX = 2
def Usage():
print("Usage: build_jp2_from_xml in.xml out.jp2")
return 2
def find_xml_node(ar, element_name, immediate_child=False, only_attributes=False):
# type = ar[XML_TYPE_IDX]
value = ar[XML_VALUE_IDX]
if not immediate_child and value == element_name:
return ar
for child_idx in range(XML_FIRST_CHILD_IDX, len(ar)):
child = ar[child_idx]
if only_attributes and child[XML_TYPE_IDX] != gdal.CXT_Attribute:
continue
if immediate_child:
if child[XML_VALUE_IDX] == element_name:
return child
else:
found = find_xml_node(child, element_name)
if found is not None:
return found
return None
def get_attribute_val(ar, attr_name):
node = find_xml_node(ar, attr_name, True)
if node is None or node[XML_TYPE_IDX] != gdal.CXT_Attribute:
return None
if (
len(ar) > XML_FIRST_CHILD_IDX
and node[XML_FIRST_CHILD_IDX][XML_TYPE_IDX] == gdal.CXT_Text
):
return node[XML_FIRST_CHILD_IDX][XML_VALUE_IDX]
return None
def get_node_content(node):
if node is None:
return None
for child_idx in range(XML_FIRST_CHILD_IDX, len(node)):
child = node[child_idx]
if child[XML_TYPE_IDX] == gdal.CXT_Text:
return child[XML_VALUE_IDX]
return None
def hex_letter_to_number(ch):
val = 0
if ch >= "0" and ch <= "9":
val = ord(ch) - ord("0")
elif ch >= "a" and ch <= "f":
val = (ord(ch) - ord("a")) + 10
elif ch >= "A" and ch <= "F":
val = (ord(ch) - ord("A")) + 10
return val
def write_hexstring_as_binary(hex_binary_content, out_f):
for i in range(int(len(hex_binary_content) / 2)):
val = hex_letter_to_number(
hex_binary_content[2 * i]
) * 16 + hex_letter_to_number(hex_binary_content[2 * i + 1])
out_f.write(chr(val).encode("latin1"))
def parse_field(xml_tree, out_f, src_jp2file):
# pylint: disable=unused-argument
if not (
xml_tree[XML_TYPE_IDX] == gdal.CXT_Element
and xml_tree[XML_VALUE_IDX] == "Field"
):
print("Not a Field element")
return False
field_name = get_attribute_val(xml_tree, "name")
if field_name is None:
print("Cannot find Field.name attribute")
# return False
field_type = get_attribute_val(xml_tree, "type")
if field_type is None:
print("Cannot find Field.type attribute")
return False
val = get_node_content(xml_tree)
if val is None:
print("Cannot find Field content")
return False
if field_type == "uint8":
out_f.write(struct.pack(">B" * 1, int(val)))
elif field_type == "uint16":
out_f.write(struct.pack(">H" * 1, int(val)))
elif field_type == "uint32":
out_f.write(struct.pack(">I" * 1, int(val)))
elif field_type == "string":
field_size = get_attribute_val(xml_tree, "size")
if field_size is not None:
assert len(val) == int(field_size)
out_f.write(val.encode("latin1"))
elif field_type == "hexint":
field_size = get_attribute_val(xml_tree, "size")
if field_size is not None:
assert len(val) == 2 + 2 * int(field_size)
write_hexstring_as_binary(val[2:], out_f)
else:
print("Unhandled type %s" % field_type)
return False
return True
marker_map = {
"SOC": 0xFF4F,
"EOC": 0xFFD9,
"SOT": 0xFF90,
"SIZ": 0xFF51,
"COD": 0xFF52,
"COC": 0xFF53,
"TLM": 0xFF55,
"PLM": 0xFF57,
"PLT": 0xFF58,
"QCD": 0xFF5C,
"QCC": 0xFF5D,
"RGN": 0xFF5E,
"POC": 0xFF5F,
"PPM": 0xFF60,
"PPT": 0xFF61,
"CRG": 0xFF63,
"COM": 0xFF64,
}
def parse_jpc_marker(xml_tree, out_f, src_jp2file):
if not (
xml_tree[XML_TYPE_IDX] == gdal.CXT_Element
and xml_tree[XML_VALUE_IDX] == "Marker"
):
print("Not a Marker element")
return False
marker_name = get_attribute_val(xml_tree, "name")
if marker_name is None:
print("Cannot find Marker.name attribute")
return False
if find_xml_node(xml_tree, "Field", immediate_child=True):
if marker_name not in marker_map:
print("Cannot find marker signature for %s" % marker_name)
return False
marker_signature = marker_map[marker_name]
out_f.write(struct.pack(">H" * 1, marker_signature))
pos = out_f.tell()
out_f.write(struct.pack(">H" * 1, 0))
for child_idx in range(XML_FIRST_CHILD_IDX, len(xml_tree)):
child = xml_tree[child_idx]
if child[XML_TYPE_IDX] != gdal.CXT_Element:
continue
if not parse_field(child, out_f, src_jp2file):
return False
new_pos = out_f.tell()
out_f.seek(pos, 0)
out_f.write(struct.pack(">H" * 1, new_pos - pos))
out_f.seek(new_pos, 0)
else:
offset = get_attribute_val(xml_tree, "offset")
if offset is None:
if marker_name == "SOC" or marker_name == "EOC":
marker_signature = marker_map[marker_name]
out_f.write(struct.pack(">H" * 1, marker_signature))
return True
print("Cannot find Marker.offset attribute")
return False
offset = int(offset)
length = get_attribute_val(xml_tree, "length")
if length is None:
print("Cannot find Marker.length attribute")
return False
length = int(length)
src_jp2file.seek(offset, 0)
data = src_jp2file.read(length)
out_f.write(data)
return True
def parse_jp2codestream(inpath, xml_tree, out_f, src_jp2file=None):
if src_jp2file is None:
src_jp2filename = get_attribute_val(xml_tree, "filename")
if src_jp2filename is None:
print("Cannot find JP2KCodeStream.filename attribute")
return False
if os.path.exists(src_jp2filename):
src_jp2file = open(src_jp2filename, "rb")
else:
src_jp2file = open(os.path.join(inpath, src_jp2filename), "rb")
for child_idx in range(XML_FIRST_CHILD_IDX, len(xml_tree)):
child = xml_tree[child_idx]
if child[XML_TYPE_IDX] != gdal.CXT_Element:
continue
if not parse_jpc_marker(child, out_f, src_jp2file):
return False
return True
def parse_jp2_box(xml_tree, out_f, src_jp2file):
if not (
xml_tree[XML_TYPE_IDX] == gdal.CXT_Element
and xml_tree[XML_VALUE_IDX] == "JP2Box"
):
print("Not a JP2Box element")
return False
jp2box_name = get_attribute_val(xml_tree, "name")
if jp2box_name is None:
print("Cannot find JP2Box.name attribute")
return False
if len(jp2box_name) != 4:
print("Invalid JP2Box.name : %s" % jp2box_name)
return False
hex_binary_content = get_node_content(
find_xml_node(xml_tree, "BinaryContent", immediate_child=True)
)
decoded_content = find_xml_node(xml_tree, "DecodedContent", immediate_child=True)
decoded_geotiff = find_xml_node(xml_tree, "DecodedGeoTIFF", immediate_child=True)
text_content = get_node_content(
find_xml_node(xml_tree, "TextContent", immediate_child=True)
)
xml_content = find_xml_node(xml_tree, "XMLContent", immediate_child=True)
jp2box = find_xml_node(xml_tree, "JP2Box", immediate_child=True)
jp2codestream = find_xml_node(xml_tree, "JP2KCodeStream", immediate_child=True)
if hex_binary_content:
if decoded_content or decoded_geotiff or text_content or xml_content or jp2box:
print(
"BinaryContent found, and one of DecodedContent/DecodedGeoTIFF/TextContent/XMLContent/JP2Box. The latter will be ignored"
)
if jp2box_name == "uuid":
uuid = get_node_content(
find_xml_node(xml_tree, "UUID", immediate_child=True)
)
if uuid is None:
print("Cannot find JP2Box.UUID element")
return False
else:
uuid = ""
out_f.write(
struct.pack(
">I" * 1, 8 + int(len(hex_binary_content) / 2) + int(len(uuid) / 2)
)
)
out_f.write(jp2box_name.encode("ascii"))
write_hexstring_as_binary(uuid, out_f)
write_hexstring_as_binary(hex_binary_content, out_f)
elif decoded_content:
if decoded_geotiff or text_content or xml_content or jp2box:
print(
"DecodedContent found, and one of DecodedGeoTIFF/TextContent/XMLContent/JP2Box. The latter will be ignored"
)
pos = out_f.tell()
out_f.write(struct.pack(">I" * 1, 0))
out_f.write(jp2box_name.encode("ascii"))
for child_idx in range(XML_FIRST_CHILD_IDX, len(decoded_content)):
child = decoded_content[child_idx]
if (
child[XML_TYPE_IDX] == gdal.CXT_Element
and child[XML_VALUE_IDX] == "Field"
):
if not parse_field(child, out_f, src_jp2file):
return False
new_pos = out_f.tell()
out_f.seek(pos, 0)
out_f.write(struct.pack(">I" * 1, new_pos - pos))
out_f.seek(new_pos, 0)
elif text_content:
if decoded_geotiff or xml_content or jp2box:
print(
"TextContent found, and one of DecodedGeoTIFF/XMLContent/JP2Box. The latter will be ignored"
)
out_f.write(struct.pack(">I" * 1, 8 + len(text_content)))
out_f.write(jp2box_name.encode("ascii"))
out_f.write(text_content.encode("latin1"))
elif xml_content:
if decoded_geotiff or jp2box:
print(
"XMLContent found, and one of DecodedGeoTIFF/JP2Box. The latter will be ignored"
)
serialized_xml_content = gdal.SerializeXMLTree(xml_content[XML_FIRST_CHILD_IDX])
out_f.write(struct.pack(">I" * 1, 8 + len(serialized_xml_content)))
out_f.write(jp2box_name.encode("ascii"))
out_f.write(serialized_xml_content.encode("latin1"))
elif jp2box:
if decoded_geotiff:
print("JP2Box found, and one of DecodedGeoTIFF. The latter will be ignored")
pos = out_f.tell()
out_f.write(struct.pack(">I" * 1, 0))
out_f.write(jp2box_name.encode("ascii"))
for child_idx in range(XML_FIRST_CHILD_IDX, len(xml_tree)):
child = xml_tree[child_idx]
if (
child[XML_TYPE_IDX] == gdal.CXT_Element
and child[XML_VALUE_IDX] == "JP2Box"
):
if not parse_jp2_box(child, out_f, src_jp2file):
return False
new_pos = out_f.tell()
out_f.seek(pos, 0)
out_f.write(struct.pack(">I" * 1, new_pos - pos))
out_f.seek(new_pos, 0)
elif decoded_geotiff:
serialized_xml_content = gdal.SerializeXMLTree(
decoded_geotiff[XML_FIRST_CHILD_IDX]
)
vrt_ds = gdal.Open(serialized_xml_content)
if vrt_ds is None:
print("Cannot decode VRTDataset. Outputting empty content")
binary_content = ""
else:
tmpfilename = "/vsimem/build_jp2_from_xml_tmp.tif"
gdal.GetDriverByName("GTiff").CreateCopy(tmpfilename, vrt_ds)
tif_f = gdal.VSIFOpenL(tmpfilename, "rb")
binary_content = gdal.VSIFReadL(1, 10000, tif_f)
gdal.VSIFCloseL(tif_f)
gdal.Unlink(tmpfilename)
uuid = get_node_content(find_xml_node(xml_tree, "UUID", immediate_child=True))
if uuid is None:
uuid = "B14BF8BD083D4B43A5AE8CD7D5A6CE03"
out_f.write(struct.pack(">I" * 1, 8 + len(binary_content) + int(len(uuid) / 2)))
out_f.write(jp2box_name.encode("ascii"))
write_hexstring_as_binary(uuid, out_f)
out_f.write(binary_content)
elif jp2codestream:
pos = out_f.tell()
out_f.write(struct.pack(">I" * 1, 0))
out_f.write(jp2box_name.encode("ascii"))
if not parse_jp2codestream(None, jp2codestream, out_f, src_jp2file):
return False
new_pos = out_f.tell()
out_f.seek(pos, 0)
out_f.write(struct.pack(">I" * 1, new_pos - pos))
out_f.seek(new_pos, 0)
else:
data_offset = get_attribute_val(xml_tree, "data_offset")
if data_offset is None:
print("Cannot find JP2Box.data_offset attribute")
return False
data_offset = int(data_offset)
data_length = get_attribute_val(xml_tree, "data_length")
if data_length is None:
print("Cannot find JP2Box.data_length attribute")
return False
data_length = int(data_length)
src_jp2file.seek(data_offset, 0)
data = src_jp2file.read(data_length)
out_f.write(struct.pack(">I" * 1, 8 + data_length))
out_f.write(jp2box_name.encode("ascii"))
out_f.write(data)
return True
def parse_jp2file(inpath, xml_tree, out_f):
src_jp2filename = get_attribute_val(xml_tree, "filename")
if src_jp2filename is None:
print("Cannot find JP2File.filename attribute")
return False
if os.path.exists(src_jp2filename):
src_jp2file = open(src_jp2filename, "rb")
else:
src_jp2file = open(os.path.join(inpath, src_jp2filename), "rb")
for child_idx in range(XML_FIRST_CHILD_IDX, len(xml_tree)):
child = xml_tree[child_idx]
if child[XML_TYPE_IDX] != gdal.CXT_Element:
continue
if not parse_jp2_box(child, out_f, src_jp2file):
return False
return True
# Wrapper class for GDAL VSI*L API with class Python file interface
class VSILFile(object):
def __init__(self, filename, access):
self.f = gdal.VSIFOpenL(filename, access)
def write(self, data):
gdal.VSIFWriteL(data, 1, len(data), self.f)
def tell(self):
return gdal.VSIFTellL(self.f)
def seek(self, pos, ref):
gdal.VSIFSeekL(self.f, pos, ref)
def close(self):
gdal.VSIFCloseL(self.f)
self.f = None
def build_file(inname, outname):
inpath = os.path.dirname(inname)
xml_tree = gdal.ParseXMLString(open(inname).read())
if xml_tree is None:
print("Cannot parse %s" % inname)
return False
# out_f = open(outname, 'wb+')
out_f = VSILFile(outname, "wb+")
if (
xml_tree[XML_TYPE_IDX] == gdal.CXT_Element
and xml_tree[XML_VALUE_IDX] == "JP2File"
):
ret = parse_jp2file(inpath, xml_tree, out_f)
elif (
xml_tree[XML_TYPE_IDX] == gdal.CXT_Element
and xml_tree[XML_VALUE_IDX] == "JP2KCodeStream"
):
ret = parse_jp2codestream(inpath, xml_tree, out_f)
else:
print("Unexpected node: %s" % xml_tree[XML_VALUE_IDX])
ret = False
out_f.close()
return ret
def main(argv=sys.argv):
i = 1
inname = None
outname = None
while i < len(argv):
if sys.argv[i][0] == "-":
return Usage()
elif inname is None:
inname = sys.argv[i]
elif outname is None:
outname = sys.argv[i]
else:
return Usage()
i = i + 1
if inname is None or outname is None:
return Usage()
if build_file(inname, outname):
return 0
return 1
if __name__ == "__main__":
sys.exit(main(sys.argv))