Generell ist das Design suboptimal finde ich, denn momentan dominiert der laengste Request eines Batches immer die Laufzeit von allen anderen desselben Batches. Das merkt man das der dann ne Weile wartet bis die naechsten 10 kommen in meinem Skript.
Ich hab's mal milde umgebaut, so das es nun 10 worker gibt, die aus einer Queue die gesamnten Auftraege abarbeiten. Der Gewinn ist nicht riesig, aber ~2 Sekunden auf 100 Requests.
Code: Alles auswählen
import asyncio
import aiohttp
from itertools import count
WORKERS = 10
TOTAL_COUNT = 100
import asyncio
import ssl
import sys
SSL_PROTOCOLS = (asyncio.sslproto.SSLProtocol,)
def ignore_aiohttp_ssl_eror(loop):
"""Ignore aiohttp #3535 / cpython #13548 issue with SSL data after close
There is an issue in Python 3.7 up to 3.7.3 that over-reports a
ssl.SSLError fatal error (ssl.SSLError: [SSL: KRB5_S_INIT] application data
after close notify (_ssl.c:2609)) after we are already done with the
connection. See GitHub issues aio-libs/aiohttp#3535 and
python/cpython#13548.
Given a loop, this sets up an exception handler that ignores this specific
exception, but passes everything else on to the previous exception handler
this one replaces.
Checks for fixed Python versions, disabling itself when running on 3.7.4+
or 3.8.
"""
if sys.version_info >= (3, 7, 4):
return
orig_handler = loop.get_exception_handler()
def ignore_ssl_error(loop, context):
if context.get("message") in {
"SSL error in data received",
"Fatal error on transport",
}:
# validate we have the right exception, transport and protocol
exception = context.get('exception')
protocol = context.get('protocol')
if (
isinstance(exception, ssl.SSLError)
and exception.reason == 'KRB5_S_INIT'
and isinstance(protocol, SSL_PROTOCOLS)
):
if loop.get_debug():
asyncio.log.logger.debug('Ignoring asyncio SSL KRB5_S_INIT error')
return
if orig_handler is not None:
orig_handler(loop, context)
else:
loop.default_exception_handler(context)
loop.set_exception_handler(ignore_ssl_error)
async def make_worker(session, queue):
while True:
try:
req_n = queue.get_nowait()
url = f"https://tbit-web.de/exec/ximg.php?fid={req_n}"
async with session.get(url) as resp:
print(req_n, "finish")
except asyncio.QueueEmpty:
break
async def main():
async with aiohttp.ClientSession() as session:
queue = asyncio.Queue()
workers = [make_worker(session, queue) for _ in range(WORKERS)]
for request_no in range(TOTAL_COUNT):
queue.put_nowait(request_no)
await asyncio.gather(*workers)
loop = asyncio.get_event_loop()
ignore_aiohttp_ssl_eror(loop)
loop.run_until_complete(main())