thread / threading mit Aktualisierung Globaler Variabeln

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
cyberchris
User
Beiträge: 7
Registriert: Samstag 3. Januar 2015, 21:06

Hallo,

ich möchte 2 Threads starten einer soll einen Bewegungssensor überwachen der zweite soll zu dem Sensor eine Verzögerung erstellen

beide Threads sollen die Globalen Variabeln stehts aktuell halten so das ich din der Schleife des Hauptscrites darauf zugreifen kann.

hier mein bisheriges Script:

Code: Alles auswählen

#!/usr/bin/python
# -*- coding: utf-8 -*-

import RPi.GPIO as GPIO
import time
import thread
import threading

# RPi.GPIO Layout verwenden (wie Pin-Nummern)
GPIO.setmode(GPIO.BOARD)

# Pin 15 (GPIO 24) auf Input setzen
GPIO.setup(15, GPIO.IN)

def sensor_delay(delay):
    # Dauersschleife
    while True:
        global sd
        global be
        if be is True:
            sd = True
        else:
            sd = False
        time.sleep(1)
        
def gpio_check(dalay):
    # Dauersschleife
    while True:
        global be
        # GPIO lesen
        if GPIO.input(15) == GPIO.HIGH:
            #GPIO ist an
            be = True
        else:
            # GPIO ist aus
            be = False
        # Warte 5s
        time.sleep(delay)
    
# Hauptscript
be = False #bewegung
sd = False #sensor delay

thread.start_new_thread(gpio_check,(5,))
thread.start_new_thread(sensor_delay,(5,))

try: # Dauer-Schleife
  while True:
      print("GPIO Check : %s Sensor Delay : %s") % (be,sd)
      time.sleep(1)
except KeyboardInterrupt:
    # CTRL-C gedrueckt
    # Reset GPIO
    GPIO.cleanup()
# ----------------------- EOF  ---------------------------
ich habe dazu schon jedemenge gegoogelt und einiges rumprobiert - da ich aber noch relativ neuling in Python bin und das nicht richtig gelernt habe hoffe ich das jemand mal drüber schaut und mir in diesen Punkt vielleicht schnell weiter helfen kann.
Ich bin jetzt soweit das ich zu der Erkenntnis gekommen bin das das irgendwie mit "Queue" gehen sollte. Leider habe ich noch nicht das passende für mich gefunden um den Code zum teil zu übernehmen - darum nun hier "mein kleiner hilfeschrei"
Benutzeravatar
pillmuncher
User
Beiträge: 1484
Registriert: Samstag 21. März 2009, 22:59
Wohnort: Pfaffenwinkel

Zuerst mal: die Antwort auf die Frage, wie man globale Variablen mit Multithreading verwendet ist dieselbe, wie die Antwort auf die Frage, wie man am besten zu Fuß quer über eine Autobahn kommt: gar nicht.

Deine Erkenntnis bzgl. der Queue ist schon richtig, denn Queue nimmt einem die meisten Probleme bzgl. Locking ab.

Soweit ich dich verstehe, möchtest du alle fünf Sekunden prüfen, ob am Sensor gerade etwas anliegt. Darufhin soll irgendwie eine Verzögerung stattfinden. Von was? Und wie lange? delay Sekunden oder 1 Sekunde? Vielleicht kannst du dazu ja nochmal was schreiben, auch was die Verzögerung bewirken soll.

Zum Code:

Man sollte nicht das Low Level Modul thread verwenden, sondern das bequemer zu verwendende High Level Modul threading, das du ja sogar importierst, aber nicht verwendest.

Kommentare sollten einen Mehrwert ggü. dem Code liefern und nicht einfach dasselbe wie der Code in anderen Worten sagen. Was der Code tut kann ich ja sehen, das braucht nicht nochmal wiederholt zu werden. Kommentare sollten einem statt dessen mitteilen, warum der Code so geschrieben wurde wie er geschrieben wurde.

Der Kommentar in Zeile 37 impliziert, dass delay 5 sec. ist. Wenn du den übergebenen Wert in Zeile 45 auf, sagen wir 7 sec. änderst, wirst du auch daran denken, alle Kommentare in deinem Programm zu überprüfen, ob sie noch mit deinem tatsächlichen Code übereinstimmen? Aber auch hier gilt, was ich schon im letzten Absatz geschrieben habe, der Code in Zeile 38 sagt ja schon, dass hier eine gewisse Zeit gewartet werden soll, und nicht etwa ein Socket geöffnet oder die Festplatte formatiert werden soll. Der Kommentar liefert also keinen Mehrwert. Insbesondere beantwortet er nicht die Frage, warum hier gewartet wird.

Code: Alles auswählen

if be is True:
    sd = True
else:
    sd = False
Das kann offensichtlich einfacher geschrieben werden:

Code: Alles auswählen

sd = be
Und

Code: Alles auswählen

if GPIO.input(15) == GPIO.HIGH:
    be = True
else:
    be = False
kann man einfacher als

Code: Alles auswählen

be = GPIO.input(15) == GPIO.HIGH
ausdrücken.

In deinem Code wird sd zwar in sensor_delay() in Abhängigkeit von be gesetzt, aber sd wird sonst nirgendwo verwendet.

Und sensor_delay() bekommt zwar delay übergeben, aber es tut nichts damit.
In specifications, Murphy's Law supersedes Ohm's.
Benutzeravatar
__LC__
User
Beiträge: 32
Registriert: Dienstag 1. März 2011, 14:08
Wohnort: 127.0.0.1
Kontaktdaten:

Hier mal ein kurzes Beispiel wie man es vielleicht mit einer Queue machen könnte.

Code: Alles auswählen

#!/usr/bin/python3

import threading
import queue
import time
import random

def check_sensor(_queue, delay):
    while True:
        try:
            _queue.put(random.randint(0, 1), block = False)
        except queue.Full:
            pass
        time.sleep(delay)

def sensor_delay(_queue):
    while True:
        try:
            item = _queue.get(timeout = 1)
            print('Thread2 item: {0}'.format(item))
        except queue.Empty:
            pass

if __name__ == '__main__':
    
    _queue = queue.Queue()
    t1 = threading.Thread(target=check_sensor, args=(_queue, 1)).start()
    t2 = threading.Thread(target=sensor_delay, args=(_queue,)).start()
Ist sicherlich nicht perfekt, aber vielleicht reicht es ja als kleine Anregung.

Schöne Grüße und gute Nacht.
cyberchris
User
Beiträge: 7
Registriert: Samstag 3. Januar 2015, 21:06

Okay, das Sensor delay soll später ein display länger in Funktion halten als der time.sleep in der gpio_check Funktion. (Genauso wie bei einem Bewegungsmelder der mit einer Lampe gekoppelt ist - bleibt das Licht ja länger an als der Sensor die Bewegung erfasst.) Deshalb Besitzt auch die Funktion Sensor_delay den Übergabe Parameter delay(das soll quasi die nach Laufzeit des Licht an sein). Da ich jetzt bei den threds hängen geblieben bin ist dies noch nicht mit drin genauso wie das alle Kommentare "schön" sind.

Schade ist das du auf das eigentliche Problem nicht so ausführlich eingegangen bist, wie auf meine code-komentare und der Code Optimierung.

Modul threading habe ich für einige gegoogelte Sachen benötigt die aber auch nicht funktionierten. Den Rest umzuschreiben ist mittlerweile auch ein teil meiner Erkenntnis.

Ich hoffe die Abhängigkeit von be und sd ist jetzt klar geworden.

Die Hauptfrage ist: wie bekomme ich den Parameter "be" in den thread mit der Funktion Sensor_delay und den darin enthaltenen sd in das hauptscript mit Queue. Danke.
cyberchris
User
Beiträge: 7
Registriert: Samstag 3. Januar 2015, 21:06

Danke __LC__ ich werd morgen das mal entsprechend ein und umändern.
Benutzeravatar
pillmuncher
User
Beiträge: 1484
Registriert: Samstag 21. März 2009, 22:59
Wohnort: Pfaffenwinkel

@cyberchris: Versuch mal das hier:

Code: Alles auswählen

#!/bin/env python
# -*- coding: utf-8 -*-

import RPi.GPIO as GPIO
import time

from contextlib import contextmanager
from queue import Queue
from threading import Thread


SENSOR_PIN = 15
SENSOR_CHECK_INTERVAL_TIME = 5
DELAY_TIME_AFTER_SWITCH_OFF = 2
ACTIVE = 1
INACTIVE = 2


@contextmanager
def gpio_context():
    GPIO.setmode(GPIO.BOARD)
    GPIO.setup(SENSOR_PIN, GPIO.IN)
    yield
    GPIO.cleanup()


def sensor_active():
    return GPIO.input(SENSOR_PIN) == GPIO.HIGH


def switch_on():
    ...


def switch_off():
    ...


def sensor(queue):
    while True:
        queue.put(ACTIVE if sensor_active() else INACTIVE)
        time.sleep(SENSOR_CHECK_INTERVAL_TIME)


def switch(queue):
    old_status = INACTIVE
    while True:
        new_status = queue.get()
        if old_status == new_status:
            continue
        old_status = new_status
        if new_status == ACTIVE:
            switch_on()
        elif new_status == INACTIVE:
            time.sleep(DELAY_TIME_AFTER_SWITCH_OFF)
            switch_off()
        else:
            raise ValueError('Invalid: %s' % str(new_status))


def main():
    try:
        with gpio_context:
            queue = Queue()
            switcher = Thread(target=switch, args=(queue,))
            switcher.daemon = True
            switcher.start()
            sensor(queue)
    except KeyboardInterrupt:
        pass


if __name__ == '__main__':
    main()
Alles ungetestet. Die ... musst du natürlich noch ausprobieren. Weil ich jetzt Bettchen muss werde ich heute nichts mehr erklären. Morgen gerne mehr, wenn gewünscht.
In specifications, Murphy's Law supersedes Ohm's.
Sirius3
User
Beiträge: 17741
Registriert: Sonntag 21. Oktober 2012, 17:20

@pillermunch: ich würde ja nur etwas in die Queue packen, wenn sich der Status geändert hat, und beim switch_off nicht die Queue Sekundenlang blockieren.

Code: Alles auswählen

def sensor(queue):
    old = INACTIVE
    while True:
        new = ACTIVE if sensor_active() else INACTIVE
        if new != old:
            queue.put(new)
            old = new
        time.sleep(SENSOR_CHECK_INTERVAL_TIME)

def switch(queue):
    timeout = None
    while True:
        try:
            status = queue.get(timeout=timeout)
            if status == ACTIVE:
                timeout = None
                switch_on()
            else:
                timeout = DELAY_TIME_AFTER_SWITCH_OFF
        except Empty:
            timeout = None
            switch_off()
Benutzeravatar
pillmuncher
User
Beiträge: 1484
Registriert: Samstag 21. März 2009, 22:59
Wohnort: Pfaffenwinkel

@cyberchris: statt "ausprobieren" muss es natürlich heißen "ausprogrammieren".

@Sirius3: Ja, das ist besser.
In specifications, Murphy's Law supersedes Ohm's.
lackschuh
User
Beiträge: 281
Registriert: Dienstag 8. Mai 2012, 13:40

Ich versteh nicht ganz, um was es geht. Du hast ein Bewegungssensor und wenn dieser eine Bewegung registriert, dann soll irgend etwas "zeitversetzt" passieren/ausgelöst werden?
Benutzeravatar
pillmuncher
User
Beiträge: 1484
Registriert: Samstag 21. März 2009, 22:59
Wohnort: Pfaffenwinkel

@lackschuh: Ich hab es auch erst nicht verstanden. Nachdem ich etwas darüber nachgedacht habe, binn ich auf das hier gekommen:

In regelmäßigen Intervallen soll überprüft werden, ob eine Sensor eine Bewegung feststellt. Wenn eine Bewegung festgestellt wurde, soll sofort ein Display angeschaltet werden. Sobald keine Bewegung mehr festgestellt werden kann, soll das Display wieder ausgeschaltet werden, allerdings erst nach einer gewissen Verzögerung. Stell dir dazu eine über einen Bewegungsmelder gesteuerte Lampe im Hausgang vor. Die soll ja nicht gleich ausgehen, bloß weil du vor deiner Wohnungstür stehengeblieben bist.
In specifications, Murphy's Law supersedes Ohm's.
Benutzeravatar
__LC__
User
Beiträge: 32
Registriert: Dienstag 1. März 2011, 14:08
Wohnort: 127.0.0.1
Kontaktdaten:

@cyberchris:

Vielleicht noch als Hinweis. Schau dir mal die Callback-Funktionalität des GPIO-Moduls an: http://sourceforge.net/p/raspberry-gpio ... ki/Inputs/
Dort kannst du eine Funktion auf die Zustandsänderung eines oder mehrerer Pins registrieren, musst also den Zustand selbst nicht pollen.
Hier ein kurzes Beispiel:

Code: Alles auswählen

#!/usr/bin/python3

import RPi.GPIO as GPIO
import time

def edge_detect(pin):
    if GPIO.input(pin):
        switch_on()
    else:
        switch_off()

def switch_on():
    print('Switched on...')

def switch_off():
    print('Switched off...')


if __name__ == '__main__':
    
    GPIO.setmode(GPIO.BOARD)
    GPIO.setup(15, GPIO.IN)
    GPIO.add_event_detect(15, GPIO.BOTH, callback=edge_detect, bouncetime=100)
    try:
        while True:
            time.sleep(1)
    finally:
        GPIO.cleanup()
Vielleicht geht das ja in die Richtung zu dem was du konkret vor hast.

Schöne Grüße
cyberchris
User
Beiträge: 7
Registriert: Samstag 3. Januar 2015, 21:06

@ __LC__ vielen dank, die Idee mit Callback-Funktionalität des GPIO-Moduls finde ich sehr interessant.
Jetzt brauch ich nur noch das Deley nach Switch on aus zu Programmieren. Dann sollte es so funktionieren.

Danke an alle die sich meines kleinen Problems angenommen haben.
cyberchris
User
Beiträge: 7
Registriert: Samstag 3. Januar 2015, 21:06

Da ich es selber suboptimal finde wen es am Ende keine Lösung gibt nun hier meine Lösung:

es ist mit Sicherheit nicht die vollkommenste Lösung - für mich reicht es.

Code: Alles auswählen

#!/usr/bin/python
# -*- coding: utf-8 -*-

import RPi.GPIO as GPIO
import time
import threading

def edge_detect(pin):
    if GPIO.input(pin):
        switch_on()
        for t in threading.enumerate():
            if t.getName() == 'TimerAfterSwitchOn':
                t.cancel()
        t = threading.Timer(15,switch_off)
        t.setName('TimerAfterSwitchOn')
        t.start()
        
def switch_on():
    print('Switched on...')

def switch_off():
    print('Switched off...')

if __name__ == '__main__':
    GPIO.setmode(GPIO.BOARD)
    GPIO.setup(15, GPIO.IN)
    GPIO.add_event_detect(15, GPIO.BOTH, callback=edge_detect, bouncetime=100)
    try:
        while True:
            time.sleep(1)
            print("warte")
    except KeyboardInterrupt:
        # CTRL-C gedrueckt
        GPIO.cleanup()
@ __LC__: es geht genau in die Richtung zu dem was ich vor habe - Dank dir.

Schöne Grüße
Sirius3
User
Beiträge: 17741
Registriert: Sonntag 21. Oktober 2012, 17:20

@cyberchris: wieviele Timer können denn maximal parallel laufen? Da ist es doch unschön, alle Threads durchsuchen zu müssen, statt den einen Timer explizit sich zu merken. Zudem muß man genau aufpassen, dass nicht während eines halbe switch_off plötzlich wieder ein switch_on dazwischen kommt. Das müßte man wieder durch eine Reihe von Locks absichern. Ich denke aber, das einfachste wäre wieder, eine Queue zu benutzen:

Code: Alles auswählen

import RPi.GPIO as GPIO
import time
import threading
import queue


class Switcher():
    def __init__(self, pin, timeout):
        GPIO.setup(pin, GPIO.IN)
        GPIO.add_event_detect(pin, GPIO.BOTH, callback=self.edge_detect, bouncetime=100)
        self.timeout = timeout
        self.queue = queue.Queue()
        switcher = threading.Thread(target=self.loop)
        switcher.daemon = True
        switcher.start()

    def loop(self):
        timeout = None
        while True:
            try:
                queue.get(timeout=timeout)
                if timeout is None:
                    timeout = self.timeout
                    self.switch_on()
            except queue.Empty:
                timeout = None
                self.switch_off()

    def edge_detect(self, pin):
        if GPIO.input(pin):
            self.queue.put("on")

    def switch_on(self):
        print('Switched on...')

    def switch_off(self):
        print('Switched off...')

if __name__ == '__main__':
    GPIO.setmode(GPIO.BOARD)
    switcher = Switcher(15, 10)
    try:
        while True:
            time.sleep(1)
            print("warte")
    except KeyboardInterrupt:
        # CTRL-C gedrueckt
        pass
    finally:
        GPIO.cleanup()
cyberchris
User
Beiträge: 7
Registriert: Samstag 3. Januar 2015, 21:06

@Sirius3: Wieso? Es läuft doch immer nur ein Timer-Thread da ich den vorhergehenden über denn Namen suche und beende(mit cancel() Funktion)
danach starte ich einen neuen Timer-Thread mit erneut 15sec Verzögerung. Das bedeutet das ich immer nach einer Bewegung am Sensor Nachfolgend die 15sec Verzögerung habe.

Wen ich heute später mir noch Zeit nehme schaue ich mir dein Script gern noch genauer an. Ich kann ja dadurch nur etwas dazu lernen. Mir ist nartürlich bewusst das die Suche nach dem Thread über den Namen bestimmt nicht die optimalste Lösung ist.
BlackJack

@cyberchris: Das ist ja gerade der Punkt das immer nur einer Deiner `Timer`-Threads laufen kann. Darum wäre es ja auch sinnvoller sich genau diesen einen gezielt zu merken statt alle zu durchsuchen die gerade laufen.
Sirius3
User
Beiträge: 17741
Registriert: Sonntag 21. Oktober 2012, 17:20

@cyberchris: das mit dem passenden Thread suchen ist nur kosmetisch. Der andere Punkt ist viel schlimmer, weil er wahrscheinlich fast nie auftritt, sich nicht reproduzieren läßt, aber unter Umständen, je nachdem, was Du beim Switchen alles machst, katastrophale Folgen haben kann. Beim nebenläufigen Programmieren muß man wirklich theoretisch alle Möglichkeiten durchdenken und bewiesenermaßen sicher sein, weil sich Fehler, die sich durch Synchronisationsungenauigkeiten ergeben, nicht testen lassen: Es kommt ein Signal; exakt 15s später kurz nachdem der Timer ausgelöst hat, wird die switch_off-Funktion unterbrochen, weil es kommt das nächste Signal, switch_on wird durchlaufen; erst jetzt wird switch_off fortgeführt, und Du stehst im Dunkeln.
Benutzeravatar
pillmuncher
User
Beiträge: 1484
Registriert: Samstag 21. März 2009, 22:59
Wohnort: Pfaffenwinkel

Es hat mir keine Ruhe gelassen, dass mir die bisher präsentierten Lösungen nicht gefallen haben. Irgendwie fand ich, dass sie alle zu wenig das Problem explizit machen, das gelöst werden soll. Das Problem wurde bisher auch nur recht unterspezifiziert dargelegt. Wenn es mein Problem wäre, würde ich es vielleicht so formulieren:

Prüfe alle ... Sekunden, ob eine Bewegung festgestellt werden kann. Sobald eine Bewegung festgestellt wurde, schalte das Display an. Wenn bei angeschaltetem Display wenigstens ... Sekunden lang keine Bewegung mehr festgestellt werden konnte, schalte das Display wieder aus.

Das kann man so in Code gießen:

Code: Alles auswählen

#!/bin/env python
# -*- coding: utf-8 -*-

import RPi.GPIO as GPIO
import contextlib
import threading
import time

SENSOR = 15
SENSOR_CHECK_INTERVAL_TIME = 5.0
DELAY_TIME_AFTER_LAST_MOVEMENT = 8.0

@contextlib.contextmanager
def switch():
    print('switch on')
    try:
        yield
    finally:
        print('switch off')

@contextlib.contextmanager
def gpio_context():
    GPIO.setmode(GPIO.BOARD)
    GPIO.setup(SENSOR, GPIO.IN)
    try:
        yield
    finally:
        GPIO.cleanup()

def movement_detected():
    return GPIO.input(SENSOR) == GPIO.HIGH

def detect(motion_event):
    while True:
        if movement_detected():
            motion_event.set()
        time.sleep(SENSOR_CHECK_INTERVAL_TIME)

def switch_when(motion_event):
    while True:
        motion_event.wait()
        with switch():
            while True:
                motion_event.clear()
                if not motion_event.wait(DELAY_TIME_AFTER_LAST_MOVEMENT):
                    break

def main():
    try:
        with gpio_context():
            motion_event = threading.Event()
            switcher = threading.Thread(target=lambda:switch_when(motion_event))
            switcher.daemon = True
            switcher.start()
            detect(motion_event)
    except KeyboardInterrupt:
        pass

if __name__ == '__main__':
    main()
Statt einer Queue kann man einfacher ein Event Objekt verwenden. Der Hauptthread fungiert als Event-Quelle und der Nebenthread als Event-Senke, d.h. das Event wird ausschließlich im Hauptthread gesetzt und im Nebenthread verwendet und wieder gelöscht. Dort wird zuerst auf eine Bewegung gewartet, worauf das Display sofort angeschaltet wird, und anschließend wird in einer Endlosschleife der Event solange immer wieder zurückgesetzt, wie eine weitere Bewegung innerhalb einer festgelegten Wartezeit festgestellt werden kann. Nur wenn innerhalb dieses Zeitrahmens keine Bewegung mehr festgestellt wurde wird die Schleife verlassen und das Display wieder ausgeschaltet.

Sowohl das GPIO-Initialisieren und -Aufräumen als auch das An- und Ausschalten des Displays folgen dem Muster, für das Contextmanager erfunden wurden.
In specifications, Murphy's Law supersedes Ohm's.
cyberchris
User
Beiträge: 7
Registriert: Samstag 3. Januar 2015, 21:06

Hallo zusammen,

nun der Reihe nach ;-)
@BlackJack: Sicher das wäre durchaus ein Punkt den ich in dem Scipt noch verbessern könnte. Das ist auch das was ich meinte, das mein Script bestimmt nicht die vollkommenste Lösung ist. Eventuell könnte das aber auch, wen man den Punkt den Sirius anführt, ein Problem werden das die Zeitspanne des Überlappens sich vergrößert und dadurch ein problem verusacht.
@Sirius3: Ich denke eine 100% Fehler freie Lösung wird es bestimmt nie geben - Für mich ist es auch wichtig den Aufwand in Relation zur Wahrscheinlichkeit das solche möglichkeit auftritt zu setzen. Ich denke dass die Zeit zwischen dem durchsuchen und dem Cancel des Timer-Threads so verschwindent gering ist das man das durchaus vernachlässigen kann. Im dunkeln werde ich nicht stehen da ja die funktion switch_off in der Funktion edge_detect mit switch_on inbegriffen ist.
@pillmuncher: wen du dir mal das Script nach deiner Formulierung anschaust könnte man feststellen das das durchaus erfüllt ist.
Ich prüfe (sogar über ein event) alle 100ms ob eine Bewegung festgestellt wird. Sobald eine Bewegung festgestellt wurde wird die Funktion edge_detect mit switch_on aufgerufen und das Display eingeschaltet. Dieses befindet sich immer 15s nach der letzten Bewegung in Funktion und wird dan erst über die Timer-Thread beendet.(das wird durch die zeilen 11 bis 16 sicher gestellt)
Wie schon erwäht stehe neuen offen gegenüber und werde mir dein Code noch ausführlich anschauen. Da ich aber mit der contextlib keine Erfahrung habe werde ich vorerst nicht damit arbeiten.
Vielleicht hast du ja für mich ein paar nüzliche links oder auch eine Buch Empfehlung damit ich mit der contextlib was anfangen kann.
Benutzeravatar
pillmuncher
User
Beiträge: 1484
Registriert: Samstag 21. März 2009, 22:59
Wohnort: Pfaffenwinkel

@cyberchris: Oft findet man derartigen Code:

Code: Alles auswählen

f = open(my_file)
try:
    ...
finally:
    f.close()

cursor = Database.connect(db_name, my_name, my_password)
try:
    ...
finally:
    cursor.close()

lock.acquire()
try:
    ...
finally:
    lock.release()

some_device.setup()
try:
    ...
finally:
    some_device.cleanup()

switch_on()
try:
    ...
finally:
    switch_off()
Das ContextManager-Protokoll standardisiert dieses Pattern zu:

Code: Alles auswählen

with open(my_file) as f:
    ...

with Database.connect(db_name, my_name, my_password) as cursor:
    ...

with lock:
    ...

with some_device:
    ...

with switch():
    ...
Die einfachste Art, solch einen ContextManager zu erstellen ist:

Code: Alles auswählen

import contextlib

@contextlib.contextmanager
def some_context():
    # initialization code here
    try:
        yield
    finally:
        # cleanup code here
Verwendet wird das dann so:

Code: Alles auswählen

with some_context():
    ...
. Dabei wird some_context() bis zum yield ausgeführt, die Ausführung unterbrochen, ... ausgeführt, und am Ende des Blocks wird some_context() nach dem yield weiter ausgeführt bis zum Ende.

Hier die Doku zu contextlib: https://docs.python.org/2/library/contextlib.html
Hier der PEP der das with Statement definiert: https://www.python.org/dev/peps/pep-0343/
Ansonsten google einfach mal nach python "context manager", da sollte einiges an Information zusammenkommen.
In specifications, Murphy's Law supersedes Ohm's.
Antworten