sqlite Multiprozess und Thread sicher

Installation und Anwendung von Datenbankschnittstellen wie SQLite, PostgreSQL, MariaDB/MySQL, der DB-API 2.0 und sonstigen Datenbanksystemen.
Antworten
Boa
User
Beiträge: 190
Registriert: Sonntag 25. Januar 2009, 12:34

Hallo,

Ich möchte eine einfache, robuste Möglichkeit zwischen geforkten Prozessen Daten auszutauschen. Da die queues aus dem multiprocessing Modul nicht robust genug sind, versuche ich es über eine Datenbank. Das klappt soweit ganz gut.

Ich schreibe gerade einen dictionary wrapper mit sqlite. Meine Überlegung ist, dass ich bei jeder Methode die auf die Datenbank zugreift ein Lock setze um vor multithreaded Zugriffen zu schützen. Außerdem soll in jeder dieser Methoden geprüft werden, ob sich das Objekt noch im gleichen Thread/Prozess befindet und ggf. eine neue Verbindung zur DB herstellen:

Code: Alles auswählen

def reconnect(self):
        if self.pid != os.getpid() or self.tid != threading.currentThread().ident: 
            self.tid = threading.currentThread().ident
            self.pid = os.getpid()
            self.conn = sqlite3.connect(self.db_file, check_same_thread = False)
Die Verbindung wird mit

Code: Alles auswählen

check_same_thread = False
erstellt.
Stimmt der Ansatz, bzw. geht es einfacher?
Sirius3
User
Beiträge: 17710
Registriert: Sonntag 21. Oktober 2012, 17:20

Datenbanken sind zum Speichern von Daten, nicht zum Austausch von Daten gedacht. Was ist an Queues nicht robust genug?
Boa
User
Beiträge: 190
Registriert: Sonntag 25. Januar 2009, 12:34

Wenn ein Prozess abbricht, der gerade auf einer Queue arbeitet kann dieser kaputt gehen.
Wenn Datenbanken nicht dafür gedacht sind, wäre es nett wenn du mir diesen Irrtum erklären kannst. Das gleiche Szenario hat man doch, wenn mehrere Nutzer auf einer Datenbank arbeiten. Dabei regelt doch die Datenbank, dass die Zugriffe mehrerer Prozesse nicht zu Inkonsistenz führt, und alle Nutzer und Prozesse die gleichen Daten sehen. Das ist soweit ich es versteht die Lösung, die ich brauche.

Thx,
Boa
Boa
User
Beiträge: 190
Registriert: Sonntag 25. Januar 2009, 12:34

Wikipedia meint, das sei ein Antipattern: http://en.wikipedia.org/wiki/Database-as-IPC
Mir geht es aber gerade um einfach Verwendung und Robustheit. Geschwindigkeit spielt eine untergeordnete Rolle. Der Queue Ansatz gefällt mir vom Prinzip her besser, ich möchte diesen aber nun ersetzen, weil es sonst zu Fehlern kommen kann.
EyDu
User
Beiträge: 4881
Registriert: Donnerstag 20. Juli 2006, 23:06
Wohnort: Berlin

Boa hat geschrieben:Wenn Datenbanken nicht dafür gedacht sind, wäre es nett wenn du mir diesen Irrtum erklären kannst. Das gleiche Szenario hat man doch, wenn mehrere Nutzer auf einer Datenbank arbeiten. Dabei regelt doch die Datenbank, dass die Zugriffe mehrerer Prozesse nicht zu Inkonsistenz führt, und alle Nutzer und Prozesse die gleichen Daten sehen. Das ist soweit ich es versteht die Lösung, die ich brauche.
Datenbanken sind dazu gedacht Daten zu teilen und nicht zum Austausch von Daten. Bei dir könnten sich noch Sockets anbieten. Protokolle gibt es dafür wie Sand am Meer.
Das Leben ist wie ein Tennisball.
Boa
User
Beiträge: 190
Registriert: Sonntag 25. Januar 2009, 12:34

Hmm, was ich eigentlich haben möchte ist ein Objekt, das mitgeforkt wird und ohne zutun synchron gehalten wird, wie z.B. das multiprocessing Manager.dict().
Ich weiß nicht wie ich das über Sockets realisieren soll. Bei Sockets muss ich zusätzlich fehlerhafte Nachrichten behandeln, die wie im vorigen Szenario entstehen, wenn der Prozess abgebrochen wird, während das Socket sendet, oder?
lunar

@Boa Nun, am besten gar nicht. Ich würde sagen, in einem parallelen Programm ist ein solches Objekt ist an sich schon ein Antipattern. Ich sehe auch nicht, wozu man das benötigen sollte, denn man kann genauso gut das Objekt einfach in einem Prozess vorhalten, und über indirekt über IPC darauf zugreifen.

Du könntest beispielsweise einen eigenen Prozess laufen lassen, der dieses ominöse Objekt verwaltet, und den Zugriff darauf über eine Message Queue oder was auch immer bereit stellt.

Magst Du uns vielleicht ein paar Hintergründe Deiner Anwendung erklären? Vielleicht gibt es ja einen besseren Weg als den, der Dir gerade vorschwebt.
Boa
User
Beiträge: 190
Registriert: Sonntag 25. Januar 2009, 12:34

Ich musste mir das Problem in Ruhe durch den Kopf gehen lassen. Also ich habe ein store Object, welches Daten auf einen Online Speicher schreibt. Die Implementierung des Store Interfaces soll möglichst einfach gehalten werden. Eine Cache Schicht liegt um den Store herum und sorgt dafür, dass Daten im Cache an das store Objekt übergeben werden. Aus Performance Gründen schreibt der Cache mehrere Dateien gleichzeitig in den Store. Um die Bandbreite effizient zu nutzen kann der Cache, wenn eine Datei daraus gelöscht wird, sich aber gerade im Upload befindet, diesen Vorgang abbrechen.
Meine Lösung dafür nutzt multiprocessing in der Cache Schicht. Dabei wird um eine Datei in den Store zu schreiben der Prozess geforkt. Wenn die Datei gelöscht wird, sich aber gerade im Upload befindet, wird der Prozess per terminate beendet.
multiprocessing bietet mit Manager.dict() und den Queues schöne Schnittstellen für IPC. Diese brauche ich einmal um dem Cache zu antworten, dass die Datei erfolgreich hochgeladen wurde. Zweitens brauche ich eine einfache Schnittstelle wie dict, damit das geforkte store Objekt z.B. intern Statistik Daten über die Dauer des Uploads updaten kann.
Mich stört jetzt dabei, dass multiprocessing Queues als Mittel der IPC anbietet, diese aber möglicherweise korrumpiert werden, wenn man den Prozess abbricht.
Deshalb habe ich nach einer anderen Lösung gesucht, wobei mir die Datenbanken als erstes Mittel einfiel. Der Vorteil ist, dass Datenbanken keine Probleme machen, wenn ein Schreib Prozess abbricht. Und mit mehreren Prozessen können sie auch von Haus aus umgehen.
Ich meine, dass bei der lokalen IPC sonst z.B. Memory Mapped Files zum Einsatz kommen, aber da muss man sich vermutlich wieder selbst um die Konsistenz kümmern, genau wie bei dem Ansatz mit den Sockets.
Das Gelbe vom Ei ist die Implementierung mit den Datenbanken nicht. Beim Logging funktioniert der Ansatz denke ich ganz gut. Aber das dict Interface so zu implementieren, dass man sich beim Implementieren des Store Interfaces keine Gedanken um Multiprocessing machen muss ist ein ziemlicher Krampf.
Ich verfolge auch noch ein paar alternative Ideen, die allerdings noch nicht sehr ausgereift sind. U.a. eine robustere Queue Implementierung, die falls eine Queue kaputt geht wie ein Revolver die nächste Queue lädt und zusätzlich eine weitere Queue erzeugt. Mit den greenlets und wie die Thread Implementierungen sonst so heißen bin ich nicht weitergekommen. Diese scheinen zwar die Möglichkeit zu bieten IO zu unterbrechen, aber der Aufwand mein Programm umzuschreiben scheint mir zu groß.

Wenn euch eine gute Lösung einfällt wäre das spitze. Meistens hilft es mit auch schon weiter jemandem das Problem zu schildern, aber bisher hatte ich noch keine zündende Idee.

Thx,
Boa
lunar

@Boa Verwende doch einfach zwei Threads in jedem Upload-Prozess: Der eine Thread führt den eigentlichen Upload durch, der andere Thread hört auf Signale des Cache-Prozesses. Statt den Upload-Prozess nun einfach mit ".terminate()" abzubrechen (was man eigentlich ohnehin nie tun sollte), sende einfach ein Signal vom Cache Prozess an den Upload-Prozess und „bitte“ diesen Prozess, sich zu beenden.

Beim Erhalt dieses Signals kann der Upload-Prozess den Thread, der den eigentlichen Upload durchführt beenden (i.e. einfach das Socket schließen), die entsprechenden Daten über den Abbruch des Prozesses wieder in die Cache-Queue schreiben und sich ordnungsgemäß beenden.

Je nach Komplexität des Nachrichtenaustauschs reicht zum Beenden ja vielleicht einfach ein "multiprocessing.Event".
Boa
User
Beiträge: 190
Registriert: Sonntag 25. Januar 2009, 12:34

Hallo lunar,

Das Store Interface soll möglichst einfach gehalten werden. Aber es sieht so aus, dass die Alternativen sehr aufwändig und zudem unschön sind, da sie bedeuten, dass ich die Queues aus multiprocessing nicht verwenden kann. Ich denke ich verlagere den Teil mit dem Abbrechen des Uploads wie du es beschreibst in den Store und mache ihn dafür optional.

Vielen Dank,
Boa
Antworten