[Konzept-Frage] Größere Datenmenge in Diagramm auswerten

Alles, was nicht direkt mit Python-Problemen zu tun hat. Dies ist auch der perfekte Platz für Jobangebote.
Benutzeravatar
Dennis89
User
Beiträge: 1560
Registriert: Freitag 11. Dezember 2020, 15:13

Hallo zusammen,

ich bin auf InfluxDB umgestiegen und habe folgendes erstes Konzept:

Code: Alles auswählen

#!/usr/bin/env python

from pathlib import Path

import pandas as pd
from influxdb import DataFrameClient

CHUNK_SIZE = 500
CSV_FILE_PATH = Path().home() / "DataLog.csv"
TEMPORARY_FOLDER = Path(__file__).parent / "Temp"



def split_file(file, temporary_folder, chunksize):
    for number, chunk in enumerate(
        pd.read_csv(file, on_bad_lines="skip", chunksize=chunksize), 1
    ):
        chunk.to_csv(f"{temporary_folder / file.stem}_{number}.csv", index=False)


def get_data(temporary_folder):
    for csv_file in temporary_folder.glob("*.csv"):
        data_frame = pd.read_csv(
            csv_file,
        )
        data_frame = data_frame.dropna()
        data_frame["Datetime"] = pd.to_datetime(
            data_frame["Date"] + data_frame["Time"], format="%a %b %d %Y%I:%M:%S %p"
        )
        data_frame = data_frame.set_index("Datetime")
        data_frame = data_frame.drop(["Date", "Time"], axis=1)
        for column in data_frame.columns:
            try:
                data_frame[column] = data_frame[column].astype(float)
            except ValueError:
                pass
        yield data_frame


def main():
    split_file(CSV_FILE_PATH, TEMPORARY_FOLDER, CHUNK_SIZE)
    with DataFrameClient(host="localhost", port=8086) as client:
        client.switch_database("Test")
        for data in get_data(TEMPORARY_FOLDER):
            client.write_points(data, "Sensors", protocol="line")


if __name__ == "__main__":
    main()
Den temporären Ordner würde ich nach dem Schreiben in die Datenbank wieder löschen. Ich mache das, weil sich mein Laptop an dem ich das teste, ab und zu aufgehängt hat, beim gesamten Einlesen der Datei. Das kam so jetzt nicht mehr vor. Das Einlesen bzw. aufteilen der Dateien geht zwar auch "ewig" und auch das schreiben in die Datenbank ist nicht sonderlich schnell. Gut ist, dass das ja eine einmalige Aktion ist und das Abfragen und Darstellen mit Grafana sollte ja flüssig gehen.

Habt ihr noch Optimierungen/Verbesserungen/Änderungen für mich?

Vielen Dank und Grüße
Dennis
"When I got the music, I got a place to go" [Rancid, 1993]
Sirius3
User
Beiträge: 18276
Registriert: Sonntag 21. Oktober 2012, 17:20

@Dennis89: jetzt nutzt Du schon pathlib, wandelst den Pfad aber doch wieder in einen String um, um ihn dann per Stringoperation zu verändern. WARUM?
Das schreiben in temporäre Dateien ist unnötig. Die Spalten vom Typ float sollten automatisch richtig erkannt werden.

Code: Alles auswählen

from pathlib import Path
import pandas as pd
from influxdb import DataFrameClient

CHUNK_SIZE = 500
CSV_FILE_PATH = Path().home() / "DataLog.csv"

def get_data(file, chunksize):
    for number, data_frame in enumerate(
        pd.read_csv(file, on_bad_lines="skip", chunksize=chunksize), 1
    ):
        data_frame = data_frame.dropna()
        data_frame["Datetime"] = pd.to_datetime(
            data_frame["Date"] + data_frame["Time"], format="%a %b %d %Y%I:%M:%S %p"
        )
        data_frame = data_frame.set_index("Datetime")
        data_frame = data_frame.drop(["Date", "Time"], axis=1)
        yield data_frame


def main():
    with DataFrameClient(host="localhost", port=8086) as client:
        client.switch_database("Test")
        for data in get_data(CSV_FILE_PATH, CHUNK_SIZE):
            client.write_points(data, "Sensors", protocol="line")


if __name__ == "__main__":
    main()
Benutzeravatar
Dennis89
User
Beiträge: 1560
Registriert: Freitag 11. Dezember 2020, 15:13

Danke für die schnelle Antwort.
WARUM?
Habe nicht mitgedacht 😖

Das Problem mit `float` war, dass in eine Meldung bekam, ich hätte gemixte Typen in einer Spalte. Ich vermute das erste mal wurde eine 0 geschrieben und als `int` erkannt und "später" kam da ein anderer Messwert, der nicht 0 ist sondern vom Typ `float`. Daher habe ich mir so beholfen.

Ich habe diese Meldung bekommen:

Code: Alles auswählen

(.venv) dennis@dennis:~/PycharmProjects/SensorData$ python xxx.py 
/home/dennis/PycharmProjects/SensorData/xxx.py:13: SettingWithCopyWarning: 
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  data_frame["Datetime"] = pd.to_datetime(
Die ist nicht abgeschnitten, sondern kam wirklich so. Habe mir deinen Code noch einmal angeschaut und nur das `enumerate` noch raus geworfen und beim zweiten Mal lief er dann durch. Ja auf den Link habe ich auch geklickt, bin aber nicht schlauer geworden, wie ich das in Zukunft verhindern könnte. Wisst ihr da was?


Grüße
Dennis
"When I got the music, I got a place to go" [Rancid, 1993]
LeanUnd
User
Beiträge: 2
Registriert: Mittwoch 16. Juli 2025, 10:09

Ich persönlich erstelle in solchen Fällen normalerweise eine Kopie des ursprünglichen Slice, um Probleme zu vermeiden:

Code: Alles auswählen

subset_df = df[some_condition].copy()
. Dann verschwindet die Warnung und Sie können die Daten in Ruhe ändern.
Benutzeravatar
Dennis89
User
Beiträge: 1560
Registriert: Freitag 11. Dezember 2020, 15:13

Danke für die Antwort.

Kannst du das bitte erklären? Ich erstelle die Spalte `Datetime` gerade erst und ändere keine Daten auf die ich "gleichzeitig" auch zugreife.


Grüße
Dennis
"When I got the music, I got a place to go" [Rancid, 1993]
Benutzeravatar
Dennis89
User
Beiträge: 1560
Registriert: Freitag 11. Dezember 2020, 15:13

Hallo zusammen,

heute stehe ich wieder vor einem Problem. Bis jetzt lief soweit alles gut, allerdings hat mein Datensatz heute Werte enthalten, die da nicht reingehören und ich würde gerne die entsprechenden Reihen löschen.
Die Funktion, die die Daten verarbeitet sieht so aus:

Code: Alles auswählen

@logger.catch
def format_csv_files(file, chunk_size):
    for data_frame in pd.read_csv(file, on_bad_lines="skip", chunksize=chunk_size):
        data_frame["Datetime"] = pd.to_datetime(
            data_frame["Date"] + data_frame["Time"],
            format="%a %b %d %Y%I:%M:%S %p",
            utc=True,
        )
        data_frame = data_frame.set_index("Datetime")
        data_frame = data_frame.drop(["Date", "Time"], axis=1)
        for column in data_frame.columns:
            try:
                data_frame[column] = data_frame[column].astype(float)
            except ValueError:
                pass
        yield data_frame
Die "Spalte" `Date` und `Time` enthält teilweise Werte wie `3114128112793` oder `5.2525634765625` oder `.278160572052002`. Mit `try/except` kann ich nach meinem Verständnis nicht arbeiten, weil ich Operationen auf das ganze `data_frame` ausübe und nicht jede Zeile einzeln betrachte und ich will nicht das ganze `data_frame` wegwerfen und `chunk_size` auf 1 zu verkleinern macht meiner Meinung nach auch kein Sinn.(?)
Die Zeilen mit den falschen Werten haben auch sonst viele Fehlstehlen in den Spalten, allerdings hat mein ganzer Datensatz teilweise Leerstellen, weil einige Sensoren noch nicht aktiv sind, daher kann ich nach leeren Spalten auch nicht aussortieren.
Wie könnte ich den hier eine Fehlerbehandlung machen, damit nur die Reihe mit den falschen Werten verworfen wird?

Wo die Werte herkommen muss ich auch noch raus finden, allerdings muss die Maschine laufen und das ist kein Notfall für den ich sie ausschalten will, deshalb hätte ich lieber ein Skript dass damit umgehen kann.


Vielen Dank und Grüße
Dennis
"When I got the music, I got a place to go" [Rancid, 1993]
Benutzeravatar
Dennis89
User
Beiträge: 1560
Registriert: Freitag 11. Dezember 2020, 15:13

Habe folgendes gefunden:

Code: Alles auswählen

def format_csv_files(file, chunk_size):
    for data_frame in pd.read_csv(file, on_bad_lines="skip", chunksize=chunk_size):
        data_frame["Datetime"] = pd.to_datetime(
            data_frame["Date"] + data_frame["Time"],
            format="%a %b %d %Y%I:%M:%S %p",
            utc=True,
            errors='coerce'
        )
        data_frame = data_frame.dropna(subset=["Datetime"])
        data_frame = data_frame.set_index("Datetime")
        data_frame = data_frame.drop(["Date", "Time"], axis=1)
        for column in data_frame.columns:
            try:
                data_frame[column] = data_frame[column].astype(float)
            except ValueError:
                pass
        yield data_frame
Bis jetzt sieht es ganz gut aus.

Grüße
Dennis
"When I got the music, I got a place to go" [Rancid, 1993]
Benutzeravatar
DeaD_EyE
User
Beiträge: 1243
Registriert: Sonntag 19. September 2010, 13:45
Wohnort: Hagen
Kontaktdaten:

grubenfox hat geschrieben: Donnerstag 17. Juli 2025, 11:14 Ach, schade... ;) ich hatte auf zukünftige Erfahrungsberichte zu `dask` gehofft. Die Webseite liest sich ja ganz nett. Ich glaube ich würde es vielleicht gerne einsetzen, aber mir fehlen dafür irgendwie die passenden Problemfälle. ;)
Du kannst doch selbst Dask mit irgendwelchen Testdaten probieren und mit Pandas vergleichen. Aber wahrscheinlich gibt es darüber sogar Blog-Einträge und Benchmarks.
sourceserver.info - sourceserver.info/wiki/ - ausgestorbener Support für HL2-Server
Antworten