Repository URL to install this package:
|
Version:
8.0.0 ▾
|
import base64
import binascii
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.serialization import pkcs12
from cryptography.x509.oid import ExtensionOID
from sincpro_siat_soap.exceptions import SincproValidationError
_OID_NAME_MAP = {
"2.5.4.3": "CN",
"2.5.4.5": "serialNumber",
"2.5.4.6": "C",
"2.5.4.10": "O",
"2.5.4.12": "title",
"2.5.4.13": "description",
"2.5.4.46": "dnQualifier",
}
def _decode_p12_payload(p12_payload: bytes | str) -> bytes:
if isinstance(p12_payload, bytes):
return p12_payload
if not isinstance(p12_payload, str) or p12_payload.strip() == "":
raise SincproValidationError("P12 payload must be bytes or a non-empty base64 string")
try:
return base64.b64decode(p12_payload, validate=True)
except (binascii.Error, ValueError) as exc:
raise SincproValidationError("Invalid base64 payload for P12") from exc
def _format_local_key_id_from_cert(certificate) -> str:
try:
digest = certificate.extensions.get_extension_for_oid(
ExtensionOID.SUBJECT_KEY_IDENTIFIER
).value.digest
except Exception:
hasher = hashes.Hash(hashes.SHA1())
hasher.update(certificate.public_bytes(serialization.Encoding.DER))
digest = hasher.finalize()
return " ".join(f"{byte:02X}" for byte in digest)
def _format_distinguished_name(name) -> str:
chunks: list[str] = []
for rdn in name.rdns:
for attribute in rdn:
label = _OID_NAME_MAP.get(
attribute.oid.dotted_string, attribute.oid.dotted_string
)
chunks.append(f"/{label}={attribute.value}")
return "".join(chunks)
def extract_private_key_and_certificate_from_p12(
p12_payload: bytes | str,
password: str,
) -> tuple[str, str]:
if password == "":
raise SincproValidationError("Password is required to extract credentials from P12")
p12_bytes = _decode_p12_payload(p12_payload)
try:
parsed = pkcs12.load_pkcs12(p12_bytes, password.encode("utf-8"))
except ValueError:
raise SincproValidationError("Unable to parse P12 payload or invalid password")
if parsed.key is None:
raise SincproValidationError("P12 payload does not contain an encrypted private key")
if parsed.cert is None:
raise SincproValidationError("P12 payload does not contain a certificate")
certificate = parsed.cert.certificate
friendly_name_raw = parsed.cert.friendly_name or b""
friendly_name = friendly_name_raw.decode("utf-8", errors="replace")
local_key_id = _format_local_key_id_from_cert(certificate)
cert_body = certificate.public_bytes(serialization.Encoding.PEM).decode("utf-8").rstrip()
certificate_pem = "\n".join(
[
"Bag Attributes",
f" localKeyID: {local_key_id} ",
f" friendlyName: {friendly_name}",
f"subject={_format_distinguished_name(certificate.subject)}",
f"issuer={_format_distinguished_name(certificate.issuer)}",
cert_body,
]
)
encrypted_key_body = (
parsed.key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.BestAvailableEncryption(
password.encode("utf-8")
),
)
.decode("utf-8")
.rstrip()
)
private_key_pem = "\n".join(
[
"Bag Attributes",
f" localKeyID: {local_key_id} ",
f" friendlyName: {friendly_name}",
"Key Attributes: <No Attributes>",
encrypted_key_body,
]
)
if "BEGIN CERTIFICATE" not in certificate_pem:
raise SincproValidationError("P12 payload does not contain a certificate")
if "BEGIN ENCRYPTED PRIVATE KEY" not in private_key_pem:
raise SincproValidationError("P12 payload does not contain an encrypted private key")
return private_key_pem, certificate_pem