Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
sincpro-siat-soap / infrastructure / pkcs12_parser.py
Size: Mime:
import base64
import binascii

from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.serialization import pkcs12
from cryptography.x509.oid import ExtensionOID

from sincpro_siat_soap.exceptions import SincproValidationError

_OID_NAME_MAP = {
    "2.5.4.3": "CN",
    "2.5.4.5": "serialNumber",
    "2.5.4.6": "C",
    "2.5.4.10": "O",
    "2.5.4.12": "title",
    "2.5.4.13": "description",
    "2.5.4.46": "dnQualifier",
}


def _decode_p12_payload(p12_payload: bytes | str) -> bytes:
    if isinstance(p12_payload, bytes):
        return p12_payload

    if not isinstance(p12_payload, str) or p12_payload.strip() == "":
        raise SincproValidationError("P12 payload must be bytes or a non-empty base64 string")

    try:
        return base64.b64decode(p12_payload, validate=True)
    except (binascii.Error, ValueError) as exc:
        raise SincproValidationError("Invalid base64 payload for P12") from exc


def _format_local_key_id_from_cert(certificate) -> str:
    try:
        digest = certificate.extensions.get_extension_for_oid(
            ExtensionOID.SUBJECT_KEY_IDENTIFIER
        ).value.digest
    except Exception:
        hasher = hashes.Hash(hashes.SHA1())
        hasher.update(certificate.public_bytes(serialization.Encoding.DER))
        digest = hasher.finalize()

    return " ".join(f"{byte:02X}" for byte in digest)


def _format_distinguished_name(name) -> str:
    chunks: list[str] = []
    for rdn in name.rdns:
        for attribute in rdn:
            label = _OID_NAME_MAP.get(
                attribute.oid.dotted_string, attribute.oid.dotted_string
            )
            chunks.append(f"/{label}={attribute.value}")
    return "".join(chunks)


def extract_private_key_and_certificate_from_p12(
    p12_payload: bytes | str,
    password: str,
) -> tuple[str, str]:
    if password == "":
        raise SincproValidationError("Password is required to extract credentials from P12")

    p12_bytes = _decode_p12_payload(p12_payload)

    try:
        parsed = pkcs12.load_pkcs12(p12_bytes, password.encode("utf-8"))
    except ValueError:
        raise SincproValidationError("Unable to parse P12 payload or invalid password")

    if parsed.key is None:
        raise SincproValidationError("P12 payload does not contain an encrypted private key")
    if parsed.cert is None:
        raise SincproValidationError("P12 payload does not contain a certificate")

    certificate = parsed.cert.certificate
    friendly_name_raw = parsed.cert.friendly_name or b""
    friendly_name = friendly_name_raw.decode("utf-8", errors="replace")
    local_key_id = _format_local_key_id_from_cert(certificate)

    cert_body = certificate.public_bytes(serialization.Encoding.PEM).decode("utf-8").rstrip()
    certificate_pem = "\n".join(
        [
            "Bag Attributes",
            f"    localKeyID: {local_key_id} ",
            f"    friendlyName: {friendly_name}",
            f"subject={_format_distinguished_name(certificate.subject)}",
            f"issuer={_format_distinguished_name(certificate.issuer)}",
            cert_body,
        ]
    )

    encrypted_key_body = (
        parsed.key.private_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PrivateFormat.PKCS8,
            encryption_algorithm=serialization.BestAvailableEncryption(
                password.encode("utf-8")
            ),
        )
        .decode("utf-8")
        .rstrip()
    )
    private_key_pem = "\n".join(
        [
            "Bag Attributes",
            f"    localKeyID: {local_key_id} ",
            f"    friendlyName: {friendly_name}",
            "Key Attributes: <No Attributes>",
            encrypted_key_body,
        ]
    )

    if "BEGIN CERTIFICATE" not in certificate_pem:
        raise SincproValidationError("P12 payload does not contain a certificate")
    if "BEGIN ENCRYPTED PRIVATE KEY" not in private_key_pem:
        raise SincproValidationError("P12 payload does not contain an encrypted private key")

    return private_key_pem, certificate_pem