Seite 1 von 1

Von subprocess.Popen zu Daemon mit multiprocessing.pool

Verfasst: Samstag 29. August 2015, 19:20
von pyseidon
Hallo,

ich möchte folgendes Ausgangssituation umbauen. Ich habe ein Python-File welches schaut ob neue Daten da. Falls ja, werden diese an ein weiteres File gegeben, damit diese verarbeitet werden.

Code: Alles auswählen

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import subprocess

def get_input_files(directory):
    new_files = list()
    for subdir, dirs, files in os.walk(directory, followlinks=True):
        for f in files:
            new_files.append(os.path.join(subdir, f))
    return new_files

def start_process(files_to_process):
    for f in files_to_process:
        subprocess.Popen(['python', 'process_data.py', '-i', f])

def main():
    files_to_process = get_input_files('input')
    start_process(files_to_process)

if __name__ == '__main__':
    main()

Code: Alles auswählen

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import argparse

def process_data(input_file):
    pass

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('-i', dest='input_file', action='store')

    args = parser.parse_args()

    if args.input_file is None:
        parser.print_help()
    else:
        process_data(args.input_file)

if __name__ == '__main__':
    main()
Ich benutze im dem Fall ``subprocess.Popen`` damit die Sachen parallel verarbeitet werden können. Jetzt laufe ich aber in das Problem, dass ich nicht steuern kann wie viele Prozesse laufen dürfen. Im Idealfall sind es "Anzahl der CPUs - 1". Zudem würde ich das gerne in einem Daemon laufen lassen, also das prüfen ob neue Daten sind und dann die Prozesse füttern. Derzeit wird das obige File über einen Cronjob aufgerufen.

So recht weiter komme ich da aber nicht. Jemand eine Idee wie man da am Besten ansetzt? Braucht man bei ``multiprocessing.Pool`` wirklich immer einen Rückgabewert bei der Funktion?

Re: Von subprocess.Popen zu Daemon mit multiprocessing.pool

Verfasst: Samstag 29. August 2015, 19:30
von BlackJack
@pyseidon: Jede Funktion hat einen Rückgabewert, das kann man gar nicht verhindern. Du musst Dich um den Rückgabewert ja nicht kümmern. Wo liegt denn das konkrete Problem?

Edit: Ungetestet:

Code: Alles auswählen

#!/usr/bin/env python
# coding: utf8
import os
from multiprocessing import cpu_count, Pool

from process_data import process_data


def get_input_files(path):
    result = list()
    for root, _, filenames in os.walk(path, followlinks=True):
        result.extend(os.path.join(root, f) for f in filenames)
    return result


def execute_processes(filenames):
    pool = Pool(cpu_count() - 1)
    for filename in filenames:
        pool.apply_async(process_data, (filename,))
    pool.close()
    pool.join()


def main():
    execute_processes(get_input_files('input'))


if __name__ == '__main__':
    main()

Re: Von subprocess.Popen zu Daemon mit multiprocessing.pool

Verfasst: Samstag 29. August 2015, 20:26
von pyseidon
``get_input_files`` ist eine Endlosschleife weil filenames immer wieder erweitert wird. Aber sonst haut das hin. Nun, ich habe das nicht hinbekommen was jetzt in ``execute_processes`` drin steht. Vor allem das close() und join() von dem Pool.

Re: Von subprocess.Popen zu Daemon mit multiprocessing.pool

Verfasst: Samstag 29. August 2015, 21:16
von BlackJack
@pyseidon: Habe das mit `filenames` im Beitrag oben korrigiert. :-)