KazV2/dockers/apikaz/source/app.py
2024-06-03 18:43:35 +02:00

1852 lines
66 KiB
Python

import os
import re
import requests
import subprocess
import logging
import tempfile
import ldap
import imaplib
import random
import string
import json
from flask import Flask, jsonify, send_from_directory, request, abort, json, Response, render_template
from flask_mail import Mail, Message
from flasgger import Swagger
from flask_restful import Api, Resource
from passlib.hash import sha512_crypt
from unidecode import unidecode
from email_validator import validate_email, EmailNotValidError
from time import sleep
from glob import glob
from bs4 import BeautifulSoup
from datetime import datetime
app = Flask(__name__)
api = Api(app)
app.logger.setLevel(logging.DEBUG)
swagger = Swagger(app, template={
"swagger": "2.0",
"info": {
"title": "L'API Kaz de la mort qui tue",
"version": "0.2.0",
"description": "Permettre des opérations de gestion des services kaz avec des écrans Ouaib"
}
})
#*************************************************
# Configuration du logger
# logging.basicConfig(level=logging.INFO)
# logger = logging.getLogger(__name__) # Création de l'objet logger
#*************************************************
#Filtrer les IP qui peuvent accéder à l'api
trusted_ips = [
"217.108.155.85",
"82.64.20.246",
"31.39.14.228",
"51.75.112.172",
"80.11.47.59",
"90.121.138.71",
"109.190.2.75",
"89.234.177.115",
"80.215.140.40",
"80.67.176.91",
"89.234.177.119",
"78.127.1.19",
"80.215.236.243",
"78.117.86.68",
"80.215.236.168"
]
@app.before_request
def limit_remote_addr():
if request.environ['HTTP_X_FORWARDED_FOR'] not in trusted_ips:
abort(jsonify(message="Et pis quoi encore "+request.environ['HTTP_X_FORWARDED_FOR']), 400)
#*************************************************
@app.route('/print_env')
def print_environment():
# Crée une chaîne de caractères pour stocker les variables d'environnement
env_string = ""
# Itère sur les variables d'environnement et les ajoute à la chaîne de caractères
for key, value in os.environ.items():
env_string += f"{key}: {value}\n" + "<br>"
# Retourne la chaîne de caractères contenant les variables d'environnement
return env_string
#*************************************************
#***** DEBUT Quelques fonctions utiles ***********
#*************************************************
#pour injecter la date dans dans le contexte des template
@app.context_processor
def inject_now():
return {'now': datetime.now}
#*************************************************
#***** FIN Quelques fonctions utiles ***********
#*************************************************
#variables globales
#le paheko de kaz
paheko_ident=os.environ.get('paheko_API_USER')
paheko_pass=os.environ.get('paheko_API_PASSWORD')
paheko_url=os.environ.get('paheko_url')
mattermost_user=os.environ.get('mattermost_user')
mattermost_pass=os.environ.get('mattermost_pass')
mattermost_url=os.environ.get('mattermost_url')
ldap_admin=os.environ.get('ldap_LDAP_ADMIN_USERNAME')
ldap_pass=os.environ.get('ldap_LDAP_ADMIN_PASSWORD')
ldap_root=os.environ.get('ldap_root')
ldap_host="ldapServ.ldapNet"
#cloud général
cloud_ident=os.environ.get('nextcloud_NEXTCLOUD_ADMIN_USER')
cloud_pass=os.environ.get('nextcloud_NEXTCLOUD_ADMIN_PASSWORD')
cloud_url=os.environ.get('cloud_url')
sympa_ident=os.environ.get('sympa_SOAP_USER')
sympa_pass=os.environ.get('sympa_SOAP_PASSWORD')
sympa_url=os.environ.get('sympa_url')
gandi_key=os.environ.get('gandi_GANDI_KEY')
gandi_url_api=os.environ.get('gandi_GANDI_API')
site_url=os.environ.get('site_url')
#pour webmail_url et mdp_url, ça renvoie des tuples et non des str, bizarre, du coup, je mets en dur
#webmail_url=os.environ.get('webmail_url'),
#mdp_url=os.environ.get('mdp_url'),
webmail_url="https://webmail.kazkouil.fr"
mdp_url="https://mdp.kazkouil.fr"
#pour le mail
app.config['MAIL_SERVER']= os.environ.get('apikaz_MAIL_SERVER')
app.config['MAIL_PORT'] = 587
app.config['MAIL_USERNAME'] = os.environ.get('apikaz_MAIL_USERNAME')
app.config['MAIL_PASSWORD'] = os.environ.get('apikaz_MAIL_PASSWORD')
app.config['MAIL_USE_TLS'] = True
app.config['MAIL_USE_SSL'] = False
mail = Mail(app)
#pour QUOTA (à virer ensuite)
serveur_imap = os.environ.get('serveur_imap')
mot_de_passe_mail=os.environ.get('mot_de_passe_mail')
#*************************************************
@app.route('/favicon.ico')
def favicon():
# return send_from_directory(os.path.join(app.root_path, 'static'),'favicon.ico')
return '', 204
#*************************************************
#la page d'accueil est vide
@app.route('/')
def silence():
return ""
#*************************************************
#*******MDP***************************************
#*************************************************
class Password_create(Resource):
def get(self):
"""
créer un password qui colle avec les appli kaz
---
tags:
- Password
parameters: []
responses:
200:
description: le password
404:
description: oops
"""
global new_password
cmd="apg -n 1 -m 10 -M NCL -d"
try:
output = subprocess.check_output(cmd, shell=True, stderr=subprocess.STDOUT)
new_password="_"+output.decode("utf-8")+"_"
return new_password,200
except subprocess.CalledProcessError as e:
return e.output.decode("utf-8"), 400
api.add_resource(Password_create, '/password/create')
#*************************************************
#*******PAHEKO************************************
#*************************************************
class Paheko_categories(Resource):
def get(self):
"""
Récupérer les catégories Paheko avec le compteur associé
---
tags:
- Paheko
parameters: []
responses:
200:
description: Liste des catégories Paheko
404:
description: oops
"""
global paheko_ident, paheko_pass, paheko_url
auth = (paheko_ident, paheko_pass)
api_url = paheko_url + '/api/user/categories'
response = requests.get(api_url, auth=auth)
if response.status_code == 200:
data = response.json()
return jsonify(data)
else:
return jsonify({'error': 'La requête a échoué'}), response.status_code
api.add_resource(Paheko_categories, '/paheko/user/categories')
#*************************************************
class Paheko_users(Resource):
def get(self,categorie):
"""
Afficher les membres d'une catégorie Paheko
---
tags:
- Paheko
parameters:
- in: path
name: categorie
type: string
required: true
responses:
200:
description: Liste des membres une catégorie Paheko
404:
description: oops
"""
global paheko_ident, paheko_pass, paheko_url
auth = (paheko_ident, paheko_pass)
api_url = paheko_url + '/api/user/category/'+categorie+'.json'
response = requests.get(api_url, auth=auth)
if response.status_code == 200:
data = response.json()
return jsonify(data)
else:
return jsonify({'error': 'La requête a échoué'}), response.status_code
api.add_resource(Paheko_users, '/paheko/user/category/<categorie>')
#*************************************************
class Paheko_user(Resource):
def __init__(self):
global paheko_ident, paheko_pass, paheko_url
self.paheko_ident = paheko_ident
self.paheko_pass = paheko_pass
self.paheko_url = paheko_url
self.auth = (self.paheko_ident, self.paheko_pass)
def get(self,ident):
"""
Afficher un membre de Paheko par son email kaz ou son numéro ou le non court de l'orga
---
tags:
- Paheko
parameters:
- in: path
name: ident
type: string
required: true
description: possible d'entrer un numéro, un email, le nom court de l'orga
responses:
200:
description: Existe et affiche
404:
description: N'existe pas
"""
if '@' in ident:
data = { "sql": f"select * from users where email='{ident}'" }
api_url = self.paheko_url + '/api/sql/'
response = requests.post(api_url, auth=self.auth, data=data)
#TODO: if faut Rechercher count et vérifier que = 1 et supprimer le count=1 dans la réponse
elif ident.isdigit():
api_url = self.paheko_url + '/api/user/'+ident
response = requests.get(api_url, auth=self.auth)
else:
data = { "sql": f"select * from users where admin_orga=1 and nom_orga='{ident}'" }
api_url = self.paheko_url + '/api/sql/'
response = requests.post(api_url, auth=self.auth, data=data)
#TODO:if faut Rechercher count et vérifier que = 1 et supprimer le count=1 dans la réponse
if response.status_code == 200:
data = response.json()
return jsonify(data)
else:
#return jsonify({'error': 'La requête a échoué'}), response.status_code
return "pas de résultat", response.status_code
#*************************************************
def put(self,ident,field,new_value):
"""
Modifie la valeur d'un champ d'un membre paheko (ident= numéro paheko ou email kaz)
---
tags:
- Paheko
parameters:
- in: path
name: ident
type: string
required: true
description: possible d'entrer le numéro paheko, un email kaz
- in: path
name: field
type: string
required: true
description: un champ de la table users de la base paheko
- in: path
name: new_value
type: string
required: true
description: la nouvelle valeur à remplacer
responses:
200:
description: Modification effectuée avec succès
400:
description: Oops, ident non trouvé ou incohérent
404:
description: Oops, modification du champ KO
"""
#récupérer le numero paheko si on fournit un email kaz
if '@' in ident:
data = { "sql": f"select id from users where email='{ident}'" }
api_url = self.paheko_url + '/api/sql/'
response = requests.post(api_url, auth=self.auth, data=data)
if response.status_code == 200:
#on extrait l'id de la réponse
data = response.json()
if data['count'] == 0:
return "email non trouvé", 400
elif data['count'] > 1:
return "trop de résultat", 400
else:
#OK
ident = data['results'][0]['id']
else:
return "pas de résultat", response.status_code
api_url = self.paheko_url + '/api/user/'+str(ident)
payload = {field: new_value}
response = requests.post(api_url, auth=self.auth, data=payload)
return response.json(),response.status_code
#*************************************************
api.add_resource(Paheko_user, '/paheko/user/<ident>', endpoint='paheko_get_user', methods=['GET'])
api.add_resource(Paheko_user, '/paheko/user/<ident>/<field>/<new_value>', endpoint='paheko_maj_user', methods=['PUT'])
#*************************************************
class Paheko_users_action(Resource):
def __init__(self):
global paheko_ident, paheko_pass, paheko_url
self.paheko_ident = paheko_ident
self.paheko_pass = paheko_pass
self.paheko_url = paheko_url
def get(self, action):
"""
retourne tous les membres de paheko avec une action à mener (création du compte kaz / modification...)
---
tags:
- Paheko
parameters:
- in: path
name: action
type: string
required: true
enum: ['A créer','A modifier','En attente','Aucune']
responses:
200:
description: liste des nouveaux kaznautes à créer
404:
description: aucun nouveau kaznaute à créer
"""
auth = (self.paheko_ident, self.paheko_pass)
api_url = self.paheko_url + '/api/sql/'
payload = { "sql": f"select * from users where action_auto='{action}'" }
response = requests.post(api_url, auth=auth, data=payload)
if response.status_code == 200:
return response.json(),200
else:
return "pas de résultat", response.status_code
api.add_resource(Paheko_users_action, '/paheko/users/<string:action>')
#*************************************************
#*******MATTERMOST********************************
#*************************************************
# on utilise mmctl et pas l'apiv4 de MM
# pourquoi ? passe que mmctl déjà utilisé dans les scripts kaz.
#*************************************************
def Mattermost_authenticate():
# Authentification sur MM
global mattermost_url, mattermost_user, mattermost_pass
cmd = f"/mm/mmctl auth login {mattermost_url} --name local-server --username {mattermost_user} --password {mattermost_pass}"
subprocess.run(cmd, shell=True, stderr=subprocess.STDOUT, check=True)
#*************************************************
class Mattermost_message(Resource):
def post(self,message,equipe="kaz",canal="creation-comptes"):
"""
Envoyer un message dans une Equipe/Canal de MM
---
tags:
- Mattermost
parameters:
- in: path
name: equipe
type: string
required: true
- in: path
name: canal
type: string
required: true
- in: path
name: message
type: string
required: true
responses:
200:
description: Affiche un message dans un canal d'une équipe
500:
description: oops
"""
Mattermost_authenticate()
try:
cmd="/mm/mmctl post create "+equipe+":"+canal+" --message "+ "\"" + message + "\""
output = subprocess.check_output(cmd, shell=True, stderr=subprocess.STDOUT)
return "Message envoyé", 200
except subprocess.CalledProcessError:
return "Message non envoyé", 500
api.add_resource(Mattermost_message, '/mattermost/message/<equipe>/<canal>/<message>')
#*************************************************
class Mattermost_user(Resource):
def __init__(self):
Mattermost_authenticate()
#*************************************************
def get(self,user):
"""
Le user existe t-il sur MM ?
---
tags:
- Mattermost User
parameters:
- in: path
name: user
type: string
required: true
description: possible d'entrer un username, un email
responses:
200:
description: Existe
404:
description: N'existe pas
"""
try:
cmd = f"/mm/mmctl user search {user} --json"
user_list_output = subprocess.check_output(cmd, shell=True)
return 200 # Le nom d'utilisateur existe
except subprocess.CalledProcessError:
return 404 # Le nom d'utilisateur n'existe pas
#*************************************************
def post(self,user,email,password):
"""
Créer un utilisateur sur MM
---
tags:
- Mattermost User
parameters:
- in: path
name: user
type: string
required: true
- in: path
name: email
type: string
required: true
- in: path
name: password
type: string
required: true
responses:
200:
description: Utilisateur créé
400:
description: oops, Utilisateur non créé
"""
# Création de l'utilisateur
try:
cmd = f"/mm/mmctl user create --email {email} --username {user} --password {password}"
output = subprocess.check_output(cmd, shell=True, stderr=subprocess.STDOUT)
return output.decode("utf-8"), 200
except subprocess.CalledProcessError as e:
return e.output.decode("utf-8"), 400
#*************************************************
def delete(self,email):
"""
Supprimer un utilisateur sur MM
---
tags:
- Mattermost User
parameters:
- in: path
name: email
type: string
required: true
responses:
200:
description: Utilisateur supprimé
400:
description: oops, Utilisateur non supprimé
"""
try:
cmd = f"/mm/mmctl user delete {email} --confirm"
output = subprocess.check_output(cmd, shell=True, stderr=subprocess.STDOUT)
return output.decode("utf-8"), 200
except subprocess.CalledProcessError as e:
return e.output.decode("utf-8"), 400
#*************************************************
def put(self,email,new_password):
"""
Changer un password pour un utilisateur de MM
---
tags:
- Mattermost User
parameters:
- in: path
name: email
type: string
required: true
- in: path
name: new_password
type: string
required: true
responses:
200:
description: Mot de passe de l'Utilisateur changé
400:
description: oops, Mot de passe de l'Utilisateur inchangé
"""
try:
cmd = f"/mm/mmctl user change-password {email} --password {new_password}"
output = subprocess.check_output(cmd, shell=True, stderr=subprocess.STDOUT)
return output.decode("utf-8"), 200
except subprocess.CalledProcessError as e:
return e.output.decode("utf-8"), 400
#*************************************************
api.add_resource(Mattermost_user, '/mattermost/user/<string:user>', endpoint='mattermost_get_user', methods=['GET'])
api.add_resource(Mattermost_user, '/mattermost/user/create/<string:user>/<string:email>/<string:password>', endpoint='mattermost_create_user', methods=['POST'])
api.add_resource(Mattermost_user, '/mattermost/user/delete/<string:email>', endpoint='mattermost_delete_user', methods=['DELETE'])
api.add_resource(Mattermost_user, '/mattermost/user/change/password/<string:email>/<string:new_password>', endpoint='mattermost_change_user_password', methods=['PUT'])
#*************************************************
class Mattermost_user_team(Resource):
def post(self,email,equipe):
"""
Affecte un utilisateur à une équipe MM
---
tags:
- Mattermost Team
parameters:
- in: path
name: email
type: string
required: true
- in: path
name: equipe
type: string
required: true
responses:
200:
description: l'utilisateur a bien été affecté à l'équipe
400:
description: oops, Utilisateur non affecté
"""
Mattermost_authenticate()
try:
cmd = f"/mm/mmctl team users add {equipe} {email}"
output = subprocess.check_output(cmd, shell=True, stderr=subprocess.STDOUT)
return output.decode("utf-8"), 200
except subprocess.CalledProcessError as e:
return e.output.decode("utf-8"), 400
api.add_resource(Mattermost_user_team, '/mattermost/user/team/<string:email>/<string:equipe>')
#*************************************************
class Mattermost_user_channel(Resource):
def post(self,email,equipe,canal):
"""
Affecte un utilisateur à un canal MM
---
tags:
- Mattermost
parameters:
- in: path
name: email
type: string
required: true
- in: path
name: equipe
type: string
required: true
- in: path
name: canal
type: string
required: true
responses:
200:
description: l'utilisateur a bien été affecté au canal
400:
description: oops, Utilisateur non affecté
"""
Mattermost_authenticate()
try:
cmd = f'/mm/mmctl channel users add {equipe}:{canal} {email}'
output = subprocess.check_output(cmd, shell=True, stderr=subprocess.STDOUT)
return output.decode("utf-8"), 200
except subprocess.CalledProcessError as e:
return e.output.decode("utf-8"), 400
api.add_resource(Mattermost_user_channel, '/mattermost/user/channel/<string:email>/<string:equipe>/<string:canal>')
#*************************************************
class Mattermost_team(Resource):
def __init__(self):
Mattermost_authenticate()
#*************************************************
def get(self):
"""
Lister les équipes sur MM
---
tags:
- Mattermost Team
parameters: []
responses:
200:
description: liste des équipes
400:
description: oops, Equipe non supprimée
"""
Mattermost_authenticate()
try:
cmd = f"/mm/mmctl team list --disable-pager"
output = subprocess.check_output(cmd, shell=True, stderr=subprocess.STDOUT)
data_list = output.decode("utf-8").strip().split('\n')
data_list.pop()
return data_list, 200
#return jsonify(data_list),200
except subprocess.CalledProcessError as e:
return e.output.decode("utf-8"), 400
#*************************************************
def post(self,equipe,email):
"""
Créer une équipe sur MM et affecter un admin si email est renseigné (set admin marche pô)
---
tags:
- Mattermost Team
parameters:
- in: path
name: equipe
type: string
required: true
- in: path
name: email
type: string
required: true
description: admin de l'équipe
responses:
200:
description: Equipe créée
400:
description: oops, Equipe non créée
"""
try:
#DANGER: l'option --email ne rend pas le user admin de l'équipe comme c'est indiqué dans la doc :(
cmd = f"/mm/mmctl team create --name {equipe} --display-name {equipe} --private --email {email}"
output = subprocess.check_output(cmd, shell=True, stderr=subprocess.STDOUT)
#Workaround: on récup l'id du user et de l'équipe pour affecter le rôle "scheme_admin": true, "scheme_user": true avec l'api MM classique.
#TODO:
return output.decode("utf-8"), 200
except subprocess.CalledProcessError as e:
return e.output.decode("utf-8"), 400
#*************************************************
def delete(self,equipe):
"""
Supprimer une équipe sur MM
---
tags:
- Mattermost Team
parameters:
- in: path
name: equipe
type: string
required: true
responses:
200:
description: Equipe supprimée
400:
description: oops, Equipe non supprimée
"""
Mattermost_authenticate()
try:
cmd = f"/mm/mmctl team delete {equipe} --confirm"
output = subprocess.check_output(cmd, shell=True, stderr=subprocess.STDOUT)
return output.decode("utf-8"), 200
except subprocess.CalledProcessError as e:
return e.output.decode("utf-8"), 400
#*************************************************
api.add_resource(Mattermost_team, '/mattermost/team/list',endpoint='mattermost_team_list', methods=['GET'])
api.add_resource(Mattermost_team, '/mattermost/team/create/<equipe>/<email>',endpoint='mattermost_team_create', methods=['POST'])
api.add_resource(Mattermost_team, '/mattermost/team/delete/<equipe>',endpoint='mattermost_team_delete', methods=['DELETE'])
#*************************************************
#***** LDAP **************************************
#*************************************************
class Ldap_user(Resource):
def __init__(self):
global ldap_admin, ldap_pass, ldap_root, ldap_host
self.ldap_admin = ldap_admin
self.ldap_pass = ldap_pass
self.ldap_root = ldap_root
self.ldap_host = f"ldap://{ldap_host}"
def _connect_ldap(self):
ldap_connection = ldap.initialize(self.ldap_host)
ldap_connection.simple_bind_s("cn={},{}".format(self.ldap_admin, self.ldap_root), self.ldap_pass)
return ldap_connection
@classmethod
def is_valid_field(cls, field):
allowed_fields = ['mailDeSecours', 'mailEnabled', 'nextcloudEnabled', 'mobilizonEnabled', 'agoraEnabled', 'userPassword', 'identifiantKaz', 'mailAlias', 'quota']
return field in allowed_fields
def get(self, email):
"""
Vérifier si un utilisateur avec cet email existe dans le LDAP soit comme mail principal soit comme alias
---
tags:
- Ldap
parameters:
- in: path
name: email
type: string
required: true
responses:
200:
description: Existe
400:
description: N'existe pas
401:
description: oops, email invalide
402:
description: oops, autre erreur
"""
try:
if not validate_email(email):
return "Adresse e-mail non valide", 400
ldap_connection = self._connect_ldap()
#result = ldap_connection.search_s("ou=users,{}".format(self.ldap_root), ldap.SCOPE_SUBTREE, "(cn={})".format(email))
# Créer une chaîne de filtre pour rechercher dans les champs "cn" et "mailAlias"
filter_str = "(|(cn={})(mailAlias={}))".format(email, email)
result = ldap_connection.search_s("ou=users,{}".format(self.ldap_root), ldap.SCOPE_SUBTREE, filter_str)
ldap_connection.unbind_s()
if result:
return True, 200
else:
return False, 400
except EmailNotValidError as e:
return str(e), 401
except ldap.LDAPError as e:
return str(e), 402
#*************************************************
def delete(self, email):
"""
Supprimer un utilisateur du LDAP par son adresse e-mail
---
tags:
- Ldap
parameters:
- in: path
name: email
type: string
required: true
responses:
200:
description: Utilisateur supprimé avec succès
404:
description: Utilisateur non trouvé dans le LDAP
400:
description: Erreur lors de la suppression de l'utilisateur
"""
try:
if not validate_email(email):
return "Adresse e-mail non valide", 400
ldap_connection = self._connect_ldap()
# Recherche de l'utilisateur
result = ldap_connection.search_s("ou=users,{}".format(self.ldap_root), ldap.SCOPE_SUBTREE, "(cn={})".format(email))
if not result:
return False, 404 # Utilisateur non trouvé
# Récupération du DN de l'utilisateur
dn = result[0][0]
# Suppression de l'utilisateur
ldap_connection.delete_s(dn)
ldap_connection.unbind_s()
return True, 200 # Utilisateur supprimé avec succès
except ldap.NO_SUCH_OBJECT:
return False, 404 # Utilisateur non trouvé
except ldap.LDAPError as e:
return str(e), 400 # Erreur lors de la suppression
except EmailNotValidError as e:
return str(e), 400
#*************************************************
def post(self, email):
"""
Ajouter, supprimer ou modifier un champ pour l'utilisateur LDAP
---
tags:
- Ldap
parameters:
- in: path
name: email
type: string
required: true
- in: query
name: action
type: string
required: true
enum: ['add', 'delete', 'modify']
- in: body
name: data
required: true
schema:
type: object
properties:
field:
type: string
enum: ['mailDeSecours', 'mailEnabled', 'nextcloudEnabled', 'mobilizonEnabled', 'agoraEnabled', 'userPassword', 'identifiantKaz', 'mailAlias', 'quota']
description: Le champ à ajouter, supprimer ou modifier (par exemple, mailDeSecours, mailAlias, etc.)
value:
type: string
description: La valeur à ajouter, supprimer ou modifier pour le champ spécifié
responses:
200:
description: Opération réussie
404:
description: Utilisateur non trouvé dans le LDAP
400:
description: Erreur lors de l'opération
"""
try:
if not validate_email(email):
return "Adresse e-mail non valide", 400
action = request.args.get('action')
field = request.json.get('field')
value = request.json.get('value')
if not action or not field or not value:
return "Action, champ ou valeur manquant", 400
if not self.is_valid_field(field):
return "Champ non autorisé", 400
ldap_connection = self._connect_ldap()
result = ldap_connection.search_s("ou=users,{}".format(self.ldap_root), ldap.SCOPE_SUBTREE, "(cn={})".format(email))
if not result:
return False, 404
dn = result[0][0]
if field == 'userPassword' and (action == 'add' or action == 'modify'):
password_chiffre = sha512_crypt.hash(value)
value = "{{CRYPT}}{}".format(password_chiffre)
if action == 'add':
mod_attrs = [(ldap.MOD_ADD, field, value.encode('utf-8'))]
elif action == 'delete':
mod_attrs = [(ldap.MOD_DELETE, field, value.encode('utf-8'))]
elif action == 'modify':
if field == 'quota':
mail_quota_value = value + 'G'
nextcloud_quota_value = value + " GB"
mod_attrs = [
(ldap.MOD_REPLACE, 'quota', value.encode('utf-8')),
(ldap.MOD_REPLACE, 'mailQuota', mail_quota_value.encode('utf-8')),
(ldap.MOD_REPLACE, 'nextcloudQuota', nextcloud_quota_value.encode('utf-8'))
]
else:
mod_attrs = [(ldap.MOD_REPLACE, field, value.encode('utf-8'))]
else:
return "Action non valide", 400
ldap_connection.modify_s(dn, mod_attrs)
ldap_connection.unbind_s()
return True, 200
except ldap.NO_SUCH_OBJECT:
return False, 404
except ldap.LDAPError as e:
return str(e), 400
except EmailNotValidError as e:
return str(e), 400
#*************************************************
def put(self, email, **kwargs):
"""
Créer une nouvelle entrée dans le LDAP pour un nouvel utilisateur. QUESTION: A QUOI SERVENT PRENOM/NOM/IDENT_KAZ DANS LE LDAP ? POURQUOI 3 QUOTA ?
---
tags:
- Ldap
parameters:
- in: path
name: email
type: string
required: true
- in: body
name: data
required: true
schema:
type: object
properties:
prenom:
type: string
description: Prénom de l'utilisateur
nom:
type: string
description: Nom de l'utilisateur
password:
type: string
description: Mot de passe de l'utilisateur
email_secours:
type: string
description: Adresse e-mail de secours
quota:
type: string
description: Quota de l'utilisateur
responses:
200:
description: Utilisateur ajouté avec succès
400:
description: Erreur lors de l'ajout de l'utilisateur
406:
description: Erreur utilisateur déjà existant
"""
try:
if kwargs: # appel depuis une autre api
email_secours = kwargs.get('email_secours')
prenom = kwargs.get('prenom')
nom = kwargs.get('nom')
password = kwargs.get('password')
quota = kwargs.get('quota')
else: # appel depuis swagger
email_secours = request.json.get('email_secours')
nom = request.json.get('nom')
prenom = request.json.get('prenom')
password = request.json.get('password')
quota = request.json.get('quota')
password_chiffre = sha512_crypt.hash(password)
if not validate_email(email) or not validate_email(email_secours):
return "Adresse e-mail ou secours non valide", 400
#le user existe t-il déjà ?
ldap_connection = self._connect_ldap()
result = ldap_connection.search_s("ou=users,{}".format(self.ldap_root), ldap.SCOPE_SUBTREE, "(cn={})".format(email))
if result:
return "User déjà existant", 406
# Construire le DN
dn = f"cn={email},ou=users,{ldap_root}"
mod_attrs = [
('objectClass', [b'inetOrgPerson', b'PostfixBookMailAccount', b'nextcloudAccount', b'kaznaute']),
('sn', f'{prenom} {nom}'.encode('utf-8')),
('mail', email.encode('utf-8')),
('mailEnabled', b'TRUE'),
('mailGidNumber', b'5000'),
('mailHomeDirectory', f"/var/mail/kazkouil.fr/{email.split('@')[0]}/".encode('utf-8')),
('mailQuota', f'{quota}G'.encode('utf-8')),
('mailStorageDirectory', f"maildir:/var/mail/kazkouil.fr/{email.split('@')[0]}/".encode('utf-8')),
('mailUidNumber', b'5000'),
('mailDeSecours', email_secours.encode('utf-8')),
('identifiantKaz', f'{prenom.lower()}.{nom.lower()}'.encode('utf-8')),
('quota', str(quota).encode('utf-8')),
('nextcloudEnabled', b'TRUE'),
('nextcloudQuota', f'{quota} GB'.encode('utf-8')),
('mobilizonEnabled', b'TRUE'),
('agoraEnabled', b'TRUE'),
('userPassword', f'{{CRYPT}}{password_chiffre}'.encode('utf-8')),
('cn', email.encode('utf-8'))
]
ldap_connection.add_s(dn, mod_attrs)
ldap_connection.unbind_s()
return "Utilisateur créé dand le ldap", 200
except ldap.LDAPError as e:
return str(e), 400
except EmailNotValidError as e:
return str(e), 400
#*************************************************
# Définition des routes avec les méthodes correspondantes
api.add_resource(Ldap_user, '/ldap/user/<string:email>', endpoint='ldap_user_get', methods=['GET'])
api.add_resource(Ldap_user, '/ldap/user/delete/<string:email>', endpoint='ldap_user_delete', methods=['DELETE'])
api.add_resource(Ldap_user, '/ldap/user/change/<string:email>', endpoint='ldap_user_change', methods=['POST'])
api.add_resource(Ldap_user, '/ldap/user/add/<string:email>', endpoint='ldap_user_add', methods=['PUT'])
#*************************************************
#***** CLOUD **************************************
#*************************************************
#TODO: pas réussi à faire une seule classe Cloud_user avec 2 méthodes get/delete
class Cloud_user(Resource):
def get(self, email):
"""
Existe dans le cloud général ?
---
tags:
- Cloud
parameters:
- in: path
name: email
type: string
required: true
responses:
200:
description: Succès
404:
description: L'utilisateur n'existe pas
500:
description: Erreur interne du serveur
"""
global cloud_ident, cloud_pass, cloud_url
try:
auth = (cloud_ident, cloud_pass)
api_url = f"{cloud_url}/ocs/v1.php/cloud/users?search={email}"
headers = {"OCS-APIRequest": "true"}
response = requests.get(api_url, auth=auth, headers=headers)
if response.status_code == 200:
if re.search(r'<element>.*</element>', response.text):
return 200
else:
return 404
except Exception as e:
return jsonify({'error': str(e)}), 500
api.add_resource(Cloud_user, '/cloud/user/<string:email>')
#*************************************************
class Cloud_user_delete(Resource):
def delete(self, email):
"""
Supprime le compte dans le cloud général
QUESTION: A PRIORI INUTILE CAR LIE AU LDAP
---
tags:
- Cloud
parameters:
- in: path
name: email
type: string
required: true
responses:
200:
description: Succès, l'utilisateur a été supprimé du cloud général
404:
description: Oops, l'utilisateur n'a pas été supprimé du cloud général
500:
description: Erreur interne du serveur
"""
global cloud_ident, cloud_pass, cloud_url
try:
auth = (cloud_ident, cloud_pass)
api_url = f"{cloud_url}/ocs/v1.php/cloud/users?search={email}"
headers = {"OCS-APIRequest": "true"}
response = requests.delete(api_url, auth=auth, headers=headers)
if response.status_code == 200:
if re.search(r'<element>.*</element>', response.text):
return 200
else:
return 404
except Exception as e:
return jsonify({'error': str(e)}), 500
api.add_resource(Cloud_user_delete, '/cloud/user/delete/<string:email>')
#*************************************************
# class Cloud_user_change(Resource):
# def put(self, email, new_password):
# """
# Modifie le mot de passe d'un Utilisateur dans le cloud général: QUESTION: A PRIORI INUTILE CAR LIE AU LDAP
# ---
# tags:
# - Cloud
# parameters:
# - in: path
# name: email
# type: string
# required: true
# - in: path
# name: new_password
# type: string
# required: true
# responses:
# 200:
# description: Succès, mot de passe changé
# 404:
# description: Oops, mot de passe NON changé
# 500:
# description: Erreur interne du serveur
# """
#
# global cloud_ident, cloud_pass, cloud_url
#
# try:
# auth = (cloud_ident, cloud_pass)
# api_url = f"{cloud_url}/ocs/v1.php/cloud/users?search={email}"
# headers = {"OCS-APIRequest": "true"}
# data = {
# "key": "password",
# "value": new_password
# }
# response = requests.put(api_url, auth=auth, headers=headers)
#
# if response.status_code == 200:
# if re.search(r'<element>.*</element>', response.text):
# return 200
# else:
# return 404
#
# except Exception as e:
# return jsonify({'error': str(e)}), 500
#
# api.add_resource(Cloud_user_change, '/cloud/user/change/<string:email>/<string:new_password>')
#*************************************************
#***** SYMPA **************************************
#*************************************************
class Sympa_user(Resource):
def __init__(self):
global sympa_ident, sympa_pass, sympa_url
self.sympa_ident = sympa_ident
self.sympa_pass = sympa_pass
self.sympa_url = sympa_url
def _execute_sympa_command(self, email, liste, service):
try:
if validate_email(email) and validate_email(liste):
cmd = f'export PERL5LIB=/usr/src/app/:$PERL5LIB && /usr/src/app/Sympa/sympa_soap_client.pl --soap_url={self.sympa_url}/sympasoap --trusted_application={self.sympa_ident} --trusted_application_password={self.sympa_pass} --proxy_vars=USER_EMAIL=admin@kaz.bzh --service={service} --service_parameters="{liste},{email}" && echo $?'
output = subprocess.check_output(cmd, shell=True, stderr=subprocess.STDOUT)
return output.decode("utf-8"), 200 # Retourne la sortie et un code de succès
except EmailNotValidError as e:
return str(e), 400 # Retourne le message d'erreur et un code d'erreur 400
except subprocess.CalledProcessError as e:
return e.output.decode("utf-8"), 400 # Retourne la sortie de la commande et un code d'erreur 400
def post(self, email, liste):
"""
Ajouter un email dans une liste sympa
---
tags:
- Sympa
parameters:
- in: path
name: email
type: string
required: true
- in: path
name: liste
type: string
required: true
responses:
200:
description: Succès, email ajouté dans la liste
400:
description: Oops, email non ajouté dans la liste
"""
output, status_code = self._execute_sympa_command(email, liste, 'add')
return output, status_code
def delete(self, email, liste):
"""
Supprimer un email dans une liste sympa
---
tags:
- Sympa
parameters:
- in: path
name: email
type: string
required: true
- in: path
name: liste
type: string
required: true
responses:
200:
description: Succès, email supprimé dans la liste
400:
description: Oops, email non supprimé dans la liste
"""
output, status_code = self._execute_sympa_command(email, liste, 'del')
return output, status_code
api.add_resource(Sympa_user, '/sympa/user/<string:email>/<string:liste>')
#*************************************************
#***** QUOTA **************************************
#*************************************************
class Quota(Resource):
#https://doc.dovecot.org/configuration_manual/authentication/master_users/
#https://blog.debugo.fr/serveur-messagerie-dovecot/
# sur kazkouil.fr, j'ai modifié /etc/dovecot/conf.d/20-lmtp.conf
#mail_plugins = $mail_plugins sieve quota
def get(self, email):
"""
Récupérer la place prise par une BAL
---
tags:
- Quota (EN COURS)
parameters:
- in: path
name: email
type: string
required: true
responses:
200:
description: Succès, taille d'une BAL'
400:
description: Oops, échec dans l'affichage de la taille d'une BAL
"""
global serveur_imap, mot_de_passe_mail
try:
if validate_email(email):
# Connexion au serveur IMAP
mail = imaplib.IMAP4_SSL(serveur_imap)
mail.login(email, mot_de_passe_mail)
#res, data = mail.select("INBOX")
#return data[0].decode("utf-8"), 200
# Requête pour obtenir le quota de la boîte aux lettres
# res, data = mail.getquota("INBOX")
# return str(data[0]).split()
res, data = mail.getquotaroot("INBOX")
return str(data[1]).split()[3]+" KB"
# Fermeture de la connexion
mail.close()
mail.logout()
else:
return "Email non valide", 400
except imaplib.IMAP4.error as e:
return str(e), 400 # Retourne le message d'erreur et un code d'erreur 400
except EmailNotValidError as e:
return str(e), 400 # Retourne le message d'erreur et un code d'erreur 400
api.add_resource(Quota, '/quota/<string:email>')
#*************************************************
#***** DNS **************************************
#*************************************************
class Dns_serveurs(Resource):
def __init__(self):
global gandi_key, gandi_url_api
self.gandi_key = gandi_key
self.gandi_url_api = gandi_url_api
def get(self):
"""
Renvoie tous les serveurs kaz de la zone dns
---
tags:
- Dns
responses:
200:
description: Succès, liste des serveurs
404:
description: Oops, soucis...
"""
url = f"{self.gandi_url_api}/records/srv/TXT"
headers = { "Authorization": f"Apikey {self.gandi_key}" }
response = requests.get(url, headers=headers)
if response.status_code != 200:
return response.json(),response.status_code
# Extraire la liste des serveurs de la réponse JSON
rrset_values = response.json()["rrset_values"]
# Nettoyer la liste
serveurs = [serveur.strip('"') for serveur in rrset_values[0].split(";")]
return serveurs, 200
api.add_resource(Dns_serveurs, '/dns/')
#*************************************************
class Dns(Resource):
def __init__(self):
global gandi_key, gandi_url_api
self.gandi_key = gandi_key
self.gandi_url_api = gandi_url_api
self.dns_serveurs_resource = Dns_serveurs()
#*************************************************
def get(self,sdomaine):
"""
Le sous-domaine existe t-il dans la zone dns avec un enreg CNAME ?
---
tags:
- Dns
parameters:
- in: path
name: sdomaine
type: string
required: true
responses:
200:
description: Succès, sdomaine existe déjà avec un CNAME
404:
description: Oops, sdomaine non trouvé
"""
url = f"{self.gandi_url_api}/records/{sdomaine}/CNAME"
headers = { "Authorization": f"Apikey {self.gandi_key}" }
response = requests.get(url, headers=headers)
return response.json(),response.status_code
#*************************************************
def delete(self,sdomaine):
"""
suppression du sdomaine
---
tags:
- Dns
parameters:
- in: path
name: sdomaine
type: string
required: true
responses:
204:
description: Succès, sdomaine supprimé
404:
description: Oops,
"""
url = f"{self.gandi_url_api}/records/{sdomaine}"
headers = { "Authorization": f"Apikey {self.gandi_key}" }
response = requests.delete(url, headers=headers)
return response.text,response.status_code
#*************************************************
def post(self,sdomaine,serveur):
"""
Créé le sous-domaine de type CNAME qui pointe sur serveur
---
tags:
- Dns
parameters:
- in: path
name: sdomaine
type: string
required: true
- in: path
name: serveur
type: string
required: true
description: Le serveur doit être l'un des serveurs disponibles dans la zone DNS
responses:
201:
description: Succès, sdomaine créé comme CNAME
400:
description: Oops, serveur inconnu, sdomaine non créé
404:
description: Oops, sdomaine non créé
200,409:
description: Oops, sdomaine déjà créé
"""
# Récupérer la liste des serveurs disponibles
serveurs_disponibles, status_code = self.dns_serveurs_resource.get()
if status_code != 200:
return serveurs_disponibles, status_code
if serveur not in serveurs_disponibles:
return f"Erreur: Le serveur {serveur} n'est pas disponible", 400
url = f"{self.gandi_url_api}/records/{sdomaine}/CNAME"
headers = { "Authorization": f"Apikey {self.gandi_key}" }
payload = f'{{"rrset_values":["{serveur}"]}}'
response = requests.post(url, data=payload, headers=headers)
return response.json(),response.status_code
#*************************************************
api.add_resource(Dns, '/dns/<string:sdomaine>', endpoint='dns_get', methods=['GET'])
api.add_resource(Dns, '/dns/<string:sdomaine>', endpoint='dns_delete', methods=['DELETE'])
api.add_resource(Dns, '/dns/<string:sdomaine>/<string:serveur>', endpoint='dns_post', methods=['POST'])
#*************************************************
#*************************************************
#***** KAZ **************************************
#*************************************************
class Kaz_user(Resource):
def __init__(self):
self.paheko_users_action_resource = Paheko_users_action()
self.paheko_user_resource=Paheko_user()
self.ldap_user_resource = Ldap_user()
self.password_create_resource = Password_create()
self.mattermost_message_resource=Mattermost_message()
self.mattermost_user_resource=Mattermost_user()
self.mattermost_user_team_resource=Mattermost_user_team()
self.mattermost_user_channel_resource=Mattermost_user_channel()
self.mattermost_team_resource=Mattermost_team()
self.sympa_user_resource=Sympa_user()
#********************************************************************************************
def delete(self):
"""
Utile pour les tests de createUser. Avant le POST de /kaz/create/users. Ça permet de supprimer/maj les comptes.
---
tags:
- Kaz
parameters: []
responses:
201:
description: Succès
401:
description: Oops, un soucis quelconque
"""
#verrou pour empêcher de lancer en même temps la même api
try:
prefixe="del_user_lock_"
if glob(f"{tempfile.gettempdir()}/{prefixe}*"): raise ValueError("ERREUR 0 : api déjà en cours d'utilisation !")
lock_file = tempfile.NamedTemporaryFile(prefix=prefixe,delete=True)
liste_emails=["0a@kazkouil.fr","0b@kazkouil.fr"]
for email in liste_emails:
res, status_code = self.ldap_user_resource.delete(email)
res, status_code = self.mattermost_user_resource.delete(email)
nom_orga=''.join(random.choice(string.ascii_lowercase) for _ in range(8))
res, status_code = self.paheko_user_resource.put(email,"nom_orga",nom_orga)
res, status_code = self.paheko_user_resource.put(email,"action_auto","A créer")
res, status_code = self.paheko_user_resource.put(email,"email_secours","fab@kazkouil.fr")
res, status_code = self.sympa_user_resource.delete(email,"infos@listes.kaz.bzh")
res, status_code = self.sympa_user_resource.delete("fab@kazkouil.fr","infos@listes.kaz.bzh")
msg=f"**POST AUTO** suppression de {email} ok"
self.mattermost_message_resource.post(message=msg)
return "OK", 200
except ValueError as e:
msg=f"(classe: {__class__.__name__} : {e}"
self.mattermost_message_resource.post(message=msg)
return str(msg), 401
#********************************************************************************************
def post(self):
"""
Créé un nouveau kaznaute: inscription sur MM / Cloud / email + msg sur MM + email à partir de action="a créer" sur paheko
---
tags:
- Kaz
parameters: []
responses:
201:
description: Succès, kaznaute créé
400:
description: Oops, rien à créer
401:
description: Oops, un soucis quelconque
"""
try:
#verrou pour empêcher de lancer en même temps la même api
prefixe="create_user_lock_"
if glob(f"{tempfile.gettempdir()}/{prefixe}*"): raise ValueError("ERREUR 0 : api déjà en cours d'utilisation !")
lock_file = tempfile.NamedTemporaryFile(prefix=prefixe,delete=True)
#qui sont les kaznautes à créer ?
liste_kaznautes, status_code = self.paheko_users_action_resource.get("A créer")
if liste_kaznautes=="pas de résultat": return "ERREUR: paheko non joignable",401
count=liste_kaznautes['count']
if count==0: return "aucun nouveau kaznaute à créer",400
#au moins un kaznaute à créer
for tab in liste_kaznautes['results']:
email = tab['email'].lower()
# est-il déjà dans le ldap ? (mail ou alias)
res, status_code = self.ldap_user_resource.get(email)
if status_code != 400: raise ValueError(f"ERREUR 1: {email} déjà existant dans ldap. {res}, on arrête tout")
#test nom orga
if tab['admin_orga'] == 1:
if tab['nom_orga'] is None:
raise ValueError(f"ERREUR 0 sur paheko: {email} : nom_orga vide, on arrête tout")
if not bool(re.match(r'^[a-z0-9-]+$', tab['nom_orga'])):
raise ValueError(f"ERREUR 0 sur paheko: {email} : nom_orga ({tab['nom_orga']}) incohérent (minuscule/chiffre/-), on arrête tout")
#test email_secours
email_secours = tab['email_secours'].lower()
if not validate_email(email_secours): raise EmailNotValidError()
#test quota
quota = tab['quota_disque']
if not quota.isdigit(): raise ValueError(f"ERREUR 2: quota non numérique : {quota}, on arrête tout")
#quel type de test ?
#"nom": "ROUSSEAU Mickael",
nom, prenom = tab['nom'].split(maxsplit=1)
#on génère un password
password,status_code = self.password_create_resource.get()
#on créé dans le ldap
#à quoi servent prenom/nom dans le ldap ?
data = {
"prenom": prenom,
"nom": nom,
"password": password,
"email_secours": email_secours,
"quota": quota
}
res, status_code = self.ldap_user_resource.put(email, **data)
if status_code != 200: raise ValueError(f"ERREUR 3 sur ldap: {email} : {res}, on arrête tout ")
#on créé dans MM
user = email.split('@')[0]
res, status_code = self.mattermost_user_resource.post(user,email,password)
if status_code != 200: raise ValueError(f"ERREUR 4 sur MM: {email} : {res}, on arrête tout ")
# et on affecte à l'équipe kaz
res, status_code = self.mattermost_user_team_resource.post(email,"kaz")
if status_code != 200: raise ValueError(f"ERREUR 5 sur MM: {email} : {res}, on arrête tout ")
#et aux 2 canaux de base
res, status_code = self.mattermost_user_channel_resource.post(email,"kaz","une-question--un-soucis")
if status_code != 200: raise ValueError(f"ERREUR 6 sur MM: {email} : {res}, on arrête tout ")
res, status_code = self.mattermost_user_channel_resource.post(email,"kaz","cafe-du-commerce--ouvert-2424h")
if status_code != 200: raise ValueError(f"ERREUR 7 sur MM: {email} : {res}, on arrête tout ")
#on créé une nouvelle équipe ds MM si besoin
if tab['admin_orga'] == 1:
res, status_code = self.mattermost_team_resource.post(tab['nom_orga'],email)
if status_code != 200: raise ValueError(f"ERREUR 8 sur MM: {email} : {res}, on arrête tout ")
#BUG: créer la nouvelle équipe n'a pas rendu l'email admin, on le rajoute comme membre simple
res, status_code = self.mattermost_user_team_resource.post(email,tab['nom_orga'])
if status_code != 200: raise ValueError(f"ERREUR 8.1 sur MM: {email} : {res}, on arrête tout ")
#on créé dans le cloud genéral
#inutile car tous les user du ldap sont user du cloud général.
#on inscrit email et email_secours à la nl infos@listes.kaz.bzh
res, status_code = self.sympa_user_resource.post(email,"infos@listes.kaz.bzh")
if status_code != 200: raise ValueError(f"ERREUR 9 sur Sympa: {email} : {res}, on arrête tout ")
res, status_code = self.sympa_user_resource.post(email_secours,"infos@listes.kaz.bzh")
if status_code != 200: raise ValueError(f"ERREUR 10 sur Sympa: {email_secours} : {res}, on arrête tout ")
#on construit/envoie le mail
context = {
'ADMIN_ORGA': tab['admin_orga'],
'NOM': tab['nom'],
'EMAIL_SOUHAITE': email,
'PASSWORD': password,
'QUOTA': tab['quota_disque'],
'URL_WEBMAIL': webmail_url,
'URL_AGORA': mattermost_url,
'URL_MDP': mdp_url,
'URL_LISTE': sympa_url,
'URL_SITE': site_url,
'URL_CLOUD': cloud_url
}
subject="KAZ: confirmation d'inscription !"
sender=app.config['MAIL_USERNAME']
reply_to = "contact@kaz.bzh"
msg = Message(subject=subject, sender=sender, reply_to=reply_to, recipients=[email,email_secours])
msg.html = render_template('email_inscription.html', **context)
mail.send(msg)
#on met le flag paheko action à Aucune
res, status_code = self.paheko_user_resource.put(email,"action_auto","Aucune")
if status_code != 200: raise ValueError(f"ERREUR 12 sur paheko: {email} : {res}, on arrête tout ")
#on post sur MM pour dire ok
msg=f"**POST AUTO** Inscription réussie pour {email} avec le secours {email_secours} Bisou!"
self.mattermost_message_resource.post(message=msg)
return "fin des inscriptions", 201
except EmailNotValidError as e:
msg=f"classe: {__class__.__name__} : ERREUR 13 : email_secours : {email_secours} " + str(e) +", on arrête tout"
self.mattermost_message_resource.post(message=msg)
return msg, 401
except ValueError as e:
msg=f"(classe: {__class__.__name__} : {e}"
self.mattermost_message_resource.post(message=msg)
return str(msg), 401
#*************************************************
api.add_resource(Kaz_user, '/kaz/create/users', endpoint='kaz_create_user', methods=['POST'])
api.add_resource(Kaz_user, '/kaz/delete/user', endpoint='kaz_delete_user', methods=['DELETE'])
#*************************************************
#**********TEST***********************************
#*************************************************
class Test(Resource):
def __init__(self):
toto="toto"
#self.mattermost_team_resource=Mattermost_team()
#global mattermost_url, sympa_url, webmail_url, mdp_url, site_url, nc_url
#********************************************************************************************
def get(self):
"""
Pour tester des conneries
---
tags:
- a simple test
parameters: []
responses:
201:
description: OK
401:
description: KO
"""
#********************************************************************************************
# #***** test suppression de toutes les équipes de MM sauf KAZ
# res,status=self.mattermost_team_resource=Mattermost_team().get()
# for equipe in res:
# if equipe!="kaz": res,status=self.mattermost_team_resource=Mattermost_team().delete(equipe)
# return "fin"
#********************************************************************************************
#**** test messagerie
NOM="fab"
#EMAIL_SOUHAITE='fab@kazkouil.fr'
#EMAIL_SOUHAITE='fab@kaz.bzh'
EMAIL_SOUHAITE='sysadmin@listes.kaz.bzh'
PASSWORD="toto"
QUOTA="1"
ADMIN_ORGA="0"
context = {
'ADMIN_ORGA': ADMIN_ORGA,
'NOM': NOM,
'EMAIL_SOUHAITE': EMAIL_SOUHAITE,
'PASSWORD': PASSWORD,
'QUOTA': QUOTA,
'URL_WEBMAIL': webmail_url,
'URL_AGORA': mattermost_url,
'URL_MDP': mdp_url,
'URL_LISTE': sympa_url,
'URL_SITE': site_url,
'URL_CLOUD': cloud_url
}
subject = "KAZ: confirmation d'inscription !"
sender=app.config['MAIL_USERNAME']
reply_to = "contact@kaz.bzh"
msg = Message(subject=subject, sender=sender, reply_to=reply_to, recipients=[EMAIL_SOUHAITE])
msg.html = render_template('email_inscription.html', **context)
# Parsez le contenu HTML avec BeautifulSoup
soup = BeautifulSoup(msg.html, 'html.parser')
msg.body = soup.get_text()
mail.send(msg)
return "Message envoyé!"
#********************************************************************************************
# #**** test ms erreur
# email_secours="toto"
#
# msg=f"classe: {__class__.__name__} : ERREUR 8 : email_secours : {email_secours} " +", on arrête tout"
# #return __class__.__name__
# return msg
#********************************************************************************************
#**** test vérou
# prefixe="toto_"
# if glob(f"{tempfile.gettempdir()}/{prefixe}*"):
# return "ERREUR : api déjà en cours d'utilisation !", 400
# else:
# lock_file = tempfile.NamedTemporaryFile(prefix=prefixe,delete=True)
#
# sleep(20)
# return str(lock_file), 201
api.add_resource(Test, '/atest', endpoint='atest', methods=['GET'])
#*************************************************
#*************************************************
#*************************************************
if __name__ == '__main__':
app.run(host='0.0.0.0', port=os.getenv('PORT'))