Source code for autossl.ca_manager.acme_v2_http01

import logging

from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization

from acme import challenges
from acme import client
from acme import errors
from acme import messages
import josepy as jose

from .. import ssl, util, exception
from . import base


USER_AGENT = 'python-autossl'

logger = logging.getLogger(__name__)


[docs]class AcmeHttp01(base.CaManager): def __init__(self, ca_config, staging=True, storage_api=None, **kwargs): super(AcmeHttp01, self).__init__(ca_config, staging=staging, storage_api=storage_api) self.ca_api = ca_config.get_acme_api(staging=self.staging) self.contact_email = ca_config.ca_config.get('contact_email') self.account_key = None self.servers_api = []
[docs] def get_signed_certificate(self, ssl_blueprint=None, csr_path=None, servers_api=None): self.servers_api = servers_api with util.TempDir() as temp_folder: # retrieve CA account key from storage ca_account_key_content = self.storage_api.retrieve_data( name=self.ca_config.ca_config['storage']['name'], data_type=ssl.DataType.PrivateKey, ) self.account_key = temp_folder.path / 'ca_account.key' self.account_key.write_bytes(ca_account_key_content) client_acme = self._get_or_create_account() # Request new certificate order for specified CSR order_resource = client_acme.new_order(csr_pem=csr_path.read_text()) # signed certificate ready to be used pem_signed_certificate = self._perform_http01(client_acme, order_resource) # return PEM certificate (bytes) return pem_signed_certificate.encode('utf-8')
@property def is_automated_renewal_supported(self): return True def _get_or_create_account(self): """Register account if not already done, else just return ACME client updated with existing account""" logger.info("Registering account...") ca_key = serialization.load_pem_private_key( data=self.account_key.read_bytes(), password=None, backend=default_backend(), ) acc_key = jose.JWKRSA(key=ca_key) # Register account and accept TOS client_network = client.ClientNetwork(acc_key, user_agent=USER_AGENT) directory = messages.Directory.from_json(client_network.get(self.ca_api + '/directory').json()) client_acme = client.ClientV2(directory=directory, net=client_network) try: # Creates account with contact information. # Terms of Service URL is in client_acme.directory.meta.terms_of_service client_acme.new_account( messages.NewRegistration.from_data( email=self.contact_email, terms_of_service_agreed=True, ) ) logger.info("New account successfully created!") except errors.ConflictError: logger.info("Account already registered!") # retrieve existing account information account_registration = messages.NewRegistration(key=acc_key.public_key(), only_return_existing=True) # TODO migrate to public API when available in ACME module # reproduce code of client_acme.new_account but without throwing exception when account already exists response = client_acme._post(directory['newAccount'], account_registration) account_info = client_acme._regr_from_response(response) # assign registration information to existing client client_acme.net.account = account_info return client_acme @staticmethod def _get_http01_challenge(authz): """Extract HTTP01 challenge from authorization resource.""" # authz.body.challenges is a set of ChallengeBody objects. for challenge_body_item in authz.body.challenges: # Find HTTP01 challenge for each domain to validate if isinstance(challenge_body_item.chall, challenges.HTTP01): return challenge_body_item raise exception.AutoSslException('HTTP-01 challenge was not offered by the CA server.') def _deploy_challenge(self, client_acme, authz): """Deploy challenge token on all servers and warn CA server that it can start validation""" challenge_body = self._get_http01_challenge(authz) response, validation = challenge_body.response_and_validation(client_acme.net.key) token = challenge_body.chall.encode('token') logger.info("Deploy challenge for domain {}".format(authz.body.identifier.value)) try: # deploy challenge on servers for server_api in self.servers_api: server_api.create_acme_challenge( token=token, key_authorization=validation, ) # Let the CA server know that we are ready for this challenge validation client_acme.answer_challenge(challenge_body, response) except Exception: # cleanup challenge in case of any exception, to avoid leaking tokens on servers self._cleanup_challenges(token=token) raise return token def _cleanup_challenges(self, token): """Remove specified challenge token from all servers""" for server_api in self.servers_api: server_api.delete_acme_challenge(token=token) def _perform_http01(self, client_acme, order_resource): """Set up challenge on servers and perform HTTP-01 challenge verification.""" # keep track of all tokens successfully deployed to be able to clean them up at the end tokens = [] try: # Deploy token/validation for HTTP-01 challenge for authz in order_resource.authorizations: tokens.append(self._deploy_challenge(client_acme, authz)) # Wait for challenge status and then issue a certificate. finalized_order_resource = client_acme.poll_and_finalize(order_resource) finally: # cleanup all challenges from servers once all validation is done for token in tokens: self._cleanup_challenges(token=token) # return certificate return finalized_order_resource.fullchain_pem