Code de base pour la détection d anomalies

De Wiki du LAMA (UMR 5127)
Aller à la navigation Aller à la recherche

Dans cette page vous trouverez le code de base pour le producer, et le consumer de détection d'erreurs

Code du producer

Code en pdf

## Importation des bibliothèques
import serial
from kafka import KafkaProducer
import time
import pickle

## Création d'un fichier pour sauvegarder les données issues du capteur (optionel)
listeDonnee = []
pickle.dump( listeDonnee, open( "saveLISTEDONNEE.txt", "wb" ) )

## Paramètre de la carte Arduino et de  l'ip du serveur Kafka
portArduino = "COM3"
adresseKafka = 'localhost:9092'

##Fonction pour permettre et récupérer le nom du topic ainsi que la valeur (!dépend des données envoyées par la carte Arduino)
def separateur_serial(chaine, separateur ="!"):
    indice = -1

    for i in range (len(chaine) -1, -1 , -1):
        if chaine[i] == separateur:
            indice = i
            break
    topic = chaine[:indice]
    valeur = chaine[indice +1:]
    return topic,valeur
    return valeur


## Initialisation de la connection avec la carte Arduino
serial_port = serial.Serial(port = portArduino, baudrate = 9600)

## Initialisation de la connection avec le sreveur Kafka
producer = KafkaProducer(bootstrap_servers=adresseKafka)

## On boucle à l'infini pour transmettre continuellement des données
while True:
    # Récupération de la donnée issue de la carte Arduino
    msg = serial_port.readline()

    # convertit le msg de l'ARDUINO en chaine de caractères
    msg = msg.decode('utf-8')[:-2] # enlève les \r\n
    # Récupération du temps pour la sauvegarde dans un fichier externe (optionel)
    temp = time.time()
    donnee = (msg, temp)
    print(msg)

    # sauvegarde des données dans un fichier de sauvegarde !consomme du temps si le fichier est trop grand
    listeDonnee = pickle.load( open( "saveLISTEDONNEE.txt", "rb" ) )
    listeDonnee.append(donnee)
    pickle.dump( listeDonnee, open( "saveLISTEDONNEE.txt", "wb" ) )

    topic,message = separateur_serial(msg)
    # print(topic,message)

    # convertit le message en bytes
    message_bytes = bytes(message, 'utf-8')

    #envoie de la donnée sur le serveur Kafka
    producer.send(topic, message_bytes)

## code pour fermeture du producer
## producer.close()

Code du consumer

[lien en pdf]

## Bibliotheque
from kafka import KafkaConsumer
from kafka.structs import TopicPartition

from sklearn.ensemble import IsolationForest
import pandas as pd

from sklearn.cluster import DBSCAN
import time

import pickle
listeDonnee = []
pickle.dump( listeDonnee, open( "saveLISTEDONNEE_DBSCAN.txt", "wb" ) )

## Fonction pour convertir une liste en DataFrame
def convert_list_database(liste):
    tableau = []
    for i in range(len(liste)-1):
        tableau.append(liste[i])
    # return tableau
    return pd.DataFrame(tableau, columns=['c1'])

## Variable
nomTopic = 'humidite'
bootstrap_servers = 'localhost:9092'
topic = TopicPartition(nomTopic, 0)

## Programme
## Création d'un consumer
consumer = KafkaConsumer(bootstrap_servers = bootstrap_servers)

## Assignation au topic que l'on va lire
consumer.assign([topic])

## Permet d'aller à la fin de la partition
consumer.poll()
consumer.seek_to_end(topic)

## Création de la FILE
taille_File = 20 +1
file = []

## Boucle des messages
for msg in consumer:
    ## Décodage du message, ici en UTF-8
    msg = msg.value.decode("utf-8")

    ##Partie detection

    ## Partie de détection d'erreur
    if len(file) > taille_File:
        file.append(msg)
        file.pop(0)

        print("Erreur ?")
        #etat allant de -1 à 1

	## on met ci-dessous la fonction de détection d'anomalies
        etat = detectionAnomaly(file, taille_File)

    ## Partie d'initialisation
    else:
        print("Initialisation", round(len(file)/taille_File,2)) # Pourcentage ?
        file.append(msg)
        # etat 2 correspond à l'initialisation
        etat = -2

    ## Partie pour sauvegarder les erreurs dans un fichiers externes (optionnel)
    temps = time.time()
    valeur = msg

    donnee = (valeur,temps,etat)
    # sauvegarde des données dans un fichier de sauvegarde !consomme du temps si le fichier est trop grand
    listeDonnee = pickle.load( open( "saveLISTEDONNEE_DBSCAN.txt", "rb" ) )
    listeDonnee.append(donnee)
    pickle.dump( listeDonnee, open( "saveLISTEDONNEE_DBSCAN.txt", "wb" ) )

    ## Aller à la fin de la partition
    consumer.poll()
    consumer.seek_to_end(topic)

Code pour affichage du résultat de notre code

Attention cette affichage est pour ce projet. Adapté le pour votre projet selon vos bases de données.

Code en pdf

## Importation des bibliothèques
import matplotlib.pyplot as plt
import pickle

##Fonction pour mise en forme des données
def separateur_serial(chaine, separateur ="!"):
    indice = -1

    for i in range (len(chaine) -1, -1 , -1):
        if chaine[i] == separateur:
            indice = i
            break
    topic = chaine[:indice]
    valeur = chaine[indice +1:]
    return valeur

## variable contenant tous les fichiers avec les données
fichier_reference = "saveLISTEDONNEE.txt"
fichier1 = "saveLISTEDONNEE_DBSCAN.txt"
fichier2 = "saveLISTEDONNEE_ISOLATION.txt"

##imporation des fichiers précédants
data_ref = pickle.load( open( fichier_reference, "rb" ) )
data_DBSCAN = pickle.load( open( fichier1, "rb" ) )
data_ISOLATION = pickle.load( open( fichier2, "rb" ) )

##Affichage des nombres de mesures contenues dans les fichiers respectifs
print("DATA ref = ",len(data_ref))
print("DATA DBSCAN = ",len(data_DBSCAN))
print("DATA ISOLATION = ",len(data_ISOLATION))

def modification_dataREF(data):
    """
    Remise en forme de notre base de référence
    """
    dataBis = []
    for elt in data:
        valeur = separateur_serial(elt[0], separateur ="!")
        donnee = (valeur,elt[1])
        dataBis.append(donnee)
    return dataBis

## Affichage avec plt des données
data_ref = modification_dataREF(data_ref)
# base de référence
y = [float(elt[0]) for elt in data_ref]
x = [float(elt[1]) for elt in data_ref]
plt.plot(x,y,color = "blue")

#DBSCAN
for elt in data_DBSCAN:
    if elt[2] == 2:
        plt.axvline(x=elt[1],color="k")
    elif elt[2] == -1:
        plt.axvline(x=elt[1],color="red")

#Isolation Forest
for elt in data_ISOLATION:
    if elt[2] == 2:
        plt.axvline(x=elt[1],color="k")
    elif elt[2] == -1:
        plt.axvline(x=elt[1],color="green")
# Remarque
# si elt[1] == 2 alors on est en itialisation
# si elt[1] == -1 alors on est en anomalie

## Affichage des données
plt.show()