Code de base pour la détection d anomalies
Dans cette page vous trouverez le code de base pour le producer, et le consumer de détection d'erreurs ainsi que pour l'affichage des données.
Code du producer
## Importation des bibliothèques import serial from kafka import KafkaProducer import time import pickle ## Création d'un fichier pour sauvegarder les données issues du capteur (optionel) listeDonnee = [] pickle.dump( listeDonnee, open( "saveLISTEDONNEE.txt", "wb" ) ) ## Paramètre de la carte Arduino et de l'ip du serveur Kafka portArduino = "COM3" adresseKafka = 'localhost:9092' ##Fonction pour permettre et récupérer le nom du topic ainsi que la valeur (!dépend des données envoyées par la carte Arduino) def separateur_serial(chaine, separateur ="!"): indice = -1 for i in range (len(chaine) -1, -1 , -1): if chaine[i] == separateur: indice = i break topic = chaine[:indice] valeur = chaine[indice +1:] return topic,valeur return valeur ## Initialisation de la connection avec la carte Arduino serial_port = serial.Serial(port = portArduino, baudrate = 9600) ## Initialisation de la connection avec le sreveur Kafka producer = KafkaProducer(bootstrap_servers=adresseKafka) ## On boucle à l'infini pour transmettre continuellement des données while True: # Récupération de la donnée issue de la carte Arduino msg = serial_port.readline() # convertit le msg de l'ARDUINO en chaine de caractères msg = msg.decode('utf-8')[:-2] # enlève les \r\n # Récupération du temps pour la sauvegarde dans un fichier externe (optionel) temp = time.time() donnee = (msg, temp) print(msg) # sauvegarde des données dans un fichier de sauvegarde !consomme du temps si le fichier est trop grand listeDonnee = pickle.load( open( "saveLISTEDONNEE.txt", "rb" ) ) listeDonnee.append(donnee) pickle.dump( listeDonnee, open( "saveLISTEDONNEE.txt", "wb" ) ) topic,message = separateur_serial(msg) # print(topic,message) # convertit le message en bytes message_bytes = bytes(message, 'utf-8') #envoie de la donnée sur le serveur Kafka producer.send(topic, message_bytes) ## code pour fermeture du producer ## producer.close()
Code important
Importation de la bibliothèquue Kafka Python
from kafka import KafkaProducer
Permet l'envoie d'un message à un topic nommé, il est conseillé de d'abord convertir le message en bytes ou en JSON
message = 'Bonjour' message_bytes = bytes(message, 'utf-8') producer.send(topic, message_bytes )
Permet la fermeture du producer
producer.close()
Code du consumer
## Bibliotheque from kafka import KafkaConsumer from kafka.structs import TopicPartition from sklearn.ensemble import IsolationForest import pandas as pd from sklearn.cluster import DBSCAN import time import pickle listeDonnee = [] pickle.dump( listeDonnee, open( "saveLISTEDONNEE_DBSCAN.txt", "wb" ) ) ## Fonction pour convertir une liste en DataFrame def convert_list_database(liste): tableau = [] for i in range(len(liste)-1): tableau.append(liste[i]) # return tableau return pd.DataFrame(tableau, columns=['c1']) ## Variable nomTopic = 'humidite' bootstrap_servers = 'localhost:9092' topic = TopicPartition(nomTopic, 0) ## Programme ## Création d'un consumer consumer = KafkaConsumer(bootstrap_servers = bootstrap_servers) ## Assignation au topic que l'on va lire consumer.assign([topic]) ## Permet d'aller à la fin de la partition consumer.poll() consumer.seek_to_end(topic) ## Création de la FILE taille_File = 20 +1 file = [] ## Boucle des messages for msg in consumer: ## Décodage du message, ici en UTF-8 msg = msg.value.decode("utf-8") ##Partie detection ## Partie de détection d'erreur if len(file) > taille_File: file.append(msg) file.pop(0) print("Erreur ?") #etat allant de -1 à 1 ## on met ci-dessous la fonction de détection d'anomalies etat = detectionAnomaly(file, taille_File) ## Partie d'initialisation else: print("Initialisation", round(len(file)/taille_File,2)) # Pourcentage ? file.append(msg) # etat 2 correspond à l'initialisation etat = -2 ## Partie pour sauvegarder les erreurs dans un fichiers externes (optionnel) temps = time.time() valeur = msg donnee = (valeur,temps,etat) # sauvegarde des données dans un fichier de sauvegarde !consomme du temps si le fichier est trop grand listeDonnee = pickle.load( open( "saveLISTEDONNEE_DBSCAN.txt", "rb" ) ) listeDonnee.append(donnee) pickle.dump( listeDonnee, open( "saveLISTEDONNEE_DBSCAN.txt", "wb" ) ) ## Aller à la fin de la partition consumer.poll() consumer.seek_to_end(topic)
Code important
Importation des bibliothèques Kafka Python
from kafka import KafkaConsumer from kafka.structs import TopicPartition
Création d'un consumer ainsi que son asignation à sa partition
nomTopic = UnNomDeTopic bootstrap_servers = AdresseIP:Port numPartition = LeNuméroDeLaPartition partition= TopicPartition(nomTopic, numPartition) consumer = KafkaConsumer(bootstrap_servers = bootstrap_servers) consumer.assign([partition])
Se rendre à la fin de la partition
consumer.poll() consumer.seek_to_end(numPartition)
Lecture de la partition, attention on a une boucle for mais elle a le comportement d'une boucle while. Si vous avez encodé votre message alors la première chose à faire est de le décoder.
for msg in consumer: msg = msg.value.decode("utf-8")
Fermeture d'un consumer
consumer.close()
Code pour affichage du résultat de notre code
Attention cette affichage est pour ce projet. Adapté le pour votre projet selon vos bases de données.
## Importation des bibliothèques import matplotlib.pyplot as plt import pickle ##Fonction pour mise en forme des données def separateur_serial(chaine, separateur ="!"): indice = -1 for i in range (len(chaine) -1, -1 , -1): if chaine[i] == separateur: indice = i break topic = chaine[:indice] valeur = chaine[indice +1:] return valeur ## variable contenant tous les fichiers avec les données fichier_reference = "saveLISTEDONNEE.txt" fichier1 = "saveLISTEDONNEE_DBSCAN.txt" fichier2 = "saveLISTEDONNEE_ISOLATION.txt" ##imporation des fichiers précédants data_ref = pickle.load( open( fichier_reference, "rb" ) ) data_DBSCAN = pickle.load( open( fichier1, "rb" ) ) data_ISOLATION = pickle.load( open( fichier2, "rb" ) ) ##Affichage des nombres de mesures contenues dans les fichiers respectifs print("DATA ref = ",len(data_ref)) print("DATA DBSCAN = ",len(data_DBSCAN)) print("DATA ISOLATION = ",len(data_ISOLATION)) def modification_dataREF(data): """ Remise en forme de notre base de référence """ dataBis = [] for elt in data: valeur = separateur_serial(elt[0], separateur ="!") donnee = (valeur,elt[1]) dataBis.append(donnee) return dataBis ## Affichage avec plt des données data_ref = modification_dataREF(data_ref) # base de référence y = [float(elt[0]) for elt in data_ref] x = [float(elt[1]) for elt in data_ref] plt.plot(x,y,color = "blue") #DBSCAN for elt in data_DBSCAN: if elt[2] == 2: plt.axvline(x=elt[1],color="k") elif elt[2] == -1: plt.axvline(x=elt[1],color="red") #Isolation Forest for elt in data_ISOLATION: if elt[2] == 2: plt.axvline(x=elt[1],color="k") elif elt[2] == -1: plt.axvline(x=elt[1],color="green") # Remarque # si elt[1] == 2 alors on est en itialisation # si elt[1] == -1 alors on est en anomalie ## Affichage des données plt.show()