Dateien einlesen via multiprocessing

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
Antworten
Roman Rettich
User
Beiträge: 4
Registriert: Donnerstag 14. Juni 2018, 13:36

Hey Leute,

ich versuche seit einigen Tagen herauszufinden, wie ich den Inhalt von Dateien zügig mithilfe von 'multiprocessing' einlesen kann, verarbeiten und wieder speichern kann. Ich habe in einem Ordner einige hundert Dateien, die jeweils einige tausend Zahlenwerte enthalten und möchte diese in einer Matrix speichern. Ich habe mich jetzt einige Zeit daran versucht, nur leider bin ich noch nicht dahinter gestiegen, da ich auch noch eher ein Python-Neuling bin. Ich habe einen Code im Internet gefunden, der für mein Problem glaube ich geeignet ist, und habe versucht ihn umzuschreiben. Hier ist meine umgeschriebene Version (Erklärung unten):

Code: Alles auswählen

import numpy as np
import multiprocessing
import math

def writedata(rs,out_q):
 data_sim=np.zeros((4281,80))
 for kkk in rs:
  rs_str=str(kkk)
  fnrs = '../OUTPUT_FILES' + '/Datei' + rs_str 
  data_sim_rec=np.genfromtxt(fnrs)
  data_sim_rec=np.array([data_sim_rec[:,1]]).T
  data_sim[:,kkk]=data_sim_rec[:,0]   
 out_q.put(data_sim)

rs=np.arange(1,80,1)
nprocs=10
out_q=multiprocessing.Queue()
chunksize=int(math.ceil(len(rs)/float(nprocs)))
procs=[]

for i in range(nprocs):
    p=multiprocessing.Process(target=writedata,args=(rs[chunksize*i:chunksize*(i+1)],out_q))
    procs.append(p)
    p.start()
resultdict={}

for i in range(nprocs):  
 resultdict.update((out_q.get()))

for p in procs:
    p.join()

data_sim=resultdict


Ich definiere die Funktion writedata, die in den Ordner mit hier beispielhaften 80 Dateien geht, die Dateien der Reihe nach in data_sim_rec ausliest und den zweiten Reiheneintrag in der Matrix data_sim an entsprechender Stelle speichern soll. Diese Matrix soll in einem Queue gespeichert werden. Ich verteile danndie Jobs über eine Liste mit Zahlen für den Loop an die verschiedenen Prozessoren.
Danach sollen die Daten in resultdict gespeichert werden. Die aktuelle Fehlermeldung lautet

Code: Alles auswählen

 "dictionary update sequence element #0 has length 80, 2 is required."
Mir ist klar, dass ich irgendetwas mit Datentypen vertausche und das mit dem update auch nicht richtig ist. Allerdings habe ich schon vieles probiert und bin ratlos.
Ich wäre über etwas Hilfe sehr dankbar :) Der Code ist in 2.7 geschrieben.

Beste Grüße,

Max
Benutzeravatar
__blackjack__
User
Beiträge: 13004
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

Ein kompletter Traceback ist in der Regel nett, denn sonst muss man suchen und eventuell raten wo das Problem denn genau auftritt. Hier war's ja noch relativ einfach die Zeile `resultdict.update((out_q.get()))` auszumachen, aber das ist ja nicht immer so.

Da ist ein Klammerpaar zu viel, das ist eventuell verwirrend.

Der Fehler ist, das die `dict.update()`-Methode eine Sequenz¹ von Schlüssel/Wert-Paarenerwartet. Bekommt aber aus der Queue ein Array mit 4281 Einträgen die nicht je aus einem Schlüssel und einem Wert bestehen sondern jeweils aus 80 Werten. Was soll denn da Deiner Meinung nach mit den Daten passieren?

Mir ist auch nicht so ganz klar was der Schlüssel denn sein soll?

Eventuell ist `concurrent.futures` die einfachere API für Dein Vorhaben. Da gibt's einen Backport von für Python 2.7.
_____
¹ Es muss nicht einmal eine Sequenz sein, jedes iteriertbare Objekt das Schlüssel/Wert-Paare liefert tut's auch.
“Most people find the concept of programming obvious, but the doing impossible.” — Alan J. Perlis
Sirius3
User
Beiträge: 17712
Registriert: Sonntag 21. Oktober 2012, 17:20

So wie Du das schreibst, ist es reichlich kompliziert. Nutze besser multiprocessing.Pool und eine der verschiedenen map-Methoden.
Eingerückt wird immer mit 4 Leerzeichen pro Ebene. Bei nur einem Leerzeichen ist der Code schwer zu lesen.

`writedata` ist ein falscher Name, da sie ja Daten liest.

Nimmt man pool, bleibt das hier übrig:

Code: Alles auswählen

import numpy as np
import multiprocessing

NUM_PROCESSES = 10
FILE_PATTERN = '../OUTPUT_FILES/Datei{}'

def read_data(nr):
    return np.genfromtxt(FILE_PATTERN.format(nr), usecols=1)

pool = multiprocessing.Pool(NUM_PROCESSES)
data_sim = np.fromiter(pool.map(read_data, range(1, 81)), dtype=float)
Roman Rettich
User
Beiträge: 4
Registriert: Donnerstag 14. Juni 2018, 13:36

Besten Dank, Sirius! So hat das schonmal in einem Testlauf geklappt.
Allerdings habe ich nun ein anderes Problem:
Da der Code bloß Teil eines großen Codes ist, kann dieser nicht im main-script ausgeführt werden, wodurch ich - wie ich glaube - auf folgendes neues Problem stoße, nämlich die Fehlermeldung:

Code: Alles auswählen

Traceback (most recent call last):
  File "Misfit.py", line 75, in <module>
    misfit_arr,time,ampl_simu,ampl_ref=calcmisfit(N_RECEIVERS,data_ref)
  File "/media/mas/376592ba-cc00-45a6-9d91-26ca397247af/SpecFEM3D_UbuntuVersion/Beton4/100kHz/Inversion/OUTPUTS_rightmodel/B_calcmisfit.py", line 23, in calcmisfit
    [time,ampl_simu,ampl_ref]=A_process_measurement(data_ref)
  File "/media/mas/376592ba-cc00-45a6-9d91-26ca397247af/SpecFEM3D_UbuntuVersion/Beton4/100kHz/Inversion/OUTPUTS_rightmodel/A_process_measurement.py", line 57, in A_process_measurement
    data_sim_rg=np.asarray(pool.map(eval(readfunc), range(limit1,limit2))).T
  File "/home/mas/anaconda2/lib/python2.7/multiprocessing/pool.py", line 253, in map
    return self.map_async(func, iterable, chunksize).get()
  File "/home/mas/anaconda2/lib/python2.7/multiprocessing/pool.py", line 572, in get
    raise self._value
cPickle.PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
Wie man sieht, ist der Code relativ 'verschachtelt'. Gibt es eine einfache Möglichkeit, das Problem zu umgehen? Die Lösungen im Netz sehen mir doch recht kompliziert aus :(

Beste Grüße und danke nochmal,

Max
Benutzeravatar
__blackjack__
User
Beiträge: 13004
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

Ich vermute es liegt am `eval(readfunc)`. Also `eval()` an sich ist schon etwas was ich dringend vermeiden würde. Was versuchst Du denn damit zu lösen? Funktionen kann `pickle` nur wenn sie in einem Modul definiert sind, so dass der Empfängerprocess das Modul laden und auf die dort definierte Funktion zugreifen kann.
“Most people find the concept of programming obvious, but the doing impossible.” — Alan J. Perlis
Roman Rettich
User
Beiträge: 4
Registriert: Donnerstag 14. Juni 2018, 13:36

okay, also hier der Programmcode:

Code: Alles auswählen

    ######input data######

import numpy as np
import multiprocessing
import time
import matplotlib
import matplotlib.pyplot as plt
from multiprocessing import Pool, freeze_support
import scipy.signal

######input data######

time_beg=time.time()
NUM_PROCESSES = 26
N_RECEIVERS=249

N_RECGROUPS=3
REC_group_1=[(1,80),'X'] ### thinking in range!!
REC_group_2=[(80,158),'Y']
REC_group_3=[(158,250),'Z']

######################
FILE_PATTERN_X = '../OUTPUT_FILES_TWOSENDERS/DB.X{}.FXX.semd'
FILE_PATTERN_Y = '../OUTPUT_FILES_TWOSENDERS/DB.X{}.FXY.semd'
FILE_PATTERN_Z = '../OUTPUT_FILES_TWOSENDERS/DB.X{}.FXZ.semd'
######################


##################### Function definition ######################

def read_data_X(nr):
    return np.array((np.genfromtxt(FILE_PATTERN_Z.format(nr), usecols=1)))

def read_data_Y(nr):
    return np.array((np.genfromtxt(FILE_PATTERN_Z.format(nr), usecols=1)))

def read_data_Z(nr):
    return np.array((np.genfromtxt(FILE_PATTERN_Z.format(nr), usecols=1)))


################## READ SIMULATION DATA WITH THE HELP OF MULTIPLE PROCESSORS ##########################


fnrs = '../OUTPUT_FILES_TWOSENDERS' + '/DB.X1.FXZ.semd' # file contains wavefield
data_sim_0 = np.genfromtxt(fnrs)
data_sim = np.empty([max(data_sim_0.shape),N_RECEIVERS+1])
data_sim[:,0]=data_sim_0[:,0] # first vector is time vector
pool = multiprocessing.Pool(NUM_PROCESSES) #Pool for Jobs

for rg in range (1,N_RECGROUPS):

    recgroup='REC_group_'+str(rg)
    readfunc='read_data_'+eval(recgroup+'[1]')
    limit1=eval(recgroup+'[0]')[0]
    limit2=eval(recgroup+'[0]')[1]
    data_sim_rg=np.asarray(pool.map(eval(readfunc), range(limit1,limit2))).T
    data_sim[:,limit1:limit2]=data_sim_rg





Ich habe gelesen, dass die Funktion pool.map() immer im Hauptskript ausgeführt werden muss und es wohl daran liegt. Das obenliegende Skript ist Teil einer Funktion, welche wieder in einer Funktion liegt.

Mit dem eval() möchte ich wie zu sehen einen String evaluieren. Was könnte man denn alternativ machen?

PS: Mir ist klar, dass mir einige Grundkenntnisse fehlen und ich deswegen an manchen Stellen etwas umständlich programmiere. Fange jetzt aber mit nem Buch an was dagegen zu tun ;)
Benutzeravatar
__blackjack__
User
Beiträge: 13004
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

@Roman Rettich: Argh. Vergiss das es `eval()` überhaupt gibt und die komische Idee Code in Zeichenketten ausführen zu wollen! Das ist Murks der fragil ist und denn am Ende keiner wirklich nachvollziehen kann, und vor allem ist es unnötig.

Wenn Du anfängst Namen zu nummerieren willst Du eigentlich eine Datenstruktur verwenden. In diesem Fall eine Liste für `REC_group_1` bis `REC_group_3`. Dann brauchst Du auch `N_RECGROUPS` nicht, denn von der Liste kann man die Länge ja einfach abfragen (`len()`-Funktion).

Dann kannst Du ein Wörterbuch erstellen, das Achsennamen auf Lesefunktionen abbildet und bist so alle `eval()`\s los:

Code: Alles auswählen

REC_GROUPS = [((1, 80), 'X'), ((80, 158), 'Y'), ((158, 250), 'Z')]

# ...

AXIS_NAME_TO_READ_FUNC = {
    'X': read_data_X,
    'Y': read_data_Y,
    'Z': read_data_Z,
}

# ...

for (limit1, limit2), axis_name in REC_GROUPS:
    data_sim_rg = np.asarray(
        pool.map(AXIS_NAME_TO_READ_FUNC[axis_name], range(limit1, limit2))
    ).T
    data_sim[:, limit1:limit2] = data_sim_rg
Und jetzt `eval()` vergessen! :-)
“Most people find the concept of programming obvious, but the doing impossible.” — Alan J. Perlis
DasIch
User
Beiträge: 2718
Registriert: Montag 19. Mai 2008, 04:21
Wohnort: Berlin

Wieso nutzt du hier überhaupt multiprocessing? Sequentiell von einer Festplatte zu lesen ist wesentlich schneller als dies "zufällig" (parallel) zu tun. Das trifft auch auf SSDs zu. Daten von einem Prozess zu einem anderen zu kopieren kostet auch nochmal Zeit.
Roman Rettich
User
Beiträge: 4
Registriert: Donnerstag 14. Juni 2018, 13:36

Danke, blackjack für den Hinweis :) Leider kommt die Fehlermeldung immer noch :/
Wie gesagt, im Internet steht, dass das passiert, wenn multiprocessing in Funktionen steht. Es gibt einige Lösungsansätze, nur alles für mich recht kompliziert.. Hat noch jemand einen Rat?

@DasIch: Ich habe es getestet, es über mehrere Prozessoren laufen zu lassen ist deutlich schneller - oder was meinst du?
Benutzeravatar
__blackjack__
User
Beiträge: 13004
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

@Roman Rettich: Kann ich mir jetzt kaum vorstellen, denn eine Funktion auf Modulebene lässt sich picklen und die `read_data_*()`-Funktionen stehen auf Modulebene. Ich weiss auch nicht so ganz was „wenn multiprocessing in Funktionen steht“ bedeuten soll. Das *muss* ja in Funktionen stehen, denn `Pool.map()` muss man ja eine Funktion/ein aufrufbares Objekt als erstes Argument übergeben.

Im Gegenteil: Bei Dir steht zu wenig in Funktionen, denn üblicherweise steht das Hauptprogramm auch in einer Funktion (konventionell `main()`) und wird durch das ``if __name__ == '__main__':``-Idiom dafür geschützt, dass es auch läuft wenn man das Modul nur importiert, statt es als Programm auszuführen. Letzteres muss man bei `multiprocessing` sogar machen wenn das auch unter Windows funktionieren soll.

DasIch meint ziemlich sicher das Dein gezeigter Code so wenig CPU-lastiges macht, dass es wahrscheinlich ist, dass es keinen Geschwindigkeitsvorteil bringt das parallel zu machen. Es wird sogar irgendwann kippen, wenn sich zu viele CPUs um Zugriff auf den gleichen Hintergrundspeicher (Festplatte, SSD, …) kloppen und sich dadurch gegenseitig ausbremsen.
“Most people find the concept of programming obvious, but the doing impossible.” — Alan J. Perlis
Antworten