"""
This module implements classes that represents Certification Authorities (CA).
"""
import shutil
import re
import json
import os
import python_freeipa
from cryptography import x509
from hashlib import sha256
from pathlib import Path, PosixPath
from python_freeipa import exceptions
from python_freeipa.client_meta import ClientMeta
from shutil import rmtree, copy2
from socket import gethostname
from SCAutolib import TEMPLATES_DIR, logger, run, LIB_DIR, LIB_DUMP_CAS, \
LIB_BACKUP
from SCAutolib.exceptions import SCAutolibException
from SCAutolib.models.file import OpensslCnf
from SCAutolib.enums import CAType
[docs]class BaseCA:
dump_file: Path = None
ca_type: str = None
_ca_cert: Path = None
_ca_key: Path = None
_ca_pki_db: Path = Path("/etc/sssd/pki/sssd_auth_ca_db.pem")
_ca_original_path: Path = LIB_BACKUP.joinpath("ca-db-original.backup")
@property
def cert(self):
return self._ca_cert
[docs] def request_cert(self, csr, username: str, cert_out: Path):
"""
Request certificate from CA for given username
:param csr: path to CSR
:type csr: str
:param username: subject for the certificate
:type username: str
:param cert_out: path where the certificate should be duplicated.
Default None
:type cert_out: pathlib.Path
:return: None or path where the certificate is stored.
:rtype: None or pathlib.Path
"""
...
[docs] def setup(self):
"""
Configure the CA
"""
...
[docs] def update_ca_db(self):
"""
Update /etc/sssd/pki/sssd_auth_ca_db.pem with certificate defined in CA
object.
"""
with self._ca_cert.open("r") as f_cert:
root_cert = f_cert.read()
if self._ca_pki_db.exists():
with self._ca_pki_db.open() as f:
with self._ca_original_path.open('w') as backup:
backup.write(f.read())
# Check if current CA cert is already present in the sssd auth db
with self._ca_pki_db.open("a+") as f:
f.seek(0)
data = f.read()
if root_cert not in data:
f.write(root_cert)
else:
# Create /etc/sssd/pki directory if it doesn't exist
self._ca_pki_db.parents[0].mkdir(exist_ok=True)
with self._ca_pki_db.open("w") as f:
f.write(root_cert)
logger.debug(
f"CA certificate {self._ca_cert} is copied to {self._ca_pki_db}")
# Restoring SELinux context on the sssd auth db
run(f"restorecon -v {self._ca_pki_db}")
logger.info("Local CA is updated")
[docs] def restore_ca_db(self):
"""
restores /etc/sssd/pki/sssd_auth_ca_db.pem to the state it was before.
"""
if self._ca_original_path.exists():
logger.debug("Found original version of sssd_auth_ca_db.pem")
with self._ca_original_path.open() as backup, \
self._ca_pki_db.open("w") as f:
f.write(backup.read())
self._ca_original_path.unlink()
else:
logger.debug("Original version of sssd_auth_ca_db.pem not found")
if self._ca_pki_db.exists():
self._ca_pki_db.unlink()
logger.info(f"Restored {self._ca_pki_db} to original version")
[docs] def sign_cert(self):
"""
Sign the certificate
"""
...
[docs] def revoke_cert(self, cert: Path):
"""
Revoke the certificate
:param cert:
:type cert: pathlib.Path
"""
...
[docs] @staticmethod
def load(json_file):
"""
Load CA from JSON file.
:return: CA object
"""
with json_file.open("r") as f:
cnt = json.load(f)
if cnt["ca_type"] == CAType.ipa:
ca = IPAServerCA(ip_addr=cnt["_ipa_server_ip"],
server_hostname=cnt["_ipa_server_hostname"],
root_passwd=cnt["_ipa_server_root_passwd"],
admin_passwd=cnt["_ipa_server_admin_passwd"],
client_hostname=cnt["_ipa_client_hostname"],
domain=cnt["_ipa_server_domain"],
realm=cnt["_ipa_server_realm"])
elif cnt["ca_type"] == CAType.custom:
ca = CustomCA(cnt)
elif cnt["ca_type"] == CAType.local:
ca = LocalCA(root_dir=cnt["root_dir"])
else:
raise SCAutolibException("CA object has unknown type. Only ipa, "
"custom and local types are supported. CA "
"object not loaded")
logger.debug(f"CA {cnt['name']} is loaded from file {json_file}")
return ca
[docs]class LocalCA(BaseCA):
"""
Represents local CA that is created as CA for virtual cards.
"""
template = Path(TEMPLATES_DIR, "ca.cnf")
ca_type = CAType.local
ca_name = "local_ca"
dump_file = LIB_DUMP_CAS.joinpath(f"{ca_name}.json")
def __init__(self, root_dir: Path = None, cnf: OpensslCnf = None):
"""
Class for local CA. Initialize required attributes, real setup is made
by LocalCA.setup() method
:param root_dir: Path to root directory of the CA. By default, is in
/etc/SCAutolib/ca
:type: Path
:param cnf: object representing openssl cnf file
:type cnf: OpensslCnf
"""
self.name = LocalCA.ca_name
self.ca_type = LocalCA.ca_type
self.root_dir: Path = Path("/etc/SCAutolib/ca") if root_dir is None \
else Path(root_dir)
if not self.root_dir.exists():
raise FileNotFoundError("Root directory of CA does not exist.")
self._conf_dir: Path = self.root_dir.joinpath("conf")
self._newcerts: Path = self.root_dir.joinpath("newcerts")
self._certs: Path = self.root_dir.joinpath("certs")
self._crl: Path = self.root_dir.joinpath("crl", "root.pem")
self._ca_pki_db = BaseCA._ca_pki_db
self._ca_cnf: OpensslCnf = cnf if cnf else OpensslCnf(
conf_type="CA",
filepath=self.root_dir.joinpath("ca.cnf"),
replace=str(self.root_dir))
self._ca_cert = self.root_dir.joinpath("rootCA.pem")
self._ca_key = self.root_dir.joinpath("rootCA.key")
self._serial: Path = self.root_dir.joinpath("serial")
self._index: Path = self.root_dir.joinpath("index.txt")
@property
def cnf(self):
return self._ca_cnf
@cnf.setter
def cnf(self, cnf: OpensslCnf):
if not cnf.path.exists():
raise SCAutolibException("CNF file does not exist")
self._ca_cnf = cnf
[docs] def to_dict(self):
"""
Customising default property for better serialisation for storing to
JSON format.
:return: dictionary with all values. Path objects are typed to string.
:rtype: dict
"""
dict_ = {k: str(v) if type(v) is PosixPath else v
for k, v in super().__dict__.items()}
if self._ca_cnf:
dict_["_ca_cnf"] = str(self._ca_cnf.path)
return dict_
@property
def is_installed(self):
"""
Check if the local CA is installed
"""
try:
result = all([self.root_dir.exists(),
self._ca_cert.exists(),
self._ca_key.exists(),
self._ca_cnf.path.exists(),
self._conf_dir.exists(),
self._newcerts.exists(),
self._certs.exists(),
self._crl.exists(),
self._serial.exists(),
self._index.exists()])
if result and self._ca_pki_db.exists():
with self._ca_pki_db.open("r") as f:
with self._ca_cert.open("r") as cert:
result &= cert.read() in f.read()
except Exception as e:
logger.error(e)
return False
return result
[docs] def setup(self):
"""
Creates directory and file structure needed by local CA.
"""
if self._ca_cnf is None:
raise SCAutolibException("CA CNF file is not set")
elif not self._ca_cnf.path.exists():
raise SCAutolibException("CA CNF does not exist")
self.root_dir.mkdir(parents=True, exist_ok=True)
self._newcerts.mkdir(exist_ok=True)
self._certs.mkdir(exist_ok=True)
self._crl.parent.mkdir(exist_ok=True)
with self._serial.open("w") as f:
f.write("01")
self._index.touch()
# Generate self-signed certificate
cmd = ['openssl', 'req', '-batch', '-config', self._ca_cnf.path,
'-x509', '-new', '-nodes', '-newkey', 'rsa:2048', '-keyout',
self._ca_key, '-sha256', '-set_serial', '0',
'-extensions', 'v3_ca', '-out', self._ca_cert]
run(cmd, check=True)
logger.info(f"CA private key is generated into {self._ca_key}")
logger.info(
f"CA self-signed certificate is generated into {self._ca_cert}")
# Configuring CRL
run(['openssl', 'ca', '-config', self._ca_cnf.path, '-gencrl',
'-out', self._crl], check=True)
logger.info("Local CA files are prepared")
[docs] def request_cert(self, csr: Path, username: str,
cert_out: Path = None) -> Path:
"""
Create the certificate from CSR and sign it. Certificate is stored
in the <root ca directory>/ca/newcerts directory with name username.pem
:param csr: path to CSR
:type csr: pathlib.Path
:param username: subject in the CSR
:type username: str
:param cert_out: path where the certificate should be duplicated.
Can be a directory or a file. If a file, .pem extension
would be set to the filename. If not specified,
certificate would be created in default directory and
filename <root ca directory>/certs/<username>.pem
:type cert_out: pathlib.Path
:return: returns path to the signed certificate
:rtype: pathlib.Path
"""
if cert_out is not None:
if cert_out.is_dir():
cert_out = cert_out.joinpath(f"{username}.pem")
elif cert_out.is_file() and cert_out.suffixes[-1] != ".pem":
cert_out = cert_out.with_suffix(".pem")
else:
cert_out = self._certs.joinpath(f"{username}.pem")
cmd = ["openssl", "ca", "-config", self._ca_cnf.path,
"-batch", "-keyfile", str(self._ca_key), "-in", str(csr),
"-notext", "-days", "365", "-extensions", "usr_cert",
"-out", str(cert_out)]
run(cmd, check=True)
return cert_out
[docs] def revoke_cert(self, cert: Path):
"""
Revoke given certificate
:param cert: path to the certificate
:type cert: pathlib.Path
"""
cmd = ['openssl', 'ca', '-config', self._ca_cnf.path, '-revoke', cert]
run(cmd, check=True)
cmd = ['openssl', 'ca', '-config', self._ca_cnf.path, '-gencrl',
'-out', self._crl]
run(cmd, check=True)
logger.info("Certificate is revoked")
[docs] def cleanup(self):
"""
Remove the root directory of the CA
"""
logger.warning(f"Removing local CA {self.root_dir}")
for file in self.root_dir.iterdir():
if file.is_file():
file.unlink()
elif file.is_dir():
shutil.rmtree(file)
if self.dump_file.exists():
self.dump_file.unlink()
logger.debug(f"Removed {self.dump_file} dump file")
logger.info(f"Local CA from {self.root_dir} is removed")
[docs]class CustomCA(BaseCA):
"""
:TODO: CustomCA is not tested yet and it's not functional until physical
cards testing with removinator is implemented
Represents CA for physical cards. Physical cards are often read-only and
rootCA certs or bundles are provided with a card. This class provides
methods for manipulation with rootCA certs of physical cards.
"""
ca_type = CAType.custom
def __init__(self, card: dict):
"""
Initialize required attributes
"""
self.ca_type = CustomCA.ca_type
self.name = card["ca_name"]
self.ca_cert = card["ca_cert"]
self.dump_file = LIB_DUMP_CAS.joinpath(f"{self.name}.json")
self.root_dir: Path = LIB_DIR.joinpath(self.name)
self._ca_cert = self.root_dir.joinpath(f"{self.name}.pem")
self._ca_pki_db: Path = BaseCA._ca_pki_db
[docs] def setup(self):
"""
Create rootCA file. Actually, copy cert from conf.json
"""
self.root_dir.mkdir(parents=True, exist_ok=True)
if self.ca_cert is None:
raise SCAutolibException(
f"CA cerf for {self.name} not found")
with self._ca_cert.open('w') as newcert:
newcert.write(self.ca_cert)
logger.info("Local CA files are prepared")
[docs] def to_dict(self):
"""
Customising default property for better serialisation for storing to
JSON format.
:return: dictionary with all values. Path objects are typed to string.
:rtype: dict
"""
dict_ = {k: str(v) if type(v) is PosixPath else v
for k, v in super().__dict__.items()}
return dict_
[docs]class IPAServerCA(BaseCA):
"""
Class represents IPA server with integrated CA. Through this class
communication with IPA server is made primarily using
``python_freeipa.client_meta.ClientMeta`` class.
During setup of the IPA client
on the system, script generated on IPA server side has to be executed for
setting up the client for smart card authentication. For this reason SSH
connection is made to the server and the script is fetched in frame of
``IPAServerCA.create()`` method.
"""
ca_type = CAType.ipa
ca_name = "IPA"
_ca_cert: Path = Path("/etc/ipa/ca.crt")
_ipa_server_ip: str = None
_ipa_server_hostname: str = None
_ipa_server_domain: str = None
_ipa_server_admin_passwd: str = None
_ipa_server_realm: str = None
_ipa_client_hostname: str = None
_ipa_server_root_passwd: str = None
_ipa_client_script = Path(LIB_DIR, "ipa-client-sc.sh")
meta_client: ClientMeta = None
dump_file = LIB_DUMP_CAS.joinpath("ipa-server.json")
def __init__(self, ip_addr: str, server_hostname: str, domain: str,
admin_passwd: str, root_passwd: str, client_hostname: str,
realm: str = None):
"""
Initialize object for IPA client for given IPA server. Also, creates
meta client (python_freeipa.client_meta.ClientMeta) logged in to the
server and ready-to-use.
:param ip_addr: IP address of the IPA server
:type ip_addr: str
:param server_hostname: Hostname of the IPA server
:type server_hostname: str
:param domain: Domain name of the IPA server
:type domain: str
:param admin_passwd: Password for admin user on the IPA server
:type admin_passwd: str
:param root_passwd: Password for root user on the IPA server
(system user)
:type root_passwd: str
:param client_hostname: Hostname for the client. This name would be set
on the client host
:type client_hostname: str
:param realm: Kerberos realm. If not set, domain in upper cases would
be used instead
:type realm: str
"""
self.ca_type = IPAServerCA.ca_type
self.name = IPAServerCA.ca_name
self._ipa_server_ip = ip_addr
self._ipa_server_hostname = server_hostname
self._add_to_hosts() # So we can log in to the IPA before setup
self._ipa_server_domain = domain
self._ipa_server_admin_passwd = admin_passwd
self._ipa_server_realm = realm if realm is not None else domain.upper()
self._ipa_client_hostname = client_hostname
self._ipa_server_root_passwd = root_passwd
self._meta_client_login()
@property
def is_installed(self):
"""
:return: True, if IPA client is installed on the system (/etc/ipa
directory contains ca.crt file from IPA server), otherwise False
:rtype: bool
"""
d = Path("/etc/ipa")
result = d.exists()
if result:
result = d.joinpath("ca.crt")
return result
@property
def domain(self):
return self._ipa_server_domain
[docs] def to_dict(self):
"""
Customising default property for better serialisation for storing to
JSON format.
:return: dictionary with all values. Path objects are typed to string.
:rtype: dict
"""
dict_: dict = super().__dict__.copy()
dict_.pop("meta_client")
return dict_
[docs] def setup(self):
"""
Setup IPA client for IPA server. After IPA client is installed, system
would be configured for smart card login with IPA using script from
IPA server obtained via SSH.
"""
logger.info(f"Start setup of IPA client on the system for "
f"{self._ipa_server_hostname} IPA server.")
self._add_to_resolv()
self._set_hostname()
logger.info("Installing IPA client")
try:
run(["ipa-client-install", "-p", "admin",
"--password", self._ipa_server_admin_passwd,
"--server", self._ipa_server_hostname,
"--domain", self._ipa_server_domain,
"--realm", self._ipa_server_realm,
"--hostname", self._ipa_client_hostname,
"--force", "--force-join", "--no-ntp",
"--no-dns-sshfp", "--mkhomedir", "--unattended"],
input="yes")
except:
logger.critical("Installation of IPA client is failed")
rmtree("/etc/ipa/*")
logger.debug("Directory /etc/ipa is removed")
raise
logger.debug("IPA client is installed")
try:
copy2("/tmp/cont-data/config-client-for-smart-card-auth.sh",
self._ipa_client_script)
logger.info("Script for setting up IPA client for smart cards was "
f"found and copied to {self._ipa_client_script}")
except FileNotFoundError:
logger.info("Script for setting up IPA client for smart cards was "
"not found. It will be generated on IPA server and "
"fetched")
self._get_sc_setup_script()
run("kinit admin", input=self._ipa_server_admin_passwd)
run(f'bash {self._ipa_client_script} /etc/ipa/ca.crt', check=True)
logger.debug("Setup of IPA client for smart card is finished")
self._meta_client_login()
policy = self.meta_client.pwpolicy_show(a_cn="global_policy")["result"]
if ["0"] != policy["krbminpwdlife"]:
self.meta_client.pwpolicy_mod(a_cn="global_policy",
o_krbminpwdlife=0)
logger.debug("Minimal kerberos password lifetime is set to 0 days")
if "365" not in policy["krbmaxpwdlife"]:
self.meta_client.pwpolicy_mod(a_cn="global_policy",
o_krbmaxpwdlife=365)
logger.debug(
"Maximum kerberos password lifetime is set to 365 days")
# TODO: add to restore client host name
logger.info("IPA client is configured on the system.")
[docs] def _set_hostname(self):
"""
Set hostname for specified IPA client hostname
"""
run(f"hostnamectl set-hostname {self._ipa_client_hostname} --static")
logger.debug(f"Hostname is set to {self._ipa_client_hostname}")
[docs] def _add_to_resolv(self):
"""
Add new nameserver (IPA) to /etc/resolv.conf and lock this file for
editing
"""
nameserver = f"nameserver {self._ipa_server_ip}"
pattern = rf"^nameserver\s+{self._ipa_server_ip}\s*"
with open("/etc/resolv.conf", "r") as f:
cnt = f.read()
logger.debug(f"Original resolv.conf:\n{cnt}")
if re.match(pattern, cnt) is None:
logger.warning(f"Nameserver {self._ipa_server_ip} is not "
"present in /etc/resolv.conf. Adding...")
cnt = (nameserver + "\n" + cnt)
with open("/etc/resolv.conf", "w") as f:
f.write(cnt)
logger.info(
"IPA server is added to /etc/resolv.conf as first nameserver")
with open("/etc/resolv.conf", "r") as f:
logger.debug(f"New resolv.conf\n{f.read()}")
[docs] def _add_to_hosts(self):
"""
Add IPA server (IP address and hostname) to /etc/hosts
"""
entry = f"{self._ipa_server_ip} {self._ipa_server_hostname}"
with open("/etc/hosts", "r+") as f:
cnt = f.read()
if entry not in cnt:
f.write(f"\n{entry}\n")
logger.warning(
f"New entry {entry} for IPA server is added to /etc/hosts")
logger.info(
f"Entry for IPA server {entry} presents in the /etc/hosts")
[docs] def _get_sc_setup_script(self):
"""
Fetch script for smart card setup of IPA client and place it to
predefined location. Script is generated only on IPA server.
Fetching is done by connecting to the host via SSH.
"""
import paramiko
from invoke import Responder
from fabric.connection import Connection
kinitpass = Responder(
pattern=f"Password for admin@{self._ipa_server_realm}: ",
response=f"{self._ipa_server_admin_passwd}\n")
logger.debug("Start receiving client script for setting up smart card "
"on IPA client")
with Connection(self._ipa_server_ip, user="root",
connect_kwargs={
"password": self._ipa_server_root_passwd}) as c:
# TODO Delete this block when PR in paramiko will be accepted
# https://github.com/paramiko/paramiko/issues/396
#### noqa:E266
paramiko.PKey.get_fingerprint = \
lambda x: sha256(x.asbytes()).digest()
c.client = paramiko.SSHClient()
c.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
#### noqa:E266
c.open()
# in_stream = False is required because while testing with pytest
# it collision appears with capturing of the output.
logger.debug("Running kinit on the IPA server")
c.run("kinit admin", pty=True,
watchers=[kinitpass], in_stream=False)
result = c.run("ipa-advise config-client-for-smart-card-auth",
hide=True, in_stream=False)
logger.debug("Script is generated on server side")
with open(self._ipa_client_script, "w") as f:
f.write(result.stdout)
if os.stat(self._ipa_client_script).st_size == 0:
msg = "Script for IPA client smart card setup is not correctly " \
"copied to the host"
logger.error(result.stdout)
logger.error(result.stderr)
raise SCAutolibException(msg)
logger.debug("File for setting up IPA client for smart cards is "
f"copied to {self._ipa_client_script}")
[docs] def request_cert(self, csr: Path, username: str, cert_out: Path):
"""
Request certificate using CSR from IPA CA for given username. It is
a wrapper around the python_freeipa.client_meta.ClientMeta.cert_request
method. It works with a file, extracts CSR data from it and then
stores in PEM format adding required prefix and suffix as in normal
certificate and. If cert_out is a directory, then certificate would be
stored in this directory with name <username>.pem. If it is a file,
then check if it has PEM extension. If not, append .pem suffix to the
name.
:param csr: path to CSR
:type csr: patlib.Path
:param username: subject for the certificate
:type username: str
:param cert_out: path where the certificate is stored. Can be a
directory or a file.
:type cert_out: patlib.Path
:return: Path to the PEM certificate.
:rtype: patlib.Path
"""
with csr.open() as f:
csr_content = f.read()
r = self.meta_client.cert_request(a_csr=csr_content,
o_principal=username)
logger.debug(r)
cert = r["result"]["certificate"]
if cert_out.is_dir():
cert_out = cert_out.joinpath(f"{username}.pem")
else:
cert_out = cert_out.with_suffix(".pem")
with cert_out.open("w") as f:
f.write("-----BEGIN CERTIFICATE-----\n"
f"{cert}\n"
f"-----END CERTIFICATE-----")
return cert_out
[docs] def add_user(self, user):
"""
Add given user to IPA server. It is a wrapper on the
python_freeipa.client_meta.ClientMeta.user_add method. Just extracts
necessary fields from IPAUser object and pass them to the method. As a
result, o_givenname == o_uid == o_sn == o_cn for simplicity.
:param user: User to be added to the IPA server.
"""
r = self.meta_client.user_add(user.username, user.username,
user.username, user.username,
o_userpassword=user.password)
logger.debug(r)
logger.info(f"User {user.username} is added to the IPA server")
[docs] def del_user(self, user):
"""
Remove user from IPA server.
:param user: User to be deleted
:raise AssertionError: If operation is failed. Fail is detected in
return value from request to IPA server.
"""
r = self.meta_client.user_del(user.username)["result"]
logger.debug(r)
logger.info(f"User {user.username} is removed from the IPA server")
[docs] def revoke_cert(self, cert_path: Path):
"""
Revoke given certificate on the IPA server. It is a wrapper on the
python_freeipa.client_meta.ClientMeta.revoke_cert method. It extracts
serial number of the certificate from the file
:param cert_path: Path to the certificate in PEM format
"""
with cert_path.open("rb") as f:
cert = x509.load_pem_x509_certificate(f.read())
r = self.meta_client.cert_revoke(cert.serial_number)
logger.debug(r)
logger.info(f"Certificate {cert.serial_number} is revoked")
return cert.serial_number
[docs] def cleanup(self):
"""
Remove IPA client from the system and from the IPA server
:raise subprocess.CalledProcessError: by run function
"""
logger.warning("Removing IPA client from the host "
f"{gethostname()}")
try:
r = self.meta_client.host_del(
a_fqdn=gethostname(), o_updatedns=True)["result"]
logger.debug(r)
assert r["failed"] == [], "Deleting of the host is failed"
except exceptions.NotFound:
logger.error(f"Current hostname ({gethostname()}) is not found "
f"on the IPA server")
# Return code 2 means that the IPA client is not configured
run(["ipa-client-install", "--uninstall", "-U"], return_code=[0, 2])
logger.info("IPA client is removed.")
@property
def ipa_server_hostname(self):
return self._ipa_server_hostname