KazV2/dockers/apikaz/source/app.py

2089 lines
73 KiB
Python

import os
import re
import requests
import subprocess
import logging
import tempfile
import ldap
import imaplib
import random
import string
import json
from flask import Flask, jsonify, send_from_directory, request, abort, json, Response, render_template
from flask_mail import Mail, Message
from flasgger import Swagger
from flask_restful import Api, Resource
from flask_jwt_extended import JWTManager, create_access_token, jwt_required
from passlib.hash import sha512_crypt
from unidecode import unidecode
from email_validator import validate_email, EmailNotValidError
from time import sleep
from glob import glob
from bs4 import BeautifulSoup
from datetime import datetime
app = Flask(__name__)
jwt = JWTManager(app)
api = Api(app)
app.logger.setLevel(logging.DEBUG)
swagger = Swagger(app, template={
"swagger": "2.0",
"info": {
"title": "L'API Kaz de la mort qui tue",
"version": "0.2.0",
"description": "Permettre des opérations de gestion des services kaz avec des écrans Ouaib"
},
"tags": [
{"name": "Authentication", "description": "Auth related operations"},
{"name": "Test", "description": "pour tester des conneries"},
{"name": "Password", "description": "Gestion Mdp"},
{"name": "Paheko", "description": "Gestion Paheko"},
{"name": "Mattermost", "description": "Gestion Mattermost Authent"},
{"name": "Mattermost User", "description": "Gestion Mattermost User"},
{"name": "Mattermost Team", "description": "Gestion Mattermost Team"},
{"name": "Ldap", "description": "Gestion Ldap"},
{"name": "Cloud", "description": "Gestion Cloud Général"},
{"name": "Sympa", "description": "Gestion Sympa"},
{"name": "Quota", "description": "Gestion Quota"},
{"name": "Dns", "description": "Gestion Dns"},
{"name": "Kaz User", "description": "Gestion Kaz User"}
],
"securityDefinitions": {
"basicAuth": {
"type": "basic",
"description": "Basic Authentication with username and password"
},
"Bearer": {
"type": "apiKey",
"name": "Authorization",
"in": "header",
"description": "JWT Authorization header using the Bearer scheme. Example: 'Bearer {token}'"
}
}
})
#TODO:
# check variables
# fail2ban (ou alors sur traefik)
# découper app.py en service
# quels scripts bash garder ?
#*************************************************
# Configuration du logger
# logging.basicConfig(level=logging.INFO)
# logger = logging.getLogger(__name__) # Création de l'objet logger
#*************************************************
#Filtrer les IP qui peuvent accéder à l'api
#TODO: au lieu d'avoir les IP en dur, prendre le fichier allow_ip'
trusted_ips = [
"82.64.20.246",
"31.39.14.228",
"51.75.112.172",
"80.11.47.59",
"90.121.138.71",
"109.190.2.75",
"89.234.177.115",
"80.215.140.40",
"80.67.176.91",
"89.234.177.119",
"78.127.1.19",
"80.215.236.243"
]
#*************************************************
#variables globales
#*************************************************
#le secret pour générer les tokens
#app.config['JWT_SECRET_KEY'] = os.environ.get('JWT_SECRET_KEY')
app.config['JWT_SECRET_KEY'] = os.environ.get('JWT_SECRET_KEY', 'your_jwt_secret_key')
#le paheko de kaz
paheko_ident=os.environ.get('paheko_API_USER')
paheko_pass=os.environ.get('paheko_API_PASSWORD')
paheko_url=os.environ.get('paheko_url')
mattermost_user=os.environ.get('mattermost_user')
mattermost_pass=os.environ.get('mattermost_pass')
mattermost_url=os.environ.get('mattermost_url')
ldap_admin=os.environ.get('ldap_LDAP_ADMIN_USERNAME')
ldap_pass=os.environ.get('ldap_LDAP_ADMIN_PASSWORD')
ldap_root=os.environ.get('ldap_root')
ldap_host="ldapServ.ldapNet"
#cloud général
cloud_ident=os.environ.get('nextcloud_NEXTCLOUD_ADMIN_USER')
cloud_pass=os.environ.get('nextcloud_NEXTCLOUD_ADMIN_PASSWORD')
cloud_url=os.environ.get('cloud_url')
gandi_key=os.environ.get('gandi_GANDI_KEY')
gandi_url_api=os.environ.get('gandi_GANDI_API')
site_url=os.environ.get('site_url')
#pour webmail_url et mdp_url, ça renvoie des tuples et non des str, bizarre, du coup, je mets en dur
#webmail_url=os.environ.get('webmail_url'),
#mdp_url=os.environ.get('mdp_url'),
webmail_url='https://webmail.kaz.bzh',
mdp_url='https://mdp.kazkouil.fr',
#pour le mail
app.config['MAIL_SERVER']= os.environ.get('apikaz_MAIL_SERVER')
app.config['MAIL_PORT'] = 587
app.config['MAIL_USERNAME'] = os.environ.get('apikaz_MAIL_USERNAME')
app.config['MAIL_PASSWORD'] = os.environ.get('apikaz_MAIL_PASSWORD')
app.config['MAIL_REPLY_TO'] = os.environ.get('apikaz_MAIL_REPLY_TO')
app.config['MAIL_USE_TLS'] = True
app.config['MAIL_USE_SSL'] = False
mail = Mail(app)
#sympa
sympa_ident=os.environ.get('sympa_SOAP_USER')
sympa_pass=os.environ.get('sympa_SOAP_PASSWORD')
sympa_url=os.environ.get('sympa_url')
sympa_liste_info=os.environ.get('sympa_liste_info')
MAIL_USERNAME=app.config['MAIL_USERNAME']
#pour QUOTA (à virer ensuite)
serveur_imap = os.environ.get('serveur_imap')
mot_de_passe_mail=os.environ.get('mot_de_passe_mail')
#*************************************************
@app.before_request
def limit_remote_addr():
if request.environ['HTTP_X_FORWARDED_FOR'] not in trusted_ips:
abort(jsonify(message="Et pis quoi encore ?"), 400)
#*************************************************
#authent mdp/pass basique
def check_auth(username, password):
return username == os.environ.get('apikaz_doc_user') and password == os.environ.get('apikaz_doc_password')
def authenticate():
return Response('tssssss.\n', 401, {'WWW-Authenticate': 'Basic realm="Login Required"'})
@app.before_request
def require_basic_auth():
if request.path.startswith('/apidocs') or request.path.startswith('/print_env'):
#if request.path.startswith('/'):
auth = request.authorization
if not auth or not check_auth(auth.username, auth.password):
return authenticate()
#*************************************************
#DANGER: ne jamais mettre print_env en PROD
@app.route('/print_env')
def print_environment():
# Crée une chaîne de caractères pour stocker les variables d'environnement
env_string = ""
# Itère sur les variables d'environnement et les ajoute à la chaîne de caractères
for key, value in os.environ.items():
env_string += f"{key}: {value}\n" + "<br>"
# Retourne la chaîne de caractères contenant les variables d'environnement
return env_string
#*************************************************
#***** DEBUT Quelques fonctions utiles ***********
#*************************************************
#pour injecter la date dans dans le contexte des template
@app.context_processor
def inject_now():
return {'now': datetime.now}
#*************************************************
#***** FIN Quelques fonctions utiles ***********
#*************************************************
#*************************************************
@app.route('/favicon.ico')
def favicon():
# return send_from_directory(os.path.join(app.root_path, 'static'),'favicon.ico')
return '', 204
#*************************************************
#la page d'accueil est vide
@app.route('/')
def silence():
return ""
#*************************************************
# obtenir un token
@app.route('/get_token', methods=['GET'])
def get_token():
"""
Get JWT token with basic auth
---
tags:
- Authentication
security:
- basicAuth: []
responses:
200:
description: Token generated successfully
schema:
type: object
properties:
access_token:
type: string
description: JWT access token
401:
description: Unauthorized
"""
auth = request.authorization
if auth and check_auth(auth.username, auth.password):
# Créez un token JWT après une authentification réussie
access_token = create_access_token(identity=auth.username)
return jsonify(access_token=access_token)
else:
return authenticate()
#*************************************************
#*******MDP***************************************
#*************************************************
class Password_create(Resource):
@jwt_required()
def get(self):
"""
créer un password qui colle avec les appli kaz
---
tags:
- Password
security:
- Bearer: []
parameters: []
responses:
200:
description: le password
404:
description: oops
"""
global new_password
cmd="apg -n 1 -m 10 -M NCL -d"
try:
output = subprocess.check_output(cmd, shell=True, stderr=subprocess.STDOUT)
new_password="_"+output.decode("utf-8")+"_"
return new_password,200
except subprocess.CalledProcessError as e:
return e.output.decode("utf-8"), 400
api.add_resource(Password_create, '/password/create')
#*************************************************
#*******PAHEKO************************************
#*************************************************
class Paheko_categories(Resource):
@jwt_required()
def get(self):
"""
Récupérer les catégories Paheko avec le compteur associé
---
tags:
- Paheko
security:
- Bearer: []
parameters: []
responses:
200:
description: Liste des catégories Paheko
404:
description: oops
"""
global paheko_ident, paheko_pass, paheko_url
auth = (paheko_ident, paheko_pass)
api_url = paheko_url + '/api/user/categories'
response = requests.get(api_url, auth=auth)
if response.status_code == 200:
data = response.json()
return jsonify(data)
else:
return jsonify({'error': 'La requête a échoué'}), response.status_code
api.add_resource(Paheko_categories, '/paheko/user/categories')
#*************************************************
class Paheko_users(Resource):
@jwt_required()
def get(self,categorie):
"""
Afficher les membres d'une catégorie Paheko
---
tags:
- Paheko
security:
- Bearer: []
parameters:
- in: path
name: categorie
type: string
required: true
responses:
200:
description: Liste des membres une catégorie Paheko
404:
description: oops
"""
global paheko_ident, paheko_pass, paheko_url
auth = (paheko_ident, paheko_pass)
if not categorie.isdigit():
return 'Id de category non valide', 400
api_url = paheko_url + '/api/user/category/'+categorie+'.json'
response = requests.get(api_url, auth=auth)
if response.status_code == 200:
data = response.json()
return jsonify(data)
else:
return jsonify({'error': 'La requête a échoué'}), response.status_code
api.add_resource(Paheko_users, '/paheko/user/category/<categorie>')
#*************************************************
class Paheko_user(Resource):
def __init__(self):
global paheko_ident, paheko_pass, paheko_url
self.paheko_ident = paheko_ident
self.paheko_pass = paheko_pass
self.paheko_url = paheko_url
self.auth = (self.paheko_ident, self.paheko_pass)
@jwt_required()
def get(self,ident):
"""
Afficher un membre de Paheko par son email kaz ou son numéro ou le non court de l'orga
---
tags:
- Paheko
security:
- Bearer: []
parameters:
- in: path
name: ident
type: string
required: true
description: possible d'entrer un numéro, un email, le nom court de l'orga
responses:
200:
description: Existe et affiche
404:
description: N'existe pas
"""
emailmatchregexp = re.compile(r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$")
if emailmatchregexp.match(ident):
data = { "sql": f"select * from users where email='{ident}' or alias = '{ident}'" }
api_url = self.paheko_url + '/api/sql/'
response = requests.post(api_url, auth=self.auth, data=data)
#TODO: if faut Rechercher count et vérifier que = 1 et supprimer le count=1 dans la réponse
elif ident.isdigit():
api_url = self.paheko_url + '/api/user/'+ident
response = requests.get(api_url, auth=self.auth)
else:
nomorga = re.sub(r'\W+', '', ident) # on vire les caractères non alphanumérique
data = { "sql": f"select * from users where admin_orga=1 and nom_orga='{nomorga}'" }
api_url = self.paheko_url + '/api/sql/'
response = requests.post(api_url, auth=self.auth, data=data)
#TODO:if faut Rechercher count et vérifier que = 1 et supprimer le count=1 dans la réponse
if response.status_code == 200:
data = response.json()
if data["count"] == 1:
return jsonify(data["results"][0])
elif data["count"] == 0:
return "pas de résultat", 400
else:
return "Plusieurs utilisateurs correspondent ?!", 400
else:
#return jsonify({'error': 'La requête a échoué'}), response.status_code
return "pas de résultat", response.status_code
#*************************************************
@jwt_required()
def put(self,ident,field,new_value):
"""
Modifie la valeur d'un champ d'un membre paheko (ident= numéro paheko ou email kaz)
---
tags:
- Paheko
security:
- Bearer: []
parameters:
- in: path
name: ident
type: string
required: true
description: possible d'entrer le numéro paheko, un email kaz
- in: path
name: field
type: string
required: true
description: un champ de la table users de la base paheko
- in: path
name: new_value
type: string
required: true
description: la nouvelle valeur à remplacer
responses:
200:
description: Modification effectuée avec succès
400:
description: Oops, ident non trouvé ou incohérent
404:
description: Oops, modification du champ KO
"""
#récupérer le numero paheko si on fournit un email kaz
emailmatchregexp = re.compile(r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$")
if emailmatchregexp.match(ident):
data = { "sql": f"select id from users where email='{ident}'" }
api_url = self.paheko_url + '/api/sql/'
response = requests.post(api_url, auth=self.auth, data=data)
if response.status_code == 200:
#on extrait l'id de la réponse
data = response.json()
if data['count'] == 0:
return "email non trouvé", 400
elif data['count'] > 1:
return "trop de résultat", 400
else:
#OK
ident = data['results'][0]['id']
else:
return "pas de résultat", response.status_code
elif not ident.isdigit():
return "Identifiant utilisateur invalide", response.status_code
regexp = re.compile("[^a-zA-Z0-9 \\r\\n\\t" + re.escape(string.punctuation) + "]")
valeur = regexp.sub('',new_value) # mouais, il faudrait être beaucoup plus précis ici en fonction des champs qu'on accepte...
champ = re.sub(r'\W+','',field) # pas de caractères non alphanumériques ici, dans l'idéal, c'est à choisir dans une liste plutot
api_url = self.paheko_url + '/api/user/'+str(ident)
payload = {champ: valeur}
response = requests.post(api_url, auth=self.auth, data=payload)
return response.json(),response.status_code
#*************************************************
api.add_resource(Paheko_user, '/paheko/user/<ident>', endpoint='paheko_get_user', methods=['GET'])
api.add_resource(Paheko_user, '/paheko/user/<ident>/<string:field>/<string:new_value>', endpoint='paheko_maj_user', methods=['PUT'])
#*************************************************
class Paheko_users_action(Resource):
def __init__(self):
global paheko_ident, paheko_pass, paheko_url
self.paheko_ident = paheko_ident
self.paheko_pass = paheko_pass
self.paheko_url = paheko_url
@jwt_required()
def get(self, action):
"""
retourne tous les membres de paheko avec une action à mener (création du compte kaz / modification...)
---
tags:
- Paheko
security:
- Bearer: []
parameters:
- in: path
name: action
type: string
required: true
enum: ['A créer','A modifier','En attente','Aucune']
responses:
200:
description: liste des nouveaux kaznautes à créer
404:
description: aucun nouveau kaznaute à créer
"""
auth = (self.paheko_ident, self.paheko_pass)
api_url = self.paheko_url + '/api/sql/'
payload = { "sql": f"select * from users where action_auto='{action}'" }
response = requests.post(api_url, auth=auth, data=payload)
if response.status_code == 200:
return response.json(),200
else:
return "pas de résultat", response.status_code
api.add_resource(Paheko_users_action, '/paheko/users/<string:action>')
#*************************************************
#*******MATTERMOST********************************
#*************************************************
# on utilise mmctl et pas l'apiv4 de MM
# pourquoi ? passe que mmctl déjà utilisé dans les scripts kaz.
#*************************************************
def Mattermost_authenticate():
# Authentification sur MM
global mattermost_url, mattermost_user, mattermost_pass
cmd = f"/mm/mmctl auth login {mattermost_url} --name local-server --username {mattermost_user} --password {mattermost_pass}"
subprocess.run(cmd, shell=True, stderr=subprocess.STDOUT, check=True)
#*************************************************
class Mattermost_message(Resource):
@jwt_required()
def post(self,message,equipe="kaz",canal="creation-comptes"):
"""
Envoyer un message dans une Equipe/Canal de MM
---
tags:
- Mattermost
security:
- Bearer: []
parameters:
- in: path
name: equipe
type: string
required: true
- in: path
name: canal
type: string
required: true
- in: path
name: message
type: string
required: true
responses:
200:
description: Affiche un message dans un canal d'une équipe
500:
description: oops
"""
Mattermost_authenticate()
try:
cmd="/mm/mmctl post create "+equipe+":"+canal+" --message "+ "\"" + message + "\""
output = subprocess.check_output(cmd, shell=True, stderr=subprocess.STDOUT)
return "Message envoyé", 200
except subprocess.CalledProcessError:
return "Message non envoyé", 500
api.add_resource(Mattermost_message, '/mattermost/message/<equipe>/<canal>/<message>')
#*************************************************
class Mattermost_user(Resource):
def __init__(self):
Mattermost_authenticate()
#*************************************************
@jwt_required()
def get(self,user):
"""
Le user existe t-il sur MM ?
---
tags:
- Mattermost User
security:
- Bearer: []
parameters:
- in: path
name: user
type: string
required: true
description: possible d'entrer un username, un email
responses:
200:
description: Existe
404:
description: N'existe pas
"""
try:
cmd = f"/mm/mmctl user search {user} --json"
user_list_output = subprocess.check_output(cmd, shell=True)
return 200 # Le nom d'utilisateur existe
except subprocess.CalledProcessError:
return 404 # Le nom d'utilisateur n'existe pas
#*************************************************
@jwt_required()
def post(self,user,email,password):
"""
Créer un utilisateur sur MM
---
tags:
- Mattermost User
security:
- Bearer: []
parameters:
- in: path
name: user
type: string
required: true
- in: path
name: email
type: string
required: true
- in: path
name: password
type: string
required: true
responses:
200:
description: Utilisateur créé
400:
description: oops, Utilisateur non créé
"""
# Création de l'utilisateur
try:
cmd = f"/mm/mmctl user create --email {email} --username {user} --password {password}"
output = subprocess.check_output(cmd, shell=True, stderr=subprocess.STDOUT)
return output.decode("utf-8"), 200
except subprocess.CalledProcessError as e:
return e.output.decode("utf-8"), 400
#*************************************************
@jwt_required()
def delete(self,email):
"""
Supprimer un utilisateur sur MM
---
tags:
- Mattermost User
security:
- Bearer: []
parameters:
- in: path
name: email
type: string
required: true
responses:
200:
description: Utilisateur supprimé
400:
description: oops, Utilisateur non supprimé
"""
try:
cmd = f"/mm/mmctl user delete {email} --confirm"
output = subprocess.check_output(cmd, shell=True, stderr=subprocess.STDOUT)
return output.decode("utf-8"), 200
except subprocess.CalledProcessError as e:
return e.output.decode("utf-8"), 400
#*************************************************
@jwt_required()
def put(self,email,new_password):
"""
Changer un password pour un utilisateur de MM
---
tags:
- Mattermost User
security:
- Bearer: []
parameters:
- in: path
name: email
type: string
required: true
- in: path
name: new_password
type: string
required: true
responses:
200:
description: Mot de passe de l'Utilisateur changé
400:
description: oops, Mot de passe de l'Utilisateur inchangé
"""
try:
cmd = f"/mm/mmctl user change-password {email} --password {new_password}"
output = subprocess.check_output(cmd, shell=True, stderr=subprocess.STDOUT)
return output.decode("utf-8"), 200
except subprocess.CalledProcessError as e:
return e.output.decode("utf-8"), 400
#*************************************************
api.add_resource(Mattermost_user, '/mattermost/user/<string:user>', endpoint='mattermost_get_user', methods=['GET'])
api.add_resource(Mattermost_user, '/mattermost/user/create/<string:user>/<string:email>/<string:password>', endpoint='mattermost_create_user', methods=['POST'])
api.add_resource(Mattermost_user, '/mattermost/user/delete/<string:email>', endpoint='mattermost_delete_user', methods=['DELETE'])
api.add_resource(Mattermost_user, '/mattermost/user/change/password/<string:email>/<string:new_password>', endpoint='mattermost_change_user_password', methods=['PUT'])
#*************************************************
class Mattermost_user_team(Resource):
@jwt_required()
def post(self,email,equipe):
"""
Affecte un utilisateur à une équipe MM
---
tags:
- Mattermost Team
security:
- Bearer: []
parameters:
- in: path
name: email
type: string
required: true
- in: path
name: equipe
type: string
required: true
responses:
200:
description: l'utilisateur a bien été affecté à l'équipe
400:
description: oops, Utilisateur non affecté
"""
Mattermost_authenticate()
try:
cmd = f"/mm/mmctl team users add {equipe} {email}"
output = subprocess.check_output(cmd, shell=True, stderr=subprocess.STDOUT)
return output.decode("utf-8"), 200
except subprocess.CalledProcessError as e:
return e.output.decode("utf-8"), 400
api.add_resource(Mattermost_user_team, '/mattermost/user/team/<string:email>/<string:equipe>')
#*************************************************
class Mattermost_user_channel(Resource):
@jwt_required()
def post(self,email,equipe,canal):
"""
Affecte un utilisateur à un canal MM
---
tags:
- Mattermost
security:
- Bearer: []
parameters:
- in: path
name: email
type: string
required: true
- in: path
name: equipe
type: string
required: true
- in: path
name: canal
type: string
required: true
responses:
200:
description: l'utilisateur a bien été affecté au canal
400:
description: oops, Utilisateur non affecté
"""
Mattermost_authenticate()
try:
cmd = f'/mm/mmctl channel users add {equipe}:{canal} {email}'
output = subprocess.check_output(cmd, shell=True, stderr=subprocess.STDOUT)
return output.decode("utf-8"), 200
except subprocess.CalledProcessError as e:
return e.output.decode("utf-8"), 400
api.add_resource(Mattermost_user_channel, '/mattermost/user/channel/<string:email>/<string:equipe>/<string:canal>')
#*************************************************
class Mattermost_team(Resource):
def __init__(self):
Mattermost_authenticate()
#*************************************************
@jwt_required()
def get(self):
"""
Lister les équipes sur MM
---
tags:
- Mattermost Team
security:
- Bearer: []
parameters: []
responses:
200:
description: liste des équipes
400:
description: oops, Equipe non supprimée
"""
Mattermost_authenticate()
try:
cmd = f"/mm/mmctl team list --disable-pager"
output = subprocess.check_output(cmd, shell=True, stderr=subprocess.STDOUT)
data_list = output.decode("utf-8").strip().split('\n')
data_list.pop()
return data_list, 200
#return jsonify(data_list),200
except subprocess.CalledProcessError as e:
return e.output.decode("utf-8"), 400
#*************************************************
@jwt_required()
def post(self,equipe,email):
"""
Créer une équipe sur MM et affecter un admin si email est renseigné (set admin marche pô)
---
tags:
- Mattermost Team
security:
- Bearer: []
parameters:
- in: path
name: equipe
type: string
required: true
- in: path
name: email
type: string
required: true
description: admin de l'équipe
responses:
200:
description: Equipe créée
400:
description: oops, Equipe non créée
"""
try:
#DANGER: l'option --email ne rend pas le user admin de l'équipe comme c'est indiqué dans la doc :(
cmd = f"/mm/mmctl team create --name {equipe} --display-name {equipe} --private --email {email}"
output = subprocess.check_output(cmd, shell=True, stderr=subprocess.STDOUT)
#Workaround: on récup l'id du user et de l'équipe pour affecter le rôle "scheme_admin": true, "scheme_user": true avec l'api MM classique.
#TODO:
return output.decode("utf-8"), 200
except subprocess.CalledProcessError as e:
return e.output.decode("utf-8"), 400
#*************************************************
@jwt_required()
def delete(self,equipe):
"""
Supprimer une équipe sur MM
---
tags:
- Mattermost Team
security:
- Bearer: []
parameters:
- in: path
name: equipe
type: string
required: true
responses:
200:
description: Equipe supprimée
400:
description: oops, Equipe non supprimée
"""
Mattermost_authenticate()
try:
cmd = f"/mm/mmctl team delete {equipe} --confirm"
output = subprocess.check_output(cmd, shell=True, stderr=subprocess.STDOUT)
return output.decode("utf-8"), 200
except subprocess.CalledProcessError as e:
return e.output.decode("utf-8"), 400
#*************************************************
api.add_resource(Mattermost_team, '/mattermost/team/list',endpoint='mattermost_team_list', methods=['GET'])
api.add_resource(Mattermost_team, '/mattermost/team/create/<equipe>/<email>',endpoint='mattermost_team_create', methods=['POST'])
api.add_resource(Mattermost_team, '/mattermost/team/delete/<equipe>',endpoint='mattermost_team_delete', methods=['DELETE'])
#*************************************************
#***** LDAP **************************************
#*************************************************
class Ldap_user(Resource):
def __init__(self):
global ldap_admin, ldap_pass, ldap_root, ldap_host
self.ldap_admin = ldap_admin
self.ldap_pass = ldap_pass
self.ldap_root = ldap_root
self.ldap_host = f"ldap://{ldap_host}"
def _connect_ldap(self):
ldap_connection = ldap.initialize(self.ldap_host)
ldap_connection.simple_bind_s("cn={},{}".format(self.ldap_admin, self.ldap_root), self.ldap_pass)
return ldap_connection
@classmethod
def is_valid_field(cls, field):
allowed_fields = ['mailDeSecours', 'mailEnabled', 'nextcloudEnabled', 'mobilizonEnabled', 'agoraEnabled', 'userPassword', 'identifiantKaz', 'mailAlias', 'quota']
return field in allowed_fields
@jwt_required()
def get(self, email):
"""
Vérifier si un utilisateur avec cet email existe dans le LDAP soit comme mail principal soit comme alias
---
tags:
- Ldap
security:
- Bearer: []
parameters:
- in: path
name: email
type: string
required: true
responses:
200:
description: Existe
400:
description: N'existe pas
401:
description: oops, email invalide
402:
description: oops, autre erreur
"""
try:
if not validate_email(email):
return "Adresse e-mail non valide", 400
ldap_connection = self._connect_ldap()
#result = ldap_connection.search_s("ou=users,{}".format(self.ldap_root), ldap.SCOPE_SUBTREE, "(cn={})".format(email))
# Créer une chaîne de filtre pour rechercher dans les champs "cn" et "mailAlias"
filter_str = "(|(cn={})(mailAlias={}))".format(email, email)
result = ldap_connection.search_s("ou=users,{}".format(self.ldap_root), ldap.SCOPE_SUBTREE, filter_str)
ldap_connection.unbind_s()
if result:
return True, 200
else:
return False, 400
except EmailNotValidError as e:
return str(e), 401
except ldap.LDAPError as e:
return str(e), 402
#*************************************************
@jwt_required()
def delete(self, email):
"""
Supprimer un utilisateur du LDAP par son adresse e-mail
---
tags:
- Ldap
security:
- Bearer: []
parameters:
- in: path
name: email
type: string
required: true
responses:
200:
description: Utilisateur supprimé avec succès
404:
description: Utilisateur non trouvé dans le LDAP
400:
description: Erreur lors de la suppression de l'utilisateur
"""
try:
if not validate_email(email):
return "Adresse e-mail non valide", 400
ldap_connection = self._connect_ldap()
# Recherche de l'utilisateur
result = ldap_connection.search_s("ou=users,{}".format(self.ldap_root), ldap.SCOPE_SUBTREE, "(cn={})".format(email))
if not result:
return False, 404 # Utilisateur non trouvé
# Récupération du DN de l'utilisateur
dn = result[0][0]
# Suppression de l'utilisateur
ldap_connection.delete_s(dn)
ldap_connection.unbind_s()
return True, 200 # Utilisateur supprimé avec succès
except ldap.NO_SUCH_OBJECT:
return False, 404 # Utilisateur non trouvé
except ldap.LDAPError as e:
return str(e), 400 # Erreur lors de la suppression
except EmailNotValidError as e:
return str(e), 400
#*************************************************
@jwt_required()
def post(self, email):
"""
Ajouter, supprimer ou modifier un champ pour l'utilisateur LDAP
---
tags:
- Ldap
security:
- Bearer: []
parameters:
- in: path
name: email
type: string
required: true
- in: query
name: action
type: string
required: true
enum: ['add', 'delete', 'modify']
- in: body
name: data
required: true
schema:
type: object
properties:
field:
type: string
enum: ['mailDeSecours', 'mailEnabled', 'nextcloudEnabled', 'mobilizonEnabled', 'agoraEnabled', 'userPassword', 'identifiantKaz', 'mailAlias', 'quota']
description: Le champ à ajouter, supprimer ou modifier (par exemple, mailDeSecours, mailAlias, etc.)
value:
type: string
description: La valeur à ajouter, supprimer ou modifier pour le champ spécifié
responses:
200:
description: Opération réussie
404:
description: Utilisateur non trouvé dans le LDAP
400:
description: Erreur lors de l'opération
"""
try:
if not validate_email(email):
return "Adresse e-mail non valide", 400
action = request.args.get('action')
field = request.json.get('field')
value = request.json.get('value')
if not action or not field or not value:
return "Action, champ ou valeur manquant", 400
if not self.is_valid_field(field):
return "Champ non autorisé", 400
ldap_connection = self._connect_ldap()
result = ldap_connection.search_s("ou=users,{}".format(self.ldap_root), ldap.SCOPE_SUBTREE, "(cn={})".format(email))
if not result:
return False, 404
dn = result[0][0]
if field == 'userPassword' and (action == 'add' or action == 'modify'):
password_chiffre = sha512_crypt.hash(value)
value = "{{CRYPT}}{}".format(password_chiffre)
if action == 'add':
mod_attrs = [(ldap.MOD_ADD, field, value.encode('utf-8'))]
elif action == 'delete':
mod_attrs = [(ldap.MOD_DELETE, field, value.encode('utf-8'))]
elif action == 'modify':
if field == 'quota':
mail_quota_value = value + 'G'
nextcloud_quota_value = value + " GB"
mod_attrs = [
(ldap.MOD_REPLACE, 'quota', value.encode('utf-8')),
(ldap.MOD_REPLACE, 'mailQuota', mail_quota_value.encode('utf-8')),
(ldap.MOD_REPLACE, 'nextcloudQuota', nextcloud_quota_value.encode('utf-8'))
]
else:
mod_attrs = [(ldap.MOD_REPLACE, field, value.encode('utf-8'))]
else:
return "Action non valide", 400
ldap_connection.modify_s(dn, mod_attrs)
ldap_connection.unbind_s()
return True, 200
except ldap.NO_SUCH_OBJECT:
return False, 404
except ldap.LDAPError as e:
return str(e), 400
except EmailNotValidError as e:
return str(e), 400
#*************************************************
@jwt_required()
def put(self, email, **kwargs):
"""
Créer une nouvelle entrée dans le LDAP pour un nouvel utilisateur. QUESTION: A QUOI SERVENT PRENOM/NOM/IDENT_KAZ DANS LE LDAP ? POURQUOI 3 QUOTA ?
---
tags:
- Ldap
security:
- Bearer: []
parameters:
- in: path
name: email
type: string
required: true
- in: body
name: data
required: true
schema:
type: object
properties:
prenom:
type: string
description: Prénom de l'utilisateur
nom:
type: string
description: Nom de l'utilisateur
password:
type: string
description: Mot de passe de l'utilisateur
email_secours:
type: string
description: Adresse e-mail de secours
quota:
type: string
description: Quota de l'utilisateur
responses:
200:
description: Utilisateur ajouté avec succès
400:
description: Erreur lors de l'ajout de l'utilisateur
406:
description: Erreur utilisateur déjà existant
"""
try:
if kwargs: # appel depuis une autre api
email_secours = kwargs.get('email_secours')
prenom = kwargs.get('prenom')
nom = kwargs.get('nom')
password = kwargs.get('password')
quota = kwargs.get('quota')
else: # appel depuis swagger
email_secours = request.json.get('email_secours')
nom = request.json.get('nom')
prenom = request.json.get('prenom')
password = request.json.get('password')
quota = request.json.get('quota')
password_chiffre = sha512_crypt.hash(password)
if not validate_email(email) or not validate_email(email_secours):
return "Adresse e-mail ou secours non valide", 400
#le user existe t-il déjà ?
ldap_connection = self._connect_ldap()
result = ldap_connection.search_s("ou=users,{}".format(self.ldap_root), ldap.SCOPE_SUBTREE, "(cn={})".format(email))
if result:
return "User déjà existant", 406
# Construire le DN
dn = f"cn={email},ou=users,{ldap_root}"
mod_attrs = [
('objectClass', [b'inetOrgPerson', b'PostfixBookMailAccount', b'nextcloudAccount', b'kaznaute']),
('sn', f'{prenom} {nom}'.encode('utf-8')),
('mail', email.encode('utf-8')),
('mailEnabled', b'TRUE'),
('mailGidNumber', b'5000'),
('mailHomeDirectory', f"/var/mail/{email.split('@')[1]}/{email.split('@')[0]}/".encode('utf-8')),
('mailQuota', f'{quota}G'.encode('utf-8')),
('mailStorageDirectory', f"maildir:/var/mail/{email.split('@')[1]}/{email.split('@')[0]}/".encode('utf-8')),
('mailUidNumber', b'5000'),
('mailDeSecours', email_secours.encode('utf-8')),
('identifiantKaz', f'{prenom.lower()}.{nom.lower()}'.encode('utf-8')),
('quota', str(quota).encode('utf-8')),
('nextcloudEnabled', b'TRUE'),
('nextcloudQuota', f'{quota} GB'.encode('utf-8')),
('mobilizonEnabled', b'TRUE'),
('agoraEnabled', b'TRUE'),
('userPassword', f'{{CRYPT}}{password_chiffre}'.encode('utf-8')),
('cn', email.encode('utf-8'))
]
ldap_connection.add_s(dn, mod_attrs)
ldap_connection.unbind_s()
return "Utilisateur créé dand le ldap", 200
except ldap.LDAPError as e:
return str(e), 400
except EmailNotValidError as e:
return str(e), 400
#*************************************************
# Définition des routes avec les méthodes correspondantes
api.add_resource(Ldap_user, '/ldap/user/<string:email>', endpoint='ldap_user_get', methods=['GET'])
api.add_resource(Ldap_user, '/ldap/user/delete/<string:email>', endpoint='ldap_user_delete', methods=['DELETE'])
api.add_resource(Ldap_user, '/ldap/user/change/<string:email>', endpoint='ldap_user_change', methods=['POST'])
api.add_resource(Ldap_user, '/ldap/user/add/<string:email>', endpoint='ldap_user_add', methods=['PUT'])
#*************************************************
#***** CLOUD **************************************
#*************************************************
#TODO: pas réussi à faire une seule classe Cloud_user avec 2 méthodes get/delete
class Cloud_user(Resource):
@jwt_required()
def get(self, email):
"""
Existe dans le cloud général ?
---
tags:
- Cloud
security:
- Bearer: []
parameters:
- in: path
name: email
type: string
required: true
responses:
200:
description: Succès
404:
description: L'utilisateur n'existe pas
500:
description: Erreur interne du serveur
"""
global cloud_ident, cloud_pass, cloud_url
try:
auth = (cloud_ident, cloud_pass)
api_url = f"{cloud_url}/ocs/v1.php/cloud/users?search={email}"
headers = {"OCS-APIRequest": "true"}
response = requests.get(api_url, auth=auth, headers=headers)
if response.status_code == 200:
if re.search(r'<element>.*</element>', response.text):
return 200
else:
return 404
except Exception as e:
return jsonify({'error': str(e)}), 500
api.add_resource(Cloud_user, '/cloud/user/<string:email>')
#*************************************************
class Cloud_user_delete(Resource):
@jwt_required()
def delete(self, email):
"""
Supprime le compte dans le cloud général
QUESTION: A PRIORI INUTILE CAR LIE AU LDAP
---
tags:
- Cloud
security:
- Bearer: []
parameters:
- in: path
name: email
type: string
required: true
responses:
200:
description: Succès, l'utilisateur a été supprimé du cloud général
404:
description: Oops, l'utilisateur n'a pas été supprimé du cloud général
500:
description: Erreur interne du serveur
"""
global cloud_ident, cloud_pass, cloud_url
try:
auth = (cloud_ident, cloud_pass)
api_url = f"{cloud_url}/ocs/v1.php/cloud/users?search={email}"
headers = {"OCS-APIRequest": "true"}
response = requests.delete(api_url, auth=auth, headers=headers)
if response.status_code == 200:
if re.search(r'<element>.*</element>', response.text):
return 200
else:
return 404
except Exception as e:
return jsonify({'error': str(e)}), 500
api.add_resource(Cloud_user_delete, '/cloud/user/delete/<string:email>')
#*************************************************
# class Cloud_user_change(Resource):
# @jwt_required()
# def put(self, email, new_password):
# """
# Modifie le mot de passe d'un Utilisateur dans le cloud général: QUESTION: A PRIORI INUTILE CAR LIE AU LDAP
# ---
# tags:
# - Cloud
# security:
# - Bearer: []
# parameters:
# - in: path
# name: email
# type: string
# required: true
# - in: path
# name: new_password
# type: string
# required: true
# responses:
# 200:
# description: Succès, mot de passe changé
# 404:
# description: Oops, mot de passe NON changé
# 500:
# description: Erreur interne du serveur
# """
#
# global cloud_ident, cloud_pass, cloud_url
#
# try:
# auth = (cloud_ident, cloud_pass)
# api_url = f"{cloud_url}/ocs/v1.php/cloud/users?search={email}"
# headers = {"OCS-APIRequest": "true"}
# data = {
# "key": "password",
# "value": new_password
# }
# response = requests.put(api_url, auth=auth, headers=headers)
#
# if response.status_code == 200:
# if re.search(r'<element>.*</element>', response.text):
# return 200
# else:
# return 404
#
# except Exception as e:
# return jsonify({'error': str(e)}), 500
#
# api.add_resource(Cloud_user_change, '/cloud/user/change/<string:email>/<string:new_password>')
#*************************************************
#***** SYMPA **************************************
#*************************************************
class Sympa_user(Resource):
def __init__(self):
global sympa_ident, sympa_pass, sympa_url,MAIL_USERNAME
self.sympa_ident = sympa_ident
self.sympa_pass = sympa_pass
self.sympa_url = sympa_url
def _execute_sympa_command(self, email, liste, service):
try:
if validate_email(email) and validate_email(liste):
cmd = f'export PERL5LIB=/usr/src/app/:$PERL5LIB && /usr/src/app/Sympa/sympa_soap_client.pl --soap_url={self.sympa_url}/sympasoap --trusted_application={self.sympa_ident} --trusted_application_password={self.sympa_pass} --proxy_vars=USER_EMAIL={MAIL_USERNAME} --service={service} --service_parameters="{liste},{email}" && echo $?'
output = subprocess.check_output(cmd, shell=True, stderr=subprocess.STDOUT)
return output.decode("utf-8"), 200 # Retourne la sortie et un code de succès
except EmailNotValidError as e:
return str(e), 400 # Retourne le message d'erreur et un code d'erreur 400
except subprocess.CalledProcessError as e:
return e.output.decode("utf-8"), 400 # Retourne la sortie de la commande et un code d'erreur 400
@jwt_required()
def post(self, email, liste):
"""
Ajouter un email dans une liste sympa
---
tags:
- Sympa
security:
- Bearer: []
parameters:
- in: path
name: email
type: string
required: true
- in: path
name: liste
type: string
required: true
responses:
200:
description: Succès, email ajouté dans la liste
400:
description: Oops, email non ajouté dans la liste
"""
output, status_code = self._execute_sympa_command(email, liste, 'add')
return output, status_code
@jwt_required()
def delete(self, email, liste):
"""
Supprimer un email dans une liste sympa
---
tags:
- Sympa
security:
- Bearer: []
parameters:
- in: path
name: email
type: string
required: true
- in: path
name: liste
type: string
required: true
responses:
200:
description: Succès, email supprimé dans la liste
400:
description: Oops, email non supprimé dans la liste
"""
output, status_code = self._execute_sympa_command(email, liste, 'del')
return output, status_code
api.add_resource(Sympa_user, '/sympa/user/<string:email>/<string:liste>')
#*************************************************
#***** QUOTA **************************************
#*************************************************
class Quota(Resource):
#https://doc.dovecot.org/configuration_manual/authentication/master_users/
#https://blog.debugo.fr/serveur-messagerie-dovecot/
# sur kazkouil.fr, j'ai modifié /etc/dovecot/conf.d/20-lmtp.conf
#mail_plugins = $mail_plugins sieve quota
@jwt_required()
def get(self, email):
"""
Récupérer la place prise par une BAL (EN COURS)
---
tags:
- Quota
security:
- Bearer: []
parameters:
- in: path
name: email
type: string
required: true
responses:
200:
description: Succès, taille d'une BAL'
400:
description: Oops, échec dans l'affichage de la taille d'une BAL
"""
global serveur_imap, mot_de_passe_mail
try:
if validate_email(email):
# Connexion au serveur IMAP
mail = imaplib.IMAP4_SSL(serveur_imap)
mail.login(email, mot_de_passe_mail)
#res, data = mail.select("INBOX")
#return data[0].decode("utf-8"), 200
# Requête pour obtenir le quota de la boîte aux lettres
# res, data = mail.getquota("INBOX")
# return str(data[0]).split()
res, data = mail.getquotaroot("INBOX")
return str(data[1]).split()[3]+" KB"
# Fermeture de la connexion
mail.close()
mail.logout()
else:
return "Email non valide", 400
except imaplib.IMAP4.error as e:
return str(e), 400 # Retourne le message d'erreur et un code d'erreur 400
except EmailNotValidError as e:
return str(e), 400 # Retourne le message d'erreur et un code d'erreur 400
api.add_resource(Quota, '/quota/<string:email>')
#*************************************************
#***** DNS **************************************
#*************************************************
class Dns_serveurs(Resource):
def __init__(self):
global gandi_key, gandi_url_api
self.gandi_key = gandi_key
self.gandi_url_api = gandi_url_api
@jwt_required()
def get(self):
"""
Renvoie tous les serveurs kaz de la zone dns
---
tags:
- Dns
security:
- Bearer: []
responses:
200:
description: Succès, liste des serveurs
404:
description: Oops, soucis...
"""
url = f"{self.gandi_url_api}/records/srv/TXT"
headers = { "Authorization": f"Apikey {self.gandi_key}" }
response = requests.get(url, headers=headers)
if response.status_code != 200:
return response.json(),response.status_code
# Extraire la liste des serveurs de la réponse JSON
rrset_values = response.json()["rrset_values"]
# Nettoyer la liste
serveurs = [serveur.strip('"') for serveur in rrset_values[0].split(";")]
return serveurs, 200
api.add_resource(Dns_serveurs, '/dns/')
#*************************************************
class Dns(Resource):
def __init__(self):
global gandi_key, gandi_url_api
self.gandi_key = gandi_key
self.gandi_url_api = gandi_url_api
self.dns_serveurs_resource = Dns_serveurs()
#*************************************************
@jwt_required()
def get(self,sdomaine):
"""
Le sous-domaine existe t-il dans la zone dns avec un enreg CNAME ?
---
tags:
- Dns
security:
- Bearer: []
parameters:
- in: path
name: sdomaine
type: string
required: true
responses:
200:
description: Succès, sdomaine existe déjà avec un CNAME
404:
description: Oops, sdomaine non trouvé
"""
url = f"{self.gandi_url_api}/records/{sdomaine}/CNAME"
headers = { "Authorization": f"Apikey {self.gandi_key}" }
response = requests.get(url, headers=headers)
return response.json(),response.status_code
#*************************************************
@jwt_required()
def delete(self,sdomaine):
"""
suppression du sdomaine
---
tags:
- Dns
security:
- Bearer: []
parameters:
- in: path
name: sdomaine
type: string
required: true
responses:
204:
description: Succès, sdomaine supprimé
404:
description: Oops,
"""
url = f"{self.gandi_url_api}/records/{sdomaine}"
headers = { "Authorization": f"Apikey {self.gandi_key}" }
response = requests.delete(url, headers=headers)
return response.text,response.status_code
#*************************************************
@jwt_required()
def post(self,sdomaine,serveur):
"""
Créé le sous-domaine de type CNAME qui pointe sur serveur
---
tags:
- Dns
security:
- Bearer: []
parameters:
- in: path
name: sdomaine
type: string
required: true
- in: path
name: serveur
type: string
required: true
description: Le serveur doit être l'un des serveurs disponibles dans la zone DNS
responses:
201:
description: Succès, sdomaine créé comme CNAME
400:
description: Oops, serveur inconnu, sdomaine non créé
404:
description: Oops, sdomaine non créé
200,409:
description: Oops, sdomaine déjà créé
"""
# Récupérer la liste des serveurs disponibles
serveurs_disponibles, status_code = self.dns_serveurs_resource.get()
if status_code != 200:
return serveurs_disponibles, status_code
if serveur not in serveurs_disponibles:
return f"Erreur: Le serveur {serveur} n'est pas disponible", 400
url = f"{self.gandi_url_api}/records/{sdomaine}/CNAME"
headers = { "Authorization": f"Apikey {self.gandi_key}" }
payload = f'{{"rrset_values":["{serveur}"]}}'
response = requests.post(url, data=payload, headers=headers)
return response.json(),response.status_code
#*************************************************
api.add_resource(Dns, '/dns/<string:sdomaine>', endpoint='dns_get', methods=['GET'])
api.add_resource(Dns, '/dns/<string:sdomaine>', endpoint='dns_delete', methods=['DELETE'])
api.add_resource(Dns, '/dns/<string:sdomaine>/<string:serveur>', endpoint='dns_post', methods=['POST'])
#*************************************************
#*************************************************
#***** KAZ **************************************
#*************************************************
class Kaz_user(Resource):
def __init__(self):
global sympa_liste_info
self.paheko_users_action_resource = Paheko_users_action()
self.paheko_user_resource=Paheko_user()
self.ldap_user_resource = Ldap_user()
self.password_create_resource = Password_create()
self.mattermost_message_resource=Mattermost_message()
self.mattermost_user_resource=Mattermost_user()
self.mattermost_user_team_resource=Mattermost_user_team()
self.mattermost_user_channel_resource=Mattermost_user_channel()
self.mattermost_team_resource=Mattermost_team()
self.sympa_user_resource=Sympa_user()
#********************************************************************************************
@jwt_required()
def delete(self):
"""
Utile pour les tests de createUser. Avant le POST de /kaz/create/users. Ça permet de supprimer/maj les comptes.
---
tags:
- Kaz User
security:
- Bearer: []
parameters: []
responses:
201:
description: Succès
401:
description: Oops, un soucis quelconque
"""
#verrou pour empêcher de lancer en même temps la même api
try:
prefixe="del_user_lock_"
if glob(f"{tempfile.gettempdir()}/{prefixe}*"): raise ValueError("ERREUR 0 : api déjà en cours d'utilisation !")
lock_file = tempfile.NamedTemporaryFile(prefix=prefixe,delete=True)
#TODO à remplir à la main
liste_emails=["",""]
email_secours=""
liste_sympa=""
for email in liste_emails:
res, status_code = self.ldap_user_resource.delete(email)
res, status_code = self.mattermost_user_resource.delete(email)
nom_orga=''.join(random.choice(string.ascii_lowercase) for _ in range(8))
res, status_code = self.paheko_user_resource.put(email,"nom_orga",nom_orga)
res, status_code = self.paheko_user_resource.put(email,"action_auto","A créer")
res, status_code = self.paheko_user_resource.put(email,"email_secours",email_secours)
res, status_code = self.sympa_user_resource.delete(email,liste_sympa)
res, status_code = self.sympa_user_resource.delete(email_secours,liste_sympa)
msg=f"**POST AUTO** suppression de {email} ok"
self.mattermost_message_resource.post(message=msg)
return "OK", 200
except ValueError as e:
msg=f"(classe: {__class__.__name__} : {e}"
self.mattermost_message_resource.post(message=msg)
return str(msg), 401
#********************************************************************************************
@jwt_required()
def post(self):
"""
Créé un nouveau kaznaute: inscription sur MM / Cloud / email + msg sur MM + email à partir de action="a créer" sur paheko
---
tags:
- Kaz User
security:
- Bearer: []
parameters: []
responses:
201:
description: Succès, kaznaute créé
400:
description: Oops, rien à créer
401:
description: Oops, un soucis quelconque
"""
try:
#verrou pour empêcher de lancer en même temps la même api
prefixe="create_user_lock_"
if glob(f"{tempfile.gettempdir()}/{prefixe}*"): raise ValueError("ERREUR 0 : api déjà en cours d'utilisation !")
lock_file = tempfile.NamedTemporaryFile(prefix=prefixe,delete=True)
#qui sont les kaznautes à créer ?
liste_kaznautes, status_code = self.paheko_users_action_resource.get("A créer")
if liste_kaznautes=="pas de résultat": return "ERREUR: paheko non joignable",401
count=liste_kaznautes['count']
if count==0: return "aucun nouveau kaznaute à créer",400
#au moins un kaznaute à créer
for tab in liste_kaznautes['results']:
email = tab['email'].lower()
# est-il déjà dans le ldap ? (mail ou alias)
res, status_code = self.ldap_user_resource.get(email)
if status_code != 400: raise ValueError(f"ERREUR 1: {email} déjà existant dans ldap. {res}, on arrête tout")
#test nom orga
if tab['admin_orga'] == 1:
if tab['nom_orga'] is None:
raise ValueError(f"ERREUR 0 sur paheko: {email} : nom_orga vide, on arrête tout")
if not bool(re.match(r'^[a-z0-9-]+$', tab['nom_orga'])):
raise ValueError(f"ERREUR 0 sur paheko: {email} : nom_orga ({tab['nom_orga']}) incohérent (minuscule/chiffre/-), on arrête tout")
#test email_secours
email_secours = tab['email_secours'].lower()
if not validate_email(email_secours): raise EmailNotValidError()
#test quota
quota = tab['quota_disque']
if not quota.isdigit(): raise ValueError(f"ERREUR 2: quota non numérique : {quota}, on arrête tout")
#quel type de test ?
#"nom": "ROUSSEAU Mickael",
nom, prenom = tab['nom'].split(maxsplit=1)
#on génère un password
password,status_code = self.password_create_resource.get()
#on créé dans le ldap
#à quoi servent prenom/nom dans le ldap ?
data = {
"prenom": prenom,
"nom": nom,
"password": password,
"email_secours": email_secours,
"quota": quota
}
res, status_code = self.ldap_user_resource.put(email, **data)
if status_code != 200: raise ValueError(f"ERREUR 3 sur ldap: {email} : {res}, on arrête tout ")
#on créé dans MM
user = email.split('@')[0]
res, status_code = self.mattermost_user_resource.post(user,email,password)
if status_code != 200: raise ValueError(f"ERREUR 4 sur MM: {email} : {res}, on arrête tout ")
# et on affecte à l'équipe kaz
res, status_code = self.mattermost_user_team_resource.post(email,"kaz")
if status_code != 200: raise ValueError(f"ERREUR 5 sur MM: {email} : {res}, on arrête tout ")
#et aux 2 canaux de base
res, status_code = self.mattermost_user_channel_resource.post(email,"kaz","une-question--un-soucis")
if status_code != 200: raise ValueError(f"ERREUR 6 sur MM: {email} : {res}, on arrête tout ")
res, status_code = self.mattermost_user_channel_resource.post(email,"kaz","cafe-du-commerce--ouvert-2424h")
if status_code != 200: raise ValueError(f"ERREUR 7 sur MM: {email} : {res}, on arrête tout ")
#on créé une nouvelle équipe ds MM si besoin
if tab['admin_orga'] == 1:
res, status_code = self.mattermost_team_resource.post(tab['nom_orga'],email)
if status_code != 200: raise ValueError(f"ERREUR 8 sur MM: {email} : {res}, on arrête tout ")
#BUG: créer la nouvelle équipe n'a pas rendu l'email admin, on le rajoute comme membre simple
res, status_code = self.mattermost_user_team_resource.post(email,tab['nom_orga'])
if status_code != 200: raise ValueError(f"ERREUR 8.1 sur MM: {email} : {res}, on arrête tout ")
#on créé dans le cloud genéral
#inutile car tous les user du ldap sont user du cloud général.
#on inscrit email et email_secours à la nl sympa_liste_info
res, status_code = self.sympa_user_resource.post(email,sympa_liste_info)
if status_code != 200: raise ValueError(f"ERREUR 9 sur Sympa: {email} : {res}, on arrête tout ")
res, status_code = self.sympa_user_resource.post(email_secours,sympa_liste_info)
if status_code != 200: raise ValueError(f"ERREUR 10 sur Sympa: {email_secours} : {res}, on arrête tout ")
#on construit/envoie le mail
context = {
'ADMIN_ORGA': tab['admin_orga'],
'NOM': tab['nom'],
'EMAIL_SOUHAITE': email,
'PASSWORD': password,
'QUOTA': tab['quota_disque'],
'URL_WEBMAIL': webmail_url,
'URL_AGORA': mattermost_url,
'URL_MDP': mdp_url,
'URL_LISTE': sympa_url,
'URL_SITE': site_url,
'URL_CLOUD': cloud_url
}
subject="KAZ: confirmation d'inscription !"
sender=app.config['MAIL_USERNAME']
reply_to = app.config['MAIL_REPLY_TO']
msg = Message(subject=subject, sender=sender, reply_to=reply_to, recipients=[email,email_secours])
msg.html = render_template('email_inscription.html', **context)
mail.send(msg)
#on met le flag paheko action à Aucune
res, status_code = self.paheko_user_resource.put(email,"action_auto","Aucune")
if status_code != 200: raise ValueError(f"ERREUR 12 sur paheko: {email} : {res}, on arrête tout ")
#on post sur MM pour dire ok
msg=f"**POST AUTO** Inscription réussie pour {email} avec le secours {email_secours} Bisou!"
self.mattermost_message_resource.post(message=msg)
return "fin des inscriptions", 201
except EmailNotValidError as e:
msg=f"classe: {__class__.__name__} : ERREUR 13 : email_secours : {email_secours} " + str(e) +", on arrête tout"
self.mattermost_message_resource.post(message=msg)
return msg, 401
except ValueError as e:
msg=f"(classe: {__class__.__name__} : {e}"
self.mattermost_message_resource.post(message=msg)
return str(msg), 401
#*************************************************
api.add_resource(Kaz_user, '/kaz/create/users', endpoint='kaz_create_user', methods=['POST'])
api.add_resource(Kaz_user, '/kaz/delete/user', endpoint='kaz_delete_user', methods=['DELETE'])
#*************************************************
#**********TEST***********************************
#*************************************************
class Test(Resource):
def __init__(self):
toto="toto"
#self.mattermost_team_resource=Mattermost_team()
#global mattermost_url, sympa_url, webmail_url, mdp_url, site_url, nc_url
#********************************************************************************************
@jwt_required()
def get(self):
"""
Pour tester des conneries: # test lançement de cmde ssh sur des serveurs distants:
---
tags:
- Test
security:
- Bearer: []
parameters: []
responses:
201:
description: OK
401:
description: KO
"""
#********************************************************************************************
# test lançcement de cmde ssh sur des serveurs distants:
# il faut au préalable que la clé publique de root du conteneur apikaz soit dans authorized key du user fabricer de la machine 163.172.94.54
# clé à créer dans le Dockerfile
# risque sécu ?
cmd="ssh -p 2201 fabricer@163.172.94.54 mkdir -p /tmp/toto"
try:
output = subprocess.check_output(cmd, shell=True, stderr=subprocess.STDOUT)
return "ok",200
except subprocess.CalledProcessError as e:
return e.output.decode("utf-8"), 400
#********************************************************************************************
# #***** test suppression de toutes les équipes de MM sauf KAZ
# res,status=self.mattermost_team_resource=Mattermost_team().get()
# for equipe in res:
# if equipe!="kaz": res,status=self.mattermost_team_resource=Mattermost_team().delete(equipe)
# return "fin"
#********************************************************************************************
#**** test messagerie
# NOM="toto"
# EMAIL_SOUHAITE='f@kaz.bzh'
# PASSWORD="toto"
# QUOTA="1"
# ADMIN_ORGA="0"
#
# context = {
# 'ADMIN_ORGA': ADMIN_ORGA,
# 'NOM': NOM,
# 'EMAIL_SOUHAITE': EMAIL_SOUHAITE,
# 'PASSWORD': PASSWORD,
# 'QUOTA': QUOTA,
# 'URL_WEBMAIL': webmail_url,
# 'URL_AGORA': mattermost_url,
# 'URL_MDP': mdp_url,
# 'URL_LISTE': sympa_url,
# 'URL_SITE': site_url,
# 'URL_CLOUD': cloud_url
# }
#
# subject = "KAZ: confirmation d'inscription !"
# sender=app.config['MAIL_USERNAME']
# reply_to = app.config['MAIL_REPLY_TO']
#
# msg = Message(subject=subject, sender=sender, reply_to=reply_to, recipients=[EMAIL_SOUHAITE])
# msg.html = render_template('email_inscription.html', **context)
#
# # Parsez le contenu HTML avec BeautifulSoup
# soup = BeautifulSoup(msg.html, 'html.parser')
# msg.body = soup.get_text()
#
# mail.send(msg)
# return "Message envoyé!"
#********************************************************************************************
# #**** test ms erreur
# email_secours="toto"
#
# msg=f"classe: {__class__.__name__} : ERREUR 8 : email_secours : {email_secours} " +", on arrête tout"
# #return __class__.__name__
# return msg
#********************************************************************************************
#**** test vérou
# prefixe="toto_"
# if glob(f"{tempfile.gettempdir()}/{prefixe}*"):
# return "ERREUR : api déjà en cours d'utilisation !", 400
# else:
# lock_file = tempfile.NamedTemporaryFile(prefix=prefixe,delete=True)
#
# sleep(20)
# return str(lock_file), 201
api.add_resource(Test, '/test', endpoint='test', methods=['GET'])
#*************************************************
#*************************************************
#*************************************************
if __name__ == '__main__':
app.run(host='0.0.0.0', port=os.getenv('PORT'))