String mit mehren Jsons splitten oder einlesenen

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
Antworten
Benutzeravatar
nieselfriem
User
Beiträge: 135
Registriert: Sonntag 13. Januar 2013, 16:00

Hallo,

bei einer Augabe von Messages mit kfkacat bekomme ich folgende Ausgabe

Code: Alles auswählen

{
    "topic": "test_topic",
    "partition": 49,
    "offset": 0,
    "tstype": "create",
    "ts": 1640007201426,
    "key": null,
    "payload": "1:FOO"
} {
    "topic": "test_topic",
    "partition": 57,
    "offset": 0,
    "tstype": "create",
    "ts": 1640007201426,
    "key": null,
    "payload": "2:BAR"
} {
    "topic": "test_topic",
    "partition": 105,
    "offset": 0,
    "tstype": "create",
    "ts": 1640005191767,
    "key": null,
    "payload": "2:BAR"
} 
Für sich genommen ist jedes ein gültiges JSON. Diese würde ich gerne Splitten oder einzeln einlesen wollen. Der ganze String ist aber kein gültiges JSON. Wie kann ich jedes JSON sinnvoll aus dem String splitten. Mit

Code: Alles auswählen

jsonString.split({\n"topic')
wird das wohl nichts.

VG niesel
__deets__
User
Beiträge: 14545
Registriert: Mittwoch 14. Oktober 2015, 14:29

Wenn das genau so aussieht, dann splitte doch an
Sirius3
User
Beiträge: 18279
Registriert: Sonntag 21. Oktober 2012, 17:20

@nieselfriem: ist das wirklich die Ausgabe von kfkacat? Das gibt normalerweise Json lines aus, es ist also simple, die Ausgabe Zeilenweise zu lesen und zu interpretieren.
Benutzeravatar
nieselfriem
User
Beiträge: 135
Registriert: Sonntag 13. Januar 2013, 16:00

hat aber den nachteil, dass aus

Code: Alles auswählen

{"topic":"hlsz__all","partition":57,"offset":0,"tstype":"create","ts":1640007201426,"key":null,"payload":"2:BAR"}

Code: Alles auswählen

"topic":"hlsz__all","partition":57,"offset":0,"tstype":"create","ts":1640007201426,"key":null,"payload":"2:BAR"
wird
Sirius3
User
Beiträge: 18279
Registriert: Sonntag 21. Oktober 2012, 17:20

Wie kommst Du darauf?

Code: Alles auswählen

In [3]: json.loads('{"topic":"hlsz__all","partition":57,"offset":0,"tstype":"create","ts":1640007201426,"key":null,"payload":"2:BAR"}')
Out[3]:
{'topic': 'hlsz__all',
 'partition': 57,
 'offset': 0,
 'tstype': 'create',
 'ts': 1640007201426,
 'key': None,
 'payload': '2:BAR'}
Benutzeravatar
nieselfriem
User
Beiträge: 135
Registriert: Sonntag 13. Januar 2013, 16:00

ja, es ist die Ausgabe aller Jsons über alle Partitionen einer Topic.
Benutzeravatar
nieselfriem
User
Beiträge: 135
Registriert: Sonntag 13. Januar 2013, 16:00

also ich habe folgenden json-String:

Code: Alles auswählen

message="""{"topic":"mytopic","partition":49,"offset":0,"tstype":"create","ts":1640007201426,"key":null,"payload":"1:FOO"} {"topic":"mytopic","partition":57,"offset":0,"tstype":"create","ts":1640007201426,"key":null,"payload":"2:BAR"} {"topic":"mytopic","partition":105,"offset":0,"tstype":"create","ts":1640005191767,"key":null,"payload":"2:BAR"} {"topic":"mytopic","partition":43,"offset":2284,"tstype":"create","ts":1640043698491,"key":"43$de..test.core.transfer.testz.misc.testzLaneState$NEW$6be0150b745946e195e2fe783e5b6a38$0","payload":"{}"} {"topic":"mytopic","partition":76,"offset":0,"tstype":"create","ts":1640005191767,"key":null,"payload":"1:FOO"}"""

liste=message_neu.split("} {")
>>> liste
['{"topic":"mytopic","partition":49,"offset":0,"tstype":"create","ts":1640007201426,"key":null,"payload":"1:FOO"', '"topic":"mytopic","partition":57,"offset":0,"tstype":"create","ts":1640007201426,"key":null,"payload":"2:BAR"', '"topic":"mytopic","partition":105,"offset":0,"tstype":"create","ts":1640005191767,"key":null,"payload":"2:BAR"', '"topic":"mytopic","partition":43,"offset":2284,"tstype":"create","ts":1640043698491,"key":"43$de.test.core.transfer.testz.misc.testzLaneState$NEW$6be0150b745946e195e2fe783e5b6a38$0","payload":"{}"', '"topic":"mytopic","partition":76,"offset":0,"tstype":"create","ts":1640005191767,"key":null,"payload":"1:FOO"}']
>>> liste[0]
'{"topic":"mytopic","partition":49,"offset":0,"tstype":"create","ts":1640007201426,"key":null,"payload":"1:FOO"'
>>> liste[1]
'"topic":"mytopic","partition":57,"offset":0,"tstype":"create","ts":1640007201426,"key":null,"payload":"2:BAR"'
split() entfernt doch den delimeter, den ich angebe und nimmt ihn ja nicht mit in das Listenelement

VG niesel
Sirius3
User
Beiträge: 18279
Registriert: Sonntag 21. Oktober 2012, 17:20

Aber woher kommt denn message? Warum sind keine newline-Zeichen zwischen den einzelnen Json-Blöcken? Wer hat die entfernt?
Benutzeravatar
kbr
User
Beiträge: 1508
Registriert: Mittwoch 15. Oktober 2008, 09:27

nieselfriem hat geschrieben: Mittwoch 5. Januar 2022, 15:42 split() entfernt doch den delimeter, den ich angebe und nimmt ihn ja nicht mit in das Listenelement
Das ist kein Problem:

Code: Alles auswählen

import json

kfkacat_out = """\
{
    "topic": "test_topic",
    "partition": 49,
    "offset": 0,
    "tstype": "create",
    "ts": 1640007201426,
    "key": null,
    "payload": "1:FOO"
} {
    "topic": "test_topic",
    "partition": 57,
    "offset": 0,
    "tstype": "create",
    "ts": 1640007201426,
    "key": null,
    "payload": "2:BAR"
} {
    "topic": "test_topic",
    "partition": 105,
    "offset": 0,
    "tstype": "create",
    "ts": 1640005191767,
    "key": null,
    "payload": "2:BAR"
}"""

items = kfkacat_out[1:-1].split("} {")
for item in items:
    data = json.loads(f"{{{item}}}")
    print(data)
oder

Code: Alles auswählen

source = f'[{kfkacat_out.replace("} {", "},{")}]'
for item in json.loads(source):
    print(item)

Wer aber hat dieses Format so komisch konstruiert?
__deets__
User
Beiträge: 14545
Registriert: Mittwoch 14. Oktober 2015, 14:29

@nieselfriem: das du danach die entfernten geschweiften Klammern wieder anbringen musst, hatte ich als selbstverständlich vorausgesetzt.
Benutzeravatar
nieselfriem
User
Beiträge: 135
Registriert: Sonntag 13. Januar 2013, 16:00

Sirius3 hat geschrieben: Mittwoch 5. Januar 2022, 16:21 Aber woher kommt denn message? Warum sind keine newline-Zeichen zwischen den einzelnen Json-Blöcken? Wer hat die entfernt?
Ahhhh.... Ich habe den Fehler gemacht und meine Ausgabe mit Copy/Paste in einen string auf der python-Console zu pusten.
Jetzt habe ich einfach mal das script als prototyp geschrieben und ausgeführt. Nun läuft es wie ich es ertwarte

Code: Alles auswählen

import subprocess
import json
from datetime import datetime
def run_command(command):
    out = subprocess.Popen(
        [command], stdout=subprocess.PIPE, shell=True)
    return out.communicate()[0].rstrip("\n\r")

def get_last_message():
    docker_id_kafka = run_command(
        'docker ps | grep kafcat_kafcat | head -n 1 | cut -d" " -f1')
    print(docker_id_kafka)
    kafka_result_string = run_command(
        'docker exec -i {0} /bin/bash -c "kafkacat -C -b kafka:9092 -t mytopic -o -1 -e  -J"'.format(docker_id_kafka))
    print(kafka_result_string)
    liste=kafka_result_string.split("\n")
    print("listprint")
    for item in liste:
        json_data=json.loads(item)
        timestamp=json_data.get("ts")
        print(timestamp)
        dt_object = datetime.fromtimestamp(timestamp/1000.0)
        print(dt_object)

def main():
     get_last_message()

if __name__ == "__main__":
    main()
Danke für den Hinweis
Sirius3
User
Beiträge: 18279
Registriert: Sonntag 21. Oktober 2012, 17:20

Und wie immer gilt, man benutzt kein shell=True! Der zusätzliche bash-Aufruf im Container ist auch überflüssig.
Für einfache Operationen benutzt man auch keine Shellskripte, denn das kann Python genausogut.
In diesem Fall ist es aber noch einfacher, da docker-ps schon die passenden Optionen hat.

Code: Alles auswählen

import subprocess
import json
from datetime import datetime

def main():
    result = subprocess.run(
        ['docker', 'ps', '-q', '-f', 'name=kafcat_kafcat'],
        stdout=subprocess.PIPE, check=True)
    docker_id_kafka = result.stdout.strip()
    print(docker_id_kafka)

    process = subprocess.Popen(['docker', 'exec', '-i', docker_id_kafka, '/usr/bin/env',
        'kafkacat', '-C', '-b', 'kafka:9092', '-t', 'mytopic', '-o', '-1', '-e', '-J'],
        stdout=subprocess.PIPE)
    for line in prcoess.stdout:
        data = json.loads(line)
        timestamp = datetime.fromtimestamp(data["ts"] / 1000.0)
        print(timestamp)
    process.wait()

if __name__ == "__main__":
    main()
Antworten