Einlesen von IMC Famos (.dat, .raw) daten
Ahhh, verstanden:
Eine Frage habe ich da aber noch, weil insbesondere bei großen Dateien, das Einlesen recht lange dauert.
Du hast geschrieben, dass man itter_blocks deutlich beschleunigen könnte, indem man das kopieren vermeidet. Ich verstehe leider nicht ganz was du damit meinst.
Meinst du damit diesen Schritt:
Zur Übersicht hier nochmal der ganze code:
Code: Alles auswählen
def load_channels(path):
DATA_TYPES = {
"1": 'ubyte',
"2": 'byte',
"3": 'ushort',
"4": 'short',
"5": 'ulong',
"6": 'long',
"7": 'float',
"8": 'double',
"11": 'bit16',
"12": 'bit48',
}
with open(path, "rb") as binary_file:
# Read the whole file at once
data = binary_file.read()
channels = []
channel = None
for typ, num, block in iter_blocks(data):
if typ == b'CD':
# first element a channel definition
channel = {}
channels.append(channel)
data = block.split(b',')
channel['delta_x'] = float(data[0])
elif typ == b'CP':
data = block.split(b',')
channel['decode'] = data[2].decode("windows-1252")
channel['Byte per value']= data[1].decode("windows-1252")
elif typ == b'CR':
key = 'unit_y' if not block.endswith(b',s') else 'unit_x'
data = block.split(b',', 6)
channel[key] = data[5].decode("windows-1252")
key+="_scale-factor"
channel[key] = float(data[1].decode("windows-1252"))
elif typ == b'CN':
data = block.split(b',')
channel['name'] = data[4].decode("windows-1252")
print("Load Channel: ", channel['name'])
elif typ == b'NT':
data = block.split(b',')
day, month, year, hour, minute, second = (d.decode("windows-1252").replace(" ", "0") for d in data)
channel['trigger_time'] = f"{year}-{month}-{day} {hour}:{minute}:{second}"
elif typ == b'Cb':
data = block.split(b',')
channel['buffer-size'] = int(data[5].decode("windows-1252"))
channel['buffer-start'] = int(data[4].decode("windows-1252"))
elif typ == b'CS':
_, data = block.split(b',', 1)
break
for channel in channels:
try:
dtype = DATA_TYPES[channel['decode']]
except KeyError:
raise ValueError("Unknown binary format")
offset = channel['buffer-start']
count = channel['buffer-size'] // numpy.dtype(dtype).itemsize
if 'unit_x' in channel:
channel['value_x'] = numpy.frombuffer(data, dtype=dtype, count=count, offset=offset)
channel['value_y'] = numpy.frombuffer(data, dtype=dtype, count=count, offset=offset-channel['buffer-size'])
else:
if abs(channel['unit_y_scale-factor']) > 0:
channel['value_y'] = numpy.frombuffer(data, dtype=dtype, count=count, offset=offset) * channel['unit_y_scale-factor']
else:
channel['value_y'] = numpy.frombuffer(data, dtype=dtype, count=count, offset=offset-channel['buffer-size'])
return channels
Du hast geschrieben, dass man itter_blocks deutlich beschleunigen könnte, indem man das kopieren vermeidet. Ich verstehe leider nicht ganz was du damit meinst.
Meinst du damit diesen Schritt:
Code: Alles auswählen
data = data[end+length+1:]
Code: Alles auswählen
def iter_blocks(data):
while data:
entry = re.search(rb'^\s*\|(\S\S)\s*,\s*\s*\s*\s*\s*(\d+)\s*,\s*\s*\s*\s*(\d+)\s*,', data)
if entry is None:
raise ValueError("Block expected")
typ, num, length = entry.groups()
length = int(length)
end = entry.end()
block = data[end:end+length]
if data[end+length] != 59: # ;
raise ValueError(f"';' expected, found {data[end+length]}")
data = data[end+length+1:]
yield typ, num, block
@Squipy: Konstanten definiert man am besten außerhalb von Funktionen.
Jetzt hast Du wenn x-Werte existieren etwas anderes programmiert wie im anderen Fall. Zudem hast Du Code gedoppelt, was man möglichst vermeiden sollte.
Das mit dem Offset sieht komisch aus, aber wenn's so sein muß ...
Ja, das meine ich mit Kopieren.
Jetzt hast Du wenn x-Werte existieren etwas anderes programmiert wie im anderen Fall. Zudem hast Du Code gedoppelt, was man möglichst vermeiden sollte.
Das mit dem Offset sieht komisch aus, aber wenn's so sein muß ...
Code: Alles auswählen
if 'unit_x' in channel:
scale = channel['unit_x_scale-factor'] or 1.0
channel['value_x'] = numpy.frombuffer(data, dtype=dtype, count=count, offset=offset) * scale
offset -= channel['buffer-size']
scale = channel['unit_y_scale-factor'] or 1.0
channel['value_y'] = numpy.frombuffer(data, dtype=dtype, count=count, offset=offset) * scale
- __blackjack__
- User
- Beiträge: 13919
- Registriert: Samstag 2. Juni 2018, 10:21
- Wohnort: 127.0.0.1
- Kontaktdaten:
Ich würde bei so etwas ja erst einmal nachmessen wo die Zeit denn bleibt, nicht das man am Ende an einer Stelle herum optimiert die am Ende gar nicht wirklich viel zum Ergebnis beiträgt.
In `iter_blocks()` ist die vorletzte Zeile nicht so gut. Wenn man den regulären Ausdruck mit `re.compile()` in ein Objekt übersetzt, bekommt man die Suchmethoden mit einem weiteren Argument das den Versatz angibt, ab dem in `data` gesucht werden soll. Dann braucht man `data` nicht ständig verkleinern, wobei Daten im Speicher herum kopiert werden müssen weil `bytes`-Objekte nicht veränderbar sind.
Statt `search()` würde ich `match()` nehmen, dann kann man sich den ^-Anker am Anfang sparen. Und der Ausdruck ist auch etwas komisch. Weisst Du denn was "\s*" bedeutet? Was denkst Du denn was da der Unterschied zu "\s*\s*" ist? Und zu "\s*\s*\s*"?
In `iter_blocks()` ist die vorletzte Zeile nicht so gut. Wenn man den regulären Ausdruck mit `re.compile()` in ein Objekt übersetzt, bekommt man die Suchmethoden mit einem weiteren Argument das den Versatz angibt, ab dem in `data` gesucht werden soll. Dann braucht man `data` nicht ständig verkleinern, wobei Daten im Speicher herum kopiert werden müssen weil `bytes`-Objekte nicht veränderbar sind.
Statt `search()` würde ich `match()` nehmen, dann kann man sich den ^-Anker am Anfang sparen. Und der Ausdruck ist auch etwas komisch. Weisst Du denn was "\s*" bedeutet? Was denkst Du denn was da der Unterschied zu "\s*\s*" ist? Und zu "\s*\s*\s*"?
“Java is a DSL to transform big Xml documents into long exception stack traces.”
— Scott Bellware
— Scott Bellware
Danke Sirius,
Ich habe es jetzt mal wie unten geändert, allerdings ohne signifkante änderung bzgl. der Geschwindigkeit:
@blackjack Danke für den Hinweis bzgl. der leerzeichen pattern. Der Sternoperator sagt wohl aus , dass beliebig oft wiederholt werden darf. Das jetzt 20 mal hintereinander zu schreiben ist also sinnfrei
btw. ich habe es auch mit mal mit match versucht allerdings ohne signifkante performance verbesserung.
Die Geschwinidgkeit ist stark abhängig von der Anzahl der Kanäle. Das heißt, wenn ich mir eine große Datei nehme, die aber nur einen Kanal enthält, ist diese unmittelbar eingelesen. Eine wesentlich kleinere Datei mit vielen Kanälen dauert dagegen etwa 1-2 min. Deswegen denke ich, dass die Anzahl der Blöcke und die daraus resultierenden iterationsschritte das Problem sind...
PS. Ich weiß, dass ich wieder eine variable in einer Funktion definiere, kam aber nicht drauf, wie ich es sonst hätte machen können :/
Ich habe es jetzt mal wie unten geändert, allerdings ohne signifkante änderung bzgl. der Geschwindigkeit:
Code: Alles auswählen
def iter_blocks(data):
start = 0
while data[start:]:
entry = re.match(rb'\s*\|(\S\S)\s*,\s*(\d+)\s*,\s*(\d+)\s*,', data[start:])
if entry is None:
raise ValueError("Block expected")
typ, num, length = entry.groups()
length = int(length)
end = entry.end()
block = data[start+end:start+end+length]
if data[start+end+length] != 59: # ;
raise ValueError(f"';' expected, found {data[end+length]}")
start += end+length+1
#data = data[end+length+1:]
yield typ, num, block

Die Geschwinidgkeit ist stark abhängig von der Anzahl der Kanäle. Das heißt, wenn ich mir eine große Datei nehme, die aber nur einen Kanal enthält, ist diese unmittelbar eingelesen. Eine wesentlich kleinere Datei mit vielen Kanälen dauert dagegen etwa 1-2 min. Deswegen denke ich, dass die Anzahl der Blöcke und die daraus resultierenden iterationsschritte das Problem sind...
PS. Ich weiß, dass ich wieder eine variable in einer Funktion definiere, kam aber nicht drauf, wie ich es sonst hätte machen können :/
@Squipy: das ist ja noch schlimmer, weil Du jetzt doppelt so oft kopierst.
Code: Alles auswählen
def iter_blocks(data):
start = 0
re_block = re.compile(rb'\s*\|(\S\S)\s*,\s*(\d+)\s*,\s*(\d+)\s*,')
while start < len(data):
entry = re_block.match(data, start)
if entry is None:
raise ValueError("Block expected")
typ, num, length = entry.groups()
length = int(length)
start += entry.end()
block = data[start:start+length]
if data[start+length] != 59: # ;
raise ValueError(f"';' expected, found {data[start+length]}")
start += length+1
yield typ, num, block
Wow, jetzt ist es um einiges schneller und kann damit locker mit Famos mithalten! Ich muss allerdings nochmal drüber nachdenken wieso meine Variante jetzt doppelt so schlimm war
In iter_block war aber noch ein kleiner bug drin bei
Richtig ists so:
Besten Dank Sirius!

In iter_block war aber noch ein kleiner bug drin bei
Code: Alles auswählen
start += entry.end()
Code: Alles auswählen
def iter_blocks(data):
start = 0
re_block = re.compile(rb'\s*\|(\S\S)\s*,\s*(\d+)\s*,\s*(\d+)\s*,')
while start < len(data):
entry = re_block.match(data, start)
if entry is None:
raise ValueError("Block expected")
typ, num, length = entry.groups()
length = int(length)
start = entry.end()
block = data[start:start+length]
if data[start+length] != 59: # ;
raise ValueError(f"';' expected, found {data[start+length]}")
start += length+1
yield typ, num, block
@Squipy: Dieser Austausch scheint die einzige Internet Quelle zu wie man FAMOS raw in python einlesen kann zu sein! Könntest du bitte den kompletten, finalen, Code posten? Es wäre super hilfreich um nicht das ganze basierend auf den Snippets wieder zusammen zu stückeln. Danke!
@Sirius3: Danke dass du Squipy bis zu einem guten Ergebniss gementort hast! Davon werden hoffentlich noch deutlich mehr FAMOS-Abhängige profitieren.
@Sirius3: Danke dass du Squipy bis zu einem guten Ergebniss gementort hast! Davon werden hoffentlich noch deutlich mehr FAMOS-Abhängige profitieren.
@Mahelita
Ich hoffe mein Code hilft dir weiter.
Ist noch sehr prototypenmäßig aber durch die Pionierleistung von Squipy und Sirius3 war ich damit in der Lage Rohdaten zu plotten.
Je nach Hardware die zum Aufzeichnen der Daten verwendet wurde muss der Code leider noch manuell angepasst werden da die Dateiformate sich leicht unterscheiden.
Ich hoffe mein Code hilft dir weiter.
Ist noch sehr prototypenmäßig aber durch die Pionierleistung von Squipy und Sirius3 war ich damit in der Lage Rohdaten zu plotten.
Je nach Hardware die zum Aufzeichnen der Daten verwendet wurde muss der Code leider noch manuell angepasst werden da die Dateiformate sich leicht unterscheiden.
Code: Alles auswählen
import re
import numpy
import matplotlib.pyplot as plt
from IPython import get_ipython
get_ipython().run_line_magic('matplotlib', 'qt')
path = r"Dein Dateipfad"
def load_channels(path):
DATA_TYPES = {
"1": 'ubyte',
"2": 'byte',
"3": 'ushort',
"4": 'short',
"5": 'ulong',
"6": 'long',
"7": 'float',
"8": 'double',
"11": 'bit16',
"12": 'bit48',
}
with open(path, "rb") as binary_file:
# Read the whole file at once
data = binary_file.read()
#print(data)
channels = []
channel = None
for typ, num, block in iter_blocks(data):
if typ == b'CF':
# first element a channel definition
channel = {}
channels.append(channel)
data = block.split(b',')
channel['delta_x'] = float(data[0])
# elif typ == b'CK':
# data = block.split(b',')
# channel['decode'] = data[1].decode("windows-1252")
# channel['Byte per value']= data[0].decode("windows-1252")
elif typ == b'NO':
data = block.split(b',')
channel['Erzeuger'] = data[2].decode("windows-1252")
elif typ == b'CD':
data = block.split(b',')
channel['Abtastrate'] = float(data[0].decode("windows-1252"))
channel['Einheit_X'] = data[3].decode("windows-1252")
elif typ == b'CN':
data = block.split(b',')
channel['name'] = data[4].decode("windows-1252")
print("Load Channel: ", channel['name'])
elif typ == b'NT':
data = block.split(b',')
day, month, year, hour, minute, second = (d.decode("windows-1252").replace(" ", "0") for d in data)
channel['trigger_time'] = f"{year}-{month}-{day} {hour}:{minute}:{second}"
elif typ == b'CP':
data = block.split(b',')
channel['decode'] = data[2].decode("windows-1252") #Zahlenformat
channel['Byte per value']= data[1].decode("windows-1252")
elif typ == b'Cb':
data = block.split(b',')
channel['buffer-size'] = int(data[5].decode("windows-1252"))
channel['buffer-start'] = int(data[4].decode("windows-1252"))
elif typ == b'CR':
key = 'unit_y' if not block.endswith(b',s') else 'unit_x'
data = block.split(b',', 6)
channel[key] = data[5].decode("windows-1252")
key+="_scale-factor"
channel[key] = float(data[1].decode("windows-1252")) #0 für Skalierung von 1, 1 für Originalskalierung
elif typ == b'CS':
_, data = block.split(b',', 1)
break
for channel in channels:
try:
dtype = DATA_TYPES[channel['decode']]
except KeyError:
raise ValueError("Unknown binary format")
offset = channel['buffer-start']
count = channel['buffer-size'] // numpy.dtype(dtype).itemsize
if 'unit_x' in channel:
channel['value_x'] = numpy.frombuffer(data, dtype=dtype, count=count, offset=offset)
channel['value_y'] = numpy.frombuffer(data, dtype=dtype, count=count, offset=offset-channel['buffer-size'])
else:
if abs(channel['unit_y_scale-factor']) > 0:
channel['value_y'] = numpy.frombuffer(data, dtype=dtype, count=count, offset=offset) * channel['unit_y_scale-factor']
else:
channel['value_y'] = numpy.frombuffer(data, dtype=dtype, count=count, offset=offset-channel['buffer-size'])
return channels
def iter_blocks(data):
start = 0
re_block = re.compile(rb'\s*\|(\S\S)\s*,\s*(\d+)\s*,\s*(\d+)\s*,')
while start < len(data):
entry = re_block.match(data, start)
if entry is None:
raise ValueError("Block expected")
typ, num, length = entry.groups()
length = int(length)
start = entry.end()
block = data[start:start+length]
if data[start+length] != 59: # ;
raise ValueError(f"';' expected, found {data[start+length]}")
start += length+1
yield typ, num, block
StatisticsTime=load_channels(path)
... Ohje, sehe es jetzt erst. An dem Code hat sich inzwischen einiges getan (speichern von single und Multi files, laden aller Daten in einem Ordner). Die Doku ist aber schlecht... Evtl. hilft es trotzdem. Bei Fragen gerne melden:
Code: Alles auswählen
import re
import numpy as np
import os
from dir_operations import get_list_of_files
import datetime
version ='version 0.3'
programm_name = 'wtg Famos Formatter ' + version
def _create_dtype_string(channel,kind):
DATA_TYPES = {
"1": 'ubyte',
"2": 'byte',
"3": 'ushort',
"4": 'int',
"5": 'ulong',
"6": 'long',
"7": 'float',
"8": 'double',
"11": 'bit16',
"12": 'bit48',
}
try:
dtype = DATA_TYPES[channel['decode_'+kind]]
if dtype!= "double" and dtype!= "long" and dtype!= "byte":
dtype+= str((int(channel['BytePerValue_'+kind])*8))
except KeyError:
raise ValueError("Unknown binary format")
return dtype
def _iter_blocks(data):
start = 0
re_block = re.compile(rb'\s*T*\|(\S\S)\s*,\s*(\d+)\s*,\s*(\d+)\s*,')
while start < len(data):
entry = re_block.match(data, start)
if entry is None:
entry = re_block.match(data, start+3)
if entry is None:
raise ValueError("Block expected, no famos format")
else:
start+=3
typ, num, length = entry.groups()
length = int(length)
start = entry.end()
block = data[start:start+length]
blockValue = entry[3]
if start+length == len(data):
length-=1
elif data[start+length] != 59: # ;
raise ValueError(f"';' expected, found {data[start+length]}")
start += length+1
yield typ, num, block, blockValue
def load_channels(path,filt = None, show_print =False):
if show_print == True:
print("load file: ", path)
with open(path, "rb") as binary_file:
# Read the whole file at once
data = binary_file.read()
channels = {}
channel = None
count = -1
for typ, num, block, blockValue in _iter_blocks(data):
#### save information of channel which is for all the same (until******)
if typ == b'CF':
pass
elif typ == b'NL':
pass
elif typ == b'CK':
pass
elif typ == b'NO':
pass
#*********************************************************************
elif typ == b'CG':
channel = {}
channels[count]=channel
count+=1
elif typ == b'CD':
data = block.split(b',')
channel['delta_x'] = float(data[0]) #bug, by filteredChannels in SK auswertung sagt der 1s
elif typ == b'NT':
data = block.split(b',')
day, month, year, hour, minute, second = (d.decode("windows-1252").replace(" ", "0") for d in data)
if int(year) > 2000:# there are two loacations of trigger time (Cb or NT block)
channel['trigger_time'] = f"{year}-{month}-{day} {hour}:{minute}:{second}"
else:
pass
elif typ == b'CC':
pass
elif typ == b'CP':
data = block.split(b',')
try:
if isinstance(channel['decode_value_y'], str):
channel['decode_value_x'] = data[2].decode("windows-1252")
channel['BytePerValue_value_x']= data[1].decode("windows-1252")
except:
channel['decode_value_y'] = data[2].decode("windows-1252")
channel['BytePerValue_value_y']= data[1].decode("windows-1252")
elif typ == b'Cb':
data = block.split(b',')
try:
if isinstance(channel['buffer_size_value_y'], int):
channel['buffer_size_value_x'] = int(data[5].decode("windows-1252"))
channel['buffer_start_value_x']= int(data[4].decode("windows-1252"))
except:
channel['buffer_size_value_y'] = int(data[5].decode("windows-1252"))
channel['buffer_start_value_y']= int(data[4].decode("windows-1252"))
channel['X0'] = float(data[9])
if float(data[10]) > 0: # then NT block have not the information of triggertime
triggertime = float(data[10])+(datetime.datetime(1980,1,1)
- datetime.datetime(1970,1,1)) // datetime.timedelta(seconds=1)-3600 #famos start 1980 but datetime 1970
try:
triggertime = datetime.datetime.fromtimestamp(triggertime).strftime("%Y-%m-%d %H:%M:%S.%f")
except:
triggertime = datetime.datetime.fromtimestamp(triggertime).strftime("%Y-%m-%d %H:%M:%S")
channel['trigger_time'] = triggertime
else:
pass #NT block has the information of trigger time
elif typ == b'ND':
pass
elif typ == b'CN':
data = block.split(b',')
channel['name'] = data[4].decode("windows-1252")
elif typ == b'CC':
pass
elif typ == b'CR':
key = 'unit_y' if not 'unit_y' in channel else 'unit_x'
data = block.split(b',', 6)
channel[key] = data[5].decode("windows-1252")
if "°" in channel[key]:
channel[key] = channel[key].replace("°","grad")
elif "²" in channel[key]:
channel[key] = channel[key].replace("²","2")
#channel[key] = channel[key].replace("Â","")
elif "W/m" in channel[key]:
channel[key] = channel[key].replace("W/m","W/m2")
#channel[key] = channel[key].replace("Â","")
if "Â" in channel[key]:
channel[key] = channel[key].replace("Â","")
key+="_scale-factor"
channel[key] = float(data[1].decode("windows-1252"))
key+="_Offset"
channel[key] = float(data[2].decode("windows-1252"))
elif typ == b'ND':
pass
elif typ == b'CS':
_, data = block.split(b',', 1)
break
for i in channels:
scale = channels[i]['unit_y_scale-factor'] or 1.0
scale_offset = channels[i]['unit_y_scale-factor_Offset'] or 0.0
if 'unit_x' in channels[i]:
dtype = _create_dtype_string(channels[i], "value_x")
scale = channels[i]['unit_x_scale-factor'] or 1.0
scale_offset = channels[i]['unit_x_scale-factor_Offset'] or 0.0
if (channels[i]['unit_x_scale-factor']!=1 or
channels[i]['unit_x_scale-factor_Offset'] !=0 ):
offset = channels[i]['buffer_start_value_x']
count = channels[i]['buffer_size_value_x'] // np.dtype(dtype).itemsize
try:
channels[i]['value_x'] = np.frombuffer(data, dtype=dtype, count=count, offset=offset) * scale +scale_offset
except:
print("hupsi")
else:
offset = channels[i]['buffer_start_value_x']
count = channels[i]['buffer_size_value_x'] // np.dtype(dtype).itemsize
channels[i]['value_x'] = np.frombuffer(data, dtype=dtype, count=count, offset=offset)
offset -= count*np.dtype(dtype).itemsize - offset
scale = channels[i]['unit_y_scale-factor'] or 1.0
dtype = _create_dtype_string(channels[i], "value_y")
if (channels[i]['unit_y_scale-factor']!=1 or
channels[i]['unit_y_scale-factor_Offset'] !=0):
offset = channels[i]['buffer_start_value_y']
count = channels[i]['buffer_size_value_y'] // np.dtype(dtype).itemsize
try:
channels[i]['value_y'] = np.frombuffer(data, dtype=dtype
,count=count, offset=offset) * scale + scale_offset
except:
raise ValueError("hupsi")
else:
offset = channels[i]['buffer_start_value_y']
count = channels[i]['buffer_size_value_y'] // np.dtype(dtype).itemsize
channels[i]['value_y'] = np.frombuffer(data, dtype=dtype, count=count, offset=offset)
#### name the indices to channel names#####
channelsOutput = {}
for i in channels:
try:
channelsOutput[channels[i]['name']] = channels[i]
except:
print("Channel not found: ",i)
return channelsOutput
def save_single_files(channels,outputPath):
if outputPath[len(outputPath)-2:len(outputPath)] != '\\':
outputPath+='\\'
for channel in channels:
print("save: ", channel)
file_info = b"|CF,2,1,1;|CK,1,3,1,1;"
#CG Block |CG,1,KeyLang,AnzahlKomponenten,Feldtyp,Dimension;#
if "value_x" in channels[channel]:
file_info += b"\n|CG,1,5,2,2,2;\n"
else:
file_info += b"\n|CG,1,5,1,1,1;\n"
#CD Block |CD,1,KeyLang,dx,kalibriert,EinheitLang, Einheit, 0,0,0;#
keyLang ="?"
dx = bytes(str(channels[channel]["delta_x"]),'utf-8')
kalibriert = b"1" # 1 = klaibriert, was immer das heißt
try:
EinheitLang = bytes(str(len(channels[channel]['unit_x'])),'utf-8')
Einheit = bytes(channel["unit_x"],'utf-8')
except: ## x Komponente nicht verfügbar --> Muss Zeitachse sein
EinheitLang = b"1"
Einheit =b"s"
info_part = dx +b","+kalibriert+b","+EinheitLang+b","+Einheit + b",0,0,0"
KeyLeng = bytes(str(len(info_part)), 'utf-8')
file_info += b"|CD,1,"+KeyLeng + b"," + info_part +b";\n"
#NT Block |NT,1,KeyLang,Tag,Monat,Jahr,Stunden,Minuten,Sekunden;
try:
keyLang ="?"
Tag = channels[channel]["trigger_time"][8:10]
Monat = channels[channel]["trigger_time"][5:7]
Jahr = channels[channel]["trigger_time"][:4]
Stunden = channels[channel]["trigger_time"][11:13]
Minuten = channels[channel]["trigger_time"][14:16]
Sekunden = channels[channel]["trigger_time"][17:]
info_part = Tag+","+Monat+","+Jahr+","+Stunden+","+Minuten+","+Sekunden
keyLang = str(len(info_part))
info_part = keyLang +","+ info_part
file_info +=bytes("|NT,1,"+info_part+";\n",'utf-8')
#CC Block, y Werte erster komp.index, start der Arrayinfos: |CC,1,KeyLang,KomponentenIndex,AnalogDigital;
file_info += b"|CC,1,3,1,1;\n"
except:
pass #Channel has no trigger time
#CP Block, |CP,1,KeyLang,BufferReferenz,Bytes,Zahlenformat, SignBits , Maske,Offset, DirekteFolgeAnzahl,AbstandBytes;
BufferReferenz ="1" #aus irgendeinen Grund kann für unsere Zwecke immer 1
Bytes = channels[channel]["BytePerValue_value_y"]
Zahlenformat = channels[channel]["decode_value_y"]
SignBits = str(int(Bytes) * 8)
Maske= "0"
Offset = "0"
DirekteFolgeAnzahl ="1"
AbstandBytes = "0"
info_part = (BufferReferenz + "," + Bytes + "," + Zahlenformat + "," +
SignBits + "," + Maske + "," + Offset +","+ DirekteFolgeAnzahl
+ "," + AbstandBytes)
file_info += bytes("|CP,1,"+ str(len(info_part))+","+info_part+";\n",'utf-8')
#Cb Block |Cb,1,KeyLang,AnzahlBufferInKey,BytesInUserInfo, BufferReferenz, IndexSamplesKey, OffsetBufferInSamplesKey, BufferLangBytes, OffsetFirstSampleInBuffer, BufferFilledBytes, 0, X0, AddZeit, UserInfo, NeuesEvent, [BufferReferenz, ... ];
AnzahlBufferInKey = "1"
BytesInUserInfo ="0"
BufferReferenz = "1"
IndexSampleKey = "1"
OffsetBufferInSamplesKey = "0" #singlefiles, y value comes at first
if channels[channel]["decode_value_y"] == "4":
scale = channels[channel]["unit_y_scale-factor"]
offset = channels[channel]["unit_y_scale-factor_Offset"]
y = np.copy(channels[channel]["value_y"]) - offset
channels[channel]["value_y"] = y
channels[channel]["value_y"] *= 1/scale
channels[channel]["value_y"] = channels[channel]["value_y"].astype("int16")
BufferLangBytes = str(channels[channel]['value_y'].itemsize*channels[channel]['value_y'].size)
OffsetFirstSampleInBuffer = OffsetBufferInSamplesKey
BufferFilledBytes = BufferLangBytes
x0 = str(channels[channel]["X0"])
AddZeit = "0"
UserInfo = "0"
ref = ""
info_part = (AnzahlBufferInKey+","+BytesInUserInfo+","
+BufferReferenz+","+ IndexSampleKey +","+OffsetBufferInSamplesKey
+","+BufferLangBytes+","+OffsetFirstSampleInBuffer+","+BufferFilledBytes+","+AddZeit+","+x0+","+UserInfo+","+ref)
info_part = "|Cb,1,"+str(len(info_part))+","+ info_part +";\n"
file_info+=bytes(info_part,'utf-8')
#CR Block |CR,1,KeyLang,Transformieren,Faktor,Offset,Kalibriert,EinheitLang, Einheit;
if (channels[channel]["unit_y_scale-factor"] !=1 or channels[channel]["unit_y_scale-factor"] !=0 and
channels[channel]["unit_y_scale-factor_Offset"] !=0):
Transformieren = "1"
else:
Transformieren = "0"
Faktor = str(channels[channel]["unit_y_scale-factor"])
Offset = str(channels[channel]["unit_y_scale-factor_Offset"])
Kalibriert ="1"
EinheitLang = str(len(channels[channel]["unit_y"]))
Einheit = channels[channel]["unit_y"]
info_part = Transformieren +","+Faktor+","+Offset+","+Kalibriert+","+EinheitLang+","+Einheit
file_info += bytes("|CR,1," + str(len(info_part)) + ","+info_part+ ";\n","utf-8")
#CN Block, |CN,1,KeyLang,IndexGruppe,0,IndexBit,NameLang,Name,KommLang,Kommentar;
IndexGruppe = "0"
none_ = "0"
IndexBit = "0"
NameLang = str(len(channels[channel]["name"]))
Name = channels[channel]["name"]
KommLang = "0"
Kommentar = ""
info_part = IndexGruppe +","+none_+","+IndexBit+","+NameLang+","+Name+","+KommLang+","+Kommentar
file_info += bytes("|CN,1," + str(len(info_part)) + "," + info_part +";\n","utf-8")
# x values
if "value_x" in channels[channel]: #XY datensatz
##CC Block
file_info += b"|CC,1,3,2,1;\n"
#CP Block, |CP,1,KeyLang,BufferReferenz,Bytes,Zahlenformat, SignBits , Maske,Offset, DirekteFolgeAnzahl,AbstandBytes;
BufferReferenz ="2"
Bytes = channels[channel]["BytePerValue_value_x"]
Zahlenformat = channels[channel]["decode_value_x"]
SignBits = str(int(Bytes) * 8)
Maske= "0"
Offset = "0"
DirekteFolgeAnzahl ="1"
AbstandBytes = "0"
info_part = (BufferReferenz + "," + Bytes + "," + Zahlenformat + "," +
SignBits + "," + Maske + "," + Offset +","+ DirekteFolgeAnzahl
+ "," + AbstandBytes)
file_info += bytes("|CP,1,"+ str(len(info_part))+","+info_part+";\n",'utf-8')
#Cb Block, |Cb,1,KeyLang,AnzahlBufferInKey,BytesInUserInfo, BufferReferenz, IndexSamplesKey, OffsetBufferInSamplesKey, BufferLangBytes, OffsetFirstSampleInBuffer, BufferFilledBytes, 0, X0, AddZeit, UserInfo, NeuesEvent, [BufferReferenz, ... ];
AnzahlBufferInKey = "1"
BytesInUserInfo ="0"
BufferReferenz = "2"
IndexSampleKey = "1"
OffsetBufferInSamplesKey = BufferLangBytes #singlefiles, x value comes after y values
BufferLangBytes = str(channels[channel]['value_x'].itemsize*channels[channel]['value_x'].size)
OffsetFirstSampleInBuffer = "0"
BufferFilledBytes = BufferLangBytes
x0 = str(channels[channel]["X0"])
AddZeit = "1"
UserInfo = "0"
ref = ""
info_part = (AnzahlBufferInKey+","+BytesInUserInfo+","
+BufferReferenz+","+ IndexSampleKey +","+OffsetBufferInSamplesKey
+","+BufferLangBytes+","+OffsetFirstSampleInBuffer+","+BufferFilledBytes+","+AddZeit+","+x0+","+UserInfo+","+ref)
info_part = "|Cb,1,"+str(len(info_part))+","+ info_part +";\n"
file_info+=bytes(info_part,'utf-8')
#CR Block, |CR,1,KeyLang,Transformieren,Faktor,Offset,Kalibriert,EinheitLang, Einheit;
if (channels[channel]["unit_x_scale-factor"] !=1 or channels[channel]["unit_x_scale-factor"] !=0 and
channels[channel]["unit_x_scale-factor_Offset"] !=0):
Transformieren = "1"
else:
Transformieren = "0"
Faktor = str(channels[channel]["unit_x_scale-factor"])
Offset = str(channels[channel]["unit_x_scale-factor_Offset"])
Kalibriert ="1"
EinheitLang = str(len(channels[channel]["unit_x"]))
Einheit = channels[channel]["unit_x"]
info_part = Transformieren +","+Faktor+","+Offset+","+Kalibriert+","+EinheitLang+","+Einheit
file_info += bytes("|CR,1," + str(len(info_part)) + ","+info_part+ ";\n","utf-8")
#CS Block, |CS,1,KeyLang,Index,Rohdaten;
Index =b"1"
Rohdaten = channels[channel]['value_y'].tobytes()
if "value_x" in channels[channel]: #XY datensatz:
Rohdaten += channels[channel]['value_x'].tobytes()
info_part = Index+b","+Rohdaten
file_info += bytes("|CS,1,"+str(len(info_part))+",","utf-8")+Index + b"," + Rohdaten + b";"
####save single file#####
filename = channels[channel]['name']+'.dat'
try:
# Create target Directory
os.mkdir(outputPath)
except:
pass
path = outputPath+filename
file=open(path,'wb+')
file.write(file_info)
file.close()
del channels
def save_multi_files(channels,outputPath, filename):
if outputPath[len(outputPath)-2:len(outputPath)] != '\\':
outputPath+='\\'
file_info = b"|CF,2,1,1;|CK,1,3,1,1;\n"
i=1
BufferReferenz_count = 1
OffsetBufferInSamplesKey_int = 0
for channel in channels:
print("write: ", i, " of ", len(channels), "channels")
#CG Block |CG,1,KeyLang,AnzahlKomponenten,Feldtyp,Dimension;#
if "value_x" in channels[channel]:
file_info += b"|CG,1,5,2,2,2;\n"
else:
file_info += b"|CG,1,5,1,1,1;\n"
#CD Block |CD,1,KeyLang,dx,kalibriert,EinheitLang, Einheit, 0,0,0;#
keyLang ="?"
dx = bytes(str(channels[channel]["delta_x"]),'utf-8')
kalibriert = b"1" # 1 = klaibriert, was immer das heißt
try:
EinheitLang = bytes(str(len(channels[channel]['unit_x'])),'utf-8')
Einheit = bytes(channel["unit_x"],'utf-8')
except: ## x Komponente nicht verfügbar --> Muss Zeitachse sein
EinheitLang = b"1"
Einheit =b"s"
info_part = dx +b","+kalibriert+b","+EinheitLang+b","+Einheit + b",0,0,0"
KeyLeng = bytes(str(len(info_part)), 'utf-8')
file_info += b"|CD,1,"+KeyLeng + b"," + info_part +b";\n"
#NT Block |NT,1,KeyLang,Tag,Monat,Jahr,Stunden,Minuten,Sekunden;
try:
keyLang ="?"
Tag = channels[channel]["trigger_time"][8:10]
Monat = channels[channel]["trigger_time"][5:7]
Jahr = channels[channel]["trigger_time"][:4]
Stunden = channels[channel]["trigger_time"][11:13]
Minuten = channels[channel]["trigger_time"][14:16]
Sekunden = channels[channel]["trigger_time"][17:]
info_part = Tag+","+Monat+","+Jahr+","+Stunden+","+Minuten+","+Sekunden
keyLang = str(len(info_part))
info_part = keyLang +","+ info_part
file_info +=bytes("|NT,1,"+info_part+";\n",'utf-8')
#CC Block, y Werte erster komp.index, start der Arrayinfos: |CC,1,KeyLang,KomponentenIndex,AnalogDigital;
file_info += b"|CC,1,3,1,1;\n"
except:
pass #Channel has no trigger time
#CP Block, |CP,1,KeyLang,BufferReferenz,Bytes,Zahlenformat, SignBits , Maske,Offset, DirekteFolgeAnzahl,AbstandBytes;
BufferReferenz =str(BufferReferenz_count) #wievielter Kanal
Bytes = channels[channel]["BytePerValue_value_y"]
Zahlenformat = channels[channel]["decode_value_y"]
SignBits = str(int(Bytes) * 8)
Maske= "0"
Offset = "0"
DirekteFolgeAnzahl ="1"
AbstandBytes = "0"
info_part = (BufferReferenz + "," + Bytes + "," + Zahlenformat + "," +
SignBits + "," + Maske + "," + Offset +","+ DirekteFolgeAnzahl
+ "," + AbstandBytes)
file_info += bytes("|CP,1,"+ str(len(info_part))+","+info_part+";\n",'utf-8')
#Cb Block |Cb,1,KeyLang,AnzahlBufferInKey,BytesInUserInfo, BufferReferenz, IndexSamplesKey, OffsetBufferInSamplesKey, BufferLangBytes, OffsetFirstSampleInBuffer, BufferFilledBytes, 0, X0, AddZeit, UserInfo, NeuesEvent, [BufferReferenz, ... ];
AnzahlBufferInKey = "1"
BytesInUserInfo ="0"
BufferReferenz = str(BufferReferenz_count) #wievielter Kanal
IndexSampleKey = "1"
OffsetBufferInSamplesKey = str(OffsetBufferInSamplesKey_int) # y value comes at first
if channels[channel]["decode_value_y"] == "4":
scale = channels[channel]["unit_y_scale-factor"]
offset = channels[channel]["unit_y_scale-factor_Offset"]
y = np.copy(channels[channel]["value_y"]) - offset
channels[channel]["value_y"] =y
channels[channel]["value_y"] *= 1/scale
channels[channel]["value_y"] = channels[channel]["value_y"].astype("int16")
BufferLangBytes = channels[channel]['value_y'].itemsize*channels[channel]['value_y'].size
OffsetBufferInSamplesKey_int+=BufferLangBytes
OffsetFirstSampleInBuffer = "0"
BufferFilledBytes = BufferLangBytes
x0 = str(channels[channel]["X0"])
AddZeit = "0"
UserInfo = "0"
ref = ""
info_part = (AnzahlBufferInKey+","+BytesInUserInfo+","
+BufferReferenz+","+ IndexSampleKey +","+OffsetBufferInSamplesKey
+","+str(BufferLangBytes)+","+OffsetFirstSampleInBuffer+","+str(BufferFilledBytes)+","+AddZeit+","+x0+","+UserInfo+","+ref)
info_part = "|Cb,1,"+str(len(info_part))+","+ info_part +";\n"
file_info+=bytes(info_part,'utf-8')
#CR Block |CR,1,KeyLang,Transformieren,Faktor,Offset,Kalibriert,EinheitLang, Einheit;
if (channels[channel]["unit_y_scale-factor"] !=1 or channels[channel]["unit_y_scale-factor"] !=0 and
channels[channel]["unit_y_scale-factor_Offset"] !=0):
Transformieren = "1"
else:
Transformieren = "0"
Faktor = str(channels[channel]["unit_y_scale-factor"])
Offset = str(channels[channel]["unit_y_scale-factor_Offset"])
Kalibriert ="1"
EinheitLang = str(len(channels[channel]["unit_y"]))
Einheit = channels[channel]["unit_y"]
if "W/m" in Einheit:
Einheit = Einheit.replace("Â","")
info_part = Transformieren +","+Faktor+","+Offset+","+Kalibriert+","+EinheitLang+","+Einheit
file_info += bytes("|CR,1," + str(len(info_part)) + ","+info_part+ ";\n","utf-8")
#CN Block, |CN,1,KeyLang,IndexGruppe,0,IndexBit,NameLang,Name,KommLang,Kommentar;
IndexGruppe = "0"
none_ = "0"
IndexBit = "0"
NameLang = str(len(channels[channel]["name"]))
Name = channels[channel]["name"]
KommLang = "0"
Kommentar = ""
info_part = IndexGruppe +","+none_+","+IndexBit+","+NameLang+","+Name+","+KommLang+","+Kommentar
file_info += bytes("|CN,1," + str(len(info_part)) + "," + info_part +";\n","utf-8")
# x values
if "value_x" in channels[channel]: #XY datensatz
BufferReferenz_count +=1
##CC Block
file_info += b"|CC,1,3,2,1;\n"
#CP Block, |CP,1,KeyLang,BufferReferenz,Bytes,Zahlenformat, SignBits , Maske,Offset, DirekteFolgeAnzahl,AbstandBytes;
BufferReferenz =str(BufferReferenz_count)
Bytes = str(channels[channel]["value_x"].itemsize)
if Bytes =="8":
Zahlenformat = "8"
else:
Zahlenformat = channels[channel]["decode_value_x"]
SignBits = str(int(Bytes) * 8)
Maske= "0"
Offset = "0"
DirekteFolgeAnzahl ="1"
AbstandBytes = "0"
info_part = (BufferReferenz + "," + Bytes + "," + Zahlenformat + "," +
SignBits + "," + Maske + "," + Offset +","+ DirekteFolgeAnzahl
+ "," + AbstandBytes)
file_info += bytes("|CP,1,"+ str(len(info_part))+","+info_part+";\n",'utf-8')
#Cb Block, |Cb,1,KeyLang,AnzahlBufferInKey,BytesInUserInfo, BufferReferenz, IndexSamplesKey, OffsetBufferInSamplesKey, BufferLangBytes, OffsetFirstSampleInBuffer, BufferFilledBytes, 0, X0, AddZeit, UserInfo, NeuesEvent, [BufferReferenz, ... ];
AnzahlBufferInKey = "1"
BytesInUserInfo ="0"
BufferReferenz = str(BufferReferenz_count)
IndexSampleKey = "1"
OffsetBufferInSamplesKey = str(OffsetBufferInSamplesKey_int) #singlefiles, x value comes after y values
BufferLangBytes = channels[channel]['value_x'].itemsize*channels[channel]['value_x'].size
OffsetBufferInSamplesKey_int+=BufferLangBytes
OffsetFirstSampleInBuffer = "0"
BufferFilledBytes = BufferLangBytes
x0 = str(channels[channel]["X0"])
AddZeit = "1"
UserInfo = "0"
ref = ""
info_part = (AnzahlBufferInKey+","+BytesInUserInfo+","
+BufferReferenz+","+ IndexSampleKey +","+OffsetBufferInSamplesKey
+","+str(BufferLangBytes)+","+OffsetFirstSampleInBuffer+","+str(BufferFilledBytes)+","+AddZeit+","+x0+","+UserInfo+","+ref)
info_part = "|Cb,1,"+str(len(info_part))+","+ info_part +";\n"
file_info+=bytes(info_part,'utf-8')
#CR Block, |CR,1,KeyLang,Transformieren,Faktor,Offset,Kalibriert,EinheitLang, Einheit;
if "unit_x_scale-factor" in channels[channel]:
if (channels[channel]["unit_x_scale-factor"] !=1 or channels[channel]["unit_x_scale-factor"] !=0 and
channels[channel]["unit_x_scale-factor_Offset"] !=0):
Transformieren = "1"
else:
Transformieren = "0"
Faktor = str(channels[channel]["unit_x_scale-factor"])
Offset = str(channels[channel]["unit_x_scale-factor_Offset"])
Kalibriert ="1"
else:
Faktor = "1"
Offset = "0"
Kalibriert ="1"
EinheitLang = str(len(channels[channel]["unit_x"]))
Einheit = channels[channel]["unit_x"]
info_part = Transformieren +","+Faktor+","+Offset+","+Kalibriert+","+EinheitLang+","+Einheit
file_info += bytes("|CR,1," + str(len(info_part)) + ","+info_part+ ";\n","utf-8")
#CS Block, |CS,1,KeyLang,Index,Rohdaten;
try:
Rohdaten += channels[channel]['value_y'].tobytes()
except:
Rohdaten = channels[channel]['value_y'].tobytes()
if "value_x" in channels[channel]: #XY datensatz:
Rohdaten += channels[channel]['value_x'].tobytes()
i+=1
BufferReferenz_count+=1
Index =b"1"
info_part = Index+b","+Rohdaten
file_info += bytes("|CS,1,"+str(len(info_part))+",","utf-8")+Index + b"," + Rohdaten + b";"
####save multi file#####
if not ".dat" in filename:
filename += ".dat"
try:
# Create target Directory
os.mkdir(outputPath)
except:
pass
path = outputPath+filename
file=open(path,'wb+')
file.write(file_info)
file.close()
del channels
def load_all_rawdata(dirName):
print('load all data in: ', dirName)
dirNameFile = get_list_of_files(dirName)
channels = {}
for path in dirNameFile:
if '.dat' in path or '.raw' in path:
try:
channels.update(load_channels(path))
except:
raise ValueError("couldn't load "+ path)
else:
pass
return channels
def load_specific_rawdata(dirName, channelName):
dirNameFile = get_list_of_files(dirName)
listOfFile = os.listdir(dirName)
channels = {}
for path, name in zip(dirNameFile, listOfFile):
if channelName.lower() == name[:len(name)-4].lower():
channels.update(load_channels(path))
return channels
Hallo zusammen,
Als erstes möchte ich mich bei Squipy bedanken, dein Code hat mir sehr geholfen!
Das Einlesen funktioniert soweit, aber ich hänge seit einigen Tagen an dem gleichem Problem fest.
Das Problem: Das Programm kommt mit Pausen nicht zurecht, wo keine Messwerte eingetragen sind. D.h. nur die jeweils erste Messung wird angeschaut, alle danach werden ignoriert. Nach jeder Messung ist 4-5 Sekunden Pause (Pause enthält keine Messwerte), danach beginnt die nächste Messung, wo dann wieder Werte eingetragen sind.
Ich bedanke mich für sämtliche Unterstützung im voraus!
Als erstes möchte ich mich bei Squipy bedanken, dein Code hat mir sehr geholfen!

Das Einlesen funktioniert soweit, aber ich hänge seit einigen Tagen an dem gleichem Problem fest.
Das Problem: Das Programm kommt mit Pausen nicht zurecht, wo keine Messwerte eingetragen sind. D.h. nur die jeweils erste Messung wird angeschaut, alle danach werden ignoriert. Nach jeder Messung ist 4-5 Sekunden Pause (Pause enthält keine Messwerte), danach beginnt die nächste Messung, wo dann wieder Werte eingetragen sind.
Ich bedanke mich für sämtliche Unterstützung im voraus!
Liebe Leute, eure Arbeit hat mir eine Menge Zeit und Kraft gespart. Ich bin sehr dankbar, dass es euch gibt. Inzwischen hat sich bei Famos das Format geändert, der Code funktioniert nicht wie es sein soll. Die Kanäle wurden in einer Gruppe festgelegt. Es wäre super hilfreich, wenn jemand weiterhelfen könnte. Lieben Dank!
testdatensatz findet ihr hier https://easyupload.io/m/bo95zc
oder hier https://drive.google.com/drive/folders/ ... drive_link
oder hier https://drive.google.com/drive/folders/ ... drive_link