Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
cryptography / src / backend / hmac.rs
Size: Mime:
// This file is dual licensed under the terms of the Apache License, Version
// 2.0, and the BSD License. See the LICENSE file in the root of this repository
// for complete details.

use crate::backend::hashes::{already_finalized_error, message_digest_from_algorithm};
use crate::buf::CffiBuf;
use crate::error::{CryptographyError, CryptographyResult};
use crate::exceptions;
use pyo3::types::PyBytesMethods;

#[pyo3::pyclass(
    module = "cryptography.hazmat.bindings._rust.openssl.hmac",
    name = "HMAC"
)]
pub(crate) struct Hmac {
    #[pyo3(get)]
    algorithm: pyo3::Py<pyo3::PyAny>,
    ctx: Option<cryptography_openssl::hmac::Hmac>,
}

impl Hmac {
    pub(crate) fn new_bytes(
        py: pyo3::Python<'_>,
        key: &[u8],
        algorithm: &pyo3::Bound<'_, pyo3::PyAny>,
    ) -> CryptographyResult<Hmac> {
        let md = message_digest_from_algorithm(py, algorithm)?;
        let ctx = cryptography_openssl::hmac::Hmac::new(key, md).map_err(|_| {
            exceptions::UnsupportedAlgorithm::new_err((
                "Digest is not supported for HMAC",
                exceptions::Reasons::UNSUPPORTED_HASH,
            ))
        })?;

        Ok(Hmac {
            ctx: Some(ctx),
            algorithm: algorithm.clone().unbind(),
        })
    }

    pub(crate) fn update_bytes(&mut self, data: &[u8]) -> CryptographyResult<()> {
        self.get_mut_ctx()?.update(data)?;
        Ok(())
    }

    fn get_ctx(&self) -> CryptographyResult<&cryptography_openssl::hmac::Hmac> {
        if let Some(ctx) = self.ctx.as_ref() {
            return Ok(ctx);
        };
        Err(already_finalized_error())
    }

    fn get_mut_ctx(&mut self) -> CryptographyResult<&mut cryptography_openssl::hmac::Hmac> {
        if let Some(ctx) = self.ctx.as_mut() {
            return Ok(ctx);
        }
        Err(already_finalized_error())
    }
}

#[pyo3::pymethods]
impl Hmac {
    #[new]
    #[pyo3(signature = (key, algorithm, backend=None))]
    fn new(
        py: pyo3::Python<'_>,
        key: CffiBuf<'_>,
        algorithm: &pyo3::Bound<'_, pyo3::PyAny>,
        backend: Option<pyo3::Bound<'_, pyo3::PyAny>>,
    ) -> CryptographyResult<Hmac> {
        let _ = backend;

        Hmac::new_bytes(py, key.as_bytes(), algorithm)
    }

    fn update(&mut self, data: CffiBuf<'_>) -> CryptographyResult<()> {
        self.update_bytes(data.as_bytes())
    }

    pub(crate) fn finalize<'p>(
        &mut self,
        py: pyo3::Python<'p>,
    ) -> CryptographyResult<pyo3::Bound<'p, pyo3::types::PyBytes>> {
        let data = self.get_mut_ctx()?.finish()?;
        self.ctx = None;
        Ok(pyo3::types::PyBytes::new_bound(py, &data))
    }

    fn verify(&mut self, py: pyo3::Python<'_>, signature: &[u8]) -> CryptographyResult<()> {
        let actual_bound = self.finalize(py)?;
        let actual = actual_bound.as_bytes();
        if actual.len() != signature.len() || !openssl::memcmp::eq(actual, signature) {
            return Err(CryptographyError::from(
                exceptions::InvalidSignature::new_err("Signature did not match digest."),
            ));
        }

        Ok(())
    }

    fn copy(&self, py: pyo3::Python<'_>) -> CryptographyResult<Hmac> {
        Ok(Hmac {
            ctx: Some(self.get_ctx()?.copy()?),
            algorithm: self.algorithm.clone_ref(py),
        })
    }
}

#[pyo3::pymodule]
pub(crate) mod hmac {
    #[pymodule_export]
    use super::Hmac;
}