Linienauswertung Nahverkehr

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
Antworten
Agascha88
User
Beiträge: 23
Registriert: Sonntag 6. März 2022, 21:04

Hallo zusammen,
ich bin neu hier im Forum und habe leider sehr wenig Erfahrung in Python-Programmierung.
Ich beabsichtige die Pünktlichkeit einzelner Linien des Nahverkehrs zu analysieren. Dafür eignet sich die frei zugängliche elektronische Fahrplanauskunft des VRR.
Im Netz habe ich einen für einen etwas anderen Ansatz funktionierenden Code gefunden. Hierbei ging es im Wesentlichen um Fahrtausfälle.
Wenn man die aktuelle Abfrage mit screen startet, sind alle Informationen im screen vorhanden.
Folgendermaßen stelle ich mir die Linienanalyse vor:
• Im Code können die Haltestellen der Linie hinterlegt werden (funktioniert schon)
• Es sollen nur Fahrten mit Ist-Zeiten ausgegeben werden (aktuell werden Fahrten mit Ist- und ohne Ist-Zeiten ausgegeben)
• Es sollen nur Fahrten, die in 0 bis 10 Minuten abfahren ausgegeben werden (aktuell werden alle Fahrten ausgegeben)
• Es sollen nur die Abfahrten der untersuchten Linie ausgegeben werden (aktuell werden alle Linien der Haltestelle ausgegeben)
• Ausgegeben sollen die Daten der Abfrage in eine csv-Datei (aktuell wird die Abfrage nur im screen angezeigt)
• Zur Analyse sind je Datenreihe erforderlich: „Abfragezeit“, „Haltestellen-Nr.“, „Haltestellenname“, „Liniennummer“, „Linienzieltext“, „Abfahrt in Minuten“, „Verspätungsminuten“
• Es soll diese Abfrage alle 10 Minuten automatisiert ausgeführt werden und das Ergebnis in die Auswertungsdatei angefügt werden.
Ich würde mich freuen, wenn ich hier im Forum Hilfe beim Erweitern des Python-Codes erhalte.
Viele Grüße
Agascha

--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
So sieht der Grundlagencode bisher aus
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

import asyncio
from aiohttp import ClientSession
from time import sleep
from typing import NamedTuple, List
from datetime import datetime
import xml.etree.ElementTree as ET
from colorama import init, Fore
from csv import writer
from async_retrying import retry


screen = True #False = ohne screen und mit Datei / True = mit screen und ohne Datei
csvfn = './Fahrtausfälle.csv'
csvln = './Linienauswerung.csv'


class Departure(NamedTuple):
stop: str
countdown: int
linenum: str
direction: str
plantime: tuple
isrealtime: bool
delay: int
rtstatus: str
genattr: List[str]


@retry(attempts=10)
async def dmreq(session, ifopt, limit=20):
payload = {'name_dm':ifopt, 'type_dm':'any', 'mode':'direct', 'useRealtime':'1', 'limit':str(limit)} #, 'includedMeans':['5', '6', '7']}
#payload['itdTime']='1200'
#payload['itdDate']='180109'
async with session.get('http://openservice-test.vrr.de/vrr/XML_DM_REQUEST', params=payload) as r:
content = await r.read()
root = ET.fromstring(content)
#tree = ET.parse(...)
#root = tree.getroot()
return (ifopt, root)


sem = asyncio.Semaphore(10)


async def safe_dmreq(*args, **kwa):
async with sem:
return await dmreq(*args, **kwa)


def deps(root):
departures = []
for departure in root.iter('itdDeparture'):
stop = "".join(root.find('itdDepartureMonitorRequest').find('itdOdv').find('itdOdvName').find('odvNameElem').itertext())
servingline = departure.find('itdServingLine')
countdown = int(departure.attrib['countdown'])
linenum = servingline.attrib['number']
direction = servingline.attrib['direction']
realtime = bool(int(servingline.attrib['realtime']))
delay = 0
rtstatus = ""
itdtime = departure.find('itdDateTime').find('itdTime')
plantime = (int(itdtime.attrib['hour']), int(itdtime.attrib['minute']))
#if countdown > 59:
#realtime = False #False
if realtime:
delay = int(servingline.find('itdNoTrain').attrib['delay'])
rtstatus = departure.attrib.get('realtimeStatus', "")
genattr = []
ga = departure.find('genAttrList')
if ga:
for gae in ga:
gaename = gae.findtext('name', default="")
gaevalue = gae.findtext('value', default="")
genattr.append(gaename+"="+gaevalue)
departures.append(Departure(stop=stop,
countdown=countdown,
linenum=linenum,
direction=direction,
plantime=plantime,
isrealtime=realtime,
delay=delay,
rtstatus=rtstatus,
genattr=genattr))
return departures


def depstrings(departures, directionwidth=30):
strings = []
for dep in departures:
color = Fore.RESET
currentstring = ""
if not dep.isrealtime:
color = Fore.WHITE
elif dep.delay > 5 or dep.delay == -9999:
color = Fore.RED
elif dep.delay > 1:
color = Fore.YELLOW
else:
color = Fore.GREEN
currentstring += ('{:>4}'.format(dep.linenum) + " " + ('{:<'+str(directionwidth)+'}').format(dep.direction[:directionwidth]) + " ")
currentstring += color
if dep.countdown == 0:
currentstring += "jetzt"
elif dep.delay == -9999:
currentstring +=" xxx "
elif dep.countdown > 59:
currentstring += tt(dep.plantime)
elif dep.countdown > 9:
currentstring += (str(dep.countdown) + "min")
else:
currentstring += (" " + str(dep.countdown) + "min")
if dep.isrealtime and dep.delay != -9999:
currentstring += (color + "(+" + str(dep.delay) + ")" + Fore.RESET)
else:
currentstring += Fore.RESET
strings.append([dep.countdown, currentstring])
strings.sort(key=lambda x: x[0])
return strings


def tt(timetuple):
return str(timetuple[0]).zfill(2) + ":" + str(timetuple[1]).zfill(2)


async def loopreqs(ifopts):
alldeps = {}
tasks = []
async with ClientSession() as session:
for ifopt in ifopts:
#try:
task = asyncio.ensure_future(safe_dmreq(session, ifopt, 25)) #25 gleich Reihen
tasks.append(task)
responses = await asyncio.gather(*tasks)
for ifopt, root in responses:
alldeps[ifopt] = deps(root)
return alldeps


if __name__ == "__main__":
init() # colorama

# Haltestellen
ifopts = ['de:05711:5447', 'de:05711:5001', 'de:05711:5002', 'de:05711:10049', 'de:05711:5003']

rtmax = 35
ausfaelle = []
ausfalldeps = []
rtabfahrtenmax = {}

loop = asyncio.get_event_loop()
future = asyncio.ensure_future(loopreqs(ifopts))
alldeps = loop.run_until_complete(future)

for ifopt in alldeps:
departures = alldeps[ifopt]
newausfall = False
for departure in departures:
if departure.countdown <= rtmax:
if departure.linenum not in rtabfahrtenmax:
rtabfahrtenmax[departure.linenum] = {}
if departure.direction not in rtabfahrtenmax[departure.linenum]:
rtabfahrtenmax[departure.linenum][departure.direction] = []
rtabfahrtenmax[departure.linenum][departure.direction].append(departure.isrealtime)
if departure.delay == -9999:
newausfall = True
ausfaelle.append(f"{departure.stop} ({ifopt}): {departure.linenum} => {departure.direction} ({tt(departure.plantime)})")
ausfalldeps.append((ifopt, departure))

# Bildschirmausgabe
if screen:
if departures:
print(f"=== {departures[0].stop} ({ifopt}) ===")
else:
print(f"Keine Abfahrten für {ifopt} gefunden")
strs = depstrings(departures)
for s in strs:
print(s[1])
print("")
if newausfall:
print("==== Ausfälle ====", "\n".join(ausfaelle), "", sep="\n")
# sleep(0.1)

if screen:
print("Sammlung beendet\n")

print(f"Echtzeitabfahrten (<= {rtmax} min) nach Linie und Richtung:")
for linenum, directions in sorted(rtabfahrtenmax.items()):
print(linenum)
for direction, rta in sorted(directions.items()):
rtabfahrten = rta.count(True)
alleabfahrten = len(rta)
print(f" - {direction}: {rtabfahrten}/{alleabfahrten} ({round((rtabfahrten/alleabfahrten)*100)} %)")
print("")

nowtimestr = datetime.now().strftime("%Y-%m-%d %H:%M")
if screen:
print(f"Gefundene Ausfälle ~{nowtimestr}:", ("\n"+"\n".join(ausfaelle)) if ausfaelle else " keine", sep="")
else:
with open(csvfn, 'a') as f:
writer(f).writerows(sorted(list([(nowtimestr, dep.stop, ifopt, dep.linenum, dep.direction, tt(dep.plantime), dep.rtstatus, "|".join(dep.genattr)) for ifopt, dep in ausfalldeps]), key=lambda x: (x[3], x[4], x[5])))
Benutzeravatar
Kebap
User
Beiträge: 687
Registriert: Dienstag 15. November 2011, 14:20
Wohnort: Dortmund

Agascha88 hat geschrieben: Sonntag 6. März 2022, 21:15 Ich würde mich freuen, wenn ich hier im Forum Hilfe beim Erweitern des Python-Codes erhalte.
Hallo, willkommen im Forum!
Möchtest du den Code selbst erweitern (und wie sah dein letzter Versuch aus?), oder soll das jemand anders für dich tun?
MorgenGrauen: 1 Welt, 8 Rassen, 13 Gilden, >250 Abenteuer, >5000 Waffen & Rüstungen,
>7000 NPC, >16000 Räume, >200 freiwillige Programmierer, nur Text, viel Spaß, seit 1992.
Agascha88
User
Beiträge: 23
Registriert: Sonntag 6. März 2022, 21:04

Hallo Kebap,
danke für die freundliche Aufnahme hier im Forum.

Den aktuellen Code habe ich direkt unter meine Frage kopiert, da ich nicht herausbekommen habe, wie ich hier den Code teilen kann.
Der Code liefert auch schon Ergebnisse im screen. Nur sollen die Ergebnisse begrenzt werden und in eine Datei geschrieben werden.

Wenn du oder ein anders Mitglied des Forums die Zeit und Lust hätte den Code ein wenig in die gewünschte Richtung zu erweitern, würde ich mich freuen.

Ich lerne zwar schnell, aber denke eure Nerven nicht unnötig strapazieren zu wollen, wenn ich es selber versuche.
Liebe Grüße
Agascha
Sirius3
User
Beiträge: 17750
Registriert: Sonntag 21. Oktober 2012, 17:20

Konstanten schreibt man komplett gross, also SCREEN.
Benutze keine kryptischen Abkürzungen. Ich verstehe nicht woher das n bei csvfn kommt, denn Fahrtausfälle hat kein n. Und das n bei Linienauswerung ist auch sehr komisch gewählt für eine Abkürzung. Da fehlt übrigens noch ein t.

dmreq oder deps sind schlechte Funktionsnamen, weil sie nichts aussagen, was diese Funktionen machen. Funktionen werden üblicherweise nach Tätigkeiten benannt.

Dass man Klassen aus typing tatsächlich als Klassen benutzt, ist mir neu, aber die Leute, die Typannotationen erfunden haben, waren eh etwas wirr im Kopf. Deshalb würde ich an Deiner Stelle auf dataclass umsteigen.

Benutze keine globalen Variablen. Wenn safe_dmreq `sem` braucht, übergib das als Argument.
Die vielen find-Aufrufe in deps lassen sich zu einem zusammenfassen: `stop = root.findtext("itdDepartureMonitorRequest/itdOdv/itdOdvName/odvNameElem")`.

In `depstrings` stückelst Du Strings mit + zusammen, obwohl Du format anscheinend schon kennst. Warum?
Besser noch f-Strings verwenden! zfill braucht man auch nicht mehr, seit es Formatierungsangaben gibt.

Code: Alles auswählen

def get_color(departure):
    color = Fore.RESET
    if not departure.isrealtime:
        color = Fore.WHITE
    elif departure.delay > 5 or departure.delay == -9999:
        color = Fore.RED
    elif departure.delay > 1:
        color = Fore.YELLOW
    else:
        color = Fore.GREEN
    return color

def tt(timetuple):
    return f"{timetuple[0]:02d}:{timetuple[1]:02d}"

def get_countdown(departure):
    if departure.countdown == 0:
        return "jetzt"
    elif departure.delay == -9999:
        return " xxx "
    elif departure.countdown > 59: 
        return tt(departure.plantime)
    else:
        return f"{departure.countdown:2d}min"

def depstrings(departures, directionwidth=30):
    strings = []
    for departure in departures:
        color = get_color(departure)
        countdown = get_countdown(departure)
        delay = f"(+{departure.delay})" if departure.isrealtime and departure.delay != -9999 else ""
        currentstring = (
            f"{departure.linenum:>4} {departure.direction:<{directionwidth}.{directionwidth}} "
            f"{color}{countdown}{delay}{Fore.RESET}"
        )
        strings.append((departure.countdown, currentstring))
    strings.sort(key=lambda x: x[0])
    return strings
Agascha88
User
Beiträge: 23
Registriert: Sonntag 6. März 2022, 21:04

Hallo Sirius3,

danke schön für deine Anregungen, Erweiterungen und deine Zeit den Code zu überarbeiten.
Ich werde versuchen, den Gesamtcode daraufhin zu überarbeiten und hier wieder teilen.

Liebe Grüße
Agascha
Agascha88
User
Beiträge: 23
Registriert: Sonntag 6. März 2022, 21:04

Hallo zusammen,

mit Sirius3 Hilfe ist der Code schon einmal wesentlich übersichtlicher und kürzer geworden.
Ich konnte testen, dass die Abfrage auch weiterhin funktioniert.
Wenn man die aktuelle Abfrage mit screen startet, sind alle Informationen schon im screen vorhanden.
Jetzt müssen aber noch die Abfragedaten reduziert und in eine Datei geschrieben werden, damit ich sie auswerten kann.

Folgendermaßen stelle ich mir die Linienanalyse vor:
• Es sollen nur Fahrten mit Ist-Zeiten ausgegeben werden (aktuell werden Fahrten mit Ist- und ohne Ist-Zeiten ausgegeben)
• Es sollen nur Fahrten, die in 0 bis 10 Minuten abfahren ausgegeben werden (aktuell werden alle Fahrten ausgegeben)
• Es sollen nur die Abfahrten der untersuchten Linie ausgegeben werden (aktuell werden alle Linien der Haltestelle ausgegeben)
• Ausgegeben sollen die Daten der Abfrage in eine csv-Datei (aktuell wird die Abfrage nur im screen angezeigt)
• Zur Analyse sind je Datenreihe erforderlich: „Abfragezeit“, „Haltestellen-Nr.“, „Haltestellenname“, „Liniennummer“, „Linienzieltext“, „Abfahrt in Minuten“, „Verspätungsminuten“
• Es soll diese Abfrage alle 10 Minuten automatisiert ausgeführt werden und das Ergebnis in die Auswertungsdatei angefügt werden.

Danke schön für eure Hilfe, Anregung, Unterstützung

Liebe Grüße
Agascha

So sieht aktuell der Code aus

Code: Alles auswählen

import asyncio
from aiohttp import ClientSession
from time import sleep
from typing import NamedTuple, List
from datetime import datetime
import xml.etree.ElementTree as ET
from colorama import init, Fore
from csv import writer
from async_retrying import retry

# dep kommt von departure
# deps kommt von departures

# Wie soll die Ausgabe erfolgen
SCREEN = True #False = Ausgabe der Fahrtausfälle in einer csv-Datei / True = Ausgabe auf dem screen
CSVAUSFÄLLE = './Fahrtausfälle.csv'  # csv Fahrtausfälle
CSVLINIE = './Linienanalyse.csv'  # csv Linienanalyse

# Haltestellenklasse
class Departure(NamedTuple):
    stop: str
    countdown: int
    linenum: str
    direction: str
    plantime: tuple
    isrealtime: bool
    delay: int
    rtstatus: str
    genattr: List[str]

# Das ist die Abfrage beim VRR
@retry(attempts=10)
async def dmreq(session, ifopt, limit=20):
    payload = {'name_dm':ifopt, 'type_dm':'any', 'mode':'direct', 'useRealtime':'1', 'limit':str(limit)} #, 'includedMeans':['5', '6', '7']}
    #payload['itdTime']='1200'       # Zu einer bestimmten Zeit die Daten abfragen
    #payload['itdDate']='180109'     # Zu einem bestimmten Datum die Daten abfragen
    async with session.get('http://openservice-test.vrr.de/vrr/XML_DM_REQUEST', params=payload) as r:
        content = await r.read()
        root = ET.fromstring(content)
        #tree = ET.parse(...)
        #root = tree.getroot()
        return (ifopt, root)

# Hier werden die Abfragen zwischengespeichert
sem = asyncio.Semaphore(10)   # Die 10 bedeutet max. 10 Abfragen zeitgleich

# https://stackoverflow.com/a/48486557    
async def safe_dmreq(*args, **kwa):
    async with sem:
        return await dmreq(*args, **kwa)

def deps(root):
    departures = []
    for departure in root.iter('itdDeparture'):
        stop = root.findtext("itdDepartureMonitorRequest/itdOdv/itdOdvName/odvNameElem")
        #stop = "".join(root.find('itdDepartureMonitorRequest').find('itdOdv').find('itdOdvName').find('odvNameElem').itertext())
        servingline = departure.find('itdServingLine')
        countdown = int(departure.attrib['countdown'])
        linenum = servingline.attrib['number']
        direction = servingline.attrib['direction']
        realtime = bool(int(servingline.attrib['realtime']))
        delay = 0
        rtstatus = ""
        itdtime = departure.find('itdDateTime').find('itdTime')
        plantime = (int(itdtime.attrib['hour']), int(itdtime.attrib['minute']))
        #if countdown > 59:
        #realtime = False #False
        if realtime:
            delay = int(servingline.find('itdNoTrain').attrib['delay'])
            rtstatus = departure.attrib.get('realtimeStatus', "")
        genattr = []
        ga = departure.find('genAttrList')
        if ga:
            for gae in ga:
                gaename = gae.findtext('name', default="")
                gaevalue = gae.findtext('value', default="")
                genattr.append(gaename+"="+gaevalue)
        departures.append(Departure(stop=stop,
                                    countdown=countdown,
                                    linenum=linenum,
                                    direction=direction,
                                    plantime=plantime,
                                    isrealtime=realtime,
                                    delay=delay,
                                    rtstatus=rtstatus,
                                    genattr=genattr))
    return departures


def get_countdown(departure):
    if departure.countdown == 0:
        return "jetzt"
    elif departure.delay == -9999:
        return " xxx "
    elif departure.countdown > 59: 
        return tt(departure.plantime)
    else:
        return f"{departure.countdown:2d}min"
 
def depstrings(departures, directionwidth=30):
    strings = []
    for departure in departures:
        color = get_color(departure)
        countdown = get_countdown(departure)
        delay = f"(+{departure.delay})" if departure.isrealtime and departure.delay != -9999 else ""
        currentstring = (
            f"{departure.linenum:>4} {departure.direction:<{directionwidth}.{directionwidth}} "
            f"{color}{countdown}{delay}{Fore.RESET}"
        )
        strings.append((departure.countdown, currentstring))
    strings.sort(key=lambda x: x[0])
    return strings

def get_color(departure):
    color = Fore.RESET
    if not departure.isrealtime:
        color = Fore.WHITE
    elif departure.delay > 5 or departure.delay == -9999:
        color = Fore.RED
    elif departure.delay > 1:
        color = Fore.YELLOW
    else:
        color = Fore.GREEN
    return color

def tt(timetuple):
    return f"{timetuple[0]:02d}:{timetuple[1]:02d}"

# Zwischengespeicherte Abfragen
async def loopreqs(ifopts):
    alldeps = {}
    tasks = []
    async with ClientSession() as session:
        for ifopt in ifopts:
            #try:
            task = asyncio.ensure_future(safe_dmreq(session, ifopt, 25)) # Zahl im letzen Feld = gleich wie viele Abfahrten werden je Haltestelle dagestellt
            tasks.append(task)
        responses = await asyncio.gather(*tasks)
        for ifopt, root in responses:
            alldeps[ifopt] = deps(root)
        return alldeps

if __name__ == "__main__":
    init()  # colorama
    
    # Linie XX, HST Baumheide, Jahnplatz, Dürerstrasse
    ifopts = ['de:05711:5122:0:1', 'de:05711:5272:74:74', 'de:05711:5460:71:73']

    rtmax = 35
    ausfaelle = []
    ausfalldeps = []
    rtabfahrtenmax = {}

    loop = asyncio.get_event_loop()
    future = asyncio.ensure_future(loopreqs(ifopts))
    alldeps = loop.run_until_complete(future)

    for ifopt in alldeps:
        departures = alldeps[ifopt]
        newausfall = False
        for departure in departures:
            if departure.countdown <= rtmax:
                if departure.linenum not in rtabfahrtenmax:
                    rtabfahrtenmax[departure.linenum] = {}
                if departure.direction not in rtabfahrtenmax[departure.linenum]:
                    rtabfahrtenmax[departure.linenum][departure.direction] = []
                rtabfahrtenmax[departure.linenum][departure.direction].append(departure.isrealtime)
            if departure.delay == -9999:
                newausfall = True
                ausfaelle.append(f"{departure.stop} ({ifopt}): {departure.linenum} => {departure.direction} ({tt(departure.plantime)})")
                ausfalldeps.append((ifopt, departure))

      
        if SCREEN:  # Bildschirmausgabe
            if departures:
                print(f"=== {departures[0].stop} ({ifopt}) ===")
            else:
                print(f"Keine Abfahrten für {ifopt} gefunden")
            strs = depstrings(departures)
            for s in strs:
                print(s[1])
            print("")
            if newausfall:
                print("==== Ausfälle ====", "\n".join(ausfaelle), "", sep="\n")
        # sleep(0.1)

    if SCREEN:   # Bildschirmausgabe
        print("Sammlung beendet\n") 

        print(f"Echtzeitabfahrten (<= {rtmax} min) nach Linie und Richtung:")
        for linenum, directions in sorted(rtabfahrtenmax.items()):
            print(linenum)
            for direction, rta in sorted(directions.items()):
                rtabfahrten = rta.count(True)
                alleabfahrten = len(rta)
                print(f" - {direction}: {rtabfahrten}/{alleabfahrten} ({round((rtabfahrten/alleabfahrten)*100)} %)")
            print("")

    nowtimestr = datetime.now().strftime("%Y-%m-%d %H:%M")
    if SCREEN:   # Bildschirmausgabe
        print(f"Gefundene Ausfälle ~{nowtimestr}:", ("\n"+"\n".join(ausfaelle)) if ausfaelle else " keine", sep="")
    else:    # Datei Fahrtausfälle
        with open(CSVAUSFÄLLE, 'a') as f:
            writer(f).writerows(sorted(list([(nowtimestr, dep.stop, ifopt, dep.linenum, dep.direction, tt(dep.plantime), dep.rtstatus, "|".join(dep.genattr)) for ifopt, dep in ausfalldeps]), key=lambda x: (x[3], x[4], x[5])))
Antworten