Einlesen von IMC Famos (.dat, .raw) daten

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
Squipy
User
Beiträge: 39
Registriert: Sonntag 30. Juni 2019, 16:42

Hallo Zusammen,
ich bin recht neu im Bereich Python unterwegs und habe großes Interesse, es für einige auswertungen und plotten von Messdaten zu verwenden. Allerdings gibt es offensichtlich noch keine Funktion, die das Einlesen von Famos dateien ermöglicht.
Ich bin bei der Suche danach jedoch auf MathLab Funktionen gestoßen (importieren oder Lesen), die genau das machen soll.

Meine Frage an euch ist nun, ob jemand mal drüber gucken kann und mir sagen kann, ob man es auch umschreiben kann für Python. Mir fehlt dafür leider noch etwas die Erfahrung und wenn es nicht funktioniert, kann ich mir erstmal die Zeit sparen um mich dort einzuarbeiten und muss evtl. doch auf MathLab umsteigen :/

Besten Dank und Gruß aus dem Rheinland!

Den Code zum Importieren von Famos daten mittel MathLab wäre dieser hier:

Code: Alles auswählen

function [dataOut]=FAMOSimport(filename)
% Usage: data=FAMOSimport(filename);
% 
% FAMOSimport() opens (MARC generated) FAMOS files and imports all signals.
% 
% ************************************************************************* 
% 
%


% Preset output to empty;
dataOut=[];


%% Check for valid input file
if exist('filename','var')~=1 ...
   || isempty(filename)
    [filename, pathname] = uigetfile( ...
        {'*.dat','FAMOS measurement files'; ...
         '*.*','All files'},'Select FAMOS measurement file ...');
     if isequal(filename,0)
         disp('FAMOS-measurement file import cancelled.');
         return;
     end
     filename=fullfile(pathname, filename);
     clear pathname;
end
if exist(filename,'file')~=2
    disp('Given file could not be found. Aborting import.');
    return;
end

%% Load input file 
fid=fopen(filename,'r','l');
data=fread(fid,inf,'uint8=>char','l')';
fclose(fid);
clear fid 


%% Parse header information
dataOut.FileName=filename;
dataOut.TYPE='AFT MARC (FAMOS)';
header=strfind(data(1:200),[char(13) char(10) '|NO,']);
subIdx=strfind(data(header(1):header(1)+50),',');
dataOut.Device=strtrim(data(header(1)+subIdx(5):header(1)+subIdx(6)-2));

%% Parse measurement entries information
units=strfind(data,[char(13) char(10) '|CR,'])';
dataOut.Unit=cell(size(units));
dataOut.Factor=zeros(size(dataOut.Unit),'double');
dataOut.Offset=zeros(size(dataOut.Unit),'double');
for i=1:length(units)
    subIdx=sort([strfind(data(units(i):units(i)+255),',') ...
                strfind(data(units(i):units(i)+255),';')]);
	dataOut.Factor(i)=str2double( ...
        data(units(i)+subIdx(4):units(i)+subIdx(5)-2));
	dataOut.Offset(i)=str2double( ...
        data(units(i)+subIdx(5):units(i)+subIdx(6)-2));
    dataOut.Unit{i}=data(units(i)+subIdx(8):units(i)+subIdx(9)-2);
    
end
clear units subIdx;

%Extract measurement variables names and corresponding time column
varName=strfind(data,[char(13) char(10) '|CN,'])';
dataOut.TimeIndex=zeros(size(varName),'uint32');
dataOut.Label=cellstr(char(zeros(size(dataOut.TimeIndex))));
dataOut.Data=zeros(size(dataOut.TimeIndex),'double');
for i=1:length(varName)
    subIdx=strfind(data(varName(i):varName(i)+255),',');
    dataOut.Label{i}=data(varName(i)+subIdx(7):varName(i)+subIdx(8)-2);
end
for i=1:length(varName)
    subIdx=sort([strfind(data(varName(i):varName(i)+255),',') ...
                   strfind(data(varName(i):varName(i)+255),';')]);
    TimeVarName=data(varName(i)+subIdx(9):varName(i)+subIdx(10)-2);
    if i==1 || ~strcmp(dataOut.TimeIndex(i-1),TimeVarName)
        idx=strmatch(TimeVarName,dataOut.Label,'exact');
        if ~isempty(idx)
            dataOut.TimeIndex(i)=idx(1);
        else
            warning('FAMOSconnect:invalidTimeLabel', ...
                ['Signal ''%s'' (%d) refers to non-existing ' ...
                'time signal label ''%s''.'], ...
                dataOut.Label{i},i,TimeVarName);    
        end
    else
        dataOut.TimeIndex(i)=dataOut.TimeIndex(i-1);
    end
end
clear varName TimeVarName subIdx;

%Extract measurement value data type and bitlength
dataType=strfind(data,[char(13) char(10) '|CP,1,'])';
dataOut.DataType=cell(size(dataType));
dataOut.DataBits=zeros(size(dataOut.DataType),'uint8');
for i=1:length(dataType)
    subIdx=strfind(data(dataType(i):dataType(i)+50),',');
    switch (data(dataType(i)+subIdx(5):dataType(i)+subIdx(6)-2))
        case '1'                        % uint8
            dataOut.DataType{i}='uint8';
        case '2'                        % int8
            dataOut.DataType{i}='int8';
        case {'3','9','11'}             % uint16
            dataOut.DataType{i}='uint16';
        case '4'                        % int16
            dataOut.DataType{i}='int16';
        case '5'                        % uint32
            dataOut.DataType{i}='uint32';
        case '6'                        % int32
            dataOut.DataType{i}='int32';
        case '7'                        % float
            dataOut.DataType{i}='single';
        case {'8','10','13'}            % double
            dataOut.DataType{i}='double';
        otherwise
            dataOut.DataType{i}='UNKNOWN';
    end 
    dataOut.DataBits(i)=str2double( ...
        data(dataType(i)+subIdx(6):dataType(i)+subIdx(7)-2));
end
clear dataType subIdx;

%Extract measurement value data block and item number
dataInfo=strfind(data,[char(13) char(10) '|Cb,1,'])';
dataOut.DataBlock=zeros(size(dataInfo),'uint16');
dataOut.DataItem=zeros(size(dataOut.DataBlock),'uint16');
for i=1:length(dataInfo)
    subIdx=strfind(data(dataInfo(i):dataInfo(i)+50),',');
    dataOut.DataItem(i)=str2double( ...
        data(dataInfo(i)+subIdx(5):dataInfo(i)+subIdx(6)-2));
    dataOut.DataBlock(i)=str2double( ...
        data(dataInfo(i)+subIdx(6):dataInfo(i)+subIdx(7)-2));
end
clear dataInfo subIdx;


%Extract measurement value binary data length and offset
dataBlock=strfind(data,[char(13) char(10) '|CS,'])';
dataOut.DataBlocks=length(dataBlock);
dataOut.DataBlocksLength=zeros(size(dataBlock),'uint32');
dataOut.DataBlocksItemLength=zeros(size(dataBlock),'uint32');
dataOut.DataBlocksOffset=zeros(size(dataBlock),'uint32');
for i=1:length(dataBlock)
    subIdx=strfind(data(dataBlock(i):dataBlock(i)+50),',');
    
    dataOut.DataBlocksOffset(i)=dataBlock(i)+subIdx(4)-1;
    dataOut.DataBlocksLength(i)=str2double( ...
        data(dataBlock(i)+subIdx(2):dataBlock(i)+subIdx(3)-2)) ...
        -(subIdx(4)-subIdx(3));                                 %Fix offset
    dataOut.DataBlocksItemLength(i)=dataOut.DataBlocksLength(i) ...
        /(sum(dataOut.DataBits(dataOut.DataBlock==i)/8));
end
clear dataBlock subIdx;


%% Sort entries - note: DataItem-value continues over blocks.
[~, dataOrder]=sort(dataOut.DataItem);
dataOutField=fieldnames(dataOut);
for i=1:length(dataOutField)
    if length(dataOut.(dataOutField{i}))==length(dataOut.DataItem) ...
       && ~strcmp(dataOutField{i},'DataBlocksLength')
        dataOut.(dataOutField{i})=dataOut.(dataOutField{i})(dataOrder);
    end
end
clear dataOrder dataOutField;


%% Extract measurement data, format: shots-aligned (not variables aligned)
data=cast(data,'uint8');
dataOut.Data=cell(length(dataOut.DataItem),1);
dataBlockId=1;
dataOffset=dataOut.DataBlocksOffset(dataBlockId);
dataVarIdx1=uint32(1: ...
                   dataOut.DataBlocksLength(dataBlockId) ...
                   /dataOut.DataBlocksItemLength(dataBlockId): ...
                   dataOut.DataBlocksLength(dataBlockId));
dataVarIdx2=reshape([dataVarIdx1;    dataVarIdx1+1],1,[]);
dataVarIdx4=reshape([dataVarIdx1;    dataVarIdx1+1; ...
                     dataVarIdx1+2;  dataVarIdx1+3],1,[]);
for i=1:length(dataOut.Label)
    if dataOut.DataBlock(i)>dataBlockId
        dataBlockId=dataOut.DataBlock(i);
        dataOffset=dataOut.DataBlocksOffset(dataBlockId);
        dataVarIdx1=uint32(1: ...
                           dataOut.DataBlocksLength(dataBlockId) ...
                           /dataOut.DataBlocksItemLength(dataBlockId): ...
                           dataOut.DataBlocksLength(dataBlockId));
        dataVarIdx2=reshape([dataVarIdx1;    dataVarIdx1+1],1,[]);
        dataVarIdx4=reshape([dataVarIdx1;    dataVarIdx1+1; ...
                             dataVarIdx1+2;  dataVarIdx1+3],1,[]);
    end
    switch dataOut.DataBits(i)
        case 8
            dataVal=cast(typecast(data(dataVarIdx1+dataOffset),...
                             dataOut.DataType{i}),'double');
            dataOffset=dataOffset+1;
        case 16
            dataVal=cast(typecast(data(dataVarIdx2+dataOffset),...
                             dataOut.DataType{i}),'double');
            dataOffset=dataOffset+2;
        case 32
            dataVal=cast(typecast(data(dataVarIdx4+dataOffset),...
                             dataOut.DataType{i}),'double');
            dataOffset=dataOffset+4;
        otherwise
            fprintf(2,['Unsupported data width in item %d:' ...
                       '%d Bits - Skipping.\n'], ...
                      dataOut.DataItem(i),dataOut.DataBits(i));
            dataOffset=dataOut.DataBits(i)/8;
            continue;
    end
    dataVal=dataVal*dataOut.Factor(i)+dataOut.Offset(i);
    dataOut.Data{i}=dataVal';
end
clear dataOffset dataBlockId dataVarIdx1 dataVarIdx2 dataVarIdx4 dataVal; 
clear i data;
__deets__
User
Beiträge: 14545
Registriert: Mittwoch 14. Oktober 2015, 14:29

Das kann man sicher auch nach Python konvertieren.
Benutzeravatar
ThomasL
User
Beiträge: 1377
Registriert: Montag 14. Mai 2018, 14:44
Wohnort: Kreis Unna NRW

Die Frage ist, hast du nur Zugriff auf die Dateien in dem Format oder Zugriff auf die IMC Software.
Wenn letzteres, kann diese Dateien auch in ASCII oder Excel Format speichern.
https://en.wikipedia.org/wiki/Imc_FAMOS
Ich bin Pazifist und greife niemanden an, auch nicht mit Worten.
Für alle meine Code Beispiele gilt: "There is always a better way."
https://projecteuler.net/profile/Brotherluii.png
Squipy
User
Beiträge: 39
Registriert: Sonntag 30. Juni 2019, 16:42

Danke für eure Antworten,
ich habe zwar Zugriff auf die Software, allerdings steht es außer Frage, dass ich mit was anderem als .dat Dateien arbeite. Andere Software ist darauf ausgelegt und leider sind ASCII Dateien wesentlich größer.
__deets__
User
Beiträge: 14545
Registriert: Mittwoch 14. Oktober 2015, 14:29

Wie gesagt - gehen tuts. Machen musst du es selbst, und dir dafür genug Python raufschaffen.
Sirius3
User
Beiträge: 18216
Registriert: Sonntag 21. Oktober 2012, 17:20

@Squipy: das was sich da DAT nennt, ist ja auch ASCII. Und mit ein bißchen Pythonkenntnissen auch nicht schwer zu entschlüsseln.
Squipy
User
Beiträge: 39
Registriert: Sonntag 30. Juni 2019, 16:42

Hi Zusammen,
wenn ich mir eine kleinen Beispieldatei ansehe, sind die ersten Zeilen nicht schwer zu identifizieren

Code: Alles auswählen

|CF,2,1,1;|CK,1,3,1,1;
|Nv,1,13,7,1,14,32,0,0;
|NO,1,7,1,0,,0,;
|NL,1,10,1252,0x409;
|CG,1,5,2,3,2;
|CD,1,13,1,1,1,s,0,0,0;
...
Das kann ich mir auch aus dem MatLab code herauslesen, was, welche Angabe ist. Allerdings sehen die tatsächlichen Messdaten (Zeitstempel und Messwert) so aus:
Bild

Kann mir da evtl. jemand auf die Sprünge helfen?
Danke!
Sirius3
User
Beiträge: 18216
Registriert: Sonntag 21. Oktober 2012, 17:20

Das sind binär codierte Doubles. Dafür gibts entweder das `struct`-Modul, oder Du benutzt `numpy.frombuffer` oder ähnliches.
Squipy
User
Beiträge: 39
Registriert: Sonntag 30. Juni 2019, 16:42

Danke für den Hinweis.

Ist es möglich die binär codierten Daten zu entpacken, wenn man die größe nicht kennt? bspw. die unpack funktion erwartet einen definerten buffer, wenn ich es richtig verstanden habe. Das heißt ich erhalte immer den error:
unpack requires a buffer of 32 bytes
Ist es möglich diese Daten auch ohne Information der Größe zu decodieren?

Die Datei ist etwa so aufgebaut, dass zu anfang strings Informationen zu den Messdaten enthalten sind und dann die binär codierten Zeitdaten kommen. Die Beschreibung dafür, habe ich gefunden und ich kann mir dementsprechend die für mich nötigen Informationen herausziehen. Das würde ich gerne alles in einem dict speichern, was auch bis zu den codierten doubles recht gut klappt. Die Dateistruktur sieht wie folgt aus:

Code: Alles auswählen

|CF,2,1,1;|CK,1,3,1,1;
|Nv,1,13,7,1,14,32,0,0;
|NO,1,7,1,0,,0,;
|NL,1,10,1252,0x409;
|CG,1,5,2,3,2;
|CD,1,13,1,1,1,s,0,0,0;
|NT,1,27,26, 8,2018,23,38,16.6099999;
|CC,1,3,1,1;
|CP,1,16,1,8,8,64,0,0,1,0;
|Cb,1,28,1,0,1,1,0,4808,0,4808,1,0,0,;
|CR,1,11,0,0,0,1,1,A;
|CN,1,29,0,0,0,17,AI_CuRotorCurrent,0,;
|CC,1,3,2,1;
|CP,1,16,2,8,8,64,0,0,1,0;
|Cb,1,31,1,0,2,1,4808,4808,0,4808,1,0,0,;
|CR,1,11,0,0,0,1,1,s;
|CS,1,9618,1,   `Íý…@   àtÄ…@   À’3†@   `Û6†@   àY†@   À˜†@    •˜†@    ò†@    ÷‡@    ~‡@   €ª‡@    ã‡@    `/ˆ@   `TXˆ@    —&ˆ@    º&ˆ@   ÀjYˆ@   à/*ˆ@   `ˆ@    cˆ@     'ˆ@   `oˆ@   `Wù‡@   Àä.ˆ@   `v±‡@   @¶‡@   ଦ‡@   àðH‡@   €Õì†@   €å¦†@   ๆ@   @[y†@   @k~†@   `x’†@   `u†@    H…†@   À#І@    “܆@   €B£†@   `‡¥†@    pþ†@   `›Ë†@    ¢¼†@   @U‡@    ‡@    Õ3‡@   @¯H‡@   À虇@   àîw‡@    !º‡@    ­‡@    `‡@   €9T‡@     3‡@   àÜu‡@    (:‡@    ‘*‡@    ±Y‡@   à‡@   ÀíI‡@    3-‡@   @ñ‡@    {‡@   àzð†@   @Kí†@    
‡@    7‡@   @‡@   à–‡@ ... usw. 
Besten Dank!
Sirius3
User
Beiträge: 18216
Registriert: Sonntag 21. Oktober 2012, 17:20

Wenn Du eine Beschreibung hast, dann sollte darin ja auch irgendwo vorkommen, wie viele Bytes bzw. Doubles dort binär gespeichert sind. Diese Anzahl mußt Du einfach nur in ein bytes-Objekt laden und decodieren. Es könnten ja vielleicht die 9618 sein?
Woher hast Du denn die Fehlermeldung mit den 32 Bytes?
Was hast Du bisher schon geschrieben? Wie sieht die Beschreibung aus? Beispieldaten?
Es ist halt schwierig zu helfen, wenn man nur Fragmente hat.
Sirius3
User
Beiträge: 18216
Registriert: Sonntag 21. Oktober 2012, 17:20

Die zweite Zahl scheint immer die Anzahl Bytes danach bis zum ; zu sein, damit läßt sich doch schonmal ein Block-Leser bauen:

Code: Alles auswählen

def iter_blocks(data):
    while data:
        entry = re.search(rb'^\s*\|(\S\S),(\d+),(\d+),', data)
        if entry is None:
            raise ValueError("Block expected")
        typ, num, length = entry.groups()
        length = int(length)
        end = entry.end()
        block = data[end:end+length]
        if data[end+length] != 59: # ;
            raise ValueError(f"';' expected, found {data[end+length]}")
        data = data[end+length+1:]
        yield typ, num, block
Für den Block mit der Kennung CS muß man dann aber noch das 1, wegsplitten, was auch immer die 1 bedeuten mag:

Code: Alles auswählen

for typ, num, block in iter_blocks(data):
    if typ == b'CS':
        _, numbers = block.split(b',', 1)
        numbers = numpy.frombuffer(numbers)
So oder so ähnlich.
Benutzeravatar
sparrow
User
Beiträge: 4501
Registriert: Freitag 17. April 2009, 10:28

Wenn ich den Code aus dem ersten Post anschaue, sollte die 1 für dem Datentyp uint8 stehen. Das deutet auch darauf hin, dass der sich ändern kann.
Squipy
User
Beiträge: 39
Registriert: Sonntag 30. Juni 2019, 16:42

Moin Zusammen,
besten Dank Sirius! Durch dein Besipielcode habe ich immerhin schonmal etwas lauffähiges hinbekommen.
Funktioniert auch soweit ganz gut. Das Problem ist leider nur die Performance...

Bei etwas größeren Dateien ist es sogar so, dass mein Rechner völlig stecken bleibt, ich verstehe aber nicht wieso...

Der Code sieht wie folgt aus:

Code: Alles auswählen

import re
import numpy



path = r"C:\Users\Sascha\Desktop\python_test\testData.DAT"


def load_channels(path):
  
    
    channelunit=dict()
    channeltriggertime=dict()
    channelsamplingrate = dict()
    channelvalue = dict()
    
    
    with open(path, "rb") as binary_file:
        
    # Read the whole file at once
        data = binary_file.read()
        dataBlock = data.split(b";")
    
    #*******************Looking for channel names and units*******************************#
    cnInfo = [s for s in dataBlock if b"|CN," in s]
    crInfo = [s for s in dataBlock if b"|CR," in s]
    crInfo = [s for s in crInfo if not b",s" in s]
    ntInfo = [s for s in dataBlock if b"|NT," in s]
    cdInfo = [s for s in dataBlock if b"|CD," in s]
    
    
    
    if len(cnInfo) == len(crInfo) == len(ntInfo) == len(cdInfo):
        
        for i in range(0,len(cnInfo)):
            
            
            name = cnInfo[i].split(b",")[7].decode("windows-1252")
    
            #**************Determine channel unit****************
            
            channelunit[name] = crInfo[i].split(b",")[8].decode("windows-1252")
            
            ###************Determine trigger time******##
            
            year = ntInfo[i].split(b",")[5].decode("windows-1252")
            
            month = ntInfo[i].split(b",")[4].decode("windows-1252").replace(" ","0")
            
            day = ntInfo[i].split(b",")[3].decode("windows-1252").replace(" ","0")
            
            hour = ntInfo[i].split(b",")[6].decode("windows-1252").replace(" ","0")
            
            minute = ntInfo[i].split(b",")[7].decode("windows-1252").replace(" ","0")
           
            second = ntInfo[i].split(b",")[8].decode("windows-1252").replace(" ","0")
         
            triggerTime = year+"-"+month+"-"+day+" "+hour+":"+minute+ ":"+second
            channeltriggertime[name] = triggerTime
            
            ###************Determine sampling rate******##        
            channelsamplingrate[name] = float(cdInfo[i].split(b",")[3].decode("windows-1252").replace(" ", "0"))
            
    
            
    else:
        raise ValueError("Blocks have not the same number!")
    
    
    
    def iter_blocks(data):
        while data:
            entry = re.search(rb'^\s*\|(\S\S),(\d+),(\d+),', data)
            if entry is None:
                raise ValueError("Block expected")
            typ, num, length = entry.groups()
            length = int(length)
            end = entry.end()
            block = data[end:end+length]
            if data[end+length] != 59: # ;
                raise ValueError(f"';' expected, found {data[end+length]}")
            data = data[end+length+1:]
            yield typ, num, block
    
    
    for typ, num, block in iter_blocks(data):
        
        if typ == b'CS':
            
            for i in range(0,len(cnInfo)):
                z = i+1
                
                _, channel = block.split(b',', 1)
                name = cnInfo[i].split(b",")[7].decode("windows-1252")
                allvalues = numpy.frombuffer(channel, dtype='float32')
                lengthChannel = allvalues.size
                start = i*int(lengthChannel/len(cnInfo))
                end = start + z*int(lengthChannel/len(cnInfo))
                channelvalue[name] = allvalues[start:end]
                            
    return channelvalue, channelunit, channelsamplingrate,channeltriggertime




StatisticsTime=load_channels(path)


Es ist durchaus möglich, dass eine Datei mehrere Kanäle enthält, das kann bspw. dadurch erkannt werden wenn mehrere "|CN" Blöcke enthalten sind.
Wichtige Infos die ich brauche sind in den Blöcken CN, CR, NT und CD enthalten.


Mein Plan ist es mehrere dicts zu erzeugen, die die entsprechenden Informationen ( Name, Einheit, Triggerzeit, Abtastrate und die jeweiligen y-Werte) enthalten die ich benötige. Damit ich diese richtig zuordnen kann, ist der key der dicts immer der jeweilige Kanalname.

Meine Frage ist, warum der Code völlig versagt sobald die Datei etwas größer ist...

Eine Beispieldatei habe ich hier hochgeladen: https://www.file-upload.net/download-13 ... a.DAT.html

Besten Dank und Gruß aus dem Rheinland,
Squipy

PS.
Kurze Erkärung zu den folgenden beiden Zeilen:

Code: Alles auswählen

 
 crInfo = [s for s in dataBlock if b"|CR," in s]
 crInfo = [s for s in crInfo if not b",s" in s]
Damit will ich die Zeitspur bei der Abfrage der Einheit erstmal nicht berücksichtigen, das habe ich so gemacht, weil sonst die Anzahl der dicts sich unterscheidet, weil ein Kanal immer jeweils zwei Spuren hat (Werte- und Zeitspur). Elegant ist es sicher nicht. Evtl. speichere ich auch einen zusätzlichen dict ab, der dann die jeweilige Zeitspur des Kanals enthält.
Sirius3
User
Beiträge: 18216
Registriert: Sonntag 21. Oktober 2012, 17:20

Du verwendest meinen Code ja auch gar nicht, sondern hackst wie wild auf den Daten herum. Wenn das funktioniert, dann ist das Zufall.

Warum verwendest Du `iter_blocks` nicht, um die Blöcke zu lesen?
Wenn Du eine Schleife über einen Index benutzt, dann machst Du in Python etwas falsch.

Um Geschwindigkeit zu gewinnen, hilft es, die selben Daten nicht immer wieder zu verarbeiten, sondern nur einmal.

Statt vier Wörterbücher zu verwalten, die alle die selben Schlüssel haben, solltest Du ein Wörterbuch benutzen, dessen Werte komplexer sind.
Squipy
User
Beiträge: 39
Registriert: Sonntag 30. Juni 2019, 16:42

Naja das Problem ist, das mehrere Blöcke ausgelesen werden müssen, die unterschiedliche Länge haben und jenachdem wieviele Kanäle in der Datei vorhanden sind, müssen Blöcke mit dem selben Namen (bspw. |CN) ausgelesen werden. iterblocks geht davon aus nur einen bestimmten Block zu decodieren, deswegen habe ich das gemacht. Ansosten verwende ich ihn eigentlich ziemlich genau so wie du ihn mir bereitgestellt hast, nur habe ich das Gefühl, dass dieser sehr lange rumrödelt sobald die Datei etwas größer wird :/
Sirius3
User
Beiträge: 18216
Registriert: Sonntag 21. Oktober 2012, 17:20

Wie kommst Du drauf, dass `iter_blocks` nur bestimmte Blöcke decodieren könnte?

Da die Blöcke immer in einer festen Reihenfolge kommen, kann man das ausnutzen. Besser wäre es natürlich, wenn man dafür noch ein paar Tests hinzufügt (das sei dem Leser als Übung überlassen):

Code: Alles auswählen

def iter_blocks(data):
    while data:
        entry = re.search(rb'^\s*\|(\S\S),(\d+),(\d+),', data)
        if entry is None:
            raise ValueError("Block expected")
        typ, num, length = entry.groups()
        length = int(length)
        end = entry.end()
        block = data[end:end+length]
        if data[end+length] != 59: # ;
            raise ValueError(f"';' expected, found {data[end+length]}")
        data = data[end+length+1:]
        yield typ, num, block
    
def load_channels(path):
    with open(path, "rb") as binary_file:
        # Read the whole file at once
        data = binary_file.read()

    channels = []
    channel = None

    for typ, num, block in iter_blocks(data):
        if typ == b'CD':
            # first element a channel definition
            channel = {}
            channels.append(channel)
            data = block.split(b',')
            channel['sampling-rate'] = float(data[0])
        elif typ == b'CR' and not block.endswith(b',s'):
            data = block.split(b',', 6)
            channel['unit'] = data[5].decode("windows-1252")
        elif typ == b'CN':
            data = block.split(b',')
            channel['name'] = data[4].decode("windows-1252")
        elif typ == b'NT':
            data = block.split(b',')
            day, month, year, hour, minute, second = (d.decode("windows-1252").replace(" ", "0") for d in data)
            channel['trigger_time'] = f"{year}-{month}-{day}T{hour}-{minute}-{second}"
        elif typ == b'CS':
            _, data = block.split(b',', 1)
            allvalues = numpy.frombuffer(data, dtype='float64')
            break
    for channel, values in zip(channels, allvalues.reshape(-1, len(channels)).T):
        channel['values'] = values
    return channels
Wenn das noch zu langsam ist, kann man iter_block deutlich beschleunigen, wenn man das Kopieren der Daten vermeidet.
Squipy
User
Beiträge: 39
Registriert: Sonntag 30. Juni 2019, 16:42

Danke Sirius.

Ich habe wohl jetzt erst den Code verstanden. Ich war der Meinung, dass mit

Code: Alles auswählen

entry = re.search(rb'^\s*\|(\S\S),(\d+)\s*\s*,(\d+),', data)
lediglich in einem Block nach zwei Zeichen und zwei Zahlen gesucht wird und alle anderen Blöcke ignoriert.

Wie auch immer, der Code funktioniert mittlerweile gut, ich habe jetzt auch mal die buffer Information pro Kanal im|"Cb" Block verstanden. Dort steckt die Information drin, von und bis welchen Buffer der jeweilige Kanal zu finden ist. Dementsprechend habei ich eine FOR schleife eingefügt, um die richtigen Werte den Kanälen (dicts) zuzuordnen.

Code: Alles auswählen

def iter_blocks(data):
    while data:
        entry = re.search(rb'^\s*\|(\S\S),(\d+)\s*\s*,(\d+),', data)
        if entry is None:
            raise ValueError("Block expected")
        typ, num, length = entry.groups()
        length = int(length)
        end = entry.end()
        block = data[end:end+length]
        if data[end+length] != 59: # ;
            raise ValueError(f"';' expected, found {data[end+length]}")
        data = data[end+length+1:]
        yield typ, num, block
    
def load_channels(path):
    with open(path, "rb") as binary_file:
        # Read the whole file at once
        data = binary_file.read()
    
    channels = []
    channel = None
    count = 0
    for typ, num, block in iter_blocks(data):
        
        if typ == b'CD':
            # first element a channel definition
            channel = {}
            channels.append(channel)
            data = block.split(b',')
            channel['sampling-rate'] = float(data[0])
                     
        elif typ == b'CP':
            data = block.split(b',')
            channel['decode'] = data[2].decode("windows-1252")
            channel['Byte per value']= data[1].decode("windows-1252")
        elif typ == b'CR' and not block.endswith(b',s'):
            data = block.split(b',', 6)
            channel['unit_y'] = data[5].decode("windows-1252")
            count+=1
            print("loading channel: " , count)
        elif typ == b'CR' and block.endswith(b',s'):
            data = block.split(b',', 6)
            channel['unit_x'] = data[5].decode("windows-1252")
        elif typ == b'CN':
            data = block.split(b',')
            channel['name'] = data[4].decode("windows-1252")
        elif typ == b'NT':
            data = block.split(b',')
            day, month, year, hour, minute, second = (d.decode("windows-1252").replace(" ", "0") for d in data)
            channel['trigger_time'] = f"{year}-{month}-{day} {hour}:{minute}:{second}"
        elif typ == b'Cb':
            data = block.split(b',')
            channel['buffer-size'] = int(data[5].decode("windows-1252"))
            channel['buffer-start'] = int(data[4].decode("windows-1252"))
        elif typ == b'CS':
            
            if channel['decode'] == "1":
                dtype= 'ubyte'
            elif channel['decode'] == "2":
                dtype= 'byte'
            elif channel['decode'] == "3":
                dtype= 'ushort'
            elif channel['decode'] == "4":
                dtype= 'short'
            elif channel['decode'] == "5":
                dtype= 'ulong'
            elif channel['decode'] == "6":
                dtype= 'long'
            elif channel['decode'] == "7":
                dtype= 'float'
            elif channel['decode'] == "8":
                dtype= 'double'
            elif channel['decode'] == "11":
                dtype= 'bit16'
            elif channel['decode'] == "11":
                dtype= 'bit48'
            else:            
                raise ValueError("Unknown binary format")
            
            
            _, data = block.split(b',', 1)
            for x in range(0,len(channels)):
                offset = channels[x]['buffer-start']
                if 'value_x' in channels[x]:
                    channels[x]['value_x'] = numpy.frombuffer(data, dtype=dtype, count =count, offset = offset+channels[x]['buffer-size'])
                    channels[x]['buffer-start']= channels[x]['buffer-start']- channel['buffer-size']
                    offset = channels[x]['buffer-start']
                count = int(channels[x]['buffer-size']/int(channels[x]['decode']))                
                channels[x]['value_y'] = numpy.frombuffer(data, dtype=dtype, count =count, offset = offset)
            break
    return channels 
Ich habe noch folgende Probleme zu denen ich bisher keine Lösung gefunden habe:
  • Teilweise kommt es vor, dass in Blöcken Leerzeichen sind, die soviel ich weiß keine Information beeinhalten. Wie bring ich dem regex code bei, diese zu ignorieren?
  • Der 'CP' Block gibt Auskunft über das Zahlenformat welches ich für dtype benötige. Allerdings decodiert der mit bei short variablen keine sinnvollen Werte. Diese Information speichere ich unter channel['decode']
Bild
  • Das was beim Einlesen lange dauert ist die Anzahl der Kanäle und viel weniger die Größe der Datei. Das heißt die Anzahl der Blöcke ist der zeitfressende Faktor oder?
Vielen Dank im voraus!
Sirius3
User
Beiträge: 18216
Registriert: Sonntag 21. Oktober 2012, 17:20

@Squipy: wie schon einmal geschrieben, macht man in Python keine Schleife über einen Index, weil man direkt über die Elemente der Liste iterieren kann. Jetzt heißt der Index sogar `x`, was noch schlechter ist als `i`, weil man bei `x` keinen Index erwartet.

Solche langen if-elif-Ketten löst man in Python mit Wörterbüchern.

Die Behandlung der Daten sieht falsch aus. "value_x" ist nie definiert, also wird der if-Block auch nie betreten. Was darin gemacht wird sieht sehr falsch aus. Warum wird da das `count` des letzten Schleifengangs benutzt, bzw. gar das `count` das die Anzahl der Channels zählt, obwohl das schon durch die Länge der channels-Liste klar wäre.

`decode` enthält eine Nummer, die den Typ beschreibt, nicht die Größe des Typs.

Mal geraten, wie es sein könnte:

Code: Alles auswählen

DATA_TYPES = {
    "1": 'ubyte',
    "2": 'byte',
    "3": 'ushort',
    "4": 'short',
    "5": 'ulong',
    "6": 'long',
    "7": 'float',
    "8": 'double',
    "11": 'bit16',
    "11": 'bit48',
}

def load_channels(path):
    with open(path, "rb") as binary_file:
        # Read the whole file at once
        data = binary_file.read()
    
    channels = []
    channel = None
    for typ, num, block in iter_blocks(data):
        if typ == b'CD':
            # first element a channel definition
            channel = {}
            channels.append(channel)
            data = block.split(b',')
            channel['sampling-rate'] = float(data[0])        
        elif typ == b'CP':
            data = block.split(b',')
            channel['decode'] = data[2].decode("windows-1252")
            channel['Byte per value']= data[1].decode("windows-1252")
        elif typ == b'CR':
            key = 'unit_y' if not block.endswith(b',s') else 'unit_x'
            data = block.split(b',', 6)
            channel[key] = data[5].decode("windows-1252")
        elif typ == b'CN':
            data = block.split(b',')
            channel['name'] = data[4].decode("windows-1252")
        elif typ == b'NT':
            data = block.split(b',')
            day, month, year, hour, minute, second = (d.decode("windows-1252").replace(" ", "0") for d in data)
            channel['trigger_time'] = f"{year}-{month}-{day} {hour}:{minute}:{second}"
        elif typ == b'Cb':
            data = block.split(b',')
            channel['buffer-size'] = int(data[5].decode("windows-1252"))
            channel['buffer-start'] = int(data[4].decode("windows-1252"))
        elif typ == b'CS':
            _, data = block.split(b',', 1)
            break

    for channel in channels:
        try:
            dtype = DATA_TYPES[channel['decode']]
        except KeyError:
            raise ValueError("Unknown binary format")
        offset = channel['buffer-start']
        count = channel['buffer-size'] // numpy.dtype(dtype).itemsize
        channel['value_y'] = numpy.frombuffer(data, dtype=dtype, count=count, offset=offset)
        if 'unit_x' in channel:
            channel['value_x'] = numpy.frombuffer(data, dtype=dtype, count=count, offset=offset+channel['buffer-size'])
    return channels
Zu den Restlichen Fragen: wie soll man die ohne konkrete Beispiele beantworten können?
Squipy
User
Beiträge: 39
Registriert: Sonntag 30. Juni 2019, 16:42

Moin Sirius,
deine Variante sieht mal wieder um einiges Übersichtlicher aus. Danke für die Hinweise. Jetzt habe ich auch verstanden, was du damit meintest, dass ich nicht über einen index iterieren soll... -.-

Zu meinen Fragen:
  • Insbesondere bei 'short' variablen, sind die Verläufe richtig die decodiert werden, allerdings sind die Werte viel zu groß, außerdem kann es nicht sein, dass die Nachkommastellen fehlen. Links sind die korrekten Daten und rechts die eingelesenen mit Code.
    Bild
    Mit ziemlich sicherheit fehlen mir hier Grundkenntnisse. Ich weiß, dass short variablen lediglich 2byte integer sind, das heißt dezimalstellen gibts sowieso nicht. Wenn ich mir die Daten aber bei Famos angucke sind diese offensichtlich vorhanden... :/ Ich vermute, dass ich ein min und max der 16 bit definieren muss, kann das sein? Bspw. sehe ich in der info des Kanals 2 byte Int min: 940.76 max: -940.76
  • Bzgl. des regex codes:

    Code: Alles auswählen

    entry = re.search(rb'^\s*\|(\S\S),(\d+),(\d+),', data)
    damit wird nach einem Block mit zwei Zeichen und zwei darauf folgenden Zahlen gesucht, es kommt aber auch vor, dass ein Block so aussieht:

    Code: Alles auswählen

    |CK,1,    3,1,1
    Ich würde dem jetzt gerne sagen, dass Leerzeichen ignoriert werden sollen...
Sirius3
User
Beiträge: 18216
Registriert: Sonntag 21. Oktober 2012, 17:20

Bei short steht irgendwo in einem anderen Block noch die Skalierung, die Du anwenden mußt.

Wenn an allen möglichen Stellen Leerzeichen erlaubt sind, dann füge Leerzeichenpattern ein: '^\s*\|(\S\S)\s*,\s*(\d+)\s*,\s*(\d+)\s*,'
Antworten