ich versuche gerade grosse Textfiles entsprechend bestimmter Regularien zu splitten und wuerde es gerne parallel aufsetzen.
Dabei teste ich zur Zeit erst einmal die Grundfunktion im Memory und erleide leider dort schon schlechte Performance.
Ich hab den Code so klein wie moeglich auf das Wesentliche runtergebrochen. Die Grundidee nochmal kurz beschrieben:
Eine Liste a enthaelt bestimmte Werte und es soll geprueft werden, ob die Werte der Liste a entweder in der Liste b oder c enthalten sind.
Dafuer erzeuge ich ein zusaetzliches index array bzw eine index liste mit dem namen a_index_listing, die dann entweder
den Wert 1 (ist in Liste b) oder den Wert 2 (ist in Liste c) bekommt. Der Grundalgorithmus ist am besten in der Funktion:
"splitting_no_parallel" zu sehen.
Code: Alles auswählen
for i in range(self.num_lines):
id = self.list_a[i]
if id in self.list_b:
self.list_a_index[i] = int(1)
if id in self.list_c:
self.list_a_index[i] = int(2)
Wieso ist der parallele Job nicht schneller als der nicht-parallele Job ?
Wenn ihr mit der groesse des jeweiligen arrays bzw liste variieren wollt, koennt ihr die Groesse aendern in:
self.num_lines = int(100000)
Habt ihr da Tipps, was ich machen kann, damit man einen Speedup im parallel Modus sieht?
Auch mit 4 oder 8 Processoren hab ich die gleichen Probleme.
Vielen Dank im voraus
lp
Code: Alles auswählen
#!/usr/bin/env python
import string, sys, datetime, threading, time
# ------------------------------------------------------------------------------
class se_threading(threading.Thread):
def __init__(self, co, name="TestThread"):
self.co = co
threading.Thread.__init__(self, name=name)
def run(self):
self.co.splitting_parallel(self.getName())
pass
# ------------------------------------------------------------------------------
class parallel_test():
def __init__(self):
self.thr_nproc = int(2)
self.num_lines = int(100000)
self.list_a = []
self.list_a_index = []
self.list_b = []
self.list_c = []
def get_time_stamp(self):
my_time = datetime.datetime.now()
print my_time.strftime("%H:%M:%S")
def prepare(self):
print "prepare list a, b, c ..."
self.list_a = self.num_lines*[0]
self.list_a_index = self.num_lines*[0]
self.p_list_a_index_1 = self.num_lines/2*[0]
self.p_list_a_index_2 = self.num_lines/2*[0]
self.list_b = self.num_lines/2*[0]
self.list_c = self.num_lines/2*[0]
for i in range(self.num_lines):
self.list_a[i] = i
for i in range(self.num_lines/2):
self.list_b[i] = i
self.list_c[i] = i + self.num_lines/2
# ------------------------------------------------------------------------------
def splitting_parallel(self, thread_id):
start_id = int(0)
end_id = int(0)
pct_counter = int(0)
pct_printed = int(0)
pct = int(0)
# prepare local list and start_id and end_id
if ( thread_id == "1" ):
start_id = int(0)
end_id = self.num_lines/2-1
if ( thread_id == "2" ):
start_id = self.num_lines/2
end_id = self.num_lines
self.get_time_stamp()
print "start threading: ", thread_id
print "start_id, end_id: " + str(start_id) + ", " + str(end_id )
# go through the list
for i in range(start_id,end_id):
id = self.list_a[i]
if id in self.list_b:
self.list_a_index[i] = int(1)
if id in self.list_c:
self.list_a_index[i] = int(2)
# percent calculation
pct_counter = pct_counter + 1
pct = int( float(pct_counter)/float(len(self.list_a)/2)*100 )
if ( (pct%20 == 0) and (pct > 0) and (pct_printed!=pct) ):
print pct*"." + " - " + str(pct) + "%" + " - " + thread_id
pct_printed = pct
def task_parallel(self):
print "parallel task ..."
ct1 = se_threading(self, "1")
ct2 = se_threading(self, "2")
ct1.start()
ct2.start()
ct1.join()
ct2.join()
# ------------------------------------------------------------------------------
def splitting_no_parallel(self, dummy):
pct_counter = int(0)
pct_printed = int(0)
pct = int(0)
print "splitting w/o parallel ..."
self.list_a_index = self.num_lines*[0]
for i in range(self.num_lines):
id = self.list_a[i]
if id in self.list_b:
self.list_a_index[i] = int(1)
if id in self.list_c:
self.list_a_index[i] = int(2)
# percent calculation
pct_counter = pct_counter + 1
pct = int( float(pct_counter)/float(self.num_lines)*100 )
if ( (pct%20 == 0) and (pct > 0) and (pct_printed!=pct) ):
print pct*"." + " - " + str(pct) + "%" + " - " + dummy
pct_printed = pct
# ------------------------------------------------------------------------------
p = parallel_test()
p.get_time_stamp()
# no parallel
p.prepare()
p.get_time_stamp()
p.splitting_no_parallel("0")
p.get_time_stamp()
# parallel
p.prepare()
p.get_time_stamp()
p.task_parallel()
p.get_time_stamp()