Code de base pour la détection d anomalies
Aller à la navigation
Aller à la recherche
Dans cette page vous trouverez le code de base pour le producer, et le consumer de détection d'erreurs
Code du producer
[lien en pdf]
## Importation des bibliothèques
import serial
from kafka import KafkaProducer
import time
import pickle
## Création d'un fichier pour sauvegarder les données issues du capteur (optionel)
listeDonnee = []
pickle.dump( listeDonnee, open( "saveLISTEDONNEE.txt", "wb" ) )
## Paramètre de la carte Arduino et de l'ip du serveur Kafka
portArduino = "COM3"
adresseKafka = 'localhost:9092'
##Fonction pour permettre et récupérer le nom du topic ainsi que la valeur (!dépend )
def separateur_serial(chaine, separateur ="!"):
indice = -1
for i in range (len(chaine) -1, -1 , -1):
if chaine[i] == separateur:
indice = i
break
topic = chaine[:indice]
valeur = chaine[indice +1:]
return topic,valeur
return valeur
## Initialisation de la connection avec la carte Arduino
serial_port = serial.Serial(port = portArduino, baudrate = 9600)
## Initialisation de la connection avec le sreveur Kafka
producer = KafkaProducer(bootstrap_servers=adresseKafka)
## On boucle à l'infini pour transmettre continuellement des données
while True:
# Récupération de la donnée issue de la carte Arduino
msg = serial_port.readline()
# convertit le msg de l'ARDUINO en chaine de caractères
msg = msg.decode('utf-8')[:-2] # enlève les \r\n
# Récupération du temps pour la sauvegarde dans un fichier externe (optionel)
temp = time.time()
donnee = (msg, temp)
print(msg)
# sauvegarde des données dans un fichier de sauvegarde !consomme du temps si le fichier est trop grand
listeDonnee = pickle.load( open( "saveLISTEDONNEE.txt", "rb" ) )
listeDonnee.append(donnee)
pickle.dump( listeDonnee, open( "saveLISTEDONNEE.txt", "wb" ) )
topic,message = separateur_serial(msg)
# print(topic,message)
# convertit le message en bytes
message_bytes = bytes(message, 'utf-8')
#envoie de la donnée sur le serveur Kafka
producer.send(topic, message_bytes)
## code pour fermeture du producer
## producer.close()
Code du consumer
[lien en pdf]
## Bibliotheque
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
from sklearn.ensemble import IsolationForest
import pandas as pd
from sklearn.cluster import DBSCAN
import time
import pickle
listeDonnee = []
pickle.dump( listeDonnee, open( "saveLISTEDONNEE_DBSCAN.txt", "wb" ) )
## Fonction pour convertir une liste en DataFrame
def convert_list_database(liste):
tableau = []
for i in range(len(liste)-1):
tableau.append(liste[i])
# return tableau
return pd.DataFrame(tableau, columns=['c1'])
## Variable
nomTopic = 'humidite'
bootstrap_servers = 'localhost:9092'
topic = TopicPartition(nomTopic, 0)
## Programme
## Création d'un consumer
consumer = KafkaConsumer(bootstrap_servers = bootstrap_servers)
## Assignation au topic que l'on va lire
consumer.assign([topic])
## Permet d'aller à la fin de la partition
consumer.poll()
consumer.seek_to_end(topic)
## Création de la FILE
taille_File = 20 +1
file = []
## Boucle des messages
for msg in consumer:
## Décodage du message, ici en UTF-8
msg = msg.value.decode("utf-8")
##Partie detection
## Partie de détection d'erreur
if len(file) > taille_File:
file.append(msg)
file.pop(0)
print("Erreur ?")
#etat allant de -1 à 1
## on met ci-dessous la fonction de détection d'anomalies
etat = detectionAnomaly(file, taille_File)
## Partie d'initialisation
else:
print("Initialisation", round(len(file)/taille_File,2)) # Pourcentage ?
file.append(msg)
# etat 2 correspond à l'initialisation
etat = -2
## Partie pour sauvegarder les erreurs dans un fichiers externes (optionnel)
temps = time.time()
valeur = msg
donnee = (valeur,temps,etat)
# sauvegarde des données dans un fichier de sauvegarde !consomme du temps si le fichier est trop grand
listeDonnee = pickle.load( open( "saveLISTEDONNEE_DBSCAN.txt", "rb" ) )
listeDonnee.append(donnee)
pickle.dump( listeDonnee, open( "saveLISTEDONNEE_DBSCAN.txt", "wb" ) )
## Aller à la fin de la partition
consumer.poll()
consumer.seek_to_end(topic)