Kurze Frage zur Effizienz eines Snippets (for-Loops...)

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
greetings1
User
Beiträge: 51
Registriert: Donnerstag 22. Oktober 2020, 18:19

Hallo, ich bin neu hier. Ich such nach einem Bottleneck oder Fehler im folgenden Code. Die äußere for-Schleife bleibt die ganze Zeit zwischen 0<i<100 und das Script hält nicht an... len(list(listings_dic.keys())) ist ca. 15.000:

Code: Alles auswählen

systems: list = []
systems_dic: dict = {}
stats: list = []
stats_dic: dict = {}
listings: list = []
listings_dic: dict = {}

//...

def get_best_listing(a: list, b: list):
    max_pro = 0
    max_l1 = None
    max_l2 = None
    for l1 in a:
        for l2 in b:
            cid1 = int(l1["commodity_id"])
            cid2 = int(l2["commodity_id"])
            buyp = int(l1["buy_price"])
            sellp = int(l2["sell_price"])
            if cid1 == cid2 and buyp > 0 and sellp > buyp:
                if sellp - buyp > max_pro:
                    max_pro = sellp - buyp
                    max_l1 = l1
                    max_l2 = l2
    if max_pro > 0:
        return (max_l1, max_l2, max_pro)
    return None


def get_best_loop(a: list, b: list):
    max_a = get_best_listing(a, b)
    max_b = get_best_listing(b, a)
    if max_a is not None and max_b is not None:
        return (max_a, max_b, max_a[2] + max_b[2])
    return None


def calc_1():
    max = 0
    max_loop = None
    keys = list(listings_dic.keys())
    for i in range(len(keys)):
        l_key_1 = keys[i]
        a: list = listings_dic[l_key_1]
        b: dict = stats_dic[l_key_1]
        c: dict = systems_dic[b["system_id"]]
        if i % 100 == 0:
            print(i)
        for j in range(i + 1, len(keys)):
            l_key_2 = keys[j]
            a2: list = listings_dic[l_key_2]
            b2: dict = stats_dic[l_key_2]
            c2: dict = systems_dic[b2["system_id"]]
            l = get_best_loop(a, a2)
            if l is not None and l[2] > max:
                max = l[2]
                max_loop = l
    print(max_loop)
    print(systems_dic[stats_dic[max_loop[0][0]["station_id"]]["system_id"]]["name"])
    print(stats_dic[max_loop[0][0]["station_id"]]["name"])
    print(systems_dic[stats_dic[max_loop[0][1]["station_id"]]["system_id"]]["name"])
    print(stats_dic[max_loop[0][1]["station_id"]]["name"])
    print(systems_dic[stats_dic[max_loop[1][0]["station_id"]]["system_id"]]["name"])
    print(stats_dic[max_loop[1][0]["station_id"]]["name"])
    print(systems_dic[stats_dic[max_loop[1][1]["station_id"]]["system_id"]]["name"])
    print(stats_dic[max_loop[1][1]["station_id"]]["name"])
    print(max_loop[0][0]["commodity_id"])
    print(max_loop[0][1]["commodity_id"])
    print(max_loop[1][0]["commodity_id"])
    print(max_loop[1][1]["commodity_id"])
    
    calc_1()
Benutzeravatar
__blackjack__
User
Beiträge: 13103
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

@greetings1: Du fragst nach der Effizienz und dann klingt die Frage aber so als wenn das gar nicht macht was es soll. Mal offensichtliche Laufzeit- oder Speicherkomplexitätsfragen aussen vor gelassen, sollte man immer erst mal Code korrekt haben, bevor man sich an Optimierungsfragen macht.

Dann ist das offensichtlichste Problem das Du da globale Variablen verwendest. Das macht man nicht. Eine Funktion oder Methode bekommt alles was sie ausser Konstanten benötigt als Argument(e) übergeben. Und sie verändert keine Datenstrukturen sofern das nicht deutlich aus dem Funktions-/Methodennamen hervorgeht oder dokumentiert ist.
Das nächste sind Namen: Einbuchstabige Namen sind selten sinnvolle gute Namen, und man hängt keine Zahlen an Namen an.

Die Typannotationen sind fast alle überflüssig. Und da wo sie es nicht sind, sind sie nicht wirklich spezifisch genug um hilfreich zu sein. Sie sind auch nur hilfreich wenn man sie auch prüfen lässt. Machst Du das? Und womit? Denn bei mir werden bei den ganzen `print()`-Aufrufen Typfehler angezeigt weil die Typen nicht spezifisch genug sind und vieles als `Optional[Any]` von der Typinferenz erkannt wird und das ist dann nicht indexierbar. Wenn man Typannotationen nicht regelmässig, am besten automatisiert überprüft, sollte man sie besser weg lassen, denn falsche Typannotation sind wirklich schlecht, weil der Leser dann starke Annahmen trifft und verwirrt ist.

`calc_1()` ruft sich am Ende selbst auf. Und zwar endlos. Das rennt also ganz sicher in eine Ausnahme. Auch wenn das nicht ganz so sicher in einer Ausnahme enden würde: Rekursion ist kein funktionierender Ersatz für imperative Schleifen. So etwas kann man nur in Programmiersprachen machen die „tail call optimization“ garantieren.
„All religions are the same: religion is basically guilt, with different holidays.” — Cathy Ladman
greetings1
User
Beiträge: 51
Registriert: Donnerstag 22. Oktober 2020, 18:19

__blackjack__ hat geschrieben: Donnerstag 22. Oktober 2020, 19:06 `calc_1()` ruft sich am Ende selbst auf. Und zwar endlos. Das rennt also ganz sicher in eine Ausnahme. Auch wenn das nicht ganz so sicher in einer Ausnahme enden würde: Rekursion ist kein funktionierender Ersatz für imperative Schleifen. So etwas kann man nur in Programmiersprachen machen die „tail call optimization“ garantieren.
Sorry, bei dem "calc_1()" Aufruf hat sich eine Einrückung zu viel eingeschlichen. Ich wollte damit nur schreiben, dass calc_1 zuerst aufgerufen wird.

Ich werde das jetzt noch mal bezüglich der Datentypen ändern, und melde mich gleich noch mal.
greetings1
User
Beiträge: 51
Registriert: Donnerstag 22. Oktober 2020, 18:19

Hier ist jetzt einmal das komplette Script mit Datentypen in "calc_1". Ich habe die Datenmenge durch eine Anpassung in Zeile 121 auch etwas reduziert, aber es hält nach wie vor nicht an und "0" ist die einzige Ausgabe:

Code: Alles auswählen

import json
import csv
import math
import os
import psutil
import inspect

systems: list = []
systems_dic: dict = {}
stats: list = []
stats_dic: dict = {}
listings: list = []
listings_dic: dict = {}


def print_mem_info():
    curframe = inspect.currentframe()
    calframe = inspect.getouterframes(curframe, 2)
    print("caller:", calframe[1][3])
    print("systems, stats, listings")
    print(type(systems), len(systems), type(systems_dic), len(systems_dic), sep=" , ")
    print(type(stats), len(stats), type(stats_dic), len(stats_dic), sep=" , ")
    print(
        type(listings), len(listings), type(listings_dic), len(listings_dic), sep=" , "
    )
    mybytes = psutil.Process(os.getpid()).memory_info()[0]
    print(mybytes / (1024.0 * 1024.0), "mb", mybytes, "b")
    print()


def get_id_dict(d: dict):
    mid = d["id"]
    if mid is not None:
        try:
            return int(mid)
        except ValueError:
            pass
    print("ValueError:", d)
    return None


def get_id_list(l: list, i: int = 0):
    mid = l[i]
    if mid is not None:
        try:
            return int(mid)
        except ValueError:
            pass
    print("ValueError:", l)
    return None


def parse_systems():
    global systems, systems_dic
    with open("systems_populated.json") as f:
        systems = json.load(f)
    for s in systems:
        n = get_id_dict(s)
        if n is not None:
            systems_dic[n] = s
    print_mem_info()
    headers: list
    with open("systems_recently.csv") as f:
        csv_reader = csv.reader(f, delimiter=",")
        i: int = 0
        for row in csv_reader:
            if i == 0:
                headers = row
            else:
                n = get_id_list(row)
                if n is not None and n not in systems_dic:
                    e = dict(zip(headers, row))
                    systems.append(e)
                    systems_dic[n] = e
            i += 1
    print_mem_info()


def get_dist_to_sun(s: dict):
    try:
        x = s["x"]
        y = s["y"]
        z = s["z"]
        if None not in (x, y, z):
            x = float(x)
            y = float(y)
            z = float(z)
            tosun: float = math.sqrt(x * x + y * y + z * z)
            return tosun
    except ValueError:
        print("ValueError:", s)
    return -1.0


def get_dist(s1: dict, s2: dict):
    try:
        x = s1["x"]
        y = s1["y"]
        z = s1["z"]
        a = s2["x"]
        b = s2["y"]
        c = s2["z"]
        if None not in (x, y, z, a, b, c):
            x = float(x)
            y = float(y)
            z = float(z)
            a = float(a)
            b = float(b)
            c = float(c)
            dist: float = math.sqrt(
                math.pow(x - a, 2) + math.pow(y - b, 2) + math.pow(z - c, 2)
            )
            return dist
    except ValueError:
        print("ValueError:", s1, s2)
    return -1.0


def grep_systems():
    global systems, systems_dic
    systems = list(filter(lambda s: 0 <= get_dist_to_sun(s) <= 50, systems))
    systems_dic.clear()
    for s in systems:
        n = get_id_dict(s)
        systems_dic[n] = s
    print_mem_info()


def parse_stats():
    global stats, stats_dic
    with open("stations.json") as f:
        stats = json.load(f)
    print_mem_info()
    stats = list(filter(lambda sta: sta["system_id"] in systems_dic, stats))
    stats_dic.clear()
    for s in stats:
        n = get_id_dict(s)
        stats_dic[n] = s
    print_mem_info()


def grep_stats():
    global systems, systems_dic, stats, stats_dic
    stats = list(
        filter(
            lambda sta: sta["max_landing_pad_size"] == "L"
            and sta["distance_to_star"] is not None
            and 0 < int(sta["distance_to_star"]) <= 5000,
            stats,
        )
    )
    stats_dic.clear()
    for s in stats:
        n = get_id_dict(s)
        stats_dic[n] = s
    myset: set = set()
    systems.clear()
    for s in stats:
        myid = s["system_id"]
        if myid not in myset:
            myset.add(myid)
            systems.append(systems_dic[myid])
    systems_dic.clear()
    for s in systems:
        n = get_id_dict(s)
        systems_dic[n] = s
    print_mem_info()


def parse_listings():
    global listings
    headers: list
    with open("listings.csv") as f:
        csv_reader = csv.reader(f, delimiter=",")
        i: int = 0
        for row in csv_reader:
            if i == 0:
                headers = row
            else:
                e = dict(zip(headers, row))
                listings.append(e)
            i += 1
    print_mem_info()
    myset: set = set()
    mylist: list = []
    for l in listings:
        if l["id"] not in myset:
            myset.add(l["id"])
            mylist.append(l)
    listings = mylist
    print_mem_info()


def grep_listings():
    global systems, systems_dic, stats, stats_dic, listings, listings_dic
    for l in listings:
        myid = int(l["station_id"])
        if myid in stats_dic:
            if myid not in listings_dic:
                listings_dic[myid] = [l]
            else:
                listings_dic[myid].append(l)
    print_mem_info()
    listings.clear()
    print_mem_info()
    stats = list(
        filter(
            lambda sta: sta["id"] in listings_dic,
            stats,
        )
    )
    stats_dic.clear()
    for s in stats:
        n = get_id_dict(s)
        stats_dic[n] = s
    myset: set = set()
    systems.clear()
    for s in stats:
        myid = int(s["system_id"])
        if myid not in myset:
            myset.add(myid)
            systems.append(systems_dic[myid])
    systems_dic.clear()
    for s in systems:
        n = get_id_dict(s)
        systems_dic[n] = s
    print_mem_info()


def get_best_listing(a: list, b: list):
    max_pro = 0
    max_l1 = None
    max_l2 = None
    for l1 in a:
        for l2 in b:
            cid1 = int(l1["commodity_id"])
            cid2 = int(l2["commodity_id"])
            buyp = int(l1["buy_price"])
            sellp = int(l2["sell_price"])
            if cid1 == cid2 and buyp > 0 and sellp > buyp:
                if sellp - buyp > max_pro:
                    max_pro = sellp - buyp
                    max_l1 = l1
                    max_l2 = l2
    if max_pro > 0:
        return (max_l1, max_l2, max_pro)
    return None


def get_best_loop(a: list, b: list):
    max_a = get_best_listing(a, b)
    max_b = get_best_listing(b, a)
    if max_a is not None and max_b is not None:
        return (max_a, max_b, max_a[2] + max_b[2])
    return None


def calc_1():
    max: int = 0
    max_loop: set = None
    keys: list = list(listings_dic.keys())
    for i in range(len(keys)):
        key_1: int = keys[i]
        a: list = listings_dic[key_1]
        b: dict = stats_dic[key_1]
        c: dict = systems_dic[b["system_id"]]
        if i % 100 == 0:
            print(i)
        for j in range(i + 1, len(keys)):
            key_2: int = keys[j]
            a2: list = listings_dic[key_2]
            b2: dict = stats_dic[key_2]
            c2: dict = systems_dic[b2["system_id"]]
            l: set = get_best_loop(a, a2)
            if l is not None and l[2] > max:
                max = l[2]
                max_loop = l
    print(max_loop)
    print(systems_dic[stats_dic[max_loop[0][0]["station_id"]]["system_id"]]["name"])
    print(stats_dic[max_loop[0][0]["station_id"]]["name"])
    print(systems_dic[stats_dic[max_loop[0][1]["station_id"]]["system_id"]]["name"])
    print(stats_dic[max_loop[0][1]["station_id"]]["name"])
    print(systems_dic[stats_dic[max_loop[1][0]["station_id"]]["system_id"]]["name"])
    print(stats_dic[max_loop[1][0]["station_id"]]["name"])
    print(systems_dic[stats_dic[max_loop[1][1]["station_id"]]["system_id"]]["name"])
    print(stats_dic[max_loop[1][1]["station_id"]]["name"])
    print(max_loop[0][0]["commodity_id"])
    print(max_loop[0][1]["commodity_id"])
    print(max_loop[1][0]["commodity_id"])
    print(max_loop[1][1]["commodity_id"])


parse_systems()
grep_systems()
parse_stats()
grep_stats()
parse_listings()
grep_listings()
calc_1()
Benutzeravatar
__blackjack__
User
Beiträge: 13103
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

@greetings1: Geh erst einmal das offensichtlichste Problem an: Keine globalen Variablen. Sonst musst Du jemanden finden der Lust hat sich 300 Zeilen Code mit 6 eventuell noch verschachtelten Datenstrukturen und 14 Funktionen die darauf operieren oder auch nicht durchzulesen. Funktionen sollten *echte* Funktionen sein, also zumindest mal nichts ausserhalb der Funktion auf magische Weise verwenden oder gar zu ändern, so dass man die leicht einzeln testen kann.

Das nächste, was auch nett wäre vor dem nächsten mal fragen zu ändern sind die schlechten Namen. Zusätzlich zum letzten Beitrag: Grunddatentypen haben nichts in Namen zu suchen. Bei Abbildungen wie beispielsweise Wörterbüchern ist es nicht unüblich den Namen nach einem Muster wie <key>_to_<value> zu bilden, wobei <key> ein sinnvoller Name ist der für einen einzelnen Schlüssel ist und <value> ein sinnvoller Name für einen Wert. Dann weiss der Leser a) dass es eine Abbildung ist, und b) was Schlüssel und Werte bedeuten.
„All religions are the same: religion is basically guilt, with different holidays.” — Cathy Ladman
greetings1
User
Beiträge: 51
Registriert: Donnerstag 22. Oktober 2020, 18:19

__blackjack__ hat geschrieben: Donnerstag 22. Oktober 2020, 20:10 Sonst musst Du jemanden finden der Lust hat sich 300 Zeilen Code mit...
Es sind 299 Zeilen. ;)

Ich konnte es lösen, indem ich die Datenstruktur leicht verändert habe:

Code: Alles auswählen

def grep_listings():
    global systems, systems_dic, stats, stats_dic, listings, listings_dic
    for l in listings:
        myid = int(l["station_id"])
        if myid in stats_dic:
            myid2 = int(l["commodity_id"])
            if myid not in listings_dic:
                listings_dic[myid] = {myid2: [l]}
            elif myid2 not in listings_dic[myid]:
                listings_dic[myid][myid2] = [l]
            else:
                listings_dic[myid][myid2].append(l)
//...
def get_best_listing(a: dict, b: dict):
    max_pro = 0
    max_l1 = None
    max_l2 = None
    for cid_a in a:
        for c_a in a[cid_a]:
            buyp = int(c_a["buy_price"])
            if cid_a in b:
                for c_b in b[cid_a]:
                    sellp = int(c_b["sell_price"])
                    if buyp > 0 and sellp > buyp:
                        if sellp - buyp > max_pro:
                            max_pro = sellp - buyp
                            max_l1 = c_a
                            max_l2 = c_b
    if max_pro > 0:
        return (max_l1, max_l2, max_pro)
    return None


def get_best_loop(a: dict, b: dict):
    max_a = get_best_listing(a, b)
    max_b = get_best_listing(b, a)
    if max_a is not None and max_b is not None:
        return (max_a, max_b, max_a[2] + max_b[2])
    return None


def calc_1():
    max: int = 0
    max_loop: set = None
    keys: list = list(listings_dic.keys())
    for i in range(len(keys)):
        key_1: int = keys[i]
        a: dict = listings_dic[key_1]
        b: dict = stats_dic[key_1]
        c: dict = systems_dic[b["system_id"]]
        if i % 100 == 0:
            print(i)
        for j in range(i + 1, len(keys)):
            key_2: int = keys[j]
            a2: dict = listings_dic[key_2]
            b2: dict = stats_dic[key_2]
            c2: dict = systems_dic[b2["system_id"]]
            l: set = get_best_loop(a, a2)
            if l is not None and l[2] > max:
                max = l[2]
                max_loop = l
    print(max_loop)
Es ist jetzt viel schneller als vorher. :)

Fällt denn sonst noch etwas auf? Hier hab ich gelesen, dass Tuple schneller sein sollen als Lists: https://stackoverflow.com/questions/686 ... -in-python , deswegen habe ich immer Tuple als Rückgabedatentyp...
Sirius3
User
Beiträge: 17749
Registriert: Sonntag 21. Oktober 2012, 17:20

Neben dem was __blackjack__ schon geschrieben hat und was du in deiner "Lösung" nicht berücksichtigt hast, gibt es noch viele Punkte. Keine globalen Variablen und richtige Funktionen sind ja erste der Anfang, um sauberen und testbaren Code zu haben.
Du benutzt json, das sollte der Input schon die richtigen Typen haben. Und auch wenn nicht, die Umwandlung macht man gleich beim Einlesen und nicht jedesmal beim Verarbeiten. Die "Fehlerbehandlung" mit None-Werten nimmt einen Großteil des Codes ein. Exceptions sind da der bessere Weg.
greetings1
User
Beiträge: 51
Registriert: Donnerstag 22. Oktober 2020, 18:19

Ich hab noch eine Frage, ich berechne jetzt ja das Optimum oder das profitabelste Element. Jetzt möchte ich aber die 10 größten Elemente/profitabelsten Elemente berechnen. Gibt es so etwas wie eine immer sortierte Liste in Python? Wenn nicht, wie könnte man das umsetzen?

"und was du in deiner "Lösung" nicht berücksichtigt hast"
Das stimmt nicht, die Datentypen habe ich hinzugefügt. Auf globale Variablen kann ich nicht verzichten, weil das die Parameteranzahl sprengen würde...
Sirius3
User
Beiträge: 17749
Registriert: Sonntag 21. Oktober 2012, 17:20

Früher gab es mal ein Limit von 255 Parametern für Funktionen. Das ist inzwischen aufgehoben. Du hast 6 globale Variablen.
Wenn Dein Ziel ist, irgendwie Zahlen zu produzieren, ok. Wenn aber Dein Ziel ist, Python zu lernen, weil Du effektiv Probleme lösen willst, oder Dein Ziel ist, ein verlässlich richtige Zahlen auszurechnen, dann solltest Du etwas an Deiner Haltung ändern. Die Tipps, die wir hier geben, sind ja nicht zum Spaß, oder zum Zeigen, dass wir hier viel besser sind. Sie haben sich in tausenden Projekten bewährt, bzw. sind tausende Projekte schief gelaufen, weil wir uns selbst (noch) nicht daran gehalten haben.
Es ist ganz normal, dass man den Code, den man geschrieben hat, etliche Male überarbeitet oder ganz verwirft.
greetings1
User
Beiträge: 51
Registriert: Donnerstag 22. Oktober 2020, 18:19

Ich hab jetzt zwei Funktionen, um die Daten direkt beim Parsen umzuwandeln, geschrieben:

Code: Alles auswählen

def convert_row(row: list):
    for i in range(len(row)):
        if type(row[i]) is str:
            try:
                row[i] = int(row[i])
                continue
            except ValueError:
                pass
            try:
                row[i] = float(row[i])
            except ValueError:
                pass


def convert_dic(d: dict):
    for k in d:
        if type(d[k]) is str:
            try:
                d[k] = int(d[k])
                continue
            except ValueError:
                pass
            try:
                d[k] = float(d[k])
            except ValueError:
                pass
Aufrufen tue ich das dann so:
for e in a_dict:
convert_dic(e)
Dadurch entfallen alle weiteren Typ-Umwandlungen und die Berechnung ist schneller. Ist das ok so?
Benutzeravatar
__blackjack__
User
Beiträge: 13103
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

@greetings1: Die Typannotationen sind wieder überflüssig und unnötig einschränkend.

Die Namen sind schlecht gewählt.

`type()` verwendet man nicht zur Typprüfung, dafür gibt es `isinstance()`.

Die Verwendung von ``continue`` macht den Code unübersichtlich. Warum steht der damit ”übersprungene” Code nicht im ``except``-Zweig?

Die beiden Funktionen sehen nahezu identisch aus, das lässt sich ohne Codewiederholung schreiben.

Beide Funktionen nehmen keine Rücksicht auf die Bedeutung der Struktur. Es kann also insbesondere bei dem Wörterbuch passieren, das man Zeichenketten in Zahlen umwandelt die von der Bedeutung her keine Zahlen sind, nur weil sich die Zeichenkette zufällig in eine Zahl umwandeln lässt.

Wird etwas abgemildert dadurch, dass die Funktionen nicht rekursiv arbeiten. Was dann aber auch wieder ein Problem darstellen könnte, falls man das braucht.

Die Funktionen verändern Datenstrukturen wo es besser wäre eine neue Struktur mit den veränderten Werten zu erstellen. Insbesondere beim Wörterbuch wäre ich mit jetzt auch gar nicht sicher, ob das sichergestellt ist, dass das auch funktioniert. Falls es bei Dir funktioniert, würde ich das nicht als universell gegeben ansehen. Strukturen zu verändern, über die man gerade iteriert ist immer mit Vorsicht zu geniessen.

Es macht das Programm nicht wirklich verständlicher weil man die Struktur und Bedeutung der Eingabedaten entweder kennen muss, oder sich die *sehr* mühsam über den ganzen Code klar machen muss. Da sollte mehr als dieser generische Code stehen. Vielleicht auch gleich mit Klassen, damit auch die JSON-Objekte verständliche Typnamen bekommen und nicht alles eine anonyme Sosse ist. Bibliotheken wie `marshmellow` können dabei helfen.
„All religions are the same: religion is basically guilt, with different holidays.” — Cathy Ladman
Sirius3
User
Beiträge: 17749
Registriert: Sonntag 21. Oktober 2012, 17:20

Datenstrukturen ändert man nicht, sondern erzeugt neue. Über einen Index zu iterieren ist immer ein Code-Smell.

Code: Alles auswählen

def convert_to_number(text):
    try:
        return int(text)
    except ValueError:
        try:
            return float(text)
        except ValueError:
            pass
    return text

def convert_row(row):
    return [convert_to_number(element) for element in row]


def convert_dictionary(dictionary):
    return {k: convert_to_number(v)
        for k, v in dictionary.items()}
Das ist aber alles zu generisch. Wenn ich das richtig sehe, wird in `parse_listings` die Datei geladen. Da ist erstmal i:int völlig unnötig, weil erstens klar ist, dass es sich um ein int handelt, wenn man i den Wert 0 zuweist. i sollte aber auch gar nicht exisitieren, weil es nur dazu dient, die erste Zeile zu erkennen; die kann man aber einfach vor der for-Schleife behandeln. Die my-Präfixe bei myset und mylist sind überflüssig, und der Datentyp sollte nicht im Variablennamen vorkommen. Zudem ist die Typannotation wieder total unsinnig, weil Du ja myset in der selben Zeile ein Set zuweist.

Code: Alles auswählen

def parse_listings():
    with open("listings.csv") as file:
        csv_reader = csv.reader(file, delimiter=",")
        headers = next(csv_reader)
        listings = []
        for row in csv_reader:
            listings.append(dict(zip(headers, row)))
    ids_seen = set()
    result = []
    for row in listings:
        if row["id"] not in ids_seen:
            ids_seen.add(row["id"])
            result.append(row)
    return result
Die Funktion hat nun sinnvolle Variablennamen, keine unnötigen Typannotationen mehr und liefert sauber eine Liste zurück, statt eine globale Variable zu verwenden. Das was Du da aber mit dem Wörterbuch machst, macht csv.DictReader schon automatisch.

Code: Alles auswählen

    with open("listings.csv") as file:
        listings = list(csv.DictReader(file, delimiter=","))
Jetzt fehlt noch die Datenkonvertierung. Die würde ich hier wirklich explizit hinschreiben, weil so gleich die Sturktur der Daten klar wird, und das Programm mit einem Fehler abbricht, falls die Daten nicht dem gewünschten Format entsprechen:

Code: Alles auswählen

LISTING_FILENAME = "listings.csv"
def parse_listings():
    with open(LISTING_FILENAME ) as file:
        listings = list(csv.DictReader(file, delimiter=","))
    ids_seen = set()
    result = []
    for row in listings:
        if row["id"] not in ids_seen:
            ids_seen.add(row["id"])
            result.append({
                "station_id": int(row["station_id"]),
                "commodity_id": int(row["commodity_id"]),
                "buy_price": float(row["buy_price"]),
                "sell_price": float(row["sell_price"]),
            })
    return result
greetings1
User
Beiträge: 51
Registriert: Donnerstag 22. Oktober 2020, 18:19

Hier einmal das komplette Programm, mit den Verbesserungsvorschlägen (bis auf das mit den global Vars...):

Code: Alles auswählen

import json
import csv
import math
import os
import psutil
import inspect
from datetime import datetime

timestamp = datetime.timestamp(datetime.now())

systems: list = []
systems_dic: dict = {}
stats: list = []
stats_dic: dict = {}
listings: list = []
listings_dic: dict = {}


def print_mem_info():
    curframe = inspect.currentframe()
    calframe = inspect.getouterframes(curframe, 2)
    print("caller:", calframe[1][3])
    print("systems, stats, listings")
    print(type(systems), len(systems), type(systems_dic), len(systems_dic), sep=" , ")
    print(type(stats), len(stats), type(stats_dic), len(stats_dic), sep=" , ")
    print(
        type(listings), len(listings), type(listings_dic), len(listings_dic), sep=" , "
    )
    mybytes = psutil.Process(os.getpid()).memory_info()[0]
    print(mybytes / (1024.0 * 1024.0), "mb", mybytes, "b")
    print()


def convert_list(list: list):
    new_list: list = []
    for e in list:
        new_e = e
        if type(e) is str:
            try:
                new_e = int(e)
            except ValueError:
                try:
                    new_e = float(e)
                except ValueError:
                    pass
        new_list.append(new_e)
    return new_list


def parse_systems():
    global systems, systems_dic
    with open("systems_populated.json") as f:
        systems = json.load(f)
    for s in systems:
        systems_dic[s["id"]] = s
    print_mem_info()
    headers: list
    with open("systems_recently.csv") as f:
        csv_reader = csv.reader(f, delimiter=",")
        i: int = 0
        for row in csv_reader:
            if i == 0:
                headers = row
            else:
                row = convert_list(row)
                n = row[0]
                if n not in systems_dic:
                    e = dict(zip(headers, row))
                    systems.append(e)
                    systems_dic[n] = e
            i += 1
    print_mem_info()


def get_dist_to_sol(s: dict):
    x = s["x"]
    y = s["y"]
    z = s["z"]
    return math.sqrt(x * x + y * y + z * z)


def get_dist(s1: dict, s2: dict):
    x = s1["x"]
    y = s1["y"]
    z = s1["z"]
    a = s2["x"]
    b = s2["y"]
    c = s2["z"]
    dist: float = math.sqrt(
        math.pow(x - a, 2) + math.pow(y - b, 2) + math.pow(z - c, 2)
    )
    return dist


def grep_systems():
    global systems, systems_dic
    systems = list(filter(lambda s: 0 <= get_dist_to_sol(s) <= 250, systems))
    systems_dic.clear()
    for s in systems:
        systems_dic[s["id"]] = s
    print_mem_info()


def parse_stats():
    global stats, stats_dic
    with open("stations.json") as f:
        stats = json.load(f)
    print_mem_info()
    stats = list(filter(lambda sta: sta["system_id"] in systems_dic, stats))
    stats_dic.clear()
    for s in stats:
        stats_dic[s["id"]] = s
    print_mem_info()


def grep_stats():
    global systems, systems_dic, stats, stats_dic
    stats = list(
        filter(
            lambda sta: sta["max_landing_pad_size"] == "L"
            and sta["distance_to_star"] is not None
            and 0 < sta["distance_to_star"] <= 5000,
            stats,
        )
    )
    stats_dic.clear()
    for s in stats:
        stats_dic[s["id"]] = s
    myset: set = set()
    systems.clear()
    for s in stats:
        myid = s["system_id"]
        if myid not in myset:
            myset.add(myid)
            systems.append(systems_dic[myid])
    systems_dic.clear()
    for s in systems:
        systems_dic[s["id"]] = s
    print_mem_info()


def get_hours(l: dict):
    return (timestamp - l["collected_at"]) / (60.0 * 60.0)


def parse_listings():
    global listings
    headers: list
    with open("listings.csv") as f:
        csv_reader = csv.reader(f, delimiter=",")
        i: int = 0
        for row in csv_reader:
            if i == 0:
                headers = row
            else:
                row = convert_list(row)
                e = dict(zip(headers, row))
                if get_hours(e) <= 48:
                    listings.append(e)
            i += 1
    print_mem_info()


def grep_listings():
    global systems, systems_dic, stats, stats_dic, listings, listings_dic
    for l in listings:
        myid = l["station_id"]
        if myid in stats_dic:
            myid2 = l["commodity_id"]
            if myid not in listings_dic:
                listings_dic[myid] = {myid2: [l]}
            elif myid2 not in listings_dic[myid]:
                listings_dic[myid][myid2] = [l]
            else:
                listings_dic[myid][myid2].append(l)
    print_mem_info()
    listings.clear()
    print_mem_info()
    stats = list(
        filter(
            lambda sta: sta["id"] in listings_dic,
            stats,
        )
    )
    stats_dic.clear()
    for s in stats:
        stats_dic[s["id"]] = s
    myset: set = set()
    systems.clear()
    for s in stats:
        myid = s["system_id"]
        if myid not in myset:
            myset.add(myid)
            systems.append(systems_dic[myid])
    systems_dic.clear()
    for s in systems:
        systems_dic[s["id"]] = s
    print_mem_info()


def get_best_listing(a: dict, b: dict):
    max_pro = 0
    max_l1 = None
    max_l2 = None
    for cid_a in a:
        for c_a in a[cid_a]:
            buyp = c_a["buy_price"]
            if cid_a in b:
                for c_b in b[cid_a]:
                    sellp = c_b["sell_price"]
                    if buyp > 0 and sellp > buyp:
                        if sellp - buyp > max_pro:
                            max_pro = sellp - buyp
                            max_l1 = c_a
                            max_l2 = c_b
    if max_pro > 0:
        return (max_l1, max_l2, max_pro)
    return None


def get_best_loop(a: dict, b: dict):
    max_a = get_best_listing(a, b)
    max_b = get_best_listing(b, a)
    if max_a is not None and max_b is not None:
        return (max_a, max_b, max_a[2] + max_b[2])
    return None


def calc_1():
    max: int = 0
    max_loop: set = None
    keys: list = list(listings_dic.keys())
    for i in range(len(keys)):
        key_1: int = keys[i]
        a: dict = listings_dic[key_1]
        b: dict = stats_dic[key_1]
        c: dict = systems_dic[b["system_id"]]
        if i % 100 == 0:
            print(i)
        for j in range(i + 1, len(keys)):
            key_2: int = keys[j]
            a2: dict = listings_dic[key_2]
            b2: dict = stats_dic[key_2]
            c2: dict = systems_dic[b2["system_id"]]
            if get_dist(c, c2) < 50:
                l: set = get_best_loop(a, a2)
                if l is not None and l[2] > max:
                    max = l[2]
                    max_loop = l
    print(max_loop)
    l1 = max_loop[0][0]
    l2 = max_loop[0][1]
    # l3 = max_loop[1][0]
    # l4 = max_loop[1][1]
    sta1 = stats_dic[l1["station_id"]]
    sta2 = stats_dic[l2["station_id"]]
    sys1 = systems_dic[sta1["system_id"]]
    sys2 = systems_dic[sta2["system_id"]]
    print(sys1["name"])
    print(sta1["name"])
    print(sys2["name"])
    print(sta2["name"])
    print(get_dist(sys1, sys2))


parse_systems()
grep_systems()
parse_stats()
grep_stats()
parse_listings()
grep_listings()
calc_1()
Ihr könnt das testen, wenn ihr die entsprechenden Daten einmal von https://eddb.io/ herunterladet (unter Other > API) (ca. 500mb). Bei mir wird das zwar das richtige Ergebnis ausgegeben, aber ich bin mir sicher, dass noch einiges verbessert werden könnte.
Benutzeravatar
__blackjack__
User
Beiträge: 13103
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

(Diese Antwort bezieht sich noch auf den Stand vom letzten Beitrag von Sirius3)

Wobei hier jetzt beim Dateiöffnen noch eine Kodierung und das `newline`-Argument fehlt das man für das `csv`-Modul braucht.

Man muss auch nicht erst alles in eine Liste einlesen sondern kann über das `DictReader`-Objekt iterieren.

Code: Alles auswählen

def parse_listings():
    with open(LISTING_FILENAME, encoding="utf-8", newline="") as file:
        ids_seen = set()
        result = []
        for row in csv.DictReader(file, delimiter=","):
            row_id = int(row["id"])
            if row_id not in ids_seen:
                ids_seen.add(row_id)
                result.append(
                    {
                        "commodity_id": int(row["commodity_id"]),
                        "station_id": int(row["station_id"]),
                        "buy_price": float(row["buy_price"]),
                        "sell_price": float(row["sell_price"]),
                    }
                )
        return result
Dieses rausfiltern von Doubletten ist eine Standardaufgabe, das würde ich mir aus `more_itertools` holen (`unique_everseen()`). Womit die Schleife dann so simpel wird, dass man da eine „list comprehension“ draus machen kann:

Code: Alles auswählen

def parse_listings():
    with open(LISTING_FILENAME, encoding="utf-8", newline="") as file:
        return [
            {
                "station_id": int(row["station_id"]),
                "commodity_id": int(row["commodity_id"]),
                "buy_price": float(row["buy_price"]),
                "sell_price": float(row["sell_price"]),
            }
            for row in unique_everseen(
                csv.DictReader(file, delimiter=","), lambda row: int(row["id"])
            )
        ]
Wörterbücher die immer einen festen Satz an Schlüsseln haben sind eigentlich keine Wörterbücher sondern ein Fall für eine Klasse:

Code: Alles auswählen

#!/usr/bin/env python3
import csv

from attr import attrs, attrib
from more_itertools import unique_everseen

LISTING_FILENAME = "listings.csv"

...


@attrs(frozen=True)
class Commodity:
    commodity_id = attrib()
    station_id = attrib()
    buy_price = attrib()
    sell_price = attrib()

    @classmethod
    def from_row(cls, row):
        return cls(
            int(row["commodity_id"]),
            int(row["station_id"]),
            float(row["buy_price"]),
            float(row["sell_price"]),
        )


def parse_listings():
    with open(LISTING_FILENAME, encoding="utf-8", newline="") as file:
        return list(
            map(
                Commodity.from_row,
                unique_everseen(
                    csv.DictReader(file, delimiter=","),
                    lambda row: int(row["id"]),
                ),
            )
        )
Eventuell möchte man noch anpassen welche Attribute beim Vergleichen und Hashen so eines `Commodity`-Objekts herangezogen werden beziehungsweise welche nicht.
„All religions are the same: religion is basically guilt, with different holidays.” — Cathy Ladman
greetings1
User
Beiträge: 51
Registriert: Donnerstag 22. Oktober 2020, 18:19

@__blackjack__ das Problem ist, dass ich nicht weiß, welche Daten/Attribute ich später im Programm noch alle brauchen werde und welche nicht... Kla, momentan ist es noch überschaubar, nur ca. 4 Werte braucht das Programm, ich möchte die filter aber später noch anpassen.
Sirius3
User
Beiträge: 17749
Registriert: Sonntag 21. Oktober 2012, 17:20

Dann pass sie doch später an. Niemand verbietet es Dir, eine Funktion später nochmal anzupassen.
greetings1
User
Beiträge: 51
Registriert: Donnerstag 22. Oktober 2020, 18:19

Man soll doch vorausschauend programmieren. ;) (bzw. modular...)
__deets__
User
Beiträge: 14539
Registriert: Mittwoch 14. Oktober 2015, 14:29

Und meistens wenn man das macht endet man mit YAGNI, weil man gar nicht wirklich verstanden hat, wie das Problem später genau ausschaut. Auf Verdacht programmieren sollte man also sein lassen.
Benutzeravatar
__blackjack__
User
Beiträge: 13103
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

Vorausschauend programmieren heisst verständlich programmieren, weil man ja voraussieht, dass so ein Programm in der Regel nie fertig ist, weil man es anpassen muss. Weil man selbst etwas anders machen will, oder weil sich beispielsweise die Struktur der Eingabedaten geändert hat. Ist bei den Daten ja offenbar vor nicht allzu langer Zeit passiert, weil man da auch Daten in einem älteren Format herunterladen kann.

Globale Variablen sind dabei dann beispielsweise nicht vorausschauend, weil die das Verständnis und die Wart- und Testbarkeit erschweren. Unit-Tests währen etwas was Änderungen und Refaktorisieren leichter/sicherer machen können.

Vorausschauend wäre es das so zu programmieren, dass man am Code ablesen kann mit was da überhaupt operiert wird. Also nicht Unmengen an anonymen Wörterbüchern sondern problemspezifische Klassen mit sinnvoll benannten Attributen und eventuell auch Verhalten, also Methoden die auf diesen Typen dann Sinn machen.

Einige von den grossen JSON-Dateien gibt's dort auch als JSON-Lines-Format. Würde sich vielleicht anbieten um weniger Speicher zu verbrauchen, weil man die nicht komplett auf einmal in den Speicher laden muss.

Bei den Datenmengen könnte auch eine Datenbank eventuell Sinn machen. Am besten eine mit Erweiterungen um auf räumliche Nähe effizient abfragen zu können.
„All religions are the same: religion is basically guilt, with different holidays.” — Cathy Ladman
greetings1
User
Beiträge: 51
Registriert: Donnerstag 22. Oktober 2020, 18:19

greetings1 hat geschrieben: Freitag 23. Oktober 2020, 06:38 Ich hab noch eine Frage, ich berechne jetzt ja das Optimum oder das profitabelste Element. Jetzt möchte ich aber die 10 größten Elemente/profitabelsten Elemente berechnen. Gibt es so etwas wie eine immer sortierte Liste in Python? Wenn nicht, wie könnte man das umsetzen?
Ich hab das jetzt folgendermaßen gelöst, ich sammle einfach alle Elemente größer eines Thresholds in einer Liste und sortiere dann:

Code: Alles auswählen

def calc_1():
    loops: list = []
    keys: list = list(listings_dic.keys())
    for i in range(len(keys)):
        key_1: int = keys[i]
        a: dict = listings_dic[key_1]
        b: dict = stats_dic[key_1]
        c: dict = systems_dic[b["system_id"]]
        if i % 100 == 0:
            print(i)
        for j in range(i + 1, len(keys)):
            key_2: int = keys[j]
            a2: dict = listings_dic[key_2]
            b2: dict = stats_dic[key_2]
            c2: dict = systems_dic[b2["system_id"]]
            if get_dist(c, c2) <= 20:
                l: set = get_best_loop(a, a2)
                if l is not None and l[2] > 10000:
                    loops.append(l)
    loops = sorted(loops, key=lambda l: l[2], reverse=True)
    print(len(loops))
    for i in range(3):
        print_loop(loops[i])


parse_systems()
parse_stats()
parse_listings()
parse_coms()
calc_1()
Antworten