Seite 3 von 3

Verfasst: Mittwoch 9. Dezember 2009, 15:40
von mathi
geht nicht so einfach, weil pool.map() zwingend das iterable an merge übergibt,

also habe ich versucht functools.partial wieder zu verwenden
http://paste.pocoo.org/show/xDdF2XnLoKMmibYPBUwo/
, aber das wirft folgenden Fehler aus:

Code: Alles auswählen

Traceback (most recent call last):
  File "C:\Program Files\eclipse\plugins\org.python.pydev_1.5.2.1260362205\PySrc\pydev_sitecustomize\sitecustomize.py", line 142, in <module>
    sys.path.extend(paths_removed)
AttributeError: 'NoneType' object has no attribute 'path'
Exception in thread Thread-1:
Traceback (most recent call last):
  File "C:\Python26\lib\threading.py", line 525, in __bootstrap_inner
    self.run()
  File "C:\Python26\lib\threading.py", line 477, in run
    self.__target(*self.__args, **self.__kwargs)
  File "C:\Python26\lib\multiprocessing\pool.py", line 225, in _handle_tasks
    put(task)
  File "C:\Python26\lib\multiprocessing\queues.py", line 279, in __getstate__
    return Queue.__getstate__(self) + (self._cond, self._unfinished_tasks)
  File "C:\Python26\lib\multiprocessing\queues.py", line 51, in __getstate__
    assert_spawning(self)
  File "C:\Python26\lib\multiprocessing\forking.py", line 25, in assert_spawning
    ' through inheritance' % type(self).__name__
RuntimeError: JoinableQueue objects should only be shared between processes through inheritance
Vererbung?? Ich hab keine Ahnung wie das hier gehen soll...
oder habe ich eine andere Möglichkeit q zu übergeben??

Verfasst: Mittwoch 9. Dezember 2009, 16:01
von HWK
Als iterable

Code: Alles auswählen

[q] * anzahl_der_seiten
MfG
HWK

Verfasst: Mittwoch 9. Dezember 2009, 16:20
von mathi
mit

Code: Alles auswählen

maplist= pool.map(merge,([q]*pages)) 


folgt der selbe Fehler:

Code: Alles auswählen

  File "C:\Python26\lib\multiprocessing\forking.py", line 25, in assert_spawning
    ' through inheritance' % type(self).__name__
RuntimeError: JoinableQueue objects should only be shared between processes through inheritance

Verfasst: Samstag 12. Dezember 2009, 12:26
von mathi
Hallo,
ich möchte nochmal um Hilfe bitten, da ich das Problem nicht alleine lösen kann......

Verfasst: Samstag 12. Dezember 2009, 13:41
von HWK
Vielleicht hilft das? D.h. versuch mal

Code: Alles auswählen

    manager = multiprocessing.Manager()
    q = manager.Queue()
MfG
HWK

Verfasst: Samstag 12. Dezember 2009, 16:20
von HWK
Falls das auch nicht klappt, hier mein nächster Vorschlag:
Verzichte auf die Queue, übergib an merge die Seitennummer und diesen pdf_reader statt input1.

Code: Alles auswählen

...
class MyManager(BaseManager):
    pass

MyManager.register('Reader', pyPdf.PdfFileReader)
...
    manager = MyManager()
    manager.start()
    pdf_reader = manager.Reader(open('test_lang.pdf', 'rb'))
...
Lies dann in merge die Seiten ein, berechne das Wasserzeichen bzw. benutze zum Testen einen zweiten vergleichbaren
Reader und verbinde beide Seiten. Das Ergebnis ist der Rückgabewert.
MfG
HWK

Verfasst: Montag 14. Dezember 2009, 11:23
von mathi
hallo und Danke für die Antworten,

von den beiden Varianten funktioniert erstere wohl am "Besten"
http://paste.pocoo.org/show/XeOI1YncBh9HVY7LjVWN/

wobei das nur mit Lock() einigermaßen funktioniert....
mit Lock() - pool.map() scheint eine Weile zu laufen :

Code: Alles auswählen

Traceback (most recent call last):
  File "D:\Python\Code\wxPython\workspace\pdfPage\src\pdfPageCMD.py", line 48, in <module>
    main()
  File "D:\Python\Code\wxPython\workspace\pdfPage\src\pdfPageCMD.py", line 27, in main
    maplist= pool.map(merge,([q]*pages,lock))          # so geht's nicht
  File "C:\Python26\lib\multiprocessing\pool.py", line 148, in map
    return self.map_async(func, iterable, chunksize).get()
  File "C:\Python26\lib\multiprocessing\pool.py", line 422, in get
    raise self._value
AttributeError: 'list' object has no attribute 'get'
mich wundert, dass dabei nur 1 Prozess mit 2 threads gestartet wird, da je lock ein threadingobjekt erstellt wird, aber keine 4 Prozesse

ohne Lock() das altbekannte:

Code: Alles auswählen

ValueError: I/O operation on closed file

Verfasst: Montag 14. Dezember 2009, 13:23
von HWK
Was Du da tust, weißt Du nicht wirklich, oder? Was soll denn das Lock? An merge übergibst Du ein Tuple aus einer Liste von q's und dem Lock. Die Liste hat natürlich keine get-Methode. Ich denke, bevor Du Dich mit hochkomplexen Sachverhalten wie der Multiprozess-Programmierung beschäftigst, solltest Du Dir erst noch einmal die Grundlagen aneignen. Zumal nicht einmal klar ist, ob Multiprocessing die Performance Deiner Anwendung verbessern wird.
MfG
HWK

Verfasst: Montag 14. Dezember 2009, 13:57
von mathi
@HWK
hast recht

Code: Alles auswählen

    t1=time()   
    pages=input1.getNumPages()

    pool = Pool(processes=4)
    maplist= pool.map(merge,xrange(pages))         
    pool.close() 
    pool.join()
    
    print "länge=",len(maplist)
    t2=time()
    print "time=",(t2-t1)
dauert 28 sec.

Code: Alles auswählen

    t1=time() 
    output = PdfFileWriter()  
            
    with open('test_lang.pdf', 'rb') as infile: 
        input1 = PdfFileReader(infile) 
        watermark = PdfFileReader(file("temp.pdf", "rb"))  
        pages=input1.getNumPages()

        pagelist=[]    
        for pagenum in range(pages):                  
            pagelist.append(merge(pagenum))
        t2=time()
        print "time=",(t2-t1)
dauert 22 sec.

Hätte ich nicht gedacht...

ich habe jetzt herausgefunden, dass gar nicht map() schuld ist, sondern

Code: Alles auswählen

    for page in maplist:
        output.addPage(page)
ich kann nicht einmal

Code: Alles auswählen

output.addPage(maplist[0])
ausführen.

Somit konnte ich aber zumindest die Zeitmessung vornehmen.

Das bedeutet doch, dass mein Prog. eher I/O lastig ist, aber die HDD schreibt nix, daher bleibt alles im RAM.
Gibt es überhaupt eine Möglichkeit mein Programm zu beschleunigen??

Verfasst: Montag 21. Dezember 2009, 09:26
von mathi
Da ich der Meinung bin, dass das Problem darin bestand die Ergebnisse von

Code: Alles auswählen

mergePage
fehlerfrei aus der merge()-Funktionen herauszubekommen, habe ich eine Möglichkeit gefunden 8)

Ich teile die Arbeit in (bei 4 Kernen) 4 teile und erstelle in 4 merge-Funktionen je 1/4 der Seiten und erhalte insgesamt 4 .pdf-Dateien, die dann noch zu einer zusammengeführt werden sollen.

http://paste.pocoo.org/show/wBopIiVEKqi34m6KDYvy/

Der Code ist noch ungefeilt, aber es funktioniert soweit, dass mit x Kernen auch die einzelne Arbeit verrichtet wird und die Dateien in der richtigen Reihenfolge zusammengefügt werden :(

zum Test: ein Kern braucht hier 33 sec. , 4 Kerne schaffen das in 14 sec.
bei einer .pdf mit 110 Seiten, interessant wird es dann mit 500-1200 Seiten, die bei mir öffter vorkommen :-)


Ich würde mich über Eure Tipps freuen, den Code zu verbessern