Code: Alles auswählen
#!/usr/bin/env python3
import os
import cv2
import numpy as np
import threading
import time
from http.server import BaseHTTPRequestHandler, HTTPServer
# Portnummer für den HTTP-Server
PORT = 8000
# Flag, um den Status des Videostreams zu verfolgen
running = False
# IP-Adresse des Hosts
HOST_IP = '192.168.178.21'
# Pfad zur Haar-Cascade-Datei für die Gesichtserkennung
cascadePath = 'Cascades/haarcascade/haarcascade_frontalface_default.xml'
class Gesichtserkennung:
def __init__(self, cascadePath):
self.cascadePath = cascadePath # Setzt den Pfad zur Cascade-Datei
self.recognizer = cv2.face.LBPHFaceRecognizer_create() # Erstellt einen LBPH-Face-Recognizer
self.faceCascade = cv2.CascadeClassifier(self.cascadePath) # Lädt die Haar-Cascade
self.names = [None] # Liste der Personen (wird beim Training gefüllt)
# Methode zum Drucken von Informationsnachrichten
def Infoprint(self, inhalt):
print(f'\033[93m{"\n[INFO] "+inhalt+". ..."}\033[0m')
# Methode zum Drucken von Warnungen
def Warnungprint(self, inhalt):
print(f'\033[33m{"\n[Warnung] "+inhalt+". ..."}\033[0m')
# Methode zum Drucken von Fehlermeldungen
def Errorprint(self, inhalt):
print(f'\033[31m{"\n[ERROR] "+inhalt+"."}\033[0m')
# Methode zum Drucken von Aktionen
def Aktionprint(self, inhalt):
print(f'\033[32m{"\n[Aktion] "+inhalt+"."}\033[0m')
# Methode zum Drucken von Eingabeaufforderungen
def Eingabeprint(self, inhalt):
print(f'\033[34m{"\n[Eingabe] "+inhalt+". ..."}\033[0m')
# Methode zum Abrufen von Gesichtern und IDs für das Training
def Get_Images_And_Labels(self):
faces = []
ids = []
# Durchsuche den Ordner 'Personen' nach Unterordnern
for root, dirs, files in os.walk("Personen"):
for dir_name in dirs:
label_str, person_name = dir_name.split('_', 1)
label = int(label_str)
self.names.append(person_name)
dir_path = os.path.join(root, dir_name)
for file_name in os.listdir(dir_path):
if file_name.endswith('.jpg') or file_name.endswith('.png'):
image_path = os.path.join(dir_path, file_name)
image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
faces.append(image)
ids.append(label)
return faces, ids
# Methode zum Trainieren des Gesichtserkennungsmodells
def Trainer(self):
self.Aktionprint("Initialisiere Trainer")
if not os.path.exists('./Trainer/'):
try:
os.makedirs('./Trainer/') # Erstellt den Ordner 'Trainer', falls nicht vorhanden
self.Aktionprint("Ordner 'Trainer' erfolgreich erstellt")
except:
self.Errorprint("Ordner 'Trainer' konnte nicht erstellt werden")
return
self.Infoprint("Trainiere Gesichter. Dies wird einige Sekunden dauern. Bitte warten...")
faces, ids = self.Get_Images_And_Labels() # Ruft die Gesichter und IDs ab
if faces and ids:
self.recognizer.train(faces, np.array(ids)) # Trainiert den Recognizer
self.Aktionprint("Speichere Trainermodel")
try:
self.recognizer.write('Trainer/trainer.yml') # Speichert das Modell
self.Infoprint(f"{len(np.unique(ids))} Gesichter trainiert") # Informiert über die Anzahl
except:
self.Errorprint("Speichern des Trainermodels nicht möglich")
else:
self.Warnungprint("Keine Trainingsdaten gefunden")
# Methode zur Gesichtserkennung in einem Frame
def Face_Recognition(self, frame):
# Lädt das trainierte Modell, falls noch nicht geladen
if not os.path.exists('Trainer/trainer.yml'):
self.Warnungprint("Trainermodel nicht gefunden. Bitte trainieren Sie erst das Modell.")
return frame
self.recognizer.read('Trainer/trainer.yml')
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) # Konvertiert in Graustufen
faces = self.faceCascade.detectMultiScale(
gray,
scaleFactor=1.2,
minNeighbors=5,
minSize=(int(0.15 * frame.shape[1]), int(0.15 * frame.shape[0]))
)
for (x, y, w, h) in faces:
roi_gray = gray[y:y+h, x:x+w]
id, confidence = self.recognizer.predict(roi_gray)
if id < len(self.names) and confidence < 100:
name = self.names[id]
confidence_text = f" {100 - round(confidence)}%"
cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
cv2.putText(frame, name, (x + 5, y - 5),
cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)
cv2.putText(frame, confidence_text, (x + 5, y + h - 5),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 1)
else:
name = "Unbekannt"
confidence_text = f" {100 - round(confidence)}%"
cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 0, 255), 2)
cv2.putText(frame, name, (x + 5, y - 5),
cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)
cv2.putText(frame, confidence_text, (x + 5, y + h - 5),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1)
return frame
# HTTP-Handler-Klasse zur Verwaltung der HTTP-Anfragen
class VIDEOSTREAMHANDLER(BaseHTTPRequestHandler):
# Instanz von Gesichtserkennung auf Klassenebene
erkennung = Gesichtserkennung(cascadePath)
def do_GET(self):
global running
try:
# Hauptseite aufrufen
if self.path == '/':
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
# HTML-Inhalt der Webseite
self.wfile.write(b'''
<html>
<head>
<title>Gesichtserkennung</title>
<script>
// Funktion zum Starten des Streams
function startStream() {
var xhr = new XMLHttpRequest();
xhr.open("GET", "/start", true);
xhr.send();
document.getElementById('video').src = '/video';
}
// Funktion zum Stoppen des Streams
function stopStream() {
var xhr = new XMLHttpRequest();
xhr.open("GET", "/stop", true);
xhr.send();
document.getElementById('video').src = '';
}
// Funktion zum Trainieren des Modells
function trainModel() {
var xhr = new XMLHttpRequest();
xhr.open("GET", "/train", true);
xhr.send();
alert('Training gestartet. Bitte warten Sie einige Sekunden.');
}
</script>
</head>
<body>
<h1>Gesichtserkennung</h1>
<button onclick="startStream()">Start Stream</button>
<button onclick="stopStream()">Stop Stream</button>
<button onclick="trainModel()">Train Model</button> <!-- Neuer Train-Button -->
<img id="video" src="" style="position: absolute; top: 0; right: 0;" />
</body>
</html>
''')
# Videostream anfordern
elif self.path == '/video':
if running:
self.send_response(200)
self.send_header('Content-type', 'multipart/x-mixed-replace; boundary=frame')
self.end_headers()
cam = cv2.VideoCapture(0)
cam.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cam.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
# Funktion zum Streamen des Videos
def Stream_Video():
try:
while running:
ret, frame = cam.read()
if not ret:
break
frame = self.erkennung.Face_Recognition(frame)
_, jpeg = cv2.imencode('.jpg', frame)
try:
self.wfile.write(b'--frame\r\n')
self.send_header('Content-Type', 'image/jpeg')
self.send_header('Content-Length', str(len(jpeg)))
self.end_headers()
self.wfile.write(jpeg.tobytes())
self.wfile.write(b'\r\n')
except OSError as e:
print(f"Socket-Fehler: {e}")
break
time.sleep(0.1)
except (BrokenPipeError, ConnectionResetError):
pass
finally:
cam.release()
Stream_Video()
else:
self.send_response(404)
self.end_headers()
# Stream starten
elif self.path == '/start':
running = True
self.send_response(200)
self.end_headers()
# Stream stoppen
elif self.path == '/stop':
running = False
self.send_response(200)
self.end_headers()
# Trainingsprozess starten
elif self.path == '/train':
self.send_response(200)
self.end_headers()
# Training in separatem Thread starten
training_thread = threading.Thread(target=self.erkennung.Trainer)
training_thread.start()
except Exception as e:
print(f"Ausnahme in do_GET: {e}")
class VIDEOSTREAMSERVER:
def __init__(self, port, ip):
self.port = port
self.ip = ip
self.httpd = None
self.server_thread = None
# Methode zum Starten des Servers
def Start_Server(self):
self.server_thread = threading.Thread(target=self.Run_Server)
self.server_thread.start()
# Methode zum Ausführen des Servers
def Run_Server(self):
self.httpd = HTTPServer((self.ip, self.port), VIDEOSTREAMHANDLER)
print(f"Server läuft auf {self.ip}:{self.port}")
self.httpd.serve_forever()
if __name__ == "__main__":
server = VIDEOSTREAMSERVER(PORT, HOST_IP)
server.Start_Server()