Seite 1 von 1

Trainingsdatenerstellung parallelisieren führt zu CUDA_ERROR_OUT_OF_MEMORY

Verfasst: Mittwoch 2. März 2022, 17:52
von CptK
Hallo, ich habe eine Liste von Beispieldaten und möchte diese jetzt in numpy-arrays umwandeln.

Die Funktionen, die alles starten und die Parallelisierung einleiten sind folgende:

Code: Alles auswählen

def split(data, n):
    k, m = divmod(len(data), n)
    return list(data[i*k+min(i, m):(i+1)*k+min(i+1, m)] for i in range(n))

def start_sample_generation(args):
    data, shape, cols = args
    return to_samples(data, shape, cols)
    
def generate_samples(sample_shape, cols_no_normalization):
    with open('raw_data.pickle', 'rb') as handle:
        raw_data= pickle.load(handle)
        
    pool = mp.Pool(mp.cpu_count())
    data = pool.map(start_sample_generation, [(x, sample_shape, cols_no_normalization) for x in split(raw_data, mp.cpu_count())])

    X, Y = [], []
    for x, y in data:
        print(x.shape, y.shape)
        X.append(x)
        Y.append(y)

    X = np.concatenate(X, axis=0)
    Y = np.concatenate(Y, axis=0)
    return X, Y

if __name__ == '__main__':
    generate_samples((25,5),['state'])
Für die Eigentliche Umwandlung zuständig ist folgender Code, wobei erst einmal alle raw-samples in Sample-Klassen gemapped werde, wo auch die Normalisierung stattfindet und dann die np.Arrays erzeugt werden:

Code: Alles auswählen

class Sample:
    def __init__(self, data, cols_no_normalization):
        self.sample = normalize(data.history, cols_no_normalization)
        self.label = ...
        self.label = tf.one_hot(self.label, depth=3)

def to_samples(raw_data, input_shape, cols_no_normalization=[]):
    samples = list(map(lambda x: Sample(x, cols_no_normalization), raw_data))
    return samples_to_numpy(samples, input_shape)
 
def samples_to_numpy(samples : list[Sample], input_shape):
    shuffle(samples)
    x = []
    y = []
    count = 0
    l = int(len(samples) / 10)
    print(">>> creating numpy arrays --> started; len(samples) =", len(samples))
    for i in range(len(samples)):
        x_n = samples[i].sample.to_numpy()
        if x_n.shape == input_shape:
            x.append(x_n)
            y.append(samples[i].label)
        if (l + count * l) == i:
            print(">>> creating numpy arrays --> progress:", (count+1)*10, '%')
            count = count + 1
    return np.stack(x, axis=0), np.stack(y, axis=0)
    
def normalize(df: pd.DataFrame, cols_no_normalization=[], drop_na=True) -> pd.DataFrame:
    tmp = df
    for col in tmp.columns:
        if col not in cols_no_normalization:
            tmp[col] = tmp[col].pct_change()
            tmp = tmp.replace([np.inf, -np.inf], np.nan)
            tmp[col] = preprocessing.scale(tmp[col].values) # scale values
            
    if drop_na:
        tmp.dropna(inplace=True)

    return tmp
Wie schon im Titel gesagt, sorgt das Ganze bei mir dafür, dass ein CUDA_ERROR_OUT_OF_MEMORY geworfen wird. Dazu habe ich jetzt zwei Fragen:
1. Was an meinem Code sorgt überhaupt dafür, dass CUDA bzw. meine GPU genutzt wird?
2. Lässt sich der Code noch hinsichtlich des Speicherbedarfs optimieren? Wenn ja wie?

Vielen Dank!

Re: Trainingsdatenerstellung parallelisieren führt zu CUDA_ERROR_OUT_OF_MEMORY

Verfasst: Mittwoch 2. März 2022, 18:51
von Sirius3
Da Du nur Bruchstücke Deines Codes zeigst, muß man wohl raten, dass tf Tensorflow ist, und was benutzt die GPU.
Es macht also gar keinen Sinn, mit Multiprocessing zu arbeiten, weil dadurch bekommst Du auch nicht mehr GPUs.

Re: Trainingsdatenerstellung parallelisieren führt zu CUDA_ERROR_OUT_OF_MEMORY

Verfasst: Mittwoch 2. März 2022, 19:57
von CptK
Sirius3 hat geschrieben: Mittwoch 2. März 2022, 18:51 Da Du nur Bruchstücke Deines Codes zeigst, muß man wohl raten, dass tf Tensorflow ist, und was benutzt die GPU.
Es macht also gar keinen Sinn, mit Multiprocessing zu arbeiten, weil dadurch bekommst Du auch nicht mehr GPUs.
Das Ganze soll ja eigentlich auch nicht auf der GPU laufen, aber der Fehler sagt mir, dass es das doch irgendwie macht. Und tf nutze ich doch nur für die One-Hot-Kodierung oder hat das auch einen Einfluss auf den Rest?

Re: Trainingsdatenerstellung parallelisieren führt zu CUDA_ERROR_OUT_OF_MEMORY

Verfasst: Mittwoch 2. März 2022, 21:55
von ThomasL
Um das beurteilen zu können, brauchen wir mehr Code, nämlich deine Importe und wie du das ganze startest.
CUDA wird nicht einfach so aus der Luft heraus gestartet und benutzt.

Re: Trainingsdatenerstellung parallelisieren führt zu CUDA_ERROR_OUT_OF_MEMORY

Verfasst: Donnerstag 3. März 2022, 11:43
von CptK
Hier der gesamte Code inkl. Imports

Code: Alles auswählen

import numpy as np
import pandas as pd
from random import shuffle
from tensorflow import one_hot
from sklearn import preprocessing
import multiprocessing as mp
import pickle

class Sample:
    def __init__(self, trade, cols_no_normalization):
        self.sample = normalize(trade.history, cols_no_normalization)
        self.label = 1
        self.label = one_hot(self.label, depth=3)

def to_samples(raw_data, input_shape, cols_no_normalization=[]):
    samples = list(map(lambda x: Sample(x, cols_no_normalization), raw_data))
    return samples_to_numpy(samples, input_shape)
 
def samples_to_numpy(samples : list[Sample], input_shape):
    shuffle(samples)
    x = []
    y = []
    count = 0
    l = int(len(samples) / 10)
    print(">>> creating numpy arrays --> started; len(samples) =", len(samples))
    for i in range(len(samples)):
        x_n = samples[i].sample.to_numpy()
        if x_n.shape == input_shape:
            x.append(x_n)
            y.append(samples[i].label)
        if (l + count * l) == i:
            print(">>> creating numpy arrays --> progress:", (count+1)*10, '%')
            count = count + 1
    return np.stack(x, axis=0), np.stack(y, axis=0)
    
def normalize(df: pd.DataFrame, cols_no_normalization=[], drop_na=True) -> pd.DataFrame:
    tmp = df
    for col in tmp.columns:
        if col not in cols_no_normalization:
            tmp[col] = tmp[col].pct_change()
            tmp = tmp.replace([np.inf, -np.inf], np.nan)
            tmp[col] = preprocessing.scale(tmp[col].values) # scale values
            
    if drop_na:
        tmp.dropna(inplace=True)

    return tmp

def split(data, n):
    k, m = divmod(len(data), n)
    return list(data[i*k+min(i, m):(i+1)*k+min(i+1, m)] for i in range(n))

def start_sample_generation(args):
    data, shape, cols = args
    return to_samples(data, shape, cols)
    
def generate_samples(sample_shape, cols_no_normalization):
    with open('trades_first_10000.pickle', 'rb') as handle:
        raw_data= pickle.load(handle)
    print(len(raw_data))    
    pool = mp.Pool(mp.cpu_count())
    data = pool.map(start_sample_generation, [(x, sample_shape, cols_no_normalization) for x in split(raw_data, mp.cpu_count())])

    X, Y = [], []
    for x, y in data:
        print(x.shape, y.shape)
        X.append(x)
        Y.append(y)

    X = np.concatenate(X, axis=0)
    Y = np.concatenate(Y, axis=0)
    print(X.shape, Y.shape)
    return X, Y

if __name__ == '__main__':
    generate_samples((25,5),['state'])
Allerdings kommt der CUDA-Fehler nicht immer - manchmal stürzt das System schon vorher wegen Überlastung ab (RAM voll). Für mich deutet das darauf hin, dass viele Objekte zu lange leben. Zum Beispiel: Ich lade ja im Hauptprogramm die Daten aus der Datei und verteile diese dann an die Prozesse. Was passiert mit den kopierten Daten, die im Hauptprogramm nach dem starten der Prozesse noch rumliegen, werden die gelöscht oder sind die da immer noch, bzw. wie kriege ich die gelöscht, dass ich nicht mehrere Kopien der gleichen Daten habe? Genauso in der Funktion samples_to_numpy: Die samples, die schon zu np.arrays umgewadelt wurden werden nicht mehr gebraucht und verschwenden nur unnötig Speicher, kann man dort die for-Schleife durch folgende while-Schleife ersetzen, um das zu verbessern? Und hat das Auswirkungen auf die Performance?

Code: Alles auswählen

while len(samples) > 0:
    x_n = samples[0].sample.to_numpy()
    if x_n.shape == input_shape:
        x.append(x_n)
        y.append(samples[0].label)
    samples = samples[1:]

Re: Trainingsdatenerstellung parallelisieren führt zu CUDA_ERROR_OUT_OF_MEMORY

Verfasst: Freitag 4. März 2022, 15:55
von CptK
Ich habe das Problem gefunden: Die Benutzung der tensorflow.one_hot-Funktion führt zu einer totalen Überlastung meines Arbeitsspeichers. Ich habe mir jetzt einfach eine eigene One-Hot-Funktion geschrieben, mit der alles problemlos durchläuft:

Code: Alles auswählen

import numpy as np

def one_hot(value, depth):
    result = np.zeros(depth)
    result[value] = 1
    return result