... Ohje, sehe es jetzt erst. An dem Code hat sich inzwischen einiges getan (speichern von single und Multi files, laden aller Daten in einem Ordner). Die Doku ist aber schlecht... Evtl. hilft es trotzdem. Bei Fragen gerne melden:
Code: Alles auswählen
import re
import numpy as np
import os
from dir_operations import get_list_of_files
import datetime
version ='version 0.3'
programm_name = 'wtg Famos Formatter ' + version
def _create_dtype_string(channel,kind):
DATA_TYPES = {
"1": 'ubyte',
"2": 'byte',
"3": 'ushort',
"4": 'int',
"5": 'ulong',
"6": 'long',
"7": 'float',
"8": 'double',
"11": 'bit16',
"12": 'bit48',
}
try:
dtype = DATA_TYPES[channel['decode_'+kind]]
if dtype!= "double" and dtype!= "long" and dtype!= "byte":
dtype+= str((int(channel['BytePerValue_'+kind])*8))
except KeyError:
raise ValueError("Unknown binary format")
return dtype
def _iter_blocks(data):
start = 0
re_block = re.compile(rb'\s*T*\|(\S\S)\s*,\s*(\d+)\s*,\s*(\d+)\s*,')
while start < len(data):
entry = re_block.match(data, start)
if entry is None:
entry = re_block.match(data, start+3)
if entry is None:
raise ValueError("Block expected, no famos format")
else:
start+=3
typ, num, length = entry.groups()
length = int(length)
start = entry.end()
block = data[start:start+length]
blockValue = entry[3]
if start+length == len(data):
length-=1
elif data[start+length] != 59: # ;
raise ValueError(f"';' expected, found {data[start+length]}")
start += length+1
yield typ, num, block, blockValue
def load_channels(path,filt = None, show_print =False):
if show_print == True:
print("load file: ", path)
with open(path, "rb") as binary_file:
# Read the whole file at once
data = binary_file.read()
channels = {}
channel = None
count = -1
for typ, num, block, blockValue in _iter_blocks(data):
#### save information of channel which is for all the same (until******)
if typ == b'CF':
pass
elif typ == b'NL':
pass
elif typ == b'CK':
pass
elif typ == b'NO':
pass
#*********************************************************************
elif typ == b'CG':
channel = {}
channels[count]=channel
count+=1
elif typ == b'CD':
data = block.split(b',')
channel['delta_x'] = float(data[0]) #bug, by filteredChannels in SK auswertung sagt der 1s
elif typ == b'NT':
data = block.split(b',')
day, month, year, hour, minute, second = (d.decode("windows-1252").replace(" ", "0") for d in data)
if int(year) > 2000:# there are two loacations of trigger time (Cb or NT block)
channel['trigger_time'] = f"{year}-{month}-{day} {hour}:{minute}:{second}"
else:
pass
elif typ == b'CC':
pass
elif typ == b'CP':
data = block.split(b',')
try:
if isinstance(channel['decode_value_y'], str):
channel['decode_value_x'] = data[2].decode("windows-1252")
channel['BytePerValue_value_x']= data[1].decode("windows-1252")
except:
channel['decode_value_y'] = data[2].decode("windows-1252")
channel['BytePerValue_value_y']= data[1].decode("windows-1252")
elif typ == b'Cb':
data = block.split(b',')
try:
if isinstance(channel['buffer_size_value_y'], int):
channel['buffer_size_value_x'] = int(data[5].decode("windows-1252"))
channel['buffer_start_value_x']= int(data[4].decode("windows-1252"))
except:
channel['buffer_size_value_y'] = int(data[5].decode("windows-1252"))
channel['buffer_start_value_y']= int(data[4].decode("windows-1252"))
channel['X0'] = float(data[9])
if float(data[10]) > 0: # then NT block have not the information of triggertime
triggertime = float(data[10])+(datetime.datetime(1980,1,1)
- datetime.datetime(1970,1,1)) // datetime.timedelta(seconds=1)-3600 #famos start 1980 but datetime 1970
try:
triggertime = datetime.datetime.fromtimestamp(triggertime).strftime("%Y-%m-%d %H:%M:%S.%f")
except:
triggertime = datetime.datetime.fromtimestamp(triggertime).strftime("%Y-%m-%d %H:%M:%S")
channel['trigger_time'] = triggertime
else:
pass #NT block has the information of trigger time
elif typ == b'ND':
pass
elif typ == b'CN':
data = block.split(b',')
channel['name'] = data[4].decode("windows-1252")
elif typ == b'CC':
pass
elif typ == b'CR':
key = 'unit_y' if not 'unit_y' in channel else 'unit_x'
data = block.split(b',', 6)
channel[key] = data[5].decode("windows-1252")
if "°" in channel[key]:
channel[key] = channel[key].replace("°","grad")
elif "²" in channel[key]:
channel[key] = channel[key].replace("²","2")
#channel[key] = channel[key].replace("Â","")
elif "W/m" in channel[key]:
channel[key] = channel[key].replace("W/m","W/m2")
#channel[key] = channel[key].replace("Â","")
if "Â" in channel[key]:
channel[key] = channel[key].replace("Â","")
key+="_scale-factor"
channel[key] = float(data[1].decode("windows-1252"))
key+="_Offset"
channel[key] = float(data[2].decode("windows-1252"))
elif typ == b'ND':
pass
elif typ == b'CS':
_, data = block.split(b',', 1)
break
for i in channels:
scale = channels[i]['unit_y_scale-factor'] or 1.0
scale_offset = channels[i]['unit_y_scale-factor_Offset'] or 0.0
if 'unit_x' in channels[i]:
dtype = _create_dtype_string(channels[i], "value_x")
scale = channels[i]['unit_x_scale-factor'] or 1.0
scale_offset = channels[i]['unit_x_scale-factor_Offset'] or 0.0
if (channels[i]['unit_x_scale-factor']!=1 or
channels[i]['unit_x_scale-factor_Offset'] !=0 ):
offset = channels[i]['buffer_start_value_x']
count = channels[i]['buffer_size_value_x'] // np.dtype(dtype).itemsize
try:
channels[i]['value_x'] = np.frombuffer(data, dtype=dtype, count=count, offset=offset) * scale +scale_offset
except:
print("hupsi")
else:
offset = channels[i]['buffer_start_value_x']
count = channels[i]['buffer_size_value_x'] // np.dtype(dtype).itemsize
channels[i]['value_x'] = np.frombuffer(data, dtype=dtype, count=count, offset=offset)
offset -= count*np.dtype(dtype).itemsize - offset
scale = channels[i]['unit_y_scale-factor'] or 1.0
dtype = _create_dtype_string(channels[i], "value_y")
if (channels[i]['unit_y_scale-factor']!=1 or
channels[i]['unit_y_scale-factor_Offset'] !=0):
offset = channels[i]['buffer_start_value_y']
count = channels[i]['buffer_size_value_y'] // np.dtype(dtype).itemsize
try:
channels[i]['value_y'] = np.frombuffer(data, dtype=dtype
,count=count, offset=offset) * scale + scale_offset
except:
raise ValueError("hupsi")
else:
offset = channels[i]['buffer_start_value_y']
count = channels[i]['buffer_size_value_y'] // np.dtype(dtype).itemsize
channels[i]['value_y'] = np.frombuffer(data, dtype=dtype, count=count, offset=offset)
#### name the indices to channel names#####
channelsOutput = {}
for i in channels:
try:
channelsOutput[channels[i]['name']] = channels[i]
except:
print("Channel not found: ",i)
return channelsOutput
def save_single_files(channels,outputPath):
if outputPath[len(outputPath)-2:len(outputPath)] != '\\':
outputPath+='\\'
for channel in channels:
print("save: ", channel)
file_info = b"|CF,2,1,1;|CK,1,3,1,1;"
#CG Block |CG,1,KeyLang,AnzahlKomponenten,Feldtyp,Dimension;#
if "value_x" in channels[channel]:
file_info += b"\n|CG,1,5,2,2,2;\n"
else:
file_info += b"\n|CG,1,5,1,1,1;\n"
#CD Block |CD,1,KeyLang,dx,kalibriert,EinheitLang, Einheit, 0,0,0;#
keyLang ="?"
dx = bytes(str(channels[channel]["delta_x"]),'utf-8')
kalibriert = b"1" # 1 = klaibriert, was immer das heißt
try:
EinheitLang = bytes(str(len(channels[channel]['unit_x'])),'utf-8')
Einheit = bytes(channel["unit_x"],'utf-8')
except: ## x Komponente nicht verfügbar --> Muss Zeitachse sein
EinheitLang = b"1"
Einheit =b"s"
info_part = dx +b","+kalibriert+b","+EinheitLang+b","+Einheit + b",0,0,0"
KeyLeng = bytes(str(len(info_part)), 'utf-8')
file_info += b"|CD,1,"+KeyLeng + b"," + info_part +b";\n"
#NT Block |NT,1,KeyLang,Tag,Monat,Jahr,Stunden,Minuten,Sekunden;
try:
keyLang ="?"
Tag = channels[channel]["trigger_time"][8:10]
Monat = channels[channel]["trigger_time"][5:7]
Jahr = channels[channel]["trigger_time"][:4]
Stunden = channels[channel]["trigger_time"][11:13]
Minuten = channels[channel]["trigger_time"][14:16]
Sekunden = channels[channel]["trigger_time"][17:]
info_part = Tag+","+Monat+","+Jahr+","+Stunden+","+Minuten+","+Sekunden
keyLang = str(len(info_part))
info_part = keyLang +","+ info_part
file_info +=bytes("|NT,1,"+info_part+";\n",'utf-8')
#CC Block, y Werte erster komp.index, start der Arrayinfos: |CC,1,KeyLang,KomponentenIndex,AnalogDigital;
file_info += b"|CC,1,3,1,1;\n"
except:
pass #Channel has no trigger time
#CP Block, |CP,1,KeyLang,BufferReferenz,Bytes,Zahlenformat, SignBits , Maske,Offset, DirekteFolgeAnzahl,AbstandBytes;
BufferReferenz ="1" #aus irgendeinen Grund kann für unsere Zwecke immer 1
Bytes = channels[channel]["BytePerValue_value_y"]
Zahlenformat = channels[channel]["decode_value_y"]
SignBits = str(int(Bytes) * 8)
Maske= "0"
Offset = "0"
DirekteFolgeAnzahl ="1"
AbstandBytes = "0"
info_part = (BufferReferenz + "," + Bytes + "," + Zahlenformat + "," +
SignBits + "," + Maske + "," + Offset +","+ DirekteFolgeAnzahl
+ "," + AbstandBytes)
file_info += bytes("|CP,1,"+ str(len(info_part))+","+info_part+";\n",'utf-8')
#Cb Block |Cb,1,KeyLang,AnzahlBufferInKey,BytesInUserInfo, BufferReferenz, IndexSamplesKey, OffsetBufferInSamplesKey, BufferLangBytes, OffsetFirstSampleInBuffer, BufferFilledBytes, 0, X0, AddZeit, UserInfo, NeuesEvent, [BufferReferenz, ... ];
AnzahlBufferInKey = "1"
BytesInUserInfo ="0"
BufferReferenz = "1"
IndexSampleKey = "1"
OffsetBufferInSamplesKey = "0" #singlefiles, y value comes at first
if channels[channel]["decode_value_y"] == "4":
scale = channels[channel]["unit_y_scale-factor"]
offset = channels[channel]["unit_y_scale-factor_Offset"]
y = np.copy(channels[channel]["value_y"]) - offset
channels[channel]["value_y"] = y
channels[channel]["value_y"] *= 1/scale
channels[channel]["value_y"] = channels[channel]["value_y"].astype("int16")
BufferLangBytes = str(channels[channel]['value_y'].itemsize*channels[channel]['value_y'].size)
OffsetFirstSampleInBuffer = OffsetBufferInSamplesKey
BufferFilledBytes = BufferLangBytes
x0 = str(channels[channel]["X0"])
AddZeit = "0"
UserInfo = "0"
ref = ""
info_part = (AnzahlBufferInKey+","+BytesInUserInfo+","
+BufferReferenz+","+ IndexSampleKey +","+OffsetBufferInSamplesKey
+","+BufferLangBytes+","+OffsetFirstSampleInBuffer+","+BufferFilledBytes+","+AddZeit+","+x0+","+UserInfo+","+ref)
info_part = "|Cb,1,"+str(len(info_part))+","+ info_part +";\n"
file_info+=bytes(info_part,'utf-8')
#CR Block |CR,1,KeyLang,Transformieren,Faktor,Offset,Kalibriert,EinheitLang, Einheit;
if (channels[channel]["unit_y_scale-factor"] !=1 or channels[channel]["unit_y_scale-factor"] !=0 and
channels[channel]["unit_y_scale-factor_Offset"] !=0):
Transformieren = "1"
else:
Transformieren = "0"
Faktor = str(channels[channel]["unit_y_scale-factor"])
Offset = str(channels[channel]["unit_y_scale-factor_Offset"])
Kalibriert ="1"
EinheitLang = str(len(channels[channel]["unit_y"]))
Einheit = channels[channel]["unit_y"]
info_part = Transformieren +","+Faktor+","+Offset+","+Kalibriert+","+EinheitLang+","+Einheit
file_info += bytes("|CR,1," + str(len(info_part)) + ","+info_part+ ";\n","utf-8")
#CN Block, |CN,1,KeyLang,IndexGruppe,0,IndexBit,NameLang,Name,KommLang,Kommentar;
IndexGruppe = "0"
none_ = "0"
IndexBit = "0"
NameLang = str(len(channels[channel]["name"]))
Name = channels[channel]["name"]
KommLang = "0"
Kommentar = ""
info_part = IndexGruppe +","+none_+","+IndexBit+","+NameLang+","+Name+","+KommLang+","+Kommentar
file_info += bytes("|CN,1," + str(len(info_part)) + "," + info_part +";\n","utf-8")
# x values
if "value_x" in channels[channel]: #XY datensatz
##CC Block
file_info += b"|CC,1,3,2,1;\n"
#CP Block, |CP,1,KeyLang,BufferReferenz,Bytes,Zahlenformat, SignBits , Maske,Offset, DirekteFolgeAnzahl,AbstandBytes;
BufferReferenz ="2"
Bytes = channels[channel]["BytePerValue_value_x"]
Zahlenformat = channels[channel]["decode_value_x"]
SignBits = str(int(Bytes) * 8)
Maske= "0"
Offset = "0"
DirekteFolgeAnzahl ="1"
AbstandBytes = "0"
info_part = (BufferReferenz + "," + Bytes + "," + Zahlenformat + "," +
SignBits + "," + Maske + "," + Offset +","+ DirekteFolgeAnzahl
+ "," + AbstandBytes)
file_info += bytes("|CP,1,"+ str(len(info_part))+","+info_part+";\n",'utf-8')
#Cb Block, |Cb,1,KeyLang,AnzahlBufferInKey,BytesInUserInfo, BufferReferenz, IndexSamplesKey, OffsetBufferInSamplesKey, BufferLangBytes, OffsetFirstSampleInBuffer, BufferFilledBytes, 0, X0, AddZeit, UserInfo, NeuesEvent, [BufferReferenz, ... ];
AnzahlBufferInKey = "1"
BytesInUserInfo ="0"
BufferReferenz = "2"
IndexSampleKey = "1"
OffsetBufferInSamplesKey = BufferLangBytes #singlefiles, x value comes after y values
BufferLangBytes = str(channels[channel]['value_x'].itemsize*channels[channel]['value_x'].size)
OffsetFirstSampleInBuffer = "0"
BufferFilledBytes = BufferLangBytes
x0 = str(channels[channel]["X0"])
AddZeit = "1"
UserInfo = "0"
ref = ""
info_part = (AnzahlBufferInKey+","+BytesInUserInfo+","
+BufferReferenz+","+ IndexSampleKey +","+OffsetBufferInSamplesKey
+","+BufferLangBytes+","+OffsetFirstSampleInBuffer+","+BufferFilledBytes+","+AddZeit+","+x0+","+UserInfo+","+ref)
info_part = "|Cb,1,"+str(len(info_part))+","+ info_part +";\n"
file_info+=bytes(info_part,'utf-8')
#CR Block, |CR,1,KeyLang,Transformieren,Faktor,Offset,Kalibriert,EinheitLang, Einheit;
if (channels[channel]["unit_x_scale-factor"] !=1 or channels[channel]["unit_x_scale-factor"] !=0 and
channels[channel]["unit_x_scale-factor_Offset"] !=0):
Transformieren = "1"
else:
Transformieren = "0"
Faktor = str(channels[channel]["unit_x_scale-factor"])
Offset = str(channels[channel]["unit_x_scale-factor_Offset"])
Kalibriert ="1"
EinheitLang = str(len(channels[channel]["unit_x"]))
Einheit = channels[channel]["unit_x"]
info_part = Transformieren +","+Faktor+","+Offset+","+Kalibriert+","+EinheitLang+","+Einheit
file_info += bytes("|CR,1," + str(len(info_part)) + ","+info_part+ ";\n","utf-8")
#CS Block, |CS,1,KeyLang,Index,Rohdaten;
Index =b"1"
Rohdaten = channels[channel]['value_y'].tobytes()
if "value_x" in channels[channel]: #XY datensatz:
Rohdaten += channels[channel]['value_x'].tobytes()
info_part = Index+b","+Rohdaten
file_info += bytes("|CS,1,"+str(len(info_part))+",","utf-8")+Index + b"," + Rohdaten + b";"
####save single file#####
filename = channels[channel]['name']+'.dat'
try:
# Create target Directory
os.mkdir(outputPath)
except:
pass
path = outputPath+filename
file=open(path,'wb+')
file.write(file_info)
file.close()
del channels
def save_multi_files(channels,outputPath, filename):
if outputPath[len(outputPath)-2:len(outputPath)] != '\\':
outputPath+='\\'
file_info = b"|CF,2,1,1;|CK,1,3,1,1;\n"
i=1
BufferReferenz_count = 1
OffsetBufferInSamplesKey_int = 0
for channel in channels:
print("write: ", i, " of ", len(channels), "channels")
#CG Block |CG,1,KeyLang,AnzahlKomponenten,Feldtyp,Dimension;#
if "value_x" in channels[channel]:
file_info += b"|CG,1,5,2,2,2;\n"
else:
file_info += b"|CG,1,5,1,1,1;\n"
#CD Block |CD,1,KeyLang,dx,kalibriert,EinheitLang, Einheit, 0,0,0;#
keyLang ="?"
dx = bytes(str(channels[channel]["delta_x"]),'utf-8')
kalibriert = b"1" # 1 = klaibriert, was immer das heißt
try:
EinheitLang = bytes(str(len(channels[channel]['unit_x'])),'utf-8')
Einheit = bytes(channel["unit_x"],'utf-8')
except: ## x Komponente nicht verfügbar --> Muss Zeitachse sein
EinheitLang = b"1"
Einheit =b"s"
info_part = dx +b","+kalibriert+b","+EinheitLang+b","+Einheit + b",0,0,0"
KeyLeng = bytes(str(len(info_part)), 'utf-8')
file_info += b"|CD,1,"+KeyLeng + b"," + info_part +b";\n"
#NT Block |NT,1,KeyLang,Tag,Monat,Jahr,Stunden,Minuten,Sekunden;
try:
keyLang ="?"
Tag = channels[channel]["trigger_time"][8:10]
Monat = channels[channel]["trigger_time"][5:7]
Jahr = channels[channel]["trigger_time"][:4]
Stunden = channels[channel]["trigger_time"][11:13]
Minuten = channels[channel]["trigger_time"][14:16]
Sekunden = channels[channel]["trigger_time"][17:]
info_part = Tag+","+Monat+","+Jahr+","+Stunden+","+Minuten+","+Sekunden
keyLang = str(len(info_part))
info_part = keyLang +","+ info_part
file_info +=bytes("|NT,1,"+info_part+";\n",'utf-8')
#CC Block, y Werte erster komp.index, start der Arrayinfos: |CC,1,KeyLang,KomponentenIndex,AnalogDigital;
file_info += b"|CC,1,3,1,1;\n"
except:
pass #Channel has no trigger time
#CP Block, |CP,1,KeyLang,BufferReferenz,Bytes,Zahlenformat, SignBits , Maske,Offset, DirekteFolgeAnzahl,AbstandBytes;
BufferReferenz =str(BufferReferenz_count) #wievielter Kanal
Bytes = channels[channel]["BytePerValue_value_y"]
Zahlenformat = channels[channel]["decode_value_y"]
SignBits = str(int(Bytes) * 8)
Maske= "0"
Offset = "0"
DirekteFolgeAnzahl ="1"
AbstandBytes = "0"
info_part = (BufferReferenz + "," + Bytes + "," + Zahlenformat + "," +
SignBits + "," + Maske + "," + Offset +","+ DirekteFolgeAnzahl
+ "," + AbstandBytes)
file_info += bytes("|CP,1,"+ str(len(info_part))+","+info_part+";\n",'utf-8')
#Cb Block |Cb,1,KeyLang,AnzahlBufferInKey,BytesInUserInfo, BufferReferenz, IndexSamplesKey, OffsetBufferInSamplesKey, BufferLangBytes, OffsetFirstSampleInBuffer, BufferFilledBytes, 0, X0, AddZeit, UserInfo, NeuesEvent, [BufferReferenz, ... ];
AnzahlBufferInKey = "1"
BytesInUserInfo ="0"
BufferReferenz = str(BufferReferenz_count) #wievielter Kanal
IndexSampleKey = "1"
OffsetBufferInSamplesKey = str(OffsetBufferInSamplesKey_int) # y value comes at first
if channels[channel]["decode_value_y"] == "4":
scale = channels[channel]["unit_y_scale-factor"]
offset = channels[channel]["unit_y_scale-factor_Offset"]
y = np.copy(channels[channel]["value_y"]) - offset
channels[channel]["value_y"] =y
channels[channel]["value_y"] *= 1/scale
channels[channel]["value_y"] = channels[channel]["value_y"].astype("int16")
BufferLangBytes = channels[channel]['value_y'].itemsize*channels[channel]['value_y'].size
OffsetBufferInSamplesKey_int+=BufferLangBytes
OffsetFirstSampleInBuffer = "0"
BufferFilledBytes = BufferLangBytes
x0 = str(channels[channel]["X0"])
AddZeit = "0"
UserInfo = "0"
ref = ""
info_part = (AnzahlBufferInKey+","+BytesInUserInfo+","
+BufferReferenz+","+ IndexSampleKey +","+OffsetBufferInSamplesKey
+","+str(BufferLangBytes)+","+OffsetFirstSampleInBuffer+","+str(BufferFilledBytes)+","+AddZeit+","+x0+","+UserInfo+","+ref)
info_part = "|Cb,1,"+str(len(info_part))+","+ info_part +";\n"
file_info+=bytes(info_part,'utf-8')
#CR Block |CR,1,KeyLang,Transformieren,Faktor,Offset,Kalibriert,EinheitLang, Einheit;
if (channels[channel]["unit_y_scale-factor"] !=1 or channels[channel]["unit_y_scale-factor"] !=0 and
channels[channel]["unit_y_scale-factor_Offset"] !=0):
Transformieren = "1"
else:
Transformieren = "0"
Faktor = str(channels[channel]["unit_y_scale-factor"])
Offset = str(channels[channel]["unit_y_scale-factor_Offset"])
Kalibriert ="1"
EinheitLang = str(len(channels[channel]["unit_y"]))
Einheit = channels[channel]["unit_y"]
if "W/m" in Einheit:
Einheit = Einheit.replace("Â","")
info_part = Transformieren +","+Faktor+","+Offset+","+Kalibriert+","+EinheitLang+","+Einheit
file_info += bytes("|CR,1," + str(len(info_part)) + ","+info_part+ ";\n","utf-8")
#CN Block, |CN,1,KeyLang,IndexGruppe,0,IndexBit,NameLang,Name,KommLang,Kommentar;
IndexGruppe = "0"
none_ = "0"
IndexBit = "0"
NameLang = str(len(channels[channel]["name"]))
Name = channels[channel]["name"]
KommLang = "0"
Kommentar = ""
info_part = IndexGruppe +","+none_+","+IndexBit+","+NameLang+","+Name+","+KommLang+","+Kommentar
file_info += bytes("|CN,1," + str(len(info_part)) + "," + info_part +";\n","utf-8")
# x values
if "value_x" in channels[channel]: #XY datensatz
BufferReferenz_count +=1
##CC Block
file_info += b"|CC,1,3,2,1;\n"
#CP Block, |CP,1,KeyLang,BufferReferenz,Bytes,Zahlenformat, SignBits , Maske,Offset, DirekteFolgeAnzahl,AbstandBytes;
BufferReferenz =str(BufferReferenz_count)
Bytes = str(channels[channel]["value_x"].itemsize)
if Bytes =="8":
Zahlenformat = "8"
else:
Zahlenformat = channels[channel]["decode_value_x"]
SignBits = str(int(Bytes) * 8)
Maske= "0"
Offset = "0"
DirekteFolgeAnzahl ="1"
AbstandBytes = "0"
info_part = (BufferReferenz + "," + Bytes + "," + Zahlenformat + "," +
SignBits + "," + Maske + "," + Offset +","+ DirekteFolgeAnzahl
+ "," + AbstandBytes)
file_info += bytes("|CP,1,"+ str(len(info_part))+","+info_part+";\n",'utf-8')
#Cb Block, |Cb,1,KeyLang,AnzahlBufferInKey,BytesInUserInfo, BufferReferenz, IndexSamplesKey, OffsetBufferInSamplesKey, BufferLangBytes, OffsetFirstSampleInBuffer, BufferFilledBytes, 0, X0, AddZeit, UserInfo, NeuesEvent, [BufferReferenz, ... ];
AnzahlBufferInKey = "1"
BytesInUserInfo ="0"
BufferReferenz = str(BufferReferenz_count)
IndexSampleKey = "1"
OffsetBufferInSamplesKey = str(OffsetBufferInSamplesKey_int) #singlefiles, x value comes after y values
BufferLangBytes = channels[channel]['value_x'].itemsize*channels[channel]['value_x'].size
OffsetBufferInSamplesKey_int+=BufferLangBytes
OffsetFirstSampleInBuffer = "0"
BufferFilledBytes = BufferLangBytes
x0 = str(channels[channel]["X0"])
AddZeit = "1"
UserInfo = "0"
ref = ""
info_part = (AnzahlBufferInKey+","+BytesInUserInfo+","
+BufferReferenz+","+ IndexSampleKey +","+OffsetBufferInSamplesKey
+","+str(BufferLangBytes)+","+OffsetFirstSampleInBuffer+","+str(BufferFilledBytes)+","+AddZeit+","+x0+","+UserInfo+","+ref)
info_part = "|Cb,1,"+str(len(info_part))+","+ info_part +";\n"
file_info+=bytes(info_part,'utf-8')
#CR Block, |CR,1,KeyLang,Transformieren,Faktor,Offset,Kalibriert,EinheitLang, Einheit;
if "unit_x_scale-factor" in channels[channel]:
if (channels[channel]["unit_x_scale-factor"] !=1 or channels[channel]["unit_x_scale-factor"] !=0 and
channels[channel]["unit_x_scale-factor_Offset"] !=0):
Transformieren = "1"
else:
Transformieren = "0"
Faktor = str(channels[channel]["unit_x_scale-factor"])
Offset = str(channels[channel]["unit_x_scale-factor_Offset"])
Kalibriert ="1"
else:
Faktor = "1"
Offset = "0"
Kalibriert ="1"
EinheitLang = str(len(channels[channel]["unit_x"]))
Einheit = channels[channel]["unit_x"]
info_part = Transformieren +","+Faktor+","+Offset+","+Kalibriert+","+EinheitLang+","+Einheit
file_info += bytes("|CR,1," + str(len(info_part)) + ","+info_part+ ";\n","utf-8")
#CS Block, |CS,1,KeyLang,Index,Rohdaten;
try:
Rohdaten += channels[channel]['value_y'].tobytes()
except:
Rohdaten = channels[channel]['value_y'].tobytes()
if "value_x" in channels[channel]: #XY datensatz:
Rohdaten += channels[channel]['value_x'].tobytes()
i+=1
BufferReferenz_count+=1
Index =b"1"
info_part = Index+b","+Rohdaten
file_info += bytes("|CS,1,"+str(len(info_part))+",","utf-8")+Index + b"," + Rohdaten + b";"
####save multi file#####
if not ".dat" in filename:
filename += ".dat"
try:
# Create target Directory
os.mkdir(outputPath)
except:
pass
path = outputPath+filename
file=open(path,'wb+')
file.write(file_info)
file.close()
del channels
def load_all_rawdata(dirName):
print('load all data in: ', dirName)
dirNameFile = get_list_of_files(dirName)
channels = {}
for path in dirNameFile:
if '.dat' in path or '.raw' in path:
try:
channels.update(load_channels(path))
except:
raise ValueError("couldn't load "+ path)
else:
pass
return channels
def load_specific_rawdata(dirName, channelName):
dirNameFile = get_list_of_files(dirName)
listOfFile = os.listdir(dirName)
channels = {}
for path, name in zip(dirNameFile, listOfFile):
if channelName.lower() == name[:len(name)-4].lower():
channels.update(load_channels(path))
return channels