2024-09-05 11:17:21 +02:00

158 lines
5.7 KiB
Python

from resources.common_imports import *
from resources.ldap import Ldap_user
#les variables globales minimum
from resources.config import *
class Test(Resource):
def __init__(self):
global paheko_ident, paheko_pass, paheko_url
self.paheko_ident = paheko_ident
self.paheko_pass = paheko_pass
self.paheko_url = paheko_url
self.ldap_user_resource = Ldap_user()
self.client = app.test_client()
@jwt_required()
def get(self):
"""
Dans le ldap, mettre le numeroMembre de paheko pour toutes les entrées
---
tags:
- Test
security:
- Bearer: []
parameters: []
responses:
201:
description: OK
401:
description: KO
"""
auth = (self.paheko_ident, self.paheko_pass)
api_url = self.paheko_url + '/api/sql/'
payload = { "sql": f"select * from users where id_category <> 13 and email='fab@kazkouil.fr'" }
#payload = { "sql": f"select * from users where id_category <> 13 " }
response = requests.post(api_url, auth=auth, data=payload)
if response.status_code == 200:
#pour chaque fiche membre, on récup le numeroMembre et l'email
data = response.json()
for item in data['results']:
# Afficher les valeurs de 'id' et 'email'
print(f"ID: {item['id']}, Email: {item['email']}")
#j'ajoute le champ numeroMembre dans le ldap'
data = {
"field": "numeroMembre",
"value": item['id']
}
email=item['email']
# Utiliser le client de test pour appeler la méthode post de Ldap_user
response = self.client.post(
f'/ldap/user/{email}?action=add',
data=json.dumps(data),
content_type='application/json'
)
return response.json, response.status_code
#if status_code != 200: raise ValueError(f"ERREUR 3 sur ldap: {email} : {res}, on arrête tout ")
else:
return "pas de résultat", response.status_code
#********************************************************************************************
#dans le ldap, mettre le numeroMembre de paheko pour toutes les entrées
#********************************************************************************************
# test lançcement de cmde ssh sur des serveurs distants:
# il faut au préalable que la clé publique de root du conteneur apikaz soit dans authorized key du user fabricer de la machine 163.172.94.54
# clé à créer dans le Dockerfile
# risque sécu ?
# cmd="ssh -p 2201 fabricer@163.172.94.54 mkdir -p /tmp/toto"
# try:
# output = subprocess.check_output(cmd, shell=True, stderr=subprocess.STDOUT)
# return "ok",200
#
# except subprocess.CalledProcessError as e:
# return e.output.decode("utf-8"), 400
#********************************************************************************************
# #***** test suppression de toutes les équipes de MM sauf KAZ
# res,status=self.mattermost_team_resource=Mattermost_team().get()
# for equipe in res:
# if equipe!="kaz": res,status=self.mattermost_team_resource=Mattermost_team().delete(equipe)
# return "fin"
#********************************************************************************************
#**** test messagerie
# NOM="toto"
# EMAIL_SOUHAITE='f@kaz.bzh'
# PASSWORD="toto"
# QUOTA="1"
# ADMIN_ORGA="0"
#
# context = {
# 'ADMIN_ORGA': ADMIN_ORGA,
# 'NOM': NOM,
# 'EMAIL_SOUHAITE': EMAIL_SOUHAITE,
# 'PASSWORD': PASSWORD,
# 'QUOTA': QUOTA,
# 'URL_WEBMAIL': webmail_url,
# 'URL_AGORA': mattermost_url,
# 'URL_MDP': mdp_url,
# 'URL_LISTE': sympa_url,
# 'URL_SITE': site_url,
# 'URL_CLOUD': cloud_url
# }
#
# subject = "KAZ: confirmation d'inscription !"
# sender=app.config['MAIL_USERNAME']
# reply_to = app.config['MAIL_REPLY_TO']
#
# msg = Message(subject=subject, sender=sender, reply_to=reply_to, recipients=[EMAIL_SOUHAITE])
# msg.html = render_template('email_inscription.html', **context)
#
# # Parsez le contenu HTML avec BeautifulSoup
# soup = BeautifulSoup(msg.html, 'html.parser')
# msg.body = soup.get_text()
#
# mail.send(msg)
# return "Message envoyé!"
#********************************************************************************************
# #**** test ms erreur
# email_secours="toto"
#
# msg=f"classe: {__class__.__name__} : ERREUR 8 : email_secours : {email_secours} " +", on arrête tout"
# #return __class__.__name__
# return msg
#********************************************************************************************
#**** test vérou
# prefixe="toto_"
# if glob(f"{tempfile.gettempdir()}/{prefixe}*"):
# return "ERREUR : api déjà en cours d'utilisation !", 400
# else:
# lock_file = tempfile.NamedTemporaryFile(prefix=prefixe,delete=True)
#
# sleep(20)
# return str(lock_file), 201