ich versuche seit geraumer Zeit IMAP Login Informationen asynchron / gleichzeitig zu überprüfen.
Das System war bereits immer das gleiche, Queue bauen, Threads starten. Das Resultat ist leider nie zufriedenstellend gewesen. Irgendetwas scheint zu blocken bei der imaplib. Habe es mit greenlets (mit monkey patch), asyncio, threads und threadpoolexecutor probiert. "AIOImaplib" scheint nicht mehr aktuell zu sein bzw. nicht zu funktionieren. Das einzige was ich noch nicht probiert habe und wo es mir ehrlich gesagt auch an codesnippets / dokumentation fehlt ist twisted.
Kennt vielleicht noch jemand Methoden oder gar eine fertige Library die ich direkt nutzen könnte?
Falls das ganze auch noch mit der qt5 event loop funktioniert wäre das genial.
Hier mal ein Beispiel des Vorhabens:
Code: Alles auswählen
from threading import Thread
from queue import Queue
from imaplib import IMAP4_SSL
account_queue = Queue()
account_infos = [ # Beispiel
('benutzer1@web.com', 'password1', 'imap.server1.com'),
('benutzer2@web.com', 'password2', 'imap.server2.com'),
('benutzer3@web.com', 'password3', 'imap.server3.com'),
('benutzer4@web.com', 'password4', 'imap.server4.com'),
]
def check_imap(username, password, server):
try:
s = IMAP4_SSL(server)
s.login(username, password)
return True
except Exception as e:
return False
def worker():
while True:
(username, password, server) = account_queue.get()
if username is None:
break
print(check_imap(username, password, server))
def main():
max_threads = 3
[account_queue.put(i) for i in account_infos]
[account_queue.put((None, None, None)) for _ in range(max_threads)]
threads = [Thread(target=worker) for _ in range(max_threads)]
[t.start() for t in threads]
[t.join() for t in threads]
if __name__ == '__main__':
main()
Sansch