Von subprocess.Popen zu Daemon mit multiprocessing.pool

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
Antworten
pyseidon
User
Beiträge: 19
Registriert: Donnerstag 24. September 2009, 20:25

Hallo,

ich möchte folgendes Ausgangssituation umbauen. Ich habe ein Python-File welches schaut ob neue Daten da. Falls ja, werden diese an ein weiteres File gegeben, damit diese verarbeitet werden.

Code: Alles auswählen

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import subprocess

def get_input_files(directory):
    new_files = list()
    for subdir, dirs, files in os.walk(directory, followlinks=True):
        for f in files:
            new_files.append(os.path.join(subdir, f))
    return new_files

def start_process(files_to_process):
    for f in files_to_process:
        subprocess.Popen(['python', 'process_data.py', '-i', f])

def main():
    files_to_process = get_input_files('input')
    start_process(files_to_process)

if __name__ == '__main__':
    main()

Code: Alles auswählen

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import argparse

def process_data(input_file):
    pass

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('-i', dest='input_file', action='store')

    args = parser.parse_args()

    if args.input_file is None:
        parser.print_help()
    else:
        process_data(args.input_file)

if __name__ == '__main__':
    main()
Ich benutze im dem Fall ``subprocess.Popen`` damit die Sachen parallel verarbeitet werden können. Jetzt laufe ich aber in das Problem, dass ich nicht steuern kann wie viele Prozesse laufen dürfen. Im Idealfall sind es "Anzahl der CPUs - 1". Zudem würde ich das gerne in einem Daemon laufen lassen, also das prüfen ob neue Daten sind und dann die Prozesse füttern. Derzeit wird das obige File über einen Cronjob aufgerufen.

So recht weiter komme ich da aber nicht. Jemand eine Idee wie man da am Besten ansetzt? Braucht man bei ``multiprocessing.Pool`` wirklich immer einen Rückgabewert bei der Funktion?
BlackJack

@pyseidon: Jede Funktion hat einen Rückgabewert, das kann man gar nicht verhindern. Du musst Dich um den Rückgabewert ja nicht kümmern. Wo liegt denn das konkrete Problem?

Edit: Ungetestet:

Code: Alles auswählen

#!/usr/bin/env python
# coding: utf8
import os
from multiprocessing import cpu_count, Pool

from process_data import process_data


def get_input_files(path):
    result = list()
    for root, _, filenames in os.walk(path, followlinks=True):
        result.extend(os.path.join(root, f) for f in filenames)
    return result


def execute_processes(filenames):
    pool = Pool(cpu_count() - 1)
    for filename in filenames:
        pool.apply_async(process_data, (filename,))
    pool.close()
    pool.join()


def main():
    execute_processes(get_input_files('input'))


if __name__ == '__main__':
    main()
Zuletzt geändert von BlackJack am Samstag 29. August 2015, 21:16, insgesamt 1-mal geändert.
Grund: Bugfix im Quelltext.
pyseidon
User
Beiträge: 19
Registriert: Donnerstag 24. September 2009, 20:25

``get_input_files`` ist eine Endlosschleife weil filenames immer wieder erweitert wird. Aber sonst haut das hin. Nun, ich habe das nicht hinbekommen was jetzt in ``execute_processes`` drin steht. Vor allem das close() und join() von dem Pool.
BlackJack

@pyseidon: Habe das mit `filenames` im Beitrag oben korrigiert. :-)
Antworten