Code de base pour la détection d anomalies
Dans cette page vous trouverez le code de base pour le producer, et le consumer de détection d'erreurs ainsi que pour l'affichage des données.
Code du producer
## Importation des bibliothèques
import serial
from kafka import KafkaProducer
import time
import pickle
## Création d'un fichier pour sauvegarder les données issues du capteur (optionel)
listeDonnee = []
pickle.dump( listeDonnee, open( "saveLISTEDONNEE.txt", "wb" ) )
## Paramètre de la carte Arduino et de l'ip du serveur Kafka
portArduino = "COM3"
adresseKafka = 'localhost:9092'
##Fonction pour permettre et récupérer le nom du topic ainsi que la valeur (!dépend des données envoyées par la carte Arduino)
def separateur_serial(chaine, separateur ="!"):
indice = -1
for i in range (len(chaine) -1, -1 , -1):
if chaine[i] == separateur:
indice = i
break
topic = chaine[:indice]
valeur = chaine[indice +1:]
return topic,valeur
return valeur
## Initialisation de la connection avec la carte Arduino
serial_port = serial.Serial(port = portArduino, baudrate = 9600)
## Initialisation de la connection avec le sreveur Kafka
producer = KafkaProducer(bootstrap_servers=adresseKafka)
## On boucle à l'infini pour transmettre continuellement des données
while True:
# Récupération de la donnée issue de la carte Arduino
msg = serial_port.readline()
# convertit le msg de l'ARDUINO en chaine de caractères
msg = msg.decode('utf-8')[:-2] # enlève les \r\n
# Récupération du temps pour la sauvegarde dans un fichier externe (optionel)
temp = time.time()
donnee = (msg, temp)
print(msg)
# sauvegarde des données dans un fichier de sauvegarde !consomme du temps si le fichier est trop grand
listeDonnee = pickle.load( open( "saveLISTEDONNEE.txt", "rb" ) )
listeDonnee.append(donnee)
pickle.dump( listeDonnee, open( "saveLISTEDONNEE.txt", "wb" ) )
topic,message = separateur_serial(msg)
# print(topic,message)
# convertit le message en bytes
message_bytes = bytes(message, 'utf-8')
#envoie de la donnée sur le serveur Kafka
producer.send(topic, message_bytes)
## code pour fermeture du producer
## producer.close()
Code important
Importation de la bibliothèquue Kafka Python
from kafka import KafkaProducer
Permet l'envoie d'un message à un topic nommé, il est conseillé de d'abord convertir le message en bytes ou en JSON
message = 'Bonjour' topic = 'NomPartition' message_bytes = bytes(message, 'utf-8') producer.send(topic, message_bytes )
Permet la fermeture du producer
producer.close()
Code du consumer
## Bibliotheque
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
from sklearn.ensemble import IsolationForest
import pandas as pd
from sklearn.cluster import DBSCAN
import time
import pickle
listeDonnee = []
pickle.dump( listeDonnee, open( "saveLISTEDONNEE_DBSCAN.txt", "wb" ) )
## Fonction pour convertir une liste en DataFrame
def convert_list_database(liste):
tableau = []
for i in range(len(liste)-1):
tableau.append(liste[i])
# return tableau
return pd.DataFrame(tableau, columns=['c1'])
## Variable
nomTopic = 'humidite'
bootstrap_servers = 'localhost:9092'
topic = TopicPartition(nomTopic, 0)
## Programme
## Création d'un consumer
consumer = KafkaConsumer(bootstrap_servers = bootstrap_servers)
## Assignation au topic que l'on va lire
consumer.assign([topic])
## Permet d'aller à la fin de la partition
consumer.poll()
consumer.seek_to_end(topic)
## Création de la FILE
taille_File = 20 +1
file = []
## Boucle des messages
for msg in consumer:
## Décodage du message, ici en UTF-8
msg = msg.value.decode("utf-8")
##Partie detection
## Partie de détection d'erreur
if len(file) > taille_File:
file.append(msg)
file.pop(0)
print("Erreur ?")
#etat allant de -1 à 1
## on met ci-dessous la fonction de détection d'anomalies
etat = detectionAnomaly(file, taille_File)
## Partie d'initialisation
else:
print("Initialisation", round(len(file)/taille_File,2)) # Pourcentage ?
file.append(msg)
# etat 2 correspond à l'initialisation
etat = -2
## Partie pour sauvegarder les erreurs dans un fichiers externes (optionnel)
temps = time.time()
valeur = msg
donnee = (valeur,temps,etat)
# sauvegarde des données dans un fichier de sauvegarde !consomme du temps si le fichier est trop grand
listeDonnee = pickle.load( open( "saveLISTEDONNEE_DBSCAN.txt", "rb" ) )
listeDonnee.append(donnee)
pickle.dump( listeDonnee, open( "saveLISTEDONNEE_DBSCAN.txt", "wb" ) )
## Aller à la fin de la partition
consumer.poll()
consumer.seek_to_end(topic)
Code important
Importation des bibliothèques Kafka Python
from kafka import KafkaConsumer from kafka.structs import TopicPartition
Création d'un consumer ainsi que son asignation à sa partition
nomTopic = UnNomDeTopic bootstrap_servers = AdresseIP:Port numPartition = LeNuméroDeLaPartition partition= TopicPartition(nomTopic, numPartition) consumer = KafkaConsumer(bootstrap_servers = bootstrap_servers) consumer.assign([partition])
Se rendre à la fin de la partition
consumer.poll() consumer.seek_to_end(numPartition)
Lecture de la partition, attention on a une boucle for mais elle a le comportement d'une boucle while. Si vous avez encodé votre message alors la première chose à faire est de le décoder.
for msg in consumer:
msg = msg.value.decode("utf-8")
Fermeture d'un consumer
consumer.close()
Code pour affichage du résultat de notre code
Attention cette affichage est pour ce projet. Adapté le pour votre projet selon vos bases de données.
## Importation des bibliothèques
import matplotlib.pyplot as plt
import pickle
##Fonction pour mise en forme des données
def separateur_serial(chaine, separateur ="!"):
indice = -1
for i in range (len(chaine) -1, -1 , -1):
if chaine[i] == separateur:
indice = i
break
topic = chaine[:indice]
valeur = chaine[indice +1:]
return valeur
## variable contenant tous les fichiers avec les données
fichier_reference = "saveLISTEDONNEE.txt"
fichier1 = "saveLISTEDONNEE_DBSCAN.txt"
fichier2 = "saveLISTEDONNEE_ISOLATION.txt"
##imporation des fichiers précédants
data_ref = pickle.load( open( fichier_reference, "rb" ) )
data_DBSCAN = pickle.load( open( fichier1, "rb" ) )
data_ISOLATION = pickle.load( open( fichier2, "rb" ) )
##Affichage des nombres de mesures contenues dans les fichiers respectifs
print("DATA ref = ",len(data_ref))
print("DATA DBSCAN = ",len(data_DBSCAN))
print("DATA ISOLATION = ",len(data_ISOLATION))
def modification_dataREF(data):
"""
Remise en forme de notre base de référence
"""
dataBis = []
for elt in data:
valeur = separateur_serial(elt[0], separateur ="!")
donnee = (valeur,elt[1])
dataBis.append(donnee)
return dataBis
## Affichage avec plt des données
data_ref = modification_dataREF(data_ref)
# base de référence
y = [float(elt[0]) for elt in data_ref]
x = [float(elt[1]) for elt in data_ref]
plt.plot(x,y,color = "blue")
#DBSCAN
for elt in data_DBSCAN:
if elt[2] == 2:
plt.axvline(x=elt[1],color="k")
elif elt[2] == -1:
plt.axvline(x=elt[1],color="red")
#Isolation Forest
for elt in data_ISOLATION:
if elt[2] == 2:
plt.axvline(x=elt[1],color="k")
elif elt[2] == -1:
plt.axvline(x=elt[1],color="green")
# Remarque
# si elt[1] == 2 alors on est en itialisation
# si elt[1] == -1 alors on est en anomalie
## Affichage des données
plt.show()