Echtzeit-Gesichtserkennung: Ein Webbasierter Video-Stream-Server für Sicherheit und Identifikation
Verfasst: Sonntag 26. Januar 2025, 20:59
"Ich arbeite derzeit an einem Projekt zur Echtzeit-Gesichtserkennung über einen Videostream, aber das Bild wird sehr ruckelig übertragen. Da ich noch nicht viel Erfahrung im Programmieren habe, möchte ich wissen, welche Schritte ich unternehmen kann, um die Performance meiner Videoübertragung zu verbessern und ein flüssigeres Bild zu erhalten. Kann mir jemand dabei helfen, die Effizienz meines Codes zu optimieren?
Code: Alles auswählen
#!/usr/bin/env python3
import os
import cv2
import numpy as np
import threading
import time
from http.server import BaseHTTPRequestHandler, HTTPServer
# Portnummer für den HTTP-Server
PORT = 8000
# Flag, um den Status des Videostreams zu verfolgen
running = False
# IP-Adresse des Hosts
HOST_IP = '192.168.178.21'
# Pfad zur Haar-Cascade-Datei für die Gesichtserkennung
cascadePath = 'Cascades/haarcascade/haarcascade_frontalface_default.xml'
class Gesichtserkennung:
def __init__(self, cascadePath):
self.cascadePath = cascadePath # Setzt den Pfad zur Cascade-Datei
self.recognizer = cv2.face.LBPHFaceRecognizer_create() # Erstellt einen LBPH-Face-Recognizer
self.faceCascade = cv2.CascadeClassifier(self.cascadePath) # Lädt die Haar-Cascade
self.names = [None] # Liste der Personen (wird beim Training gefüllt)
# Methode zum Drucken von Informationsnachrichten
def Infoprint(self, inhalt):
print(f'\033[93m{"\n[INFO] "+inhalt+". ..."}\033[0m')
# Methode zum Drucken von Warnungen
def Warnungprint(self, inhalt):
print(f'\033[33m{"\n[Warnung] "+inhalt+". ..."}\033[0m')
# Methode zum Drucken von Fehlermeldungen
def Errorprint(self, inhalt):
print(f'\033[31m{"\n[ERROR] "+inhalt+"."}\033[0m')
# Methode zum Drucken von Aktionen
def Aktionprint(self, inhalt):
print(f'\033[32m{"\n[Aktion] "+inhalt+"."}\033[0m')
# Methode zum Drucken von Eingabeaufforderungen
def Eingabeprint(self, inhalt):
print(f'\033[34m{"\n[Eingabe] "+inhalt+". ..."}\033[0m')
# Methode zum Abrufen von Gesichtern und IDs für das Training
def Get_Images_And_Labels(self):
faces = []
ids = []
# Durchsuche den Ordner 'Personen' nach Unterordnern
for root, dirs, files in os.walk("Personen"):
for dir_name in dirs:
label_str, person_name = dir_name.split('_', 1)
label = int(label_str)
self.names.append(person_name)
dir_path = os.path.join(root, dir_name)
for file_name in os.listdir(dir_path):
if file_name.endswith('.jpg') or file_name.endswith('.png'):
image_path = os.path.join(dir_path, file_name)
image = cv2.imread(image_path, cv2.IMREAD_GRAYSCALE)
faces.append(image)
ids.append(label)
return faces, ids
# Methode zum Trainieren des Gesichtserkennungsmodells
def Trainer(self):
self.Aktionprint("Initialisiere Trainer")
if not os.path.exists('./Trainer/'):
try:
os.makedirs('./Trainer/') # Erstellt den Ordner 'Trainer', falls nicht vorhanden
self.Aktionprint("Ordner 'Trainer' erfolgreich erstellt")
except:
self.Errorprint("Ordner 'Trainer' konnte nicht erstellt werden")
return
self.Infoprint("Trainiere Gesichter. Dies wird einige Sekunden dauern. Bitte warten...")
faces, ids = self.Get_Images_And_Labels() # Ruft die Gesichter und IDs ab
if faces and ids:
self.recognizer.train(faces, np.array(ids)) # Trainiert den Recognizer
self.Aktionprint("Speichere Trainermodel")
try:
self.recognizer.write('Trainer/trainer.yml') # Speichert das Modell
self.Infoprint(f"{len(np.unique(ids))} Gesichter trainiert") # Informiert über die Anzahl
except:
self.Errorprint("Speichern des Trainermodels nicht möglich")
else:
self.Warnungprint("Keine Trainingsdaten gefunden")
# Methode zur Gesichtserkennung in einem Frame
def Face_Recognition(self, frame):
# Lädt das trainierte Modell, falls noch nicht geladen
if not os.path.exists('Trainer/trainer.yml'):
self.Warnungprint("Trainermodel nicht gefunden. Bitte trainieren Sie erst das Modell.")
return frame
self.recognizer.read('Trainer/trainer.yml')
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) # Konvertiert in Graustufen
faces = self.faceCascade.detectMultiScale(
gray,
scaleFactor=1.2,
minNeighbors=5,
minSize=(int(0.15 * frame.shape[1]), int(0.15 * frame.shape[0]))
)
for (x, y, w, h) in faces:
roi_gray = gray[y:y+h, x:x+w]
id, confidence = self.recognizer.predict(roi_gray)
if id < len(self.names) and confidence < 100:
name = self.names[id]
confidence_text = f" {100 - round(confidence)}%"
cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
cv2.putText(frame, name, (x + 5, y - 5),
cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)
cv2.putText(frame, confidence_text, (x + 5, y + h - 5),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 1)
else:
name = "Unbekannt"
confidence_text = f" {100 - round(confidence)}%"
cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 0, 255), 2)
cv2.putText(frame, name, (x + 5, y - 5),
cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)
cv2.putText(frame, confidence_text, (x + 5, y + h - 5),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 1)
return frame
# HTTP-Handler-Klasse zur Verwaltung der HTTP-Anfragen
class VIDEOSTREAMHANDLER(BaseHTTPRequestHandler):
# Instanz von Gesichtserkennung auf Klassenebene
erkennung = Gesichtserkennung(cascadePath)
def do_GET(self):
global running
try:
# Hauptseite aufrufen
if self.path == '/':
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
# HTML-Inhalt der Webseite
self.wfile.write(b'''
<html>
<head>
<title>Gesichtserkennung</title>
<script>
// Funktion zum Starten des Streams
function startStream() {
var xhr = new XMLHttpRequest();
xhr.open("GET", "/start", true);
xhr.send();
document.getElementById('video').src = '/video';
}
// Funktion zum Stoppen des Streams
function stopStream() {
var xhr = new XMLHttpRequest();
xhr.open("GET", "/stop", true);
xhr.send();
document.getElementById('video').src = '';
}
// Funktion zum Trainieren des Modells
function trainModel() {
var xhr = new XMLHttpRequest();
xhr.open("GET", "/train", true);
xhr.send();
alert('Training gestartet. Bitte warten Sie einige Sekunden.');
}
</script>
</head>
<body>
<h1>Gesichtserkennung</h1>
<button onclick="startStream()">Start Stream</button>
<button onclick="stopStream()">Stop Stream</button>
<button onclick="trainModel()">Train Model</button> <!-- Neuer Train-Button -->
<img id="video" src="" style="position: absolute; top: 0; right: 0;" />
</body>
</html>
''')
# Videostream anfordern
elif self.path == '/video':
if running:
self.send_response(200)
self.send_header('Content-type', 'multipart/x-mixed-replace; boundary=frame')
self.end_headers()
cam = cv2.VideoCapture(0)
cam.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cam.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
# Funktion zum Streamen des Videos
def Stream_Video():
try:
while running:
ret, frame = cam.read()
if not ret:
break
frame = self.erkennung.Face_Recognition(frame)
_, jpeg = cv2.imencode('.jpg', frame)
try:
self.wfile.write(b'--frame\r\n')
self.send_header('Content-Type', 'image/jpeg')
self.send_header('Content-Length', str(len(jpeg)))
self.end_headers()
self.wfile.write(jpeg.tobytes())
self.wfile.write(b'\r\n')
except OSError as e:
print(f"Socket-Fehler: {e}")
break
time.sleep(0.1)
except (BrokenPipeError, ConnectionResetError):
pass
finally:
cam.release()
Stream_Video()
else:
self.send_response(404)
self.end_headers()
# Stream starten
elif self.path == '/start':
running = True
self.send_response(200)
self.end_headers()
# Stream stoppen
elif self.path == '/stop':
running = False
self.send_response(200)
self.end_headers()
# Trainingsprozess starten
elif self.path == '/train':
self.send_response(200)
self.end_headers()
# Training in separatem Thread starten
training_thread = threading.Thread(target=self.erkennung.Trainer)
training_thread.start()
except Exception as e:
print(f"Ausnahme in do_GET: {e}")
class VIDEOSTREAMSERVER:
def __init__(self, port, ip):
self.port = port
self.ip = ip
self.httpd = None
self.server_thread = None
# Methode zum Starten des Servers
def Start_Server(self):
self.server_thread = threading.Thread(target=self.Run_Server)
self.server_thread.start()
# Methode zum Ausführen des Servers
def Run_Server(self):
self.httpd = HTTPServer((self.ip, self.port), VIDEOSTREAMHANDLER)
print(f"Server läuft auf {self.ip}:{self.port}")
self.httpd.serve_forever()
if __name__ == "__main__":
server = VIDEOSTREAMSERVER(PORT, HOST_IP)
server.Start_Server()