Code de base pour la détection d anomalies

De Wiki du LAMA (UMR 5127)
Aller à la navigation Aller à la recherche

Dans cette page vous trouverez le code de base pour le producer, et le consumer de détection d'erreurs ainsi que pour l'affichage des données.

Code du producer

Code en pdf

## Importation des bibliothèques
import serial
from kafka import KafkaProducer
import time
import pickle

## Création d'un fichier pour sauvegarder les données issues du capteur (optionel)
listeDonnee = []
pickle.dump( listeDonnee, open( "saveLISTEDONNEE.txt", "wb" ) )

## Paramètre de la carte Arduino et de  l'ip du serveur Kafka
portArduino = "COM3"
adresseKafka = 'localhost:9092'

##Fonction pour permettre et récupérer le nom du topic ainsi que la valeur (!dépend des données envoyées par la carte Arduino)
def separateur_serial(chaine, separateur ="!"):
    indice = -1

    for i in range (len(chaine) -1, -1 , -1):
        if chaine[i] == separateur:
            indice = i
            break
    topic = chaine[:indice]
    valeur = chaine[indice +1:]
    return topic,valeur
    return valeur


## Initialisation de la connection avec la carte Arduino
serial_port = serial.Serial(port = portArduino, baudrate = 9600)

## Initialisation de la connection avec le sreveur Kafka
producer = KafkaProducer(bootstrap_servers=adresseKafka)

## On boucle à l'infini pour transmettre continuellement des données
while True:
    # Récupération de la donnée issue de la carte Arduino
    msg = serial_port.readline()

    # convertit le msg de l'ARDUINO en chaine de caractères
    msg = msg.decode('utf-8')[:-2] # enlève les \r\n
    # Récupération du temps pour la sauvegarde dans un fichier externe (optionel)
    temp = time.time()
    donnee = (msg, temp)
    print(msg)

    # sauvegarde des données dans un fichier de sauvegarde !consomme du temps si le fichier est trop grand
    listeDonnee = pickle.load( open( "saveLISTEDONNEE.txt", "rb" ) )
    listeDonnee.append(donnee)
    pickle.dump( listeDonnee, open( "saveLISTEDONNEE.txt", "wb" ) )

    topic,message = separateur_serial(msg)
    # print(topic,message)

    # convertit le message en bytes
    message_bytes = bytes(message, 'utf-8')

    #envoie de la donnée sur le serveur Kafka
    producer.send(topic, message_bytes)

## code pour fermeture du producer
## producer.close()

Code important

Importation de la bibliothèquue Kafka Python

from kafka import KafkaProducer

Permet l'envoie d'un message à un topic nommé, il est conseillé de d'abord convertir le message en bytes ou en JSON

message = 'Bonjour'

message_bytes = bytes(message, 'utf-8')

producer.send(topic, message_bytes )


Permet la fermeture du producer

producer.close()

Code du consumer

Code en pdf

## Bibliotheque
from kafka import KafkaConsumer
from kafka.structs import TopicPartition

from sklearn.ensemble import IsolationForest
import pandas as pd

from sklearn.cluster import DBSCAN
import time

import pickle
listeDonnee = []
pickle.dump( listeDonnee, open( "saveLISTEDONNEE_DBSCAN.txt", "wb" ) )

## Fonction pour convertir une liste en DataFrame
def convert_list_database(liste):
    tableau = []
    for i in range(len(liste)-1):
        tableau.append(liste[i])
    # return tableau
    return pd.DataFrame(tableau, columns=['c1'])

## Variable
nomTopic = 'humidite'
bootstrap_servers = 'localhost:9092'
topic = TopicPartition(nomTopic, 0)

## Programme
## Création d'un consumer
consumer = KafkaConsumer(bootstrap_servers = bootstrap_servers)

## Assignation au topic que l'on va lire
consumer.assign([topic])

## Permet d'aller à la fin de la partition
consumer.poll()
consumer.seek_to_end(topic)

## Création de la FILE
taille_File = 20 +1
file = []

## Boucle des messages
for msg in consumer:
    ## Décodage du message, ici en UTF-8
    msg = msg.value.decode("utf-8")

    ##Partie detection

    ## Partie de détection d'erreur
    if len(file) > taille_File:
        file.append(msg)
        file.pop(0)

        print("Erreur ?")
        #etat allant de -1 à 1

	## on met ci-dessous la fonction de détection d'anomalies
        etat = detectionAnomaly(file, taille_File)

    ## Partie d'initialisation
    else:
        print("Initialisation", round(len(file)/taille_File,2)) # Pourcentage ?
        file.append(msg)
        # etat 2 correspond à l'initialisation
        etat = -2

    ## Partie pour sauvegarder les erreurs dans un fichiers externes (optionnel)
    temps = time.time()
    valeur = msg

    donnee = (valeur,temps,etat)
    # sauvegarde des données dans un fichier de sauvegarde !consomme du temps si le fichier est trop grand
    listeDonnee = pickle.load( open( "saveLISTEDONNEE_DBSCAN.txt", "rb" ) )
    listeDonnee.append(donnee)
    pickle.dump( listeDonnee, open( "saveLISTEDONNEE_DBSCAN.txt", "wb" ) )

    ## Aller à la fin de la partition
    consumer.poll()
    consumer.seek_to_end(topic)

Code important

Importation de la bibliothèquue Kafka Python

from kafka import KafkaProducer

Code pour affichage du résultat de notre code

Attention cette affichage est pour ce projet. Adapté le pour votre projet selon vos bases de données.

Code en pdf

## Importation des bibliothèques
import matplotlib.pyplot as plt
import pickle

##Fonction pour mise en forme des données
def separateur_serial(chaine, separateur ="!"):
    indice = -1

    for i in range (len(chaine) -1, -1 , -1):
        if chaine[i] == separateur:
            indice = i
            break
    topic = chaine[:indice]
    valeur = chaine[indice +1:]
    return valeur

## variable contenant tous les fichiers avec les données
fichier_reference = "saveLISTEDONNEE.txt"
fichier1 = "saveLISTEDONNEE_DBSCAN.txt"
fichier2 = "saveLISTEDONNEE_ISOLATION.txt"

##imporation des fichiers précédants
data_ref = pickle.load( open( fichier_reference, "rb" ) )
data_DBSCAN = pickle.load( open( fichier1, "rb" ) )
data_ISOLATION = pickle.load( open( fichier2, "rb" ) )

##Affichage des nombres de mesures contenues dans les fichiers respectifs
print("DATA ref = ",len(data_ref))
print("DATA DBSCAN = ",len(data_DBSCAN))
print("DATA ISOLATION = ",len(data_ISOLATION))

def modification_dataREF(data):
    """
    Remise en forme de notre base de référence
    """
    dataBis = []
    for elt in data:
        valeur = separateur_serial(elt[0], separateur ="!")
        donnee = (valeur,elt[1])
        dataBis.append(donnee)
    return dataBis

## Affichage avec plt des données
data_ref = modification_dataREF(data_ref)
# base de référence
y = [float(elt[0]) for elt in data_ref]
x = [float(elt[1]) for elt in data_ref]
plt.plot(x,y,color = "blue")

#DBSCAN
for elt in data_DBSCAN:
    if elt[2] == 2:
        plt.axvline(x=elt[1],color="k")
    elif elt[2] == -1:
        plt.axvline(x=elt[1],color="red")

#Isolation Forest
for elt in data_ISOLATION:
    if elt[2] == 2:
        plt.axvline(x=elt[1],color="k")
    elif elt[2] == -1:
        plt.axvline(x=elt[1],color="green")
# Remarque
# si elt[1] == 2 alors on est en itialisation
# si elt[1] == -1 alors on est en anomalie

## Affichage des données
plt.show()