135 lines
		
	
	
		
			4.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			135 lines
		
	
	
		
			4.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import subprocess
 | 
						|
 | 
						|
from .config import getDockersConfig, getSecretConfig
 | 
						|
 | 
						|
mattermost_user = getSecretConfig("mattermostServ", "MM_ADMIN_USER")
 | 
						|
mattermost_pass = getSecretConfig("mattermostServ", "MM_ADMIN_PASSWORD")
 | 
						|
mattermost_url = f"https://{getDockersConfig('matterHost')}.{getDockersConfig('domain')}"
 | 
						|
mmctl = "docker exec -i mattermostServ bin/mmctl"
 | 
						|
 | 
						|
class Mattermost:
 | 
						|
 | 
						|
    def __init__(self):
 | 
						|
        pass
 | 
						|
 | 
						|
    def __enter__(self):
 | 
						|
        self.authenticate()
 | 
						|
        return self
 | 
						|
 | 
						|
    def __exit__(self, tp, e, traceback):
 | 
						|
        self.logout()
 | 
						|
 | 
						|
 | 
						|
    def authenticate(self):
 | 
						|
         # Authentification sur MM
 | 
						|
         cmd = f"{mmctl} auth login {mattermost_url} --name local-server --username {mattermost_user} --password {mattermost_pass}"
 | 
						|
         subprocess.run(cmd, shell=True, stderr=subprocess.STDOUT, check=True)
 | 
						|
 | 
						|
 | 
						|
    def logout(self):
 | 
						|
         # Authentification sur MM
 | 
						|
         cmd = f"{mmctl} auth clean"
 | 
						|
         subprocess.run(cmd, shell=True, stderr=subprocess.STDOUT, check=True)
 | 
						|
 | 
						|
    def post_message(self, message, equipe="kaz", canal="creation-comptes"):
 | 
						|
        """
 | 
						|
        Envoyer un message dans une Equipe/Canal de MM
 | 
						|
        """
 | 
						|
        cmd = f"{mmctl} post create {equipe}:{canal} --message \"{message}\""
 | 
						|
        output = subprocess.check_output(cmd, shell=True, stderr=subprocess.STDOUT)
 | 
						|
        return output.decode()
 | 
						|
 | 
						|
 | 
						|
    def get_user(self, user):
 | 
						|
        """
 | 
						|
        Le user existe t-il sur MM ?
 | 
						|
        """
 | 
						|
        try:
 | 
						|
            cmd = f"{mmctl} user search {user} --json"
 | 
						|
            user_list_output = subprocess.check_output(cmd, shell=True)
 | 
						|
            return True  # Le nom d'utilisateur existe
 | 
						|
        except subprocess.CalledProcessError:
 | 
						|
            return False
 | 
						|
 | 
						|
 | 
						|
    def create_user(self, user, email, password):
 | 
						|
        """
 | 
						|
        Créer un utilisateur sur MM
 | 
						|
        """
 | 
						|
        cmd = f"{mmctl} user create --email {email} --username {user} --password {password}"
 | 
						|
        output = subprocess.check_output(cmd, shell=True, stderr=subprocess.STDOUT)
 | 
						|
        return output.decode()
 | 
						|
 | 
						|
 | 
						|
    def delete_user(self, email):
 | 
						|
        """
 | 
						|
        Supprimer un utilisateur sur MM
 | 
						|
        """
 | 
						|
        cmd = f"{mmctl} user delete {email} --confirm"
 | 
						|
        output = subprocess.check_output(cmd, shell=True, stderr=subprocess.STDOUT)
 | 
						|
        return output.decode()
 | 
						|
 | 
						|
 | 
						|
    def update_password(self, email, new_password):
 | 
						|
        """
 | 
						|
        Changer un password pour un utilisateur de MM
 | 
						|
        """
 | 
						|
        cmd = f"{mmctl} user change-password {email} --password {new_password}"
 | 
						|
        output = subprocess.check_output(cmd, shell=True, stderr=subprocess.STDOUT)
 | 
						|
        return output.decode()
 | 
						|
 | 
						|
 | 
						|
    def add_user_to_team(self, email, equipe):
 | 
						|
        """
 | 
						|
        Affecte un utilisateur à une équipe MM
 | 
						|
        """
 | 
						|
        cmd = f"{mmctl} team users add {equipe} {email}"
 | 
						|
        output = subprocess.check_output(cmd, shell=True, stderr=subprocess.STDOUT)
 | 
						|
        return output.decode()
 | 
						|
 | 
						|
 | 
						|
    def add_user_to_channel(self, email, equipe, canal):
 | 
						|
        """
 | 
						|
        Affecte un utilisateur à un canal MM
 | 
						|
        """
 | 
						|
        cmd = f'{mmctl} channel users add {equipe}:{canal} {email}'
 | 
						|
        output = subprocess.check_output(cmd, shell=True, stderr=subprocess.STDOUT)
 | 
						|
        return output.decode()
 | 
						|
 | 
						|
 | 
						|
    def get_teams(self):
 | 
						|
        """
 | 
						|
        Lister les équipes sur MM
 | 
						|
        """
 | 
						|
        cmd = f"{mmctl} team list --disable-pager"
 | 
						|
        output = subprocess.check_output(cmd, shell=True, stderr=subprocess.STDOUT)
 | 
						|
        data_list = output.decode("utf-8").strip().split('\n')
 | 
						|
        data_list.pop()
 | 
						|
        return data_list
 | 
						|
 | 
						|
 | 
						|
    def create_team(self, equipe, email):
 | 
						|
        """
 | 
						|
        Créer une équipe sur MM et affecter un admin si email est renseigné (set admin marche pô)
 | 
						|
        """
 | 
						|
 | 
						|
        #DANGER: l'option --email ne rend pas le user admin de l'équipe comme c'est indiqué dans la doc :(
 | 
						|
        cmd = f"{mmctl} team create --name {equipe} --display-name {equipe} --private --email {email}"
 | 
						|
        output = subprocess.check_output(cmd, shell=True, stderr=subprocess.STDOUT)
 | 
						|
 | 
						|
        #Workaround: on récup l'id du user et de l'équipe pour affecter le rôle "scheme_admin": true, "scheme_user": true avec l'api MM classique.
 | 
						|
        #TODO:
 | 
						|
 | 
						|
        return output.decode()
 | 
						|
 | 
						|
 | 
						|
    def delete_team(self, equipe):
 | 
						|
        """
 | 
						|
        Supprimer une équipe sur MM
 | 
						|
        """
 | 
						|
 | 
						|
        cmd = f"{mmctl} team delete {equipe} --confirm"
 | 
						|
        output = subprocess.check_output(cmd, shell=True, stderr=subprocess.STDOUT)
 | 
						|
        return output.decode()
 | 
						|
 |