Source code for SCAutolib.controller

import json
import os
from pathlib import Path
from schema import Schema, Use
from shutil import rmtree
from typing import Union

from SCAutolib import exceptions, schema_cas, schema_user, schema_card
from SCAutolib import (logger, run, LIB_DIR, LIB_BACKUP, LIB_DUMP,
                       LIB_DUMP_USERS, LIB_DUMP_CAS, LIB_DUMP_CARDS,
                       LIB_DUMP_CONFS, TEMPLATES_DIR)
from SCAutolib.models import CA, file, user, card, authselect as auth
from SCAutolib.models.file import File, OpensslCnf
from SCAutolib.models.CA import BaseCA
from SCAutolib.enums import (CardType, UserType)
from SCAutolib.utils import (_check_selinux, _gen_private_key,
                             _install_packages, _check_packages,
                             dump_to_json, ca_factory)
from SCAutolib.isDistro import isDistro


[docs]class Controller: authselect: auth.Authselect = auth.Authselect() sssd_conf: file.SSSDConf = file.SSSDConf() lib_conf: dict = None _lib_conf_path: Path = None local_ca: CA.LocalCA = None ipa_ca: CA.IPAServerCA = None users: [user.User] = None dconf_file = File(filepath='/etc/dconf/db/local.d/gnome_disable_welcome', template=Path(TEMPLATES_DIR, 'gnome_disable_welcome')) @property def conf_path(self): return self._lib_conf_path def __init__(self, config: Union[Path, str] = None, params: {} = None): """ Constructor will parse and check input configuration file. If some required fields in the configuration are missing, CLI parameters would be checked if missing values are there. If not, an exception would be raised. After parsing the configuration file and filling internal values of the Controller object, other related objects (users, cards, CAs, Authselect, etc.) would be initialised, but any real action that would affect the system wouldn't be made. :param config: Path to configuration file with metadata for testing. :type config: pathlib.Path or str :param params: Parameters from CLI :type params: dict :return: """ # Check params # Parse config file self.lib_conf = None if config: self._lib_conf_path = config.absolute() if isinstance(config, Path) \ else Path(config).absolute() with self._lib_conf_path.open("r") as f: tmp_conf = json.load(f) if tmp_conf is None: raise exceptions.SCAutolibException( "Data are not loaded correctly.") self.lib_conf = self._validate_configuration(tmp_conf, params) self.users = [] for d in (LIB_DIR, LIB_BACKUP, LIB_DUMP, LIB_DUMP_USERS, LIB_DUMP_CAS, LIB_DUMP_CARDS, LIB_DUMP_CONFS): d.mkdir(exist_ok=True) if LIB_DUMP_CAS.joinpath("local_ca.json").exists(): self.local_ca = BaseCA.load(LIB_DUMP_CAS.joinpath("local_ca.json")) if LIB_DUMP_CAS.joinpath("ipa-server.json").exists(): self.ipa_ca = BaseCA.load(LIB_DUMP_CAS.joinpath("ipa-server.json"))
[docs] def prepare(self, force: bool, gdm: bool, install_missing: bool, graphical: bool): """ Prepare system for testing. This method provides complex configuration of system under test for testing including creation of CAs, users and smart cards in the system and objects that represents them in SCAutolib. Configuration is based on config file and CLI options. :param force: Defines if existing objects, files, users, services etc. should be erased or overwritten if they already exist. True stands for erase/overwrite. This parameter is forwarded to several methods and it can have slightly different meaning in each of them. For details see docstrings of the methods. :type force: bool :param gdm: If True, GDM package would be installed :type gdm: bool :param install_missing: If True, all missing packages would be installed :type install_missing: bool :param graphical: If True, GUI tests dependencies are installed :type graphical: bool """ self.setup_system(install_missing, gdm, graphical) # Prepare CAs: Virtual cards are populated by certificates that are: a) # created locally and signed by local CA configured on the system under # test, or b) created and signed using FreeIPA. try: self.setup_local_ca(force=force) except exceptions.SCAutolibWrongConfig as e: logger.info(e) try: self.setup_ipa_client(force=force) except exceptions.SCAutolibWrongConfig as e: logger.info(e) for usr in self.lib_conf["users"]: self.setup_user(usr, force=force) # Create cards defined in config. For physical cards only objects will # be created while for virtual cards tokens will be created and enrolled for token in self.lib_conf["cards"]: # prepare CA objects for physical cards if token["card_type"] == CardType.physical: self.setup_custom_ca(token) self.setup_card(token) elif token["card_type"] == CardType.virtual: c = self.setup_card(token) self.enroll_card(c)
[docs] def setup_system(self, install_missing: bool, gdm: bool, graphical: bool): """ Do general system setup meaning package installation based on specifications in the configuration file, SSSD configuration, configurations for virtual smart cards, etc. :param install_missing: If True, all missing packages would be installed :type install_missing: bool :param gdm: If True, GDM package would be installed :type gdm: bool :param graphical: If True, GUI tests dependencies are installed :type graphical: bool :return: """ for d in (LIB_DIR, LIB_BACKUP, LIB_DUMP, LIB_DUMP_USERS, LIB_DUMP_CAS, LIB_DUMP_CARDS): d.mkdir(exist_ok=True) packages = ["opensc", "httpd", "sssd", "sssd-tools", "gnutls-utils", "openssl", "nss-tools"] # Prepare for virtual cards if any(c["card_type"] == CardType.virtual for c in self.lib_conf["cards"]): packages += ["pcsc-lite-ccid", "pcsc-lite", "virt_cacard", "vpcd", "softhsm"] run("dnf -y copr --hub fedora enable jjelen/vsmartcard") # Add IPA packages if needed if any([u["user_type"] == UserType.ipa for u in self.lib_conf["users"]]): packages += self._general_steps_for_ipa() # Check for installed packages missing = _check_packages(packages) if install_missing and missing: _install_packages(missing) elif missing: msg = "Can't continue. Some packages are missing: " \ f"{', '.join(missing)}" logger.critical(msg) raise exceptions.SCAutolibException(msg) if graphical: self.setup_graphical(install_missing, gdm) if not isDistro('fedora'): run(['dnf', 'groupinstall', "Smart Card Support", '-y', '--allowerasing']) logger.debug("Smart Card Support group in installed.") else: # Fedora requires rsyslog as well run(['dnf', 'install', 'opensc', 'pcsc-lite-ccid', 'rsyslog', '-y']) run(['systemctl', 'start', 'rsyslog']) self.sssd_conf.create() self.sssd_conf.save() self._general_steps_for_virtual_sc() base_user = user.User("base-user", "redhat") base_user.add_user() dump_to_json(base_user) dump_to_json(user.User(username="root", password=self.lib_conf["root_passwd"]))
def setup_graphical(self, install_missing: bool, gdm: bool): packages = ["gcc", "tesseract", "ffmpeg-free"] if gdm: packages.append("gdm") missing = _check_packages(packages) if install_missing and missing: _install_packages(missing) elif missing: msg = "Can't continue with graphical. Some packages are missing: " \ f"{', '.join(missing)}" logger.critical(msg) raise exceptions.SCAutolibException(msg) if not isDistro('fedora'): run(['dnf', 'groupinstall', 'Server with GUI', '-y', '--allowerasing']) run(['pip', 'install', 'python-uinput']) else: # Fedora doesn't have server with GUI group so installed gdm # manually and also python3-uinput should be installed from RPM run(['dnf', 'install', 'gdm', 'python3-uinput', '-y']) # disable subscription message run(['systemctl', '--global', 'mask', 'org.gnome.SettingsDaemon.Subscription.target']) # disable welcome message if not self.dconf_file.exists(): self.dconf_file.create() self.dconf_file.save() run('dconf update')
[docs] def setup_local_ca(self, force: bool = False): """ Setup local CA based on configuration from the configuration file. All necessary files for this operation (e.g. CNF file for self-signed root certificate) would be created along the way. :param force: If local CA already exists in given directory, specifies if it should be overwritten :type force: bool :raises: SCAutolib.exceptions.SCAutolibWrongConfig """ if "local_ca" not in self.lib_conf["ca"]: msg = "Section for local CA is not found in the configuration file" raise exceptions.SCAutolibWrongConfig(msg) ca_dir: Path = self.lib_conf["ca"]["local_ca"]["dir"] ca_dir.mkdir(exist_ok=True, parents=True) cnf = OpensslCnf(ca_dir.joinpath("ca.cnf"), "CA", str(ca_dir)) self.local_ca = ca_factory(path=ca_dir, cnf=cnf, create=True) if force: logger.warning(f"Removing previous local CA from {ca_dir}") self.local_ca.cleanup() cnf.create() cnf.save() self.local_ca.setup() self.local_ca.update_ca_db() run(["systemctl", "restart", "sssd"], sleep=5) logger.info(f"Local CA is configured in {ca_dir}") dump_to_json(self.local_ca)
def setup_custom_ca(self, card_data: dict): if card_data["card_type"] == CardType.physical: ca = ca_factory(create=True, card_data=card_data) ca.setup() if not ca._ca_cert.is_file(): raise FileNotFoundError(f"File not found: {ca._ca_cert}") dump_to_json(ca)
[docs] def setup_ipa_client(self, force: bool = False): """ Configure IPA client for given IPA server on current host. IPA server should be already up and running for correct configuration of the IPA client :param force: If IPA Client is already configured on the system, specifies if it should be removed before configuring a new client. :type force: bool :raises: SCAutolib.exceptions.SCAutolibWrongConfig """ if "ipa" not in self.lib_conf["ca"]: msg = "Section for IPA is not found in the configuration file" raise exceptions.SCAutolibWrongConfig(msg) self.ipa_ca = CA.IPAServerCA(**self.lib_conf["ca"]["ipa"]) if self.ipa_ca.is_installed: logger.warning("IPA client is already configured on this system.") if not force: logger.info("Set force argument to True if you want to remove " "previous installation.") return self.ipa_ca.cleanup() else: logger.info("IPA client is not configured on the system") self.ipa_ca.setup() self.sssd_conf.update_default_content() self.sssd_conf.set(key="domains", value=f"shadowutils, {self.ipa_ca.domain}", section="sssd") dump_to_json(self.ipa_ca)
[docs] def setup_user(self, user_dict: dict, force: bool = False): """ Configure the user on the specified system (local machine/CA). :param force: specify if the user should be re-created with its card directory :type force: bool :param user_dict: set of values to initialise the user :type user_dict: dict :return: the user object """ new_user = None if user_dict["user_type"] == UserType.local: new_user = user.User(username=user_dict["name"], password=user_dict["passwd"]) if force: new_user.delete_user() new_user.add_user() else: if self.ipa_ca is None: msg = "Can't proceed in configuration of IPA user because no " \ "IPA Client is configured" raise exceptions.SCAutolibException(msg) new_user = user.IPAUser(ipa_server=self.ipa_ca, username=user_dict["name"], password=user_dict["passwd"]) if force: new_user.delete_user() new_user.add_user() self.users.append(new_user) dump_to_json(new_user) return new_user
[docs] def setup_card(self, card_dict: dict, force: bool = False): """ Create card object. Card object should contain its root CA cert as it represents general card (i.e. including physical read-only cards). :param card_dict: Dictionary containing card attributes :type card_dict: dict :param force: If its true and card directory exists it will be removed :type force: bool """ card_dir: Path = Path("/root/cards", card_dict["name"]) card_dir.mkdir(parents=True, exist_ok=True) if force and card_dir.exists(): rmtree(card_dir) if card_dict["card_type"] == CardType.physical: new_card = card.PhysicalCard(card_dict, card_dir=card_dir) elif card_dict["card_type"] == CardType.virtual: hsm_conf = self.prepare_softhsm_config(card_dir) new_card = card.VirtualCard(card_dict, softhsm2_conf=hsm_conf.path, card_dir=card_dir) # card needs to know some details of its user new_card.user = self.link_user_to_card(new_card) if new_card.user.user_type == UserType.local: new_card.cnf = self.prepare_user_cnf(new_card) if force: self.revoke_certs(new_card) new_card.create() else: raise NotImplementedError("Other card types than 'physical' and " "'virtual' are not supported") dump_to_json(new_card) return new_card
def link_user_to_card(self, card: card.VirtualCard): for card_user in self.users: if card_user.username == card.cardholder: return card_user
[docs] def prepare_softhsm_config(self, card_dir: Path = None): """Prepare SoftHSM2 config for virtual card""" filepath = card_dir.joinpath("sofhtsm2.conf") hsm_conf = file.SoftHSM2Conf(filepath, card_dir=card_dir) hsm_conf.create() hsm_conf.save() return hsm_conf
[docs] def prepare_user_cnf(self, card: card.VirtualCard): """Prepare user openssl cnf""" cnf_path = card.card_dir.joinpath(f"{card.cardholder}.cnf") cnf = file.OpensslCnf(filepath=cnf_path, conf_type="user", replace=[card.cardholder, card.CN]) cnf.create() cnf.save() return cnf.path
def revoke_certs(self, card: card.VirtualCard): if card.cert and card.cert.exists(): if card.user.user_type == UserType.local: self.local_ca.revoke_cert(card.cert) else: self.ipa_ca.revoke_cert(card.cert)
[docs] def enroll_card(self, card: card.VirtualCard): """ Enroll the card - i.e. upload keys and certs to card. If private key and/or the certificate do not exist, new one's would be requested from corresponding CA. :param card: card object :type card: card.VirtualCard """ logger.debug(f"Starting enrollment of the card {card.name}") if not card: raise exceptions.SCAutolibException( f"Card {card.name} is not initialized") if not card.key.exists(): _gen_private_key(card.key) if not card.cert.exists(): csr = card.gen_csr() ca = self.ipa_ca \ if isinstance(card.user, user.IPAUser) else self.local_ca card.cert = ca.request_cert(csr, card.cardholder, card.cert) card.enroll() dump_to_json(card)
[docs] def cleanup(self): """ Clean the system after setup. This method restores the SSSD config file, deletes created users with cards, remove CA's (local and/or IPA Client) """ users = {} for user_file in LIB_DUMP_USERS.iterdir(): usr = user.User.load(user_file, ipa_server=self.ipa_ca) users[usr.username] = usr if usr.username != "root": usr.delete_user() for card_file in LIB_DUMP_CARDS.iterdir(): if card_file.exists(): card_obj = card.Card.load(card_file) if card_obj.card_type == CardType.virtual: card_obj.user = users[card_obj.cardholder] self.revoke_certs(card_obj) card_obj.delete() if self.local_ca: self.local_ca.cleanup() self.local_ca.restore_ca_db() if self.ipa_ca: self.ipa_ca.cleanup() opensc_cache_dir = Path(os.path.expanduser('~') + "/.cache/opensc/") if opensc_cache_dir.exists(): for cache_file in opensc_cache_dir.iterdir(): cache_file.unlink() logger.debug("Removed opensc file cache") sssd_cache_dir = Path(os.path.expanduser('~sssd') + "/.cache/opensc/") if sssd_cache_dir.exists(): for cache_file in sssd_cache_dir.iterdir(): cache_file.unlink() logger.debug("Removed opensc file cache for sssd user") # file only created in graphical mode that is why it is removed. self.dconf_file.remove() self.sssd_conf.restore() pcscd_service = File("/usr/lib/systemd/system/pcscd.service") pcscd_service.restore() opensc_module = File("/usr/share/p11-kit/modules/opensc.module") opensc_module.restore()
[docs] @staticmethod def _validate_configuration(conf: dict, params: {} = None) -> dict: """ Validate schema of the configuration file. If some value is not present in the config file, this value would be looked in the CLI parameters :param conf: Configuration to be parsed (e.g. data loaded from JSON file) :type conf: dict :param params: CLI arguments :type params: dict :return: dictionary with parsed values from conf and params attributes. All values are retyped to specified type. :rtype: dict """ # FIXME: any schema requires all values to be in the config file, and # only IP address of IPA server is accepted from CLI arguments. # Add loading of the values from params dict # IP regex # Specify validation schema for CAs # Specify general schema for whole config file schema = Schema({"root_passwd": Use(str), "ca": schema_cas, "users": [schema_user], "cards": [schema_card]}) return schema.validate(conf)
[docs] @staticmethod def _general_steps_for_virtual_sc(): """ Prepare the system for virtual smart card. Preparation means to configure pcscd service and opensc module to work correctly with virtual smart card. Also, repository for installing virt_cacard package is added in this method. """ _check_selinux() pcscd_service = File("/usr/lib/systemd/system/pcscd.service") pcscd_service.backup() exec_start = pcscd_service.get(section="Service", key="ExecStart") if "--auto-exit" in exec_start: exec_start = exec_start.replace("--auto-exit", "") pcscd_service.set(section="Service", key="ExecStart", value=exec_start) pcscd_service.save() opensc_module = File("/usr/share/p11-kit/modules/opensc.module") opensc_module.backup() try: opensc_module.get("disable-in", separator=":") except exceptions.SCAutolibException: logger.warning("OpenSC module does not have option 'disable-in: " "virt_cacard' set") opensc_module.set(key="disable-in", value="virt_cacard", separator=": ") opensc_module.save() run(['systemctl', 'stop', 'pcscd.service', 'pcscd.socket', 'sssd']) rmtree("/var/lib/sss/mc/*", ignore_errors=True) rmtree("/var/lib/sss/db/*", ignore_errors=True) logger.debug( "Directories /var/lib/sss/mc/ and /var/lib/sss/db/ removed") run("systemctl daemon-reload") run("systemctl restart pcscd sssd") logger.debug("Copr repo for virt_cacard is enabled")
[docs] @staticmethod def _general_steps_for_ipa(): """ General system preparation for installing IPA client on RHEL/Fedora :return: name of the IPA client package for current Linux """ if isDistro(['rhel', 'centos'], version='8'): run("dnf module enable -y idm:DL1") run("dnf install @idm:DL1 -y") logger.debug("idm:DL1 module is installed") if isDistro('fedora'): return ["e2fsprogs", "freeipa-client"] else: return ["e2fsprogs", "ipa-client"]
[docs] def get_user_dict(self, name): """ Get user dictionary from the config file. :param name: name of the user :type name: str :return: user dictionary :rtype: dict """ for user_dict in self.lib_conf["users"]: if user_dict["name"] == name: return user_dict raise exceptions.SCAutolibMissingUserConfig(name)
[docs] def init_ca(self, local: bool = False): """ Initialize CA. :param local: if True, local CA is initialized, otherwise IPA :type local: bool """ if local: self.local_ca = CA.LocalCA(self.lib_conf["ca"]["local_ca"]["dir"]) if not self.local_ca.cert.exists(): raise exceptions.SCAutolibMissingCA( f"CA certificate not found in {str(self.local_ca.cert)}") else: self.ipa_ca = CA.IPAServerCA(self.lib_conf["ca"]["ipa"]) if not self.ipa_ca.is_installed: raise exceptions.SCAutolibMissingCA( "IPA server CA is not installed") logger.info("CA is initialized")