Code de base pour la détection d anomalies

De Wiki du LAMA (UMR 5127)
Aller à la navigation Aller à la recherche

Dans cette page vous trouverez le code de base pour le producer, et le consumer de détection d'erreurs

Code du producer

Code du consumer

[lien en pdf]

Fihcier Python consumer pour détection d'érreur

    1. Bibliotheque

from kafka import KafkaConsumer from kafka.structs import TopicPartition

from sklearn.ensemble import IsolationForest import pandas as pd

from sklearn.cluster import DBSCAN import time

import pickle listeDonnee = [] pickle.dump( listeDonnee, open( "saveLISTEDONNEE_DBSCAN.txt", "wb" ) )

    1. Fonction pour convertir une liste en DataFrame

def convert_list_database(liste):

   tableau = []
   for i in range(len(liste)-1):
       tableau.append(liste[i])
   # return tableau
   return pd.DataFrame(tableau, columns=['c1'])
    1. Variable

nomTopic = 'humidite' bootstrap_servers = 'localhost:9092' topic = TopicPartition(nomTopic, 0)

    1. Programme
    2. Création d'un consumer

consumer = KafkaConsumer(bootstrap_servers = bootstrap_servers)

    1. Assignation au topic que l'on va lire

consumer.assign([topic])

    1. Permet d'aller à la fin de la partition

consumer.poll() consumer.seek_to_end(topic)

    1. Création de la FILE

taille_File = 20 +1 file = []

    1. Boucle des messages

for msg in consumer:

   ## Décodage du message, ici en UTF-8
   msg = msg.value.decode("utf-8")
   ##Partie detection
   ## Partie de détection d'erreur
   if len(file) > taille_File:
       file.append(msg)
       file.pop(0)
       print("Erreur ?")
       #etat allant de -1 à 1

## on met ci-dessous la fonction de détection d'anomalies

       etat = detectionAnomaly(file, taille_File)
   ## Partie d'initialisation
   else:
       print("Initialisation", round(len(file)/taille_File,2)) # Pourcentage ?
       file.append(msg)
       # etat 2 correspond à l'initialisation
       etat = -2
   ## Partie pour sauvegarder les erreurs dans un fichiers externes (optionnel)
   temps = time.time()
   valeur = msg
   donnee = (valeur,temps,etat)
   # sauvegarde des données dans un fichier de sauvegarde !consomme du temps si le fichier est trop grand
   listeDonnee = pickle.load( open( "saveLISTEDONNEE_DBSCAN.txt", "rb" ) )
   listeDonnee.append(donnee)
   pickle.dump( listeDonnee, open( "saveLISTEDONNEE_DBSCAN.txt", "wb" ) )
   ## Aller à la fin de la partition
   consumer.poll()
   consumer.seek_to_end(topic)