Seite 1 von 1

String mit mehren Jsons splitten oder einlesenen

Verfasst: Mittwoch 5. Januar 2022, 14:34
von nieselfriem
Hallo,

bei einer Augabe von Messages mit kfkacat bekomme ich folgende Ausgabe

Code: Alles auswählen

{
    "topic": "test_topic",
    "partition": 49,
    "offset": 0,
    "tstype": "create",
    "ts": 1640007201426,
    "key": null,
    "payload": "1:FOO"
} {
    "topic": "test_topic",
    "partition": 57,
    "offset": 0,
    "tstype": "create",
    "ts": 1640007201426,
    "key": null,
    "payload": "2:BAR"
} {
    "topic": "test_topic",
    "partition": 105,
    "offset": 0,
    "tstype": "create",
    "ts": 1640005191767,
    "key": null,
    "payload": "2:BAR"
} 
Für sich genommen ist jedes ein gültiges JSON. Diese würde ich gerne Splitten oder einzeln einlesen wollen. Der ganze String ist aber kein gültiges JSON. Wie kann ich jedes JSON sinnvoll aus dem String splitten. Mit

Code: Alles auswählen

jsonString.split({\n"topic')
wird das wohl nichts.

VG niesel

Re: String mit mehren Jsons splitten oder einlesenen

Verfasst: Mittwoch 5. Januar 2022, 14:45
von __deets__
Wenn das genau so aussieht, dann splitte doch an

Re: String mit mehren Jsons splitten oder einlesenen

Verfasst: Mittwoch 5. Januar 2022, 14:55
von Sirius3
@nieselfriem: ist das wirklich die Ausgabe von kfkacat? Das gibt normalerweise Json lines aus, es ist also simple, die Ausgabe Zeilenweise zu lesen und zu interpretieren.

Re: String mit mehren Jsons splitten oder einlesenen

Verfasst: Mittwoch 5. Januar 2022, 15:00
von nieselfriem
hat aber den nachteil, dass aus

Code: Alles auswählen

{"topic":"hlsz__all","partition":57,"offset":0,"tstype":"create","ts":1640007201426,"key":null,"payload":"2:BAR"}

Code: Alles auswählen

"topic":"hlsz__all","partition":57,"offset":0,"tstype":"create","ts":1640007201426,"key":null,"payload":"2:BAR"
wird

Re: String mit mehren Jsons splitten oder einlesenen

Verfasst: Mittwoch 5. Januar 2022, 15:30
von Sirius3
Wie kommst Du darauf?

Code: Alles auswählen

In [3]: json.loads('{"topic":"hlsz__all","partition":57,"offset":0,"tstype":"create","ts":1640007201426,"key":null,"payload":"2:BAR"}')
Out[3]:
{'topic': 'hlsz__all',
 'partition': 57,
 'offset': 0,
 'tstype': 'create',
 'ts': 1640007201426,
 'key': None,
 'payload': '2:BAR'}

Re: String mit mehren Jsons splitten oder einlesenen

Verfasst: Mittwoch 5. Januar 2022, 15:32
von nieselfriem
ja, es ist die Ausgabe aller Jsons über alle Partitionen einer Topic.

Re: String mit mehren Jsons splitten oder einlesenen

Verfasst: Mittwoch 5. Januar 2022, 15:42
von nieselfriem
also ich habe folgenden json-String:

Code: Alles auswählen

message="""{"topic":"mytopic","partition":49,"offset":0,"tstype":"create","ts":1640007201426,"key":null,"payload":"1:FOO"} {"topic":"mytopic","partition":57,"offset":0,"tstype":"create","ts":1640007201426,"key":null,"payload":"2:BAR"} {"topic":"mytopic","partition":105,"offset":0,"tstype":"create","ts":1640005191767,"key":null,"payload":"2:BAR"} {"topic":"mytopic","partition":43,"offset":2284,"tstype":"create","ts":1640043698491,"key":"43$de..test.core.transfer.testz.misc.testzLaneState$NEW$6be0150b745946e195e2fe783e5b6a38$0","payload":"{}"} {"topic":"mytopic","partition":76,"offset":0,"tstype":"create","ts":1640005191767,"key":null,"payload":"1:FOO"}"""

liste=message_neu.split("} {")
>>> liste
['{"topic":"mytopic","partition":49,"offset":0,"tstype":"create","ts":1640007201426,"key":null,"payload":"1:FOO"', '"topic":"mytopic","partition":57,"offset":0,"tstype":"create","ts":1640007201426,"key":null,"payload":"2:BAR"', '"topic":"mytopic","partition":105,"offset":0,"tstype":"create","ts":1640005191767,"key":null,"payload":"2:BAR"', '"topic":"mytopic","partition":43,"offset":2284,"tstype":"create","ts":1640043698491,"key":"43$de.test.core.transfer.testz.misc.testzLaneState$NEW$6be0150b745946e195e2fe783e5b6a38$0","payload":"{}"', '"topic":"mytopic","partition":76,"offset":0,"tstype":"create","ts":1640005191767,"key":null,"payload":"1:FOO"}']
>>> liste[0]
'{"topic":"mytopic","partition":49,"offset":0,"tstype":"create","ts":1640007201426,"key":null,"payload":"1:FOO"'
>>> liste[1]
'"topic":"mytopic","partition":57,"offset":0,"tstype":"create","ts":1640007201426,"key":null,"payload":"2:BAR"'
split() entfernt doch den delimeter, den ich angebe und nimmt ihn ja nicht mit in das Listenelement

VG niesel

Re: String mit mehren Jsons splitten oder einlesenen

Verfasst: Mittwoch 5. Januar 2022, 16:21
von Sirius3
Aber woher kommt denn message? Warum sind keine newline-Zeichen zwischen den einzelnen Json-Blöcken? Wer hat die entfernt?

Re: String mit mehren Jsons splitten oder einlesenen

Verfasst: Mittwoch 5. Januar 2022, 16:51
von kbr
nieselfriem hat geschrieben: Mittwoch 5. Januar 2022, 15:42 split() entfernt doch den delimeter, den ich angebe und nimmt ihn ja nicht mit in das Listenelement
Das ist kein Problem:

Code: Alles auswählen

import json

kfkacat_out = """\
{
    "topic": "test_topic",
    "partition": 49,
    "offset": 0,
    "tstype": "create",
    "ts": 1640007201426,
    "key": null,
    "payload": "1:FOO"
} {
    "topic": "test_topic",
    "partition": 57,
    "offset": 0,
    "tstype": "create",
    "ts": 1640007201426,
    "key": null,
    "payload": "2:BAR"
} {
    "topic": "test_topic",
    "partition": 105,
    "offset": 0,
    "tstype": "create",
    "ts": 1640005191767,
    "key": null,
    "payload": "2:BAR"
}"""

items = kfkacat_out[1:-1].split("} {")
for item in items:
    data = json.loads(f"{{{item}}}")
    print(data)
oder

Code: Alles auswählen

source = f'[{kfkacat_out.replace("} {", "},{")}]'
for item in json.loads(source):
    print(item)

Wer aber hat dieses Format so komisch konstruiert?

Re: String mit mehren Jsons splitten oder einlesenen

Verfasst: Mittwoch 5. Januar 2022, 16:54
von __deets__
@nieselfriem: das du danach die entfernten geschweiften Klammern wieder anbringen musst, hatte ich als selbstverständlich vorausgesetzt.

Re: String mit mehren Jsons splitten oder einlesenen

Verfasst: Mittwoch 5. Januar 2022, 17:24
von nieselfriem
Sirius3 hat geschrieben: Mittwoch 5. Januar 2022, 16:21 Aber woher kommt denn message? Warum sind keine newline-Zeichen zwischen den einzelnen Json-Blöcken? Wer hat die entfernt?
Ahhhh.... Ich habe den Fehler gemacht und meine Ausgabe mit Copy/Paste in einen string auf der python-Console zu pusten.
Jetzt habe ich einfach mal das script als prototyp geschrieben und ausgeführt. Nun läuft es wie ich es ertwarte

Code: Alles auswählen

import subprocess
import json
from datetime import datetime
def run_command(command):
    out = subprocess.Popen(
        [command], stdout=subprocess.PIPE, shell=True)
    return out.communicate()[0].rstrip("\n\r")

def get_last_message():
    docker_id_kafka = run_command(
        'docker ps | grep kafcat_kafcat | head -n 1 | cut -d" " -f1')
    print(docker_id_kafka)
    kafka_result_string = run_command(
        'docker exec -i {0} /bin/bash -c "kafkacat -C -b kafka:9092 -t mytopic -o -1 -e  -J"'.format(docker_id_kafka))
    print(kafka_result_string)
    liste=kafka_result_string.split("\n")
    print("listprint")
    for item in liste:
        json_data=json.loads(item)
        timestamp=json_data.get("ts")
        print(timestamp)
        dt_object = datetime.fromtimestamp(timestamp/1000.0)
        print(dt_object)

def main():
     get_last_message()

if __name__ == "__main__":
    main()
Danke für den Hinweis

Re: String mit mehren Jsons splitten oder einlesenen

Verfasst: Mittwoch 5. Januar 2022, 18:23
von Sirius3
Und wie immer gilt, man benutzt kein shell=True! Der zusätzliche bash-Aufruf im Container ist auch überflüssig.
Für einfache Operationen benutzt man auch keine Shellskripte, denn das kann Python genausogut.
In diesem Fall ist es aber noch einfacher, da docker-ps schon die passenden Optionen hat.

Code: Alles auswählen

import subprocess
import json
from datetime import datetime

def main():
    result = subprocess.run(
        ['docker', 'ps', '-q', '-f', 'name=kafcat_kafcat'],
        stdout=subprocess.PIPE, check=True)
    docker_id_kafka = result.stdout.strip()
    print(docker_id_kafka)

    process = subprocess.Popen(['docker', 'exec', '-i', docker_id_kafka, '/usr/bin/env',
        'kafkacat', '-C', '-b', 'kafka:9092', '-t', 'mytopic', '-o', '-1', '-e', '-J'],
        stdout=subprocess.PIPE)
    for line in prcoess.stdout:
        data = json.loads(line)
        timestamp = datetime.fromtimestamp(data["ts"] / 1000.0)
        print(timestamp)
    process.wait()

if __name__ == "__main__":
    main()