Asynchrones IMAP?

Sockets, TCP/IP, (XML-)RPC und ähnliche Themen gehören in dieses Forum
Antworten
Sansch
User
Beiträge: 10
Registriert: Freitag 19. Januar 2018, 15:41

Hallo miteinander,

ich versuche seit geraumer Zeit IMAP Login Informationen asynchron / gleichzeitig zu überprüfen.
Das System war bereits immer das gleiche, Queue bauen, Threads starten. Das Resultat ist leider nie zufriedenstellend gewesen. Irgendetwas scheint zu blocken bei der imaplib. Habe es mit greenlets (mit monkey patch), asyncio, threads und threadpoolexecutor probiert. "AIOImaplib" scheint nicht mehr aktuell zu sein bzw. nicht zu funktionieren. Das einzige was ich noch nicht probiert habe und wo es mir ehrlich gesagt auch an codesnippets / dokumentation fehlt ist twisted.

Kennt vielleicht noch jemand Methoden oder gar eine fertige Library die ich direkt nutzen könnte?
Falls das ganze auch noch mit der qt5 event loop funktioniert wäre das genial.

Hier mal ein Beispiel des Vorhabens:

Code: Alles auswählen

from threading import Thread
from queue import Queue
from imaplib import IMAP4_SSL

account_queue = Queue()
account_infos = [  # Beispiel
    ('benutzer1@web.com', 'password1', 'imap.server1.com'),
    ('benutzer2@web.com', 'password2', 'imap.server2.com'),
    ('benutzer3@web.com', 'password3', 'imap.server3.com'),
    ('benutzer4@web.com', 'password4', 'imap.server4.com'),
]


def check_imap(username, password, server):
    try:
        s = IMAP4_SSL(server)
        s.login(username, password)
        return True
    except Exception as e:
        return False


def worker():
    while True:
        (username, password, server) = account_queue.get()
        if username is None:
            break

        print(check_imap(username, password, server))


def main():
    max_threads = 3
    [account_queue.put(i) for i in account_infos]
    [account_queue.put((None, None, None)) for _ in range(max_threads)]
    threads = [Thread(target=worker) for _ in range(max_threads)]
    [t.start() for t in threads]
    [t.join() for t in threads]


if __name__ == '__main__':
    main()
Gruß,
Sansch
Benutzeravatar
noisefloor
User
Beiträge: 3843
Registriert: Mittwoch 17. Oktober 2007, 21:40
Wohnort: WW
Kontaktdaten:

Hallo,
"AIOImaplib" scheint nicht mehr aktuell zu sein bzw. nicht zu funktionieren.
Die aktuelle Version ist vom 1.5.2018 - das ist ziemlich aktuell. "funktioniert nicht" ist keine brauchbare Beschreibung. Was funktioniert denn bei dir am Beispielcode nicht?
Falls das ganze auch noch mit der qt5 event loop funktioniert wäre das genial.
Muss das oder kann das? Das macht hier einen ziemlichen Unterschied, ob man das in den Qt5 Mainloop integrieren muss oder nicht. asyncio hat ja z.B. seinen eigenen Loop. Wenn du Threads benutzen willst, müsstest du IMHO die Threadimplementierung QThread nutzen.

Gruß, noisefloor
Sansch
User
Beiträge: 10
Registriert: Freitag 19. Januar 2018, 15:41

Hallo Noisefloor,

ich muss ehrlicherweise sagen das ich mit asyncio noch nicht so viel Erfahrung sammeln konnte, allerdings funktioniert das erstellen einer asyncio event loop in einem QThread von daher wär aioimaplib schon optimal.
Habe das ganze nun noch einmal mit asyncio rekonstruiert - so funktioniert das in aller Regel mit anderen asynchronen modulen ziemlich gut. Nur aioimap bekomme ich nicht zum laufen. Es scheint auch so als würden die "worker" jeweils auch aufeinander warten und dann wird neue arbeit aus der queue geholt. Ich weiß auch nicht wieso ich "gaierror" nich fangen kann.

Hier mein Ansatz:

Code: Alles auswählen

import asyncio
from aioimaplib import IMAP4_SSL
from socket import gaierror

max_workers = 2
account_queue = asyncio.Queue()
account_infos = [  # Beispiel
    ('benutzer1@web.com', 'password1', 'imap.server1.com'),
    ('benutzer2@web.com', 'password2', 'imap.server2.com'),
    ('benutzer3@web.com', 'password3', 'imap.server3.com'),
]


async def produce_work():
    for i in account_infos:
        await account_queue.put(i)
    for _ in range(max_workers):
        await account_queue.put((None, None, None))


async def check_imap(username, password, server):
    try:
        s = IMAP4_SSL(server)
        await s.wait_hello_from_server()
        await s.login(username, password)
        return username, password, server, True
    except Exception as e:
        return username, password, server, str(e)
    except gaierror as e:
        return username, password, server, str(e)


async def worker():
    while True:
        (username, password, server) = await account_queue.get()
        if username is None:
            break

        print(await check_imap(username, password, server))


def main():
    loop = asyncio.get_event_loop()
    tasks = [worker() for _ in range(max_workers)]
    loop.run_until_complete(asyncio.gather(produce_work(), *tasks, loop=loop, return_exceptions=True))


if __name__ == '__main__':
    main()
Ausgabe:

Code: Alles auswählen

('benutzer1@web.com', 'password1', 'imap.server1.com', '')
('benutzer2@web.com', 'password2', 'imap.server2.com', '')
Task exception was never retrieved
future: <Task finished coro=<BaseEventLoop.create_connection() done, defined at G:\WinPython-3.5.4\python-3.5.4.amd64\lib\asyncio\base_events.py:679> exception=gaierror(11001, 'getaddrinfo failed')>
Traceback (most recent call last):
  File "G:\WinPython-3.5.4\python-3.5.4.amd64\lib\asyncio\tasks.py", line 240, in _step
    result = coro.send(None)
  File "G:\WinPython-3.5.4\python-3.5.4.amd64\lib\asyncio\base_events.py", line 732, in create_connection
    infos = f1.result()
  File "G:\WinPython-3.5.4\python-3.5.4.amd64\lib\asyncio\futures.py", line 294, in result
    raise self._exception
  File "G:\WinPython-3.5.4\python-3.5.4.amd64\lib\concurrent\futures\thread.py", line 55, in run
    result = self.fn(*self.args, **self.kwargs)
  File "G:\WinPython-3.5.4\python-3.5.4.amd64\lib\socket.py", line 733, in getaddrinfo
    for res in _socket.getaddrinfo(host, port, family, type, proto, flags):
socket.gaierror: [Errno 11001] getaddrinfo failed
('benutzer3@web.com', 'password3', 'imap.server3.com', '')
Task was destroyed but it is pending!
task: <Task pending coro=<BaseEventLoop.create_connection() running at G:\WinPython-3.5.4\python-3.5.4.amd64\lib\asyncio\base_events.py:763> wait_for=<Future pending cb=[BaseSelectorEventLoop._sock_connect_done(648)(), Task._wakeup()]>>
Task was destroyed but it is pending!
task: <Task pending coro=<BaseEventLoop.create_connection() running at G:\WinPython-3.5.4\python-3.5.4.amd64\lib\asyncio\base_events.py:763> wait_for=<Future pending cb=[BaseSelectorEventLoop._sock_connect_done(716)(), Task._wakeup()]>>
Benutzeravatar
noisefloor
User
Beiträge: 3843
Registriert: Mittwoch 17. Oktober 2007, 21:40
Wohnort: WW
Kontaktdaten:

Hallo,

das Konstrukt mit den Workern ist ein wenig abenteuerlich... asyncio funktioniert anders als Multiprocessing und Threading. Es gibt _einen_ Eventloop, der die Kontrolle hat. Man braucht üblicherweise keine Worker.

Nochmal: Hast du das Beispiel von der Homepage von aioimaplib mal so probiert? Bzw. das lässt sich testweise ja via `asyncio.gather()` leicht auf mehrere E-Mail Konten ausweiten.

Gruß, noisefloor
Sansch
User
Beiträge: 10
Registriert: Freitag 19. Januar 2018, 15:41

Das passiert mit dem Beispielcode:

Code: Alles auswählen

Task exception was never retrieved
future: <Task finished coro=<BaseEventLoop.create_connection() done, defined at G:\WinPython-3.5.4\python-3.5.4.amd64\lib\asyncio\base_events.py:679> exception=gaierror(11003, 'getaddrinfo failed')>
Traceback (most recent call last):
  File "G:\WinPython-3.5.4\python-3.5.4.amd64\lib\asyncio\tasks.py", line 240, in _step
    result = coro.send(None)
  File "G:\WinPython-3.5.4\python-3.5.4.amd64\lib\asyncio\base_events.py", line 732, in create_connection
    infos = f1.result()
  File "G:\WinPython-3.5.4\python-3.5.4.amd64\lib\asyncio\futures.py", line 294, in result
    raise self._exception
  File "G:\WinPython-3.5.4\python-3.5.4.amd64\lib\concurrent\futures\thread.py", line 55, in run
    result = self.fn(*self.args, **self.kwargs)
  File "G:\WinPython-3.5.4\python-3.5.4.amd64\lib\socket.py", line 733, in getaddrinfo
    for res in _socket.getaddrinfo(host, port, family, type, proto, flags):
socket.gaierror: [Errno 11003] getaddrinfo failed
Traceback (most recent call last):
  File "G:\WinPython-3.5.4\python-3.5.4.amd64\lib\asyncio\base_events.py", line 467, in run_until_complete
    return future.result()
  File "G:\WinPython-3.5.4\python-3.5.4.amd64\lib\asyncio\futures.py", line 294, in result
    raise self._exception
  File "G:\WinPython-3.5.4\python-3.5.4.amd64\lib\asyncio\tasks.py", line 240, in _step
    result = coro.send(None)
  File "G:\Sannes\Python3\Tests\async imap 2.py", line 8, in check_mailbox
    yield from imap_client.wait_hello_from_server()
  File "G:\WinPython-3.5.4\python-3.5.4.amd64\lib\site-packages\aioimaplib\aioimaplib.py", line 645, in wait_hello_from_server
    yield from asyncio.wait_for(self.protocol.wait('AUTH|NONAUTH'), self.timeout)
  File "G:\WinPython-3.5.4\python-3.5.4.amd64\lib\asyncio\tasks.py", line 410, in wait_for
    raise futures.TimeoutError()
concurrent.futures._base.TimeoutError
Benutzeravatar
noisefloor
User
Beiträge: 3843
Registriert: Mittwoch 17. Oktober 2007, 21:40
Wohnort: WW
Kontaktdaten:

Hallo,

Lt. Traceback hast du einen Timeout-Error. Also antwortet die Gegenseite nicht beim `HELLO` oder beim Login.

Funktioniert das Abfragen, wenn du nur ein einzelnes Postfach synchron abfragst?

Gruß, noisefloor
Sansch
User
Beiträge: 10
Registriert: Freitag 19. Januar 2018, 15:41

Hallo,

sorry für die späte Antwort.
Ich habe es gerade noch einmal probiert. Mein Beispiel funktioniert so lang der Server auch antwortet und alles glatt läuft.

Allerdings habe ich noch das Problem (wie bereits oben zu sehen). Das Timeouts, ConnectionRefused etc. nicht gefangen werden.
Woran könnte dies liegen?
Antworten