Code de base pour la détection d anomalies
Aller à la navigation
Aller à la recherche
Dans cette page vous trouverez le code de base pour le producer, et le consumer de détection d'erreurs ainsi que pour l'affichage des données.
Code du producer
## Importation des bibliothèques import serial from kafka import KafkaProducer import time import pickle ## Création d'un fichier pour sauvegarder les données issues du capteur (optionel) listeDonnee = [] pickle.dump( listeDonnee, open( "saveLISTEDONNEE.txt", "wb" ) ) ## Paramètre de la carte Arduino et de l'ip du serveur Kafka portArduino = "COM3" adresseKafka = 'localhost:9092' ##Fonction pour permettre et récupérer le nom du topic ainsi que la valeur (!dépend des données envoyées par la carte Arduino) def separateur_serial(chaine, separateur ="!"): indice = -1 for i in range (len(chaine) -1, -1 , -1): if chaine[i] == separateur: indice = i break topic = chaine[:indice] valeur = chaine[indice +1:] return topic,valeur return valeur ## Initialisation de la connection avec la carte Arduino serial_port = serial.Serial(port = portArduino, baudrate = 9600) ## Initialisation de la connection avec le sreveur Kafka producer = KafkaProducer(bootstrap_servers=adresseKafka) ## On boucle à l'infini pour transmettre continuellement des données while True: # Récupération de la donnée issue de la carte Arduino msg = serial_port.readline() # convertit le msg de l'ARDUINO en chaine de caractères msg = msg.decode('utf-8')[:-2] # enlève les \r\n # Récupération du temps pour la sauvegarde dans un fichier externe (optionel) temp = time.time() donnee = (msg, temp) print(msg) # sauvegarde des données dans un fichier de sauvegarde !consomme du temps si le fichier est trop grand listeDonnee = pickle.load( open( "saveLISTEDONNEE.txt", "rb" ) ) listeDonnee.append(donnee) pickle.dump( listeDonnee, open( "saveLISTEDONNEE.txt", "wb" ) ) topic,message = separateur_serial(msg) # print(topic,message) # convertit le message en bytes message_bytes = bytes(message, 'utf-8') #envoie de la donnée sur le serveur Kafka producer.send(topic, message_bytes) ## code pour fermeture du producer ## producer.close()
Code important
Importation de la bibliothèquue Kafka Python
from kafka import KafkaProducer
Permet l'envoie d'un message à un topic nommé, il est conseillé de d'abord convertir le message en bytes ou en JSON
message = 'Bonjour' message_bytes = bytes(message, 'utf-8') producer.send(topic, message_bytes )
Permet la fermeture du producer
producer.close()
Code du consumer
## Bibliotheque from kafka import KafkaConsumer from kafka.structs import TopicPartition from sklearn.ensemble import IsolationForest import pandas as pd from sklearn.cluster import DBSCAN import time import pickle listeDonnee = [] pickle.dump( listeDonnee, open( "saveLISTEDONNEE_DBSCAN.txt", "wb" ) ) ## Fonction pour convertir une liste en DataFrame def convert_list_database(liste): tableau = [] for i in range(len(liste)-1): tableau.append(liste[i]) # return tableau return pd.DataFrame(tableau, columns=['c1']) ## Variable nomTopic = 'humidite' bootstrap_servers = 'localhost:9092' topic = TopicPartition(nomTopic, 0) ## Programme ## Création d'un consumer consumer = KafkaConsumer(bootstrap_servers = bootstrap_servers) ## Assignation au topic que l'on va lire consumer.assign([topic]) ## Permet d'aller à la fin de la partition consumer.poll() consumer.seek_to_end(topic) ## Création de la FILE taille_File = 20 +1 file = [] ## Boucle des messages for msg in consumer: ## Décodage du message, ici en UTF-8 msg = msg.value.decode("utf-8") ##Partie detection ## Partie de détection d'erreur if len(file) > taille_File: file.append(msg) file.pop(0) print("Erreur ?") #etat allant de -1 à 1 ## on met ci-dessous la fonction de détection d'anomalies etat = detectionAnomaly(file, taille_File) ## Partie d'initialisation else: print("Initialisation", round(len(file)/taille_File,2)) # Pourcentage ? file.append(msg) # etat 2 correspond à l'initialisation etat = -2 ## Partie pour sauvegarder les erreurs dans un fichiers externes (optionnel) temps = time.time() valeur = msg donnee = (valeur,temps,etat) # sauvegarde des données dans un fichier de sauvegarde !consomme du temps si le fichier est trop grand listeDonnee = pickle.load( open( "saveLISTEDONNEE_DBSCAN.txt", "rb" ) ) listeDonnee.append(donnee) pickle.dump( listeDonnee, open( "saveLISTEDONNEE_DBSCAN.txt", "wb" ) ) ## Aller à la fin de la partition consumer.poll() consumer.seek_to_end(topic)
Code important
Importation de la bibliothèquue Kafka Python
from kafka import KafkaProducer
Code pour affichage du résultat de notre code
Attention cette affichage est pour ce projet. Adapté le pour votre projet selon vos bases de données.
## Importation des bibliothèques import matplotlib.pyplot as plt import pickle ##Fonction pour mise en forme des données def separateur_serial(chaine, separateur ="!"): indice = -1 for i in range (len(chaine) -1, -1 , -1): if chaine[i] == separateur: indice = i break topic = chaine[:indice] valeur = chaine[indice +1:] return valeur ## variable contenant tous les fichiers avec les données fichier_reference = "saveLISTEDONNEE.txt" fichier1 = "saveLISTEDONNEE_DBSCAN.txt" fichier2 = "saveLISTEDONNEE_ISOLATION.txt" ##imporation des fichiers précédants data_ref = pickle.load( open( fichier_reference, "rb" ) ) data_DBSCAN = pickle.load( open( fichier1, "rb" ) ) data_ISOLATION = pickle.load( open( fichier2, "rb" ) ) ##Affichage des nombres de mesures contenues dans les fichiers respectifs print("DATA ref = ",len(data_ref)) print("DATA DBSCAN = ",len(data_DBSCAN)) print("DATA ISOLATION = ",len(data_ISOLATION)) def modification_dataREF(data): """ Remise en forme de notre base de référence """ dataBis = [] for elt in data: valeur = separateur_serial(elt[0], separateur ="!") donnee = (valeur,elt[1]) dataBis.append(donnee) return dataBis ## Affichage avec plt des données data_ref = modification_dataREF(data_ref) # base de référence y = [float(elt[0]) for elt in data_ref] x = [float(elt[1]) for elt in data_ref] plt.plot(x,y,color = "blue") #DBSCAN for elt in data_DBSCAN: if elt[2] == 2: plt.axvline(x=elt[1],color="k") elif elt[2] == -1: plt.axvline(x=elt[1],color="red") #Isolation Forest for elt in data_ISOLATION: if elt[2] == 2: plt.axvline(x=elt[1],color="k") elif elt[2] == -1: plt.axvline(x=elt[1],color="green") # Remarque # si elt[1] == 2 alors on est en itialisation # si elt[1] == -1 alors on est en anomalie ## Affichage des données plt.show()