Machine Learning mit einer pyqt5 GUI. Wie "sauber" ist mein Code programmiert? Was kann man verbessern?
Verfasst: Mittwoch 26. Mai 2021, 18:55
Hallo alle zusammen,
ich habe die gleiche Frage vor einiger Zeit schon mal bei einem deutlich kleineren Programm gestellt und damals sehr gutes und hilfreiches Feedback bekommen. Deshalb wollte ich noch ein zweites mal nachfragen. Es funktioniert soweit alles ganz gut, ich frage mich nur wie "sauber" das alles programmiert ist und was man verbessern kann. Das Programm ist insgesamt doch relativ lang. Ich erwarte daher natürlich von niemanden, dass er das Programm bis ins kleinste Detail analysiert. Mir ist jedes Feedback sehr willkommen. Grundsätzlich bin ich natürlich an Feedback zum allgemeinen Aufbau oder der Struktur des Programms interessiert, aber auch zu Kleinigkeiten, wenn jemandem etwas auffällt.
Ich habe eine kleine GUI programmiert über die man verschiedene Machine Learning Verfahren verwenden kann. Ich hänge mal ein Bild der GUI an, damit man sich das ein bisschen besser vorstellen kann und dann natürlich noch den Code. Der Code ist auf zwei Module aufgeteilt. main.py und models.py. Vielen Dank schon mal an jeden der sich die Zeit nimmt und mal rüber schaut.

main.py
models.py
ich habe die gleiche Frage vor einiger Zeit schon mal bei einem deutlich kleineren Programm gestellt und damals sehr gutes und hilfreiches Feedback bekommen. Deshalb wollte ich noch ein zweites mal nachfragen. Es funktioniert soweit alles ganz gut, ich frage mich nur wie "sauber" das alles programmiert ist und was man verbessern kann. Das Programm ist insgesamt doch relativ lang. Ich erwarte daher natürlich von niemanden, dass er das Programm bis ins kleinste Detail analysiert. Mir ist jedes Feedback sehr willkommen. Grundsätzlich bin ich natürlich an Feedback zum allgemeinen Aufbau oder der Struktur des Programms interessiert, aber auch zu Kleinigkeiten, wenn jemandem etwas auffällt.
Ich habe eine kleine GUI programmiert über die man verschiedene Machine Learning Verfahren verwenden kann. Ich hänge mal ein Bild der GUI an, damit man sich das ein bisschen besser vorstellen kann und dann natürlich noch den Code. Der Code ist auf zwei Module aufgeteilt. main.py und models.py. Vielen Dank schon mal an jeden der sich die Zeit nimmt und mal rüber schaut.

main.py
Code: Alles auswählen
import sys
from pathlib import Path
from PyQt5.QtCore import Qt, QFile, QTextStream, QThread, pyqtSignal
from PyQt5 import uic, QtGui
from PyQt5.QtWidgets import QMessageBox, QMainWindow, QApplication, QFileDialog
import time
from models import classifier, Neural_Network
import numpy as np
from sklearn.tree import DecisionTreeClassifier
from sklearn.experimental import enable_hist_gradient_boosting
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier, HistGradientBoostingClassifier, AdaBoostClassifier
from sklearn.svm import SVC
from sklearn.linear_model import LogisticRegression
from sklearn.neighbors import KNeighborsClassifier
from xgboost import XGBClassifier
import pandas as pd
from sklearn.preprocessing import PowerTransformer
from sklearn.model_selection import train_test_split
import json
import math
class MainWindow:
def __init__(self):
self.ui = uic.loadUi("gui.ui")
self.ui.browse_button.clicked.connect(self.open_file)
self.ui.train_dt.clicked.connect(self.button_decision_tree)
self.ui.train_rf.clicked.connect(self.button_random_forest)
self.ui.train_svm.clicked.connect(self.button_support_vector_machine)
self.ui.train_lr.clicked.connect(self.button_logistic_regression)
self.ui.train_knn.clicked.connect(self.button_k_nearest_neighbor)
self.ui.train_gb.clicked.connect(self.button_gradient_boost)
self.ui.train_hgb.clicked.connect(self.button_hist_gradient_boost)
self.ui.train_xg.clicked.connect(self.button_xg_boost)
self.ui.train_ada.clicked.connect(self.button_ada_boost)
self.ui.train_nn.clicked.connect(self.button_neural_network)
self.ui.export_dt.clicked.connect(self.export_model)
self.ui.export_rf.clicked.connect(self.export_model)
self.ui.export_svm.clicked.connect(self.export_model)
self.ui.export_lr.clicked.connect(self.export_model)
self.ui.export_knn.clicked.connect(self.export_model)
self.ui.export_gb.clicked.connect(self.export_model)
self.ui.export_hgb.clicked.connect(self.export_model)
self.ui.export_xg.clicked.connect(self.export_model)
self.ui.export_ada.clicked.connect(self.export_model)
self.ui.export_nn.clicked.connect(self.export_model)
self.ui.number_hidden_layers_nn.valueChanged.connect(self.update_activation_neuron_layers)
self.ui.train_auto_nn.clicked.connect(self.button_auto_neural_network)
self.ui.train_auto_dt.clicked.connect(self.button_auto_decision_tree)
self.ui.train_auto_rf.clicked.connect(self.button_auto_random_forest)
self.ui.train_auto_svm.clicked.connect(self.button_auto_support_vector_machine)
self.ui.train_auto_lr.clicked.connect(self.button_auto_logistic_regression)
self.ui.train_auto_knn.clicked.connect(self.button_auto_k_nearest_neighbor)
self.ui.train_auto_ada.clicked.connect(self.button_auto_ada_boost)
self.ui.train_auto_gb.clicked.connect(self.button_auto_gradient_boost)
self.ui.train_auto_hgb.clicked.connect(self.button_auto_hist_gradient_boost)
self.ui.train_auto_xg.clicked.connect(self.button_auto_xg_boost)
self.ui.terminate_button.clicked.connect(self.terminate_training_thread)
self.model = None
self.features = None
self.label = None
self.filename = None
self.params = None
self.bool_default = None
self.data = None
self.dataset = None
self.bool_kfold = None
self.output_textfield = None
self.ui.browse_button.animateClick()
def disable_tabs(self):
self.ui.browse_button.setEnabled(False)
self.ui.manual_ML.setEnabled(False)
self.ui.auto_ML.setEnabled(False)
def enable_tabs(self):
self.ui.browse_button.setEnabled(True)
self.ui.manual_ML.setEnabled(True)
self.ui.auto_ML.setEnabled(True)
def update_prograss_bar(self, i):
n = 100
self.ui.progressBar.setMaximum(100 * n)
self.ui.progressBar.setValue(int(i * n))
self.ui.progressBar.setFormat("%.02f %%" % i)
def open_file(self):
try:
options = QFileDialog.Options()
options |= QFileDialog.DontUseNativeDialog
self.filename, _ = QFileDialog.getOpenFileName(self.ui,"QFileDialog.getOpenFileName()", "","CSV Files (*.csv)", options=options)
self.ui.lineEdit.setText(self.filename)
self.dataset = PandasTableModel(pd.read_csv(self.filename))
self.update_activation_neuron_layers()
self.display_data()
except FileNotFoundError:
self.filename = None
self.check_file()
self.open_file()
def display_data(self):
self.ui.dataset_view.setModel(self.dataset)
def update_activation_neuron_layers(self):
number_hidden_layers = self.ui.number_hidden_layers_nn.value()
number_of_labels = len(self.dataset.get_labels().value_counts())
number_of_features = len(self.dataset.get_features().columns)
neurons = []
activation = []
for i in range(number_hidden_layers+1):
neurons.append(number_of_features)
activation.append('relu')
neurons.append(number_of_labels)
if number_of_labels == 2:
activation.append('sigmoid')
elif number_of_labels >= 3:
activation.append('softmax')
elif number_of_labels == 1:
self.output_textfield.setPlainText("Dataset has only one label")
self.ui.number_of_neurons_nn.setText(str(neurons))
self.ui.activation_functions_nn.setText(str(activation))
def check_file(self):
if self.filename == None:
msg = QMessageBox()
buttonReply = QMessageBox.question(self.ui, "Warning", "Please open a csv file", QMessageBox.Ok | QMessageBox.Close, QMessageBox.Ok)
if buttonReply == QMessageBox.Ok:
msg.exec_()
else:
sys.exit()
return(False)
else:
return(True)
def button_decision_tree(self):
if self.check_file():
try:
self.ui.results_dt.setPlainText("Training process is running")
self.disable_tabs()
criterion = self.ui.criterion_dt.currentText()
max_depth = self.ui.max_depth_dt.value()
splits = self.ui.splits_dt.value()
repeats = self.ui.repeats_dt.value()
self.bool_kfold = self.ui.bool_kfold_dt.isChecked()
self.bool_default = self.ui.bool_default_parameters_dt.isChecked()
self.params = {"criterion" : criterion, "max_depth" : max_depth}
self.model = classifier(features = self.dataset.get_features(), label = self.dataset.get_labels(), scaler = None, bool_kfold = self.bool_kfold,
splits = splits, repeats = repeats, modeltype = DecisionTreeClassifier, params = self.params, iterations = None, bool_default = self.bool_default)
self.worker = WorkerThread(self.model)
self.worker.finished.connect(self.finish_training_thread)
self.worker.results.connect(self.get_params_and_accuracy)
self.worker.update_progress.connect(self.update_prograss_bar)
self.worker.start()
self.output_textfield = self.ui.results_dt
except json.decoder.JSONDecodeError:
self.format_exception_pop_up()
def button_auto_decision_tree(self):
if self.check_file():
try:
self.ui.results_auto_dt.setPlainText("Training process is running")
self.disable_tabs()
criterion = json.loads(self.ui.Criterion_auto_dt.text().replace('\'', '"'))
max_depth = json.loads(self.ui.max_depth_auto_dt.text())
iterations = self.ui.iterations_auto_dt.value()
splits = self.ui.splits_auto_dt.value()
repeats = self.ui.repeats_auto_dt.value()
self.bool_kfold = self.ui.bool_kfold_auto_dt.isChecked()
max_combinations = (len(criterion) * len(max_depth))
self.params = {"criterion": criterion, 'max_depth': max_depth, "max_combinations": max_combinations}
self.model = classifier(features = self.dataset.get_features(), label = self.dataset.get_labels(), scaler = None, bool_kfold = self.bool_kfold,
splits = splits, repeats = repeats, modeltype = DecisionTreeClassifier, params = self.params, iterations = iterations, bool_default = False)
self.worker = WorkerThread(self.model)
self.worker.finished.connect(self.finish_training_thread)
self.worker.results.connect(self.get_params_and_accuracy)
self.worker.update_progress.connect(self.update_prograss_bar)
self.worker.start()
self.output_textfield = self.ui.results_auto_dt
except json.decoder.JSONDecodeError:
self.format_exception_pop_up()
def button_random_forest(self):
if self.check_file():
try:
self.ui.results_rf.setPlainText("Training process is running")
self.disable_tabs()
n_estimators = self.ui.number_of_estimators_rf.value()
criterion = self.ui.criterion_rf.currentText()
max_depth = self.ui.max_depth_rf.value()
splits = self.ui.splits_rf.value()
repeats = self.ui.repeats_rf.value()
self.bool_kfold = self.ui.bool_kfold_rf.isChecked()
self.bool_default = self.ui.bool_default_parameters_rf.isChecked()
self.params = {"n_estimators" : n_estimators, "criterion" : criterion, "max_depth" : max_depth}
self.model = classifier(features = self.dataset.get_features(), label = self.dataset.get_labels(), scaler = None, bool_kfold = self.bool_kfold, splits = splits,
repeats = repeats, modeltype = RandomForestClassifier, params = self.params, iterations = None, bool_default = self.bool_default)
self.worker = WorkerThread(self.model)
self.worker.finished.connect(self.finish_training_thread)
self.worker.results.connect(self.get_params_and_accuracy)
self.worker.update_progress.connect(self.update_prograss_bar)
self.worker.start()
self.output_textfield = self.ui.results_rf
except json.decoder.JSONDecodeError:
self.format_exception_pop_up()
def button_auto_random_forest(self):
if self.check_file():
try:
self.ui.results_auto_rf.setPlainText("Training process is running")
self.disable_tabs()
n_estimators = json.loads(self.ui.number_of_estimator_auto_rf.text())
criterion = json.loads(self.ui.Criterion_auto_rf.text().replace('\'', '"'))
max_depth = json.loads(self.ui.max_depth_auto_rf.text())
iterations = self.ui.iterations_auto_rf.value()
splits = self.ui.splits_auto_rf.value()
repeats = self.ui.repeats_auto_rf.value()
self.bool_kfold = self.ui.bool_kfold_auto_rf.isChecked()
max_combinations = (len(criterion) * len(max_depth) * len(n_estimators))
self.params = {"n_estimators": n_estimators, "criterion": criterion, 'max_depth': max_depth, "max_combinations": max_combinations}
self.model = classifier(features = self.dataset.get_features(), label = self.dataset.get_labels(), scaler = None, bool_kfold = self.bool_kfold,
splits = splits, repeats = repeats, modeltype = RandomForestClassifier, params = self.params, iterations = iterations, bool_default = False)
self.worker = WorkerThread(self.model)
self.worker.finished.connect(self.finish_training_thread)
self.worker.results.connect(self.get_params_and_accuracy)
self.worker.update_progress.connect(self.update_prograss_bar)
self.worker.start()
self.output_textfield = self.ui.results_auto_rf
except json.decoder.JSONDecodeError:
self.format_exception_pop_up()
def button_support_vector_machine(self):
if self.check_file():
try:
self.ui.results_svm.setPlainText("Training process is running")
self.disable_tabs()
scaler = self.ui.scaler_svm.currentText()
kernel = self.ui.kernel_svm.currentText()
C = self.ui.c_svm.value()
splits = self.ui.splits_svm.value()
repeats = self.ui.repeats_svm.value()
self.bool_kfold = self.ui.bool_kfold_svm.isChecked()
self.bool_default = self.ui.bool_default_parameters_svm.isChecked()
self.params = {"kernel" : kernel, "C" : C}
self.model = classifier(features = self.dataset.get_features(), label = self.dataset.get_labels(), scaler = scaler, bool_kfold = self.bool_kfold,
splits = splits, repeats = repeats, modeltype = SVC, params = self.params, iterations = None, bool_default = self.bool_default)
self.worker = WorkerThread(self.model)
self.worker.finished.connect(self.finish_training_thread)
self.worker.results.connect(self.get_params_and_accuracy)
self.worker.update_progress.connect(self.update_prograss_bar)
self.worker.start()
self.output_textfield = self.ui.results_svm
except json.decoder.JSONDecodeError:
self.format_exception_pop_up()
def button_auto_support_vector_machine(self):
if self.check_file():
try:
self.ui.results_auto_svm.setPlainText("Training process is running")
self.disable_tabs()
scaler = self.ui.scaler_auto_svm.currentText()
C = json.loads(self.ui.C_auto_svm.text())
kernel = json.loads(self.ui.kernel_auto_svm.text().replace('\'', '"'))
iterations = self.ui.iterations_auto_svm.value()
splits = self.ui.splits_auto_svm.value()
repeats = self.ui.repeats_auto_svm.value()
self.bool_kfold = self.ui.bool_kfold_auto_svm.isChecked()
max_combinations = (len(C) * len(kernel))
self.params = {"C": C, "kernel": kernel, "max_combinations": max_combinations}
self.model = classifier(features = self.dataset.get_features(), label = self.dataset.get_labels(), scaler = scaler, bool_kfold = self.bool_kfold,
splits = splits, repeats = repeats, modeltype = SVC, params = self.params, iterations = iterations, bool_default = False)
self.worker = WorkerThread(self.model)
self.worker.finished.connect(self.finish_training_thread)
self.worker.results.connect(self.get_params_and_accuracy)
self.worker.update_progress.connect(self.update_prograss_bar)
self.worker.start()
self.output_textfield = self.ui.results_auto_svm
except json.decoder.JSONDecodeError:
self.format_exception_pop_up()
def button_logistic_regression(self):
if self.check_file():
try:
self.ui.results_lr.setPlainText("Training process is running")
self.disable_tabs()
scaler = self.ui.scaler_lr.currentText()
C = self.ui.c_lr.value()
max_iter = self.ui.max_iter_lr.value()
splits = self.ui.splits_lr.value()
repeats = self.ui.repeats_lr.value()
modeltype = LogisticRegression
self.bool_kfold = self.ui.bool_kfold_lr.isChecked()
self.bool_default = self.ui.bool_default_parameters_lr.isChecked()
self.params = {"C" : C, "max_iter": max_iter}
self.model = classifier(features = self.dataset.get_features(), label = self.dataset.get_labels(), scaler = scaler, bool_kfold = self.bool_kfold,
splits = splits, repeats = repeats, modeltype = LogisticRegression, params = self.params, iterations = None, bool_default = self.bool_default)
self.worker = WorkerThread(self.model)
self.worker.finished.connect(self.finish_training_thread)
self.worker.results.connect(self.get_params_and_accuracy)
self.worker.update_progress.connect(self.update_prograss_bar)
self.worker.start()
self.output_textfield = self.ui.results_lr
except json.decoder.JSONDecodeError:
self.format_exception_pop_up()
def button_auto_logistic_regression(self):
if self.check_file():
try:
self.ui.results_auto_lr.setPlainText("Training process is running")
self.disable_tabs()
scaler = self.ui.scaler_auto_lr.currentText()
C = json.loads(self.ui.C_auto_lr.text())
max_iter = json.loads(self.ui.max_iter_auto_lr.text())
iterations = self.ui.iterations_auto_lr.value()
splits = self.ui.splits_auto_lr.value()
repeats = self.ui.repeats_auto_lr.value()
self.bool_kfold = self.ui.bool_kfold_auto_lr.isChecked()
max_combinations = (len(C) * len(max_iter))
self.params = {"C": C, "max_iter": max_iter, "max_combinations": max_combinations}
self.model = classifier(features = self.dataset.get_features(), label = self.dataset.get_labels(), scaler = scaler, bool_kfold = self.bool_kfold,
splits = splits, repeats = repeats, modeltype = LogisticRegression, params = self.params, iterations = iterations, bool_default = False)
self.worker = WorkerThread(self.model)
self.worker.finished.connect(self.finish_training_thread)
self.worker.results.connect(self.get_params_and_accuracy)
self.worker.update_progress.connect(self.update_prograss_bar)
self.worker.start()
self.output_textfield = self.ui.results_auto_lr
except json.decoder.JSONDecodeError:
self.format_exception_pop_up()
def button_k_nearest_neighbor(self):
if self.check_file():
try:
self.ui.results_knn.setPlainText("Training process is running")
self.disable_tabs()
scaler = self.ui.scaler_knn.currentText()
n_neighbors = self.ui.n_neighbors_knn.value()
algorithm = self.ui.algorithm_knn.currentText()
weights = self.ui.weights_knn.currentText()
power_parameter = self.ui.p_knn.value()
splits = self.ui.splits_knn.value()
repeats = self.ui.repeats_knn.value()
self.bool_kfold = self.ui.bool_kfold_knn.isChecked()
self.bool_default = self.ui.bool_default_parameters_knn.isChecked()
self.params = {"n_neighbors" : n_neighbors, "algorithm": algorithm, "p": power_parameter, "weights": weights}
self.model = classifier(features = self.dataset.get_features(), label = self.dataset.get_labels(), scaler = scaler, bool_kfold = self.bool_kfold,
splits = splits, repeats = repeats, modeltype = KNeighborsClassifier, params = self.params, iterations = None, bool_default = self.bool_default)
self.worker = WorkerThread(self.model)
self.worker.finished.connect(self.finish_training_thread)
self.worker.results.connect(self.get_params_and_accuracy)
self.worker.update_progress.connect(self.update_prograss_bar)
self.worker.start()
self.output_textfield = self.ui.results_knn
except json.decoder.JSONDecodeError:
self.format_exception_pop_up()
def button_auto_k_nearest_neighbor(self):
if self.check_file():
try:
self.ui.results_auto_knn.setPlainText("Training process is running")
self.disable_tabs()
scaler = self.ui.scaler_auto_knn.currentText()
n_neighbors = json.loads(self.ui.n_neighbors_auto_knn.text())
algorithm = json.loads(self.ui.algorithm_auto_knn.text().replace('\'', '"'))
weights = json.loads(self.ui.weights_auto_knn.text().replace('\'', '"'))
p = json.loads(self.ui.power_parameter_auto_knn.text())
iterations = self.ui.iterations_auto_knn.value()
splits = self.ui.splits_auto_knn.value()
repeats = self.ui.repeats_auto_knn.value()
self.bool_kfold = self.ui.bool_kfold_auto_knn.isChecked()
max_combinations = (len(n_neighbors) * len(algorithm) * len(weights) * len(p))
self.params = {"n_neighbors": n_neighbors, "algorithm": algorithm, "weights": weights, "p": p, "max_combinations": max_combinations}
self.model = classifier(features = self.dataset.get_features(), label = self.dataset.get_labels(), scaler = scaler, bool_kfold = self.bool_kfold,
splits = splits, repeats = repeats, modeltype = KNeighborsClassifier, params = self.params, iterations = iterations, bool_default = False)
self.worker = WorkerThread(self.model)
self.worker.finished.connect(self.finish_training_thread)
self.worker.results.connect(self.get_params_and_accuracy)
self.worker.update_progress.connect(self.update_prograss_bar)
self.worker.start()
self.output_textfield = self.ui.results_auto_knn
except json.decoder.JSONDecodeError:
self.format_exception_pop_up()
def button_ada_boost(self):
if self.check_file():
try:
self.ui.results_ada.setPlainText("Training process is running")
self.disable_tabs()
learning_rate = self.ui.learning_rate_ada.value()
n_estimators = self.ui.number_of_estimators_ada.value()
algorithm = self.ui.algorithm_ada.currentText()
splits = self.ui.splits_ada.value()
repeats = self.ui.repeats_ada.value()
self.bool_kfold = self.ui.bool_kfold_ada.isChecked()
self.bool_default = self.ui.bool_default_parameter_ada.isChecked()
self.params = {"learning_rate" : learning_rate, "n_estimators": n_estimators, "algorithm": algorithm}
self.model = classifier(features = self.dataset.get_features(), label = self.dataset.get_labels(), scaler = None, bool_kfold = self.bool_kfold,
splits = splits, repeats = repeats, modeltype = AdaBoostClassifier, params = self.params, iterations = None, bool_default = self.bool_default)
self.worker = WorkerThread(self.model)
self.worker.finished.connect(self.finish_training_thread)
self.worker.results.connect(self.get_params_and_accuracy)
self.worker.update_progress.connect(self.update_prograss_bar)
self.worker.start()
self.output_textfield = self.ui.results_ada
except json.decoder.JSONDecodeError:
self.format_exception_pop_up()
def button_auto_ada_boost(self):
if self.check_file():
try:
self.ui.results_auto_ada.setPlainText("Training process is running")
self.disable_tabs()
learning_rate = json.loads(self.ui.lr_auto_ada.text())
n_estimators = json.loads(self.ui.number_of_estimators_auto_ada.text())
algorithm = json.loads(self.ui.algorithm_auto_ada.text().replace('\'', '"'))
iterations = self.ui.iterations_auto_ada.value()
splits = self.ui.splits_auto_ada.value()
repeats = self.ui.repeats_auto_ada.value()
self.bool_kfold = self.ui.bool_kfold_auto_ada.isChecked()
max_combinations = (len(learning_rate) * len(n_estimators) * len(algorithm))
self.params = {"learning_rate": learning_rate, "n_estimators": n_estimators, "algorithm": algorithm, "max_combinations": max_combinations}
self.model = classifier(features = self.dataset.get_features(), label = self.dataset.get_labels(), scaler = None, bool_kfold = self.bool_kfold,
splits = splits, repeats = repeats, modeltype = AdaBoostClassifier, params = self.params, iterations = iterations, bool_default = False)
self.worker = WorkerThread(self.model)
self.worker.finished.connect(self.finish_training_thread)
self.worker.results.connect(self.get_params_and_accuracy)
self.worker.update_progress.connect(self.update_prograss_bar)
self.worker.start()
self.output_textfield = self.ui.results_auto_ada
except json.decoder.JSONDecodeError:
self.format_exception_pop_up()
def button_gradient_boost(self):
if self.check_file():
try:
self.ui.results_gb.setPlainText("Training process is running")
self.disable_tabs()
learning_rate = self.ui.learning_rate_gb.value()
n_estimators = self.ui.number_of_estimators_gb.value()
criterion = self.ui.criterion_gb.currentText()
max_depth = self.ui.max_depth_gb.value()
splits = self.ui.splits_gb.value()
repeats = self.ui.repeats_gb.value()
self.bool_kfold = self.ui.bool_kfold_gb.isChecked()
self.bool_default = self.ui.bool_default_parameters_gb.isChecked()
self.params = {"learning_rate" : learning_rate, "n_estimators": n_estimators, "criterion": criterion, "max_depth": max_depth}
self.model = classifier(features = self.dataset.get_features(), label = self.dataset.get_labels(), scaler = None, bool_kfold = self.bool_kfold, splits = splits,
repeats = repeats, modeltype = GradientBoostingClassifier, params = self.params, iterations = None, bool_default = self.bool_default)
self.worker = WorkerThread(self.model)
self.worker.finished.connect(self.finish_training_thread)
self.worker.results.connect(self.get_params_and_accuracy)
self.worker.update_progress.connect(self.update_prograss_bar)
self.worker.start()
self.output_textfield = self.ui.results_gb
except json.decoder.JSONDecodeError:
self.format_exception_pop_up()
def button_auto_gradient_boost(self):
if self.check_file():
try:
self.ui.results_auto_gb.setPlainText("Training process is running")
self.disable_tabs()
learning_rate = json.loads(self.ui.lr_auto_gb.text())
n_estimators = json.loads(self.ui.number_of_estimators_auto_gb.text())
criterion = json.loads(self.ui.criterion_auto_gb.text().replace('\'', '"'))
max_depth = json.loads(self.ui.max_depth_auto_gb.text())
iterations = self.ui.iterations_auto_gb.value()
splits = self.ui.splits_auto_gb.value()
repeats = self.ui.repeats_auto_gb.value()
self.bool_kfold = self.ui.bool_kfold_auto_gb.isChecked()
max_combinations = (len(learning_rate) * len(n_estimators) * len(criterion) * len(max_depth))
self.params = {"learning_rate": learning_rate, "n_estimators": n_estimators, "criterion": criterion, "max_depth": max_depth, "max_combinations": max_combinations}
self.model = classifier(features = self.dataset.get_features(), label = self.dataset.get_labels(), scaler = None, bool_kfold = self.bool_kfold,
splits = splits, repeats = repeats, modeltype = GradientBoostingClassifier, params = self.params, iterations = iterations, bool_default = False)
self.worker = WorkerThread(self.model)
self.worker.finished.connect(self.finish_training_thread)
self.worker.results.connect(self.get_params_and_accuracy)
self.worker.update_progress.connect(self.update_prograss_bar)
self.worker.start()
self.output_textfield = self.ui.results_auto_gb
except json.decoder.JSONDecodeError:
self.format_exception_pop_up()
def button_hist_gradient_boost(self):
if self.check_file():
try:
self.ui.results_hgb.setPlainText("Training process is running")
self.disable_tabs()
learning_rate = self.ui.learning_rate_hgb.value()
max_iter = self.ui.max_iter_hgb.value()
max_depth = self.ui.max_depth_hgb.value()
splits = self.ui.splits_hgb.value()
repeats = self.ui.repeats_hgb.value()
self.bool_kfold = self.ui.bool_kfold_hgb.isChecked()
self.bool_default = self.ui.bool_default_parameters_hgb.isChecked()
self.params = {"learning_rate" : learning_rate, "max_iter": max_iter, "max_depth": max_depth}
self.model = classifier(features = self.dataset.get_features(), label = self.dataset.get_labels(), scaler = None, bool_kfold = self.bool_kfold, splits = splits,
repeats = repeats, modeltype = HistGradientBoostingClassifier, params = self.params, iterations = None, bool_default = self.bool_default)
self.worker = WorkerThread(self.model)
self.worker.finished.connect(self.finish_training_thread)
self.worker.results.connect(self.get_params_and_accuracy)
self.worker.update_progress.connect(self.update_prograss_bar)
self.worker.start()
self.output_textfield = self.ui.results_hgb
except json.decoder.JSONDecodeError:
self.format_exception_pop_up()
def button_auto_hist_gradient_boost(self):
if self.check_file():
try:
self.ui.results_auto_hgb.setPlainText("Training process is running")
self.disable_tabs()
learning_rate = json.loads(self.ui.lr_auto_hgb.text())
max_iter = json.loads(self.ui.max_iter_auto_hgb.text())
max_depth = json.loads(self.ui.max_depth_auto_hgb.text())
iterations = self.ui.iterations_auto_hgb.value()
splits = self.ui.splits_auto_hgb.value()
repeats = self.ui.repeats_auto_hgb.value()
self.bool_kfold = self.ui.bool_kfold_auto_hgb.isChecked()
max_combinations = (len(learning_rate) * len(max_iter) * len(max_depth))
self.params = {"learning_rate": learning_rate, "max_iter": max_iter, "max_depth": max_depth, "max_combinations": max_combinations}
self.model = classifier(features = self.dataset.get_features(), label = self.dataset.get_labels(), scaler = None, bool_kfold = self.bool_kfold,
splits = splits, repeats = repeats, modeltype = HistGradientBoostingClassifier, params = self.params, iterations = iterations, bool_default = False)
self.worker = WorkerThread(self.model)
self.worker.finished.connect(self.finish_training_thread)
self.worker.results.connect(self.get_params_and_accuracy)
self.worker.update_progress.connect(self.update_prograss_bar)
self.worker.start()
self.output_textfield = self.ui.results_auto_hgb
except json.decoder.JSONDecodeError:
self.format_exception_pop_up()
def button_xg_boost(self):
if self.check_file():
try:
self.ui.results_xg.setPlainText("Training process is running")
self.disable_tabs()
eta = self.ui.learning_rate_XG.value()
Booster = self.ui.booster_xg.currentText()
max_depth = self.ui.max_depth_xg.value()
splits = self.ui.splits_xg.value()
repeats = self.ui.repeats_xg.value()
self.bool_kfold = self.ui.bool_kfold_xg.isChecked()
self.bool_default = self.ui.bool_default_parameters_xg.isChecked()
self.params = {"eta" : eta, "booster": Booster, "max_depth": max_depth}
self.model = classifier(features = self.dataset.get_features(), label = self.dataset.get_labels(), scaler = None, bool_kfold = self.bool_kfold, splits = splits,
repeats = repeats, modeltype = XGBClassifier, params = self.params, iterations = None, bool_default = self.bool_default)
self.worker = WorkerThread(self.model)
self.worker.finished.connect(self.finish_training_thread)
self.worker.results.connect(self.get_params_and_accuracy)
self.worker.update_progress.connect(self.update_prograss_bar)
self.worker.start()
self.output_textfield = self.ui.results_xg
except json.decoder.JSONDecodeError:
self.format_exception_pop_up()
def button_auto_xg_boost(self):
if self.check_file():
try:
self.ui.results_auto_xg.setPlainText("Training process is running")
self.disable_tabs()
eta = json.loads(self.ui.lr_auto_xg.text())
Booster = json.loads(self.ui.booster_auto_xg.text().replace('\'', '"'))
max_depth = json.loads(self.ui.max_depth_auto_xg.text())
iterations = self.ui.iterations_auto_xg.value()
splits = self.ui.splits_auto_xg.value()
repeats = self.ui.repeats_auto_xg.value()
self.bool_kfold = self.ui.bool_kfold_auto_xg.isChecked()
max_combinations = (len(eta) * len(Booster) * len(max_depth))
self.params = {"eta": eta, "Booster": Booster, "max_depth": max_depth, "max_combinations": max_combinations}
self.model = classifier(features = self.dataset.get_features(), label = self.dataset.get_labels(), scaler = None, bool_kfold = self.bool_kfold,
splits = splits, repeats = repeats, modeltype = XGBClassifier, params = self.params, iterations = iterations, bool_default = False)
self.worker = WorkerThread(self.model)
self.worker.finished.connect(self.finish_training_thread)
self.worker.results.connect(self.get_params_and_accuracy)
self.worker.update_progress.connect(self.update_prograss_bar)
self.worker.start()
self.output_textfield = self.ui.results_auto_xg
except json.decoder.JSONDecodeError:
self.format_exception_pop_up()
def button_neural_network(self):
if self.check_file():
try:
self.ui.results_nn.setPlainText("Training process is running")
self.disable_tabs()
scaler = self.ui.scaler_nn.currentText()
learning_rate = self.ui.learning_rate_nn.value()
number_layers = self.ui.number_hidden_layers_nn.value()
dropout = self.ui.dropout_nn.value()
number_neurons = json.loads(self.ui.number_of_neurons_nn.text())
activations = json.loads(self.ui.activation_functions_nn.text().replace('\'', '"'))
optimizer = self.ui.optimizer_nn.currentText()
batch_size = self.ui.batch_size_nn.value()
epochs = self.ui.epochs_nn.value()
self.params = {"lr": learning_rate, 'batch_size': batch_size, 'number_layers': number_layers, 'dropout': dropout,
'number_neurons': number_neurons, 'activations': activations, 'optimizer': optimizer}
splits = self.ui.splits_nn.value()
repeats = self.ui.repeats_nn.value()
self.bool_kfold = self.ui.bool_kfold_nn.isChecked()
self.model = Neural_Network(features = self.dataset.get_features(), label = self.dataset.get_labels(), scaler = scaler, bool_kfold = self.bool_kfold,
splits = splits, repeats = repeats, modeltype = Neural_Network, params = self.params, iterations = None, epochs = epochs)
self.worker = WorkerThread(self.model)
self.worker.finished.connect(self.finish_training_thread)
self.worker.results.connect(self.get_params_and_accuracy)
self.worker.update_progress.connect(self.update_prograss_bar)
self.worker.start()
self.output_textfield = self.ui.results_nn
except json.decoder.JSONDecodeError:
self.format_exception_pop_up()
def button_auto_neural_network(self):
if self.check_file():
try:
self.ui.results_auto_nn.setPlainText("Training process is running")
self.disable_tabs()
scaler = self.ui.scaler_auto_nn.currentText()
learning_rate = json.loads(self.ui.lr_auto_nn.text())
number_layers = json.loads(self.ui.number_of_hidden_layers_auto_nn.text())
dropout = json.loads(self.ui.dropout_auto_nn.text())
number_neurons = json.loads(self.ui.neurons_auto_nn.text())
activations = json.loads(self.ui.activation_auto_nn.text().replace('\'', '"'))
optimizer = json.loads(self.ui.optimizer_auto_nn.text().replace('\'', '"'))
batch_size = json.loads(self.ui.batch_size_auto_nn.text())
epochs = self.ui.epochs_auto_nn.value()
iterations = self.ui.iterations_auto_nn.value()
splits = self.ui.splits_auto_nn.value()
repeats = self.ui.repeats_auto_nn.value()
self.bool_kfold = self.ui.bool_kfold_auto_nn.isChecked()
max_combinations = math.prod(map(len,(learning_rate, number_layers, dropout, number_neurons, activations, optimizer, batch_size)))
self.params = {"lr": learning_rate, 'batch_size': batch_size, 'number_layers': number_layers, 'dropout': dropout,
'number_neurons': number_neurons, 'activations': activations, 'optimizer': optimizer, "max_combinations": max_combinations}
self.model = Neural_Network(features = self.dataset.get_features(), label = self.dataset.get_labels(), scaler = scaler, bool_kfold = self.bool_kfold,
splits = splits, repeats = repeats, modeltype = Neural_Network, params = self.params, iterations = iterations, epochs = epochs)
self.worker = WorkerThread(self.model)
self.worker.finished.connect(self.finish_training_thread)
self.worker.results.connect(self.get_params_and_accuracy)
self.worker.update_progress.connect(self.update_prograss_bar)
self.worker.start()
self.output_textfield = self.ui.results_auto_nn
except json.decoder.JSONDecodeError:
self.format_exception_pop_up()
def get_params_and_accuracy(self, results):
self.model = results[0]
accuracy = results[1]
if results[2] != None:
self.params = results[2]
self.display_model_results(accuracy)
self.enable_tabs()
def display_model_results(self, accuracy):
if self.bool_default:
params = "Default Parameters"
else:
params = self.params
if self.bool_kfold:
accuracy_print = ("{acc} +/- {std}" .format(acc = np.round(np.mean(accuracy),3), std = np.round(np.std(accuracy),3)))
else:
accuracy_print = accuracy
self.output_textfield.setPlainText("Accuracy: {accuracy}\nModel: {model}\nParamters: {params}".format(accuracy=accuracy_print, model=self.model.modeltype.__name__, params=params))
def export_model(self):
self.model.save_model()
msg = QMessageBox()
msg.setWindowTitle("Model saved")
msg.setText("Saving the model was successful")
msg.exec_()
def format_exception_pop_up(self):
msg = QMessageBox()
msg.setWindowTitle("wrong parameter input")
msg.setText("Please follow the format for the parameter input")
msg.exec_()
def finish_training_thread(self):
msg = QMessageBox()
msg.setWindowTitle("Done!")
msg.setText("Training is completed")
msg.exec_()
def terminate_training_thread(self):
self.worker.terminate()
self.output_textfield.setPlainText("Training process was terminated")
self.enable_tabs()
self.update_prograss_bar(0)
msg = QMessageBox()
msg.setWindowTitle("Terminated!")
msg.setText("Training process was terminated")
msg.exec_()
class PandasTableModel(QtGui.QStandardItemModel):
def __init__(self, data, parent=None):
QtGui.QStandardItemModel.__init__(self, parent)
self._data = data
for row in data.values.tolist():
data_row = [ QtGui.QStandardItem("{}".format(x)) for x in row ]
self.appendRow(data_row)
return
def get_features(self, parent=None):
return self._data.iloc[:,:-1]
def get_labels(self, parent=None):
return self._data.iloc[:,-1:]
def headerData(self, x, orientation, role):
if orientation == Qt.Horizontal and role == Qt.DisplayRole:
return self._data.columns[x]
if orientation == Qt.Vertical and role == Qt.DisplayRole:
return self._data.index[x]
return None
class WorkerThread(QThread):
update_progress = pyqtSignal(float)
results = pyqtSignal(tuple)
def __init__(self, model, *args, **kwargs):
super().__init__(*args, **kwargs)
self.model = model
def run(self):
self.update_progress.emit(0)
if self.model.iterations != None:
accuracy, params = self.model.parameter_search(self.update_progress)
model_accuracy_params = (self.model, accuracy, params)
elif self.model.iterations == None and self.model.modeltype == Neural_Network:
accuracy = self.model.fit_nn(self.update_progress)
model_accuracy_params = (self.model, accuracy, None)
elif self.model.iterations == None and self.model.modeltype != Neural_Network:
accuracy = self.model.train_model(self.update_progress)
model_accuracy_params = (self.model, accuracy, None)
self.results.emit(model_accuracy_params)
def main():
app = QApplication(sys.argv)
""" file = QFile("C:/Users/Marvin/Desktop/Coding Projects/Machine_Learning/AutoML/styles/style.qss")
file.open(QFile.ReadOnly | QFile.Text)
stream = QTextStream(file)
app.setStyleSheet(stream.readAll()) """
window = MainWindow()
window.ui.show()
sys.exit(app.exec())
if __name__ == "__main__":
main()
models.py
Code: Alles auswählen
from sklearn.tree import DecisionTreeClassifier
from sklearn.experimental import enable_hist_gradient_boosting
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier, HistGradientBoostingClassifier, AdaBoostClassifier
from sklearn.metrics import accuracy_score
import pandas as pd
from sklearn.model_selection import RepeatedStratifiedKFold
from sklearn.preprocessing import StandardScaler, MaxAbsScaler, MinMaxScaler, QuantileTransformer, PowerTransformer, RobustScaler
from sklearn.model_selection import train_test_split
from sklearn.svm import SVC
import numpy as np
import os
from joblib import dump
from sklearn.linear_model import LogisticRegression
from sklearn.neighbors import KNeighborsClassifier
from xgboost import XGBClassifier
from keras.optimizers import SGD, Adam, Adadelta, Adagrad, Adamax, RMSprop, Nadam, Ftrl
from keras.models import Sequential
from keras.layers import Dense, Dropout
import json
from keras.utils import to_categorical
import pathlib
from keras.callbacks import EarlyStopping, Callback
from skopt import gp_minimize
from bayes_opt import BayesianOptimization
import random
from PyQt5.QtCore import Qt, QFile, QTextStream, QThread, pyqtSignal
class classifier():
'''
Creates classifier machine learning model.
Takes arguments:
- features: array matrix
- label: array vector
- scaler: string
- bool_kfold: boolean
- splits: integer
- repeats: integer
- modeltype: object
- params: dictionary
- iterations: integer
- bool_default: boolean
Contains the following methods:
- fit_model: fits the given machine learning model (modeltype).
- scale: scales the data
- save_model: exports the trained model into .h5 file
- train_model: trains the fitted model with the given dataset
- parameter_search: perfoms a randomized hyperparameter optimization
'''
def __init__(self, features, label, scaler, bool_kfold, splits, repeats, modeltype, params, iterations, bool_default):
self.features = features
self.label = label
self.scaler = globals()[scaler] if scaler is not None else None
self.fitted_scaler = None
self.model = None
self.bool_kfold = bool_kfold
self.splits = splits
self.repeats = repeats
self.modeltype = modeltype
self.bool_default = bool_default
self.params = params
self.iterations = iterations
def fit_model(self, model, x_train, y_train, x_test, y_test):
'''
Fits and evaluates the model.
Takes arguments:
- model: the model to be trained
- x_train: training data (array matrix)
- y_train: training labels (array vector)
- x_test: test data (array matrix)
- y_test: test labels (array vector)
return:
- accuracy: returns the accuracy (float)
'''
model = model.fit(x_train, y_train)
y_pred = model.predict(x_test)
accuracy = accuracy_score(y_test, y_pred)
return(accuracy)
def scale(self):
'''
Scales the data
uses the given feature and label vectors and scales the with the given scaler.
return:
- scaled features and labels as tuple containing vectors
'''
x = self.features
y = self.label
if self.scaler == None:
x = x.to_numpy()
self.fitted_scaler = None
else:
self.fitted_scaler = self.scaler()
self.fitted_scaler.fit(x)
x = self.fitted_scaler.transform(x)
y = y.to_numpy().ravel()
return(x, y)
def save_model(self):
'''
Saves the previously trained machine learning model and (if used) the scaler as h5 file(s)
'''
dir_path = pathlib.Path(__file__).parent.absolute()
model_path = os.path.join(dir_path, self.modeltype.__name__ + "_model.h5")
if self.modeltype.__name__ == 'Neural_Network':
self.model.save(model_path)
else:
dump(self.model, model_path)
if self.scaler == None:
return
else:
scaler_path = os.path.join(dir_path, self.modeltype.__name__ + "_" + self.scaler.__name__ + ".h5")
dump(self.fitted_scaler, scaler_path)
return
def train_model(self, update_progress = None, start = 0, stop = 100):
'''
Trains the already fitted machine learning model and performs a RepeatedStratifiedKFold validation
'''
if self.bool_kfold:
x, y = self.scale()
rkfold = RepeatedStratifiedKFold(n_splits=self.splits, n_repeats=self.repeats)
score = []
total_repeats = self.splits * self.repeats
sub_progress = np.arange(start, stop + (stop-start)/total_repeats, (stop-start)/total_repeats)
count = 1
for train, test in rkfold.split(x, y):
if self.bool_default:
self.model = self.modeltype()
score.append(self.fit_model(self.model, x[train], y[train], x[test], y[test]))
if update_progress: update_progress.emit(sub_progress[count])
count += 1
else:
self.model = self.modeltype(**self.params)
score.append(self.fit_model(self.model, x[train], y[train], x[test], y[test]))
if update_progress: update_progress.emit(sub_progress[count])
count += 1
return(score)
else:
X, y = self.scale()
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, stratify = y)
if self.bool_default:
self.model = self.modeltype()
score = self.fit_model(self.model, X_train, y_train, X_test, y_test)
else:
self.model = self.modeltype(**self.params)
score = self.fit_model(self.model, X_train, y_train, X_test, y_test)
if update_progress: update_progress.emit(stop)
return(score)
def parameter_search(self, update_progress = None):
'''
performs a randomized hyperparameter optimization with the given parameter sets
'''
temp_dict = self.params
self.params = {}
best_params = {}
best_accuracy = 0
tested_params = []
max_model_fits = min(self.iterations, temp_dict["max_combinations"])
progress = np.arange(0, 100 + (100/max_model_fits), 100/max_model_fits)
count = 0
while self.iterations > len(tested_params) and temp_dict["max_combinations"] > len(tested_params):
for key in temp_dict:
if key == "number_layers":
neurons_list = []
neurons_list.append(len(self.features.columns))
self.params["number_layers"] = random.choice(temp_dict["number_layers"])
for i in range(self.params["number_layers"]):
neurons_list.append(random.choice(temp_dict["number_neurons"]))
neurons_list.append(len(self.label.value_counts()))
self.params["number_neurons"] = neurons_list
activation_list = [random.choice(temp_dict["activations"])] * (self.params["number_layers"]+1)
activation_list.append('softmax')
self.params["activations"] = activation_list
elif key != "number_neurons" and key != "activations" and key != "max_combinations":
self.params[key] = random.choice(temp_dict[key])
if self.params not in tested_params:
if self.modeltype.__name__ == 'Neural_Network':
accuracy = self.fit_nn(update_progress, progress[count], progress[count+1])
count += 1
else:
accuracy = self.train_model(update_progress, progress[count], progress[count+1])
count += 1
tested_params.append(self.params)
if self.bool_kfold:
if np.mean(accuracy) > np.mean(best_accuracy):
best_accuracy = accuracy
best_params = self.params
else:
if accuracy > best_accuracy:
best_accuracy = accuracy
best_params = self.params
self.params = {}
return (best_accuracy, best_params)
class Neural_Network(classifier):
'''
Subclass of "classifier()" - Creates Neural Network.
- features: array matrix
- label: array vector
- scaler: string
- bool_kfold: boolean
- splits: integer
- repeats: integer
- modeltype: object
- params: dictionary
- iterations: integer
- epochs: integer
Contains the following methods:
- create_neural_network: creates the neural network with the given parameters
- fit_nn: fits the neural network
'''
def __init__(self, features, label, scaler, bool_kfold, splits, repeats, modeltype, params, iterations, epochs):
super().__init__(features, label, scaler, bool_kfold, splits, repeats, modeltype, params, iterations, bool_default = None)
self.epochs = epochs
def create_neural_network(self):
'''
creates the neural network with the given parameters
'''
optimizer = globals()[self.params['optimizer']] if self.params['optimizer'] is not None else None
self.model = Sequential()
self.model.add(Dense(self.params['number_neurons'][0], activation = str(self.params['activations'][0]), input_shape = (len(self.features.columns), )))
self.model.add(Dropout(rate=self.params['dropout']))
for i in range(1, (self.params['number_layers']+1)):
self.model.add(Dense(self.params['number_neurons'][i], activation = str(self.params['activations'][i])))
self.model.add(Dropout(rate=self.params['dropout']))
self.model.add(Dense(self.params['number_neurons'][-1], activation = str(self.params['activations'][-1])))
self.model.compile(loss='categorical_crossentropy', optimizer=optimizer(lr=self.params['lr']), metrics=['accuracy'])
self.model.summary()
def fit_nn(self, update_progress = None, start = 0, stop = 100):
'''
fits the neural network
'''
Earlystopper = EarlyStopping(monitor='val_accuracy', patience=15)
if self.bool_kfold:
x, y = self.scale()
y = y - np.min(y)
rkfold = RepeatedStratifiedKFold(n_splits=self.splits, n_repeats=self.repeats)
score = []
total_repeats = self.splits * self.repeats
sub_progress = np.arange(start, stop + (stop-start)/total_repeats, (stop-start)/total_repeats)
count = 1
for train, test in rkfold.split(x, y):
self.create_neural_network()
y_train = to_categorical(y[train])
y_test = to_categorical(y[test])
self.model.fit(x[train], y_train, epochs = self.epochs, batch_size=self.params['batch_size'], callbacks=[Earlystopper], validation_data=(x[test], y_test))
if update_progress: update_progress.emit(sub_progress[count])
count += 1
score.append(self.model.evaluate(x[test], y_test)[1])
return(score)
else:
self.create_neural_network()
X, y = self.scale()
y = y - np.min(y)
y = to_categorical(y)
x_train, x_test, y_train, y_test = train_test_split(X, y, test_size=0.25, stratify=y)
self.model.fit(x_train, y_train, epochs = self.epochs, batch_size=self.params['batch_size'], callbacks=[Earlystopper], validation_data=(x_test, y_test))
accuracy = self.model.evaluate(x_test, y_test)[1]
if update_progress: update_progress.emit(stop)
return(accuracy)