Trainingsdatenerstellung parallelisieren führt zu CUDA_ERROR_OUT_OF_MEMORY

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
Antworten
CptK
User
Beiträge: 7
Registriert: Sonntag 16. Januar 2022, 12:04
Wohnort: Darmstadt

Hallo, ich habe eine Liste von Beispieldaten und möchte diese jetzt in numpy-arrays umwandeln.

Die Funktionen, die alles starten und die Parallelisierung einleiten sind folgende:

Code: Alles auswählen

def split(data, n):
    k, m = divmod(len(data), n)
    return list(data[i*k+min(i, m):(i+1)*k+min(i+1, m)] for i in range(n))

def start_sample_generation(args):
    data, shape, cols = args
    return to_samples(data, shape, cols)
    
def generate_samples(sample_shape, cols_no_normalization):
    with open('raw_data.pickle', 'rb') as handle:
        raw_data= pickle.load(handle)
        
    pool = mp.Pool(mp.cpu_count())
    data = pool.map(start_sample_generation, [(x, sample_shape, cols_no_normalization) for x in split(raw_data, mp.cpu_count())])

    X, Y = [], []
    for x, y in data:
        print(x.shape, y.shape)
        X.append(x)
        Y.append(y)

    X = np.concatenate(X, axis=0)
    Y = np.concatenate(Y, axis=0)
    return X, Y

if __name__ == '__main__':
    generate_samples((25,5),['state'])
Für die Eigentliche Umwandlung zuständig ist folgender Code, wobei erst einmal alle raw-samples in Sample-Klassen gemapped werde, wo auch die Normalisierung stattfindet und dann die np.Arrays erzeugt werden:

Code: Alles auswählen

class Sample:
    def __init__(self, data, cols_no_normalization):
        self.sample = normalize(data.history, cols_no_normalization)
        self.label = ...
        self.label = tf.one_hot(self.label, depth=3)

def to_samples(raw_data, input_shape, cols_no_normalization=[]):
    samples = list(map(lambda x: Sample(x, cols_no_normalization), raw_data))
    return samples_to_numpy(samples, input_shape)
 
def samples_to_numpy(samples : list[Sample], input_shape):
    shuffle(samples)
    x = []
    y = []
    count = 0
    l = int(len(samples) / 10)
    print(">>> creating numpy arrays --> started; len(samples) =", len(samples))
    for i in range(len(samples)):
        x_n = samples[i].sample.to_numpy()
        if x_n.shape == input_shape:
            x.append(x_n)
            y.append(samples[i].label)
        if (l + count * l) == i:
            print(">>> creating numpy arrays --> progress:", (count+1)*10, '%')
            count = count + 1
    return np.stack(x, axis=0), np.stack(y, axis=0)
    
def normalize(df: pd.DataFrame, cols_no_normalization=[], drop_na=True) -> pd.DataFrame:
    tmp = df
    for col in tmp.columns:
        if col not in cols_no_normalization:
            tmp[col] = tmp[col].pct_change()
            tmp = tmp.replace([np.inf, -np.inf], np.nan)
            tmp[col] = preprocessing.scale(tmp[col].values) # scale values
            
    if drop_na:
        tmp.dropna(inplace=True)

    return tmp
Wie schon im Titel gesagt, sorgt das Ganze bei mir dafür, dass ein CUDA_ERROR_OUT_OF_MEMORY geworfen wird. Dazu habe ich jetzt zwei Fragen:
1. Was an meinem Code sorgt überhaupt dafür, dass CUDA bzw. meine GPU genutzt wird?
2. Lässt sich der Code noch hinsichtlich des Speicherbedarfs optimieren? Wenn ja wie?

Vielen Dank!
Sirius3
User
Beiträge: 18279
Registriert: Sonntag 21. Oktober 2012, 17:20

Da Du nur Bruchstücke Deines Codes zeigst, muß man wohl raten, dass tf Tensorflow ist, und was benutzt die GPU.
Es macht also gar keinen Sinn, mit Multiprocessing zu arbeiten, weil dadurch bekommst Du auch nicht mehr GPUs.
CptK
User
Beiträge: 7
Registriert: Sonntag 16. Januar 2022, 12:04
Wohnort: Darmstadt

Sirius3 hat geschrieben: Mittwoch 2. März 2022, 18:51 Da Du nur Bruchstücke Deines Codes zeigst, muß man wohl raten, dass tf Tensorflow ist, und was benutzt die GPU.
Es macht also gar keinen Sinn, mit Multiprocessing zu arbeiten, weil dadurch bekommst Du auch nicht mehr GPUs.
Das Ganze soll ja eigentlich auch nicht auf der GPU laufen, aber der Fehler sagt mir, dass es das doch irgendwie macht. Und tf nutze ich doch nur für die One-Hot-Kodierung oder hat das auch einen Einfluss auf den Rest?
Benutzeravatar
ThomasL
User
Beiträge: 1379
Registriert: Montag 14. Mai 2018, 14:44
Wohnort: Kreis Unna NRW

Um das beurteilen zu können, brauchen wir mehr Code, nämlich deine Importe und wie du das ganze startest.
CUDA wird nicht einfach so aus der Luft heraus gestartet und benutzt.
Ich bin Pazifist und greife niemanden an, auch nicht mit Worten.
Für alle meine Code Beispiele gilt: "There is always a better way."
https://projecteuler.net/profile/Brotherluii.png
CptK
User
Beiträge: 7
Registriert: Sonntag 16. Januar 2022, 12:04
Wohnort: Darmstadt

Hier der gesamte Code inkl. Imports

Code: Alles auswählen

import numpy as np
import pandas as pd
from random import shuffle
from tensorflow import one_hot
from sklearn import preprocessing
import multiprocessing as mp
import pickle

class Sample:
    def __init__(self, trade, cols_no_normalization):
        self.sample = normalize(trade.history, cols_no_normalization)
        self.label = 1
        self.label = one_hot(self.label, depth=3)

def to_samples(raw_data, input_shape, cols_no_normalization=[]):
    samples = list(map(lambda x: Sample(x, cols_no_normalization), raw_data))
    return samples_to_numpy(samples, input_shape)
 
def samples_to_numpy(samples : list[Sample], input_shape):
    shuffle(samples)
    x = []
    y = []
    count = 0
    l = int(len(samples) / 10)
    print(">>> creating numpy arrays --> started; len(samples) =", len(samples))
    for i in range(len(samples)):
        x_n = samples[i].sample.to_numpy()
        if x_n.shape == input_shape:
            x.append(x_n)
            y.append(samples[i].label)
        if (l + count * l) == i:
            print(">>> creating numpy arrays --> progress:", (count+1)*10, '%')
            count = count + 1
    return np.stack(x, axis=0), np.stack(y, axis=0)
    
def normalize(df: pd.DataFrame, cols_no_normalization=[], drop_na=True) -> pd.DataFrame:
    tmp = df
    for col in tmp.columns:
        if col not in cols_no_normalization:
            tmp[col] = tmp[col].pct_change()
            tmp = tmp.replace([np.inf, -np.inf], np.nan)
            tmp[col] = preprocessing.scale(tmp[col].values) # scale values
            
    if drop_na:
        tmp.dropna(inplace=True)

    return tmp

def split(data, n):
    k, m = divmod(len(data), n)
    return list(data[i*k+min(i, m):(i+1)*k+min(i+1, m)] for i in range(n))

def start_sample_generation(args):
    data, shape, cols = args
    return to_samples(data, shape, cols)
    
def generate_samples(sample_shape, cols_no_normalization):
    with open('trades_first_10000.pickle', 'rb') as handle:
        raw_data= pickle.load(handle)
    print(len(raw_data))    
    pool = mp.Pool(mp.cpu_count())
    data = pool.map(start_sample_generation, [(x, sample_shape, cols_no_normalization) for x in split(raw_data, mp.cpu_count())])

    X, Y = [], []
    for x, y in data:
        print(x.shape, y.shape)
        X.append(x)
        Y.append(y)

    X = np.concatenate(X, axis=0)
    Y = np.concatenate(Y, axis=0)
    print(X.shape, Y.shape)
    return X, Y

if __name__ == '__main__':
    generate_samples((25,5),['state'])
Allerdings kommt der CUDA-Fehler nicht immer - manchmal stürzt das System schon vorher wegen Überlastung ab (RAM voll). Für mich deutet das darauf hin, dass viele Objekte zu lange leben. Zum Beispiel: Ich lade ja im Hauptprogramm die Daten aus der Datei und verteile diese dann an die Prozesse. Was passiert mit den kopierten Daten, die im Hauptprogramm nach dem starten der Prozesse noch rumliegen, werden die gelöscht oder sind die da immer noch, bzw. wie kriege ich die gelöscht, dass ich nicht mehrere Kopien der gleichen Daten habe? Genauso in der Funktion samples_to_numpy: Die samples, die schon zu np.arrays umgewadelt wurden werden nicht mehr gebraucht und verschwenden nur unnötig Speicher, kann man dort die for-Schleife durch folgende while-Schleife ersetzen, um das zu verbessern? Und hat das Auswirkungen auf die Performance?

Code: Alles auswählen

while len(samples) > 0:
    x_n = samples[0].sample.to_numpy()
    if x_n.shape == input_shape:
        x.append(x_n)
        y.append(samples[0].label)
    samples = samples[1:]
CptK
User
Beiträge: 7
Registriert: Sonntag 16. Januar 2022, 12:04
Wohnort: Darmstadt

Ich habe das Problem gefunden: Die Benutzung der tensorflow.one_hot-Funktion führt zu einer totalen Überlastung meines Arbeitsspeichers. Ich habe mir jetzt einfach eine eigene One-Hot-Funktion geschrieben, mit der alles problemlos durchläuft:

Code: Alles auswählen

import numpy as np

def one_hot(value, depth):
    result = np.zeros(depth)
    result[value] = 1
    return result
Antworten