« Code de base pour la détection d anomalies » : différence entre les versions
Aller à la navigation
Aller à la recherche
Aucun résumé des modifications |
|||
| Ligne 177 : | Ligne 177 : | ||
consumer.poll() |
consumer.poll() |
||
consumer.seek_to_end(topic) |
consumer.seek_to_end(topic) |
||
</nowiki> |
|||
==Code important== |
|||
Importation de la bibliothèquue Kafka Python |
|||
<nowiki> |
|||
from kafka import KafkaProducer |
|||
</nowiki> |
</nowiki> |
||
Version du 26 mai 2023 à 13:06
Dans cette page vous trouverez le code de base pour le producer, et le consumer de détection d'erreurs ainsi que pour l'affichage des données.
Code du producer
## Importation des bibliothèques
import serial
from kafka import KafkaProducer
import time
import pickle
## Création d'un fichier pour sauvegarder les données issues du capteur (optionel)
listeDonnee = []
pickle.dump( listeDonnee, open( "saveLISTEDONNEE.txt", "wb" ) )
## Paramètre de la carte Arduino et de l'ip du serveur Kafka
portArduino = "COM3"
adresseKafka = 'localhost:9092'
##Fonction pour permettre et récupérer le nom du topic ainsi que la valeur (!dépend des données envoyées par la carte Arduino)
def separateur_serial(chaine, separateur ="!"):
indice = -1
for i in range (len(chaine) -1, -1 , -1):
if chaine[i] == separateur:
indice = i
break
topic = chaine[:indice]
valeur = chaine[indice +1:]
return topic,valeur
return valeur
## Initialisation de la connection avec la carte Arduino
serial_port = serial.Serial(port = portArduino, baudrate = 9600)
## Initialisation de la connection avec le sreveur Kafka
producer = KafkaProducer(bootstrap_servers=adresseKafka)
## On boucle à l'infini pour transmettre continuellement des données
while True:
# Récupération de la donnée issue de la carte Arduino
msg = serial_port.readline()
# convertit le msg de l'ARDUINO en chaine de caractères
msg = msg.decode('utf-8')[:-2] # enlève les \r\n
# Récupération du temps pour la sauvegarde dans un fichier externe (optionel)
temp = time.time()
donnee = (msg, temp)
print(msg)
# sauvegarde des données dans un fichier de sauvegarde !consomme du temps si le fichier est trop grand
listeDonnee = pickle.load( open( "saveLISTEDONNEE.txt", "rb" ) )
listeDonnee.append(donnee)
pickle.dump( listeDonnee, open( "saveLISTEDONNEE.txt", "wb" ) )
topic,message = separateur_serial(msg)
# print(topic,message)
# convertit le message en bytes
message_bytes = bytes(message, 'utf-8')
#envoie de la donnée sur le serveur Kafka
producer.send(topic, message_bytes)
## code pour fermeture du producer
## producer.close()
Code important
Importation de la bibliothèquue Kafka Python
from kafka import KafkaProducer
Permet l'envoie d'un message à un topic nommé, il est conseillé de d'abord convertir le message en bytes ou en JSON
message = 'Bonjour' message_bytes = bytes(message, 'utf-8') producer.send(topic, message_bytes )
Permet la fermeture du producer
producer.close()
Code du consumer
## Bibliotheque
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
from sklearn.ensemble import IsolationForest
import pandas as pd
from sklearn.cluster import DBSCAN
import time
import pickle
listeDonnee = []
pickle.dump( listeDonnee, open( "saveLISTEDONNEE_DBSCAN.txt", "wb" ) )
## Fonction pour convertir une liste en DataFrame
def convert_list_database(liste):
tableau = []
for i in range(len(liste)-1):
tableau.append(liste[i])
# return tableau
return pd.DataFrame(tableau, columns=['c1'])
## Variable
nomTopic = 'humidite'
bootstrap_servers = 'localhost:9092'
topic = TopicPartition(nomTopic, 0)
## Programme
## Création d'un consumer
consumer = KafkaConsumer(bootstrap_servers = bootstrap_servers)
## Assignation au topic que l'on va lire
consumer.assign([topic])
## Permet d'aller à la fin de la partition
consumer.poll()
consumer.seek_to_end(topic)
## Création de la FILE
taille_File = 20 +1
file = []
## Boucle des messages
for msg in consumer:
## Décodage du message, ici en UTF-8
msg = msg.value.decode("utf-8")
##Partie detection
## Partie de détection d'erreur
if len(file) > taille_File:
file.append(msg)
file.pop(0)
print("Erreur ?")
#etat allant de -1 à 1
## on met ci-dessous la fonction de détection d'anomalies
etat = detectionAnomaly(file, taille_File)
## Partie d'initialisation
else:
print("Initialisation", round(len(file)/taille_File,2)) # Pourcentage ?
file.append(msg)
# etat 2 correspond à l'initialisation
etat = -2
## Partie pour sauvegarder les erreurs dans un fichiers externes (optionnel)
temps = time.time()
valeur = msg
donnee = (valeur,temps,etat)
# sauvegarde des données dans un fichier de sauvegarde !consomme du temps si le fichier est trop grand
listeDonnee = pickle.load( open( "saveLISTEDONNEE_DBSCAN.txt", "rb" ) )
listeDonnee.append(donnee)
pickle.dump( listeDonnee, open( "saveLISTEDONNEE_DBSCAN.txt", "wb" ) )
## Aller à la fin de la partition
consumer.poll()
consumer.seek_to_end(topic)
Code important
Importation de la bibliothèquue Kafka Python
from kafka import KafkaProducer
Code pour affichage du résultat de notre code
Attention cette affichage est pour ce projet. Adapté le pour votre projet selon vos bases de données.
## Importation des bibliothèques
import matplotlib.pyplot as plt
import pickle
##Fonction pour mise en forme des données
def separateur_serial(chaine, separateur ="!"):
indice = -1
for i in range (len(chaine) -1, -1 , -1):
if chaine[i] == separateur:
indice = i
break
topic = chaine[:indice]
valeur = chaine[indice +1:]
return valeur
## variable contenant tous les fichiers avec les données
fichier_reference = "saveLISTEDONNEE.txt"
fichier1 = "saveLISTEDONNEE_DBSCAN.txt"
fichier2 = "saveLISTEDONNEE_ISOLATION.txt"
##imporation des fichiers précédants
data_ref = pickle.load( open( fichier_reference, "rb" ) )
data_DBSCAN = pickle.load( open( fichier1, "rb" ) )
data_ISOLATION = pickle.load( open( fichier2, "rb" ) )
##Affichage des nombres de mesures contenues dans les fichiers respectifs
print("DATA ref = ",len(data_ref))
print("DATA DBSCAN = ",len(data_DBSCAN))
print("DATA ISOLATION = ",len(data_ISOLATION))
def modification_dataREF(data):
"""
Remise en forme de notre base de référence
"""
dataBis = []
for elt in data:
valeur = separateur_serial(elt[0], separateur ="!")
donnee = (valeur,elt[1])
dataBis.append(donnee)
return dataBis
## Affichage avec plt des données
data_ref = modification_dataREF(data_ref)
# base de référence
y = [float(elt[0]) for elt in data_ref]
x = [float(elt[1]) for elt in data_ref]
plt.plot(x,y,color = "blue")
#DBSCAN
for elt in data_DBSCAN:
if elt[2] == 2:
plt.axvline(x=elt[1],color="k")
elif elt[2] == -1:
plt.axvline(x=elt[1],color="red")
#Isolation Forest
for elt in data_ISOLATION:
if elt[2] == 2:
plt.axvline(x=elt[1],color="k")
elif elt[2] == -1:
plt.axvline(x=elt[1],color="green")
# Remarque
# si elt[1] == 2 alors on est en itialisation
# si elt[1] == -1 alors on est en anomalie
## Affichage des données
plt.show()