Suche com_lib.py

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
Antworten
Easymen
User
Beiträge: 3
Registriert: Samstag 17. September 2022, 15:28

Hallo an Alle,

ich möchte ein altes Python Skript probieren.
Es geht um eine Verbindung PC und PLC(Speicherprogrammierbare Steuerung ähnlich Siemens Logo,in meinen Fall Eaton Easy800).
Die Verbindung wird Seriell bzw. über einen USB->RS232 Adapter hergestellt.
Meine Betriebssysteme sind Linux Mint und wenn es sein muss auch Windows 10.

Ich brauche die com_lib.py .Das Thema wo ich das Skript her habe ich seit fast 10Jahren tot.Ich möchte mich trotzdem nochmal
damit befassen.

Wer kennt diese com_lib.py ?

Oder kann man das Skript umbauen ?

Danke schon mal.

Grüße Martin

Code: Alles auswählen

#!/usr/bin/env python

# IPC_TO_SERIAL.py  #140416 -------------------------------------------
#
# socketserver for com_lib.py
#----------------------------------------------------------------------

import com_lib as com_lib

import os
import socket
import select
import struct
import serial
import sys
import time
import atexit

time.sleep(10)

res = com_lib.set_device_baudrate(9600)

if res == 0:
    
    print com_lib.o_serial
    com_lib.close_port()


# ------------------------------------------ default plc_items ---------------------------------------

for index in range(1,97):

    exec('M'+str(index)+'=com_lib.plc_item(0,\"M\",'+str(index)+')')
    exec('MB'+str(index)+'=com_lib.plc_item(0,\"MB\",'+str(index)+')')
    exec('MW'+str(index)+'=com_lib.plc_item(0,\"MW\",'+str(index)+')')
    exec('MD'+str(index)+'=com_lib.plc_item(0,\"MD\",'+str(index)+')')
    
for index in range(1,13):

    exec('I'+str(index)+'=com_lib.plc_item(0,\"I\",'+str(index)+')')

for index in range(1,9):

    exec('Q'+str(index)+'=com_lib.plc_item(0,\"Q\",'+str(index)+')')

for index in range(1,5):

    exec('IA'+str(index)+'=com_lib.plc_item(0,\"IA\",'+str(index)+')')

QA = com_lib.plc_item(0,'QA',1)

# -------- subscriptions --------

subscribed_items = []


def subscribe_item(_item,_port,_max_age=1,_bias=0):

    _item.port = _port
    _item.max_age = _max_age
    _item.bias = _bias

    subscribed_items.append(_item)
    
def unsubscribe_item(_item):

    _item.port = None
    
    subscribed_items.remove(_item)

subscribe_item(Q1,54000,.1)
subscribe_item(IA1,54000,1.5,10)
subscribe_item(MD1,54000,2)
subscribe_item(MD89,54000,3)

def get_interfaces():
    
    resp =[]
    
    for item in subscribed_items:

        tmp_item = (item.item_type+str(item.index+1),item.port,item.max_age,item.bias)
        resp.append(tmp_item)

    return resp

        
# ------------------ serial ----

s_name='/dev/ttyUSB0'
s_baudrate=9600
s_timeout=1

def set_serial(_name,_baudrate,_timeout):

    global s_name,s_baudrate,s_timeout
    
    s_name = _name
    s_baudrate = _baudrate
    s_timeout = _timeout
    
# ------------------------- raw socket to serial com -----------------

def write_port(outData):

    try:

        if com_lib.o_serial.inWaiting() > 0:
            
            com_lib.urb.append(com_lib.o_serial.read(com_lib.o_serial.inWaiting())) # --- flush unrequested data to urb
            
        com_lib.o_serial.write(outData)
  
        return listen_port()                                # --- go wait for data

    except:

        return 4                                            # --- no port?

def listen_port():

    try:

        cnt = 0
        
        while cnt < 500 :

            data_len=com_lib.o_serial.inWaiting()

            if data_len > 6: #( ----  smallest message 'e'/len/err_1/err_2/crc_1/crc_2
            
                in_packet = com_lib.o_serial.read(2)

                tupl = struct.unpack('<sB',in_packet)

                if tupl[0] == 'e':
                    
                    pl_len = int(tupl[1])
                    in_packet = in_packet+com_lib.o_serial.read(pl_len)
                    
                    return in_packet

                else:

                    in_packet = in_packet+com_lib.o_serial.read(com_lib.o_serial.inWaiting())
                    return in_packet
                
            else:
                cnt=cnt+1

        return 8                                    # ---- function timed out

    except:

        return 9                                    # ---- tty port is closed or does not exist
    

# ----------------- sockets --------------------------------------
# ----------------------------------------------------------------

# ----------------- socket server --------------------------------

client_objects = []
size = 1024

# setup variables and server sockets (UNIX and INET)

unix_sock = '/tmp/raspy_com'

# clean up for nix

if os.path.exists(unix_sock):

    os.remove(unix_sock)

# server sockets

unix_backlog = 5

tcp_host = None


# ------ get LAN address ----------------------------------

if tcp_host == None:
    
    if os.name != "nt":

        import fcntl

        def get_interface_ip(ifname):

            ts = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            return socket.inet_ntoa(fcntl.ioctl(ts.fileno(), 0x8915, struct.pack('256s',ifname[:15]))[20:24])
            
        
    tcp_host = socket.gethostbyname(socket.gethostname())

    if tcp_host.startswith('127.'):

        interfaces = ['eth0','eth1','eth2','wlan0','wlan1','wifi0','ath0','ath1','ppp0',]

        for ifname in interfaces:

            try:

                tcp_host = get_interface_ip(ifname)
                
                break
            
            except IOError:

                tcp_host = 'localhost'            
                pass
        
# ---------------------------------------------------------

def close_sockets():

    print 'closing sockets'
    
    for socket in client_objects:

        socket.close()

atexit.register(close_sockets)

tcp_port = 50000
tcp_backlog = 5

got_nix_server = False
server_count = 0

try :
    
    unix_server = socket.socket(socket.AF_UNIX,socket.SOCK_STREAM)
    unix_server.setblocking(0)
    unix_server.bind(unix_sock)
    unix_server.listen(unix_backlog)

    client_objects.append(unix_server)
    server_count = 1
    got_nix_server = True
    print 'Unix Socket Server listening at '+unix_sock

except:

    print str(os.name)+' seems not to implement the AF_UNIX family.'

tcp_server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
tcp_server.setblocking(0)
tcp_server.bind((tcp_host,tcp_port))
tcp_server.listen(tcp_backlog)

client_objects.append(tcp_server)
server_count = server_count + 1
print 'TCP Socket Server listening at '+str(tcp_host)+':'+str(tcp_port)

com_lib.open_port()

# ------------------------ main -----------------------------

running = 1

while running:

    if len(client_objects) > server_count:
        
        startTime = time.time()

        for item in subscribed_items:

            if item.timestamp  < startTime - item.max_age:
                #print 'updating item '+item.item_type+str(item.index+1)
                res = item.get_value()

    input_ready,output_ready,in_error = select.select(client_objects,[],client_objects,.01)

    for rc in input_ready:

        if rc == unix_server:

            client,address = unix_server.accept()
            client_objects.append(client)
            
            if com_lib.o_serial == None:

                res = com_lib.open_port()

            client.send(str(len(client_objects)-2))
            print 'new NIX client '+str(len(client_objects)-2)+' connected to socket '+unix_sock
            
        elif rc == tcp_server:
            
            client,address = tcp_server.accept()
            client_objects.append(client)

            if com_lib.o_serial == None:

                res = com_lib.open_port()
                
            client.send(str(len(client_objects)-2))
            print 'new TCP client #'+str(len(client_objects)-2)+' connected by '+str(address)
            
        else:

            try:
                
                sock_type = rc.family
                sock_address = rc.getpeername()
                
                data = rc.recv(size)

            except:

                data = False

            if data:

                if data[0] == 'E':
                    
                    res = write_port(data)
                    rc.send(str(res))

                else:
                    
                    try:
                        
                        exec('res='+data)
                        rc.send(str(res))

                    except:
                        
                        rc.send('?')
                        
            else:

                rc.close()
                client_objects.remove(rc)
                
                if len(client_objects) <= server_count:
                    
                       print 'nobody connected... closing serial port'
                       com_lib.close_port()


    for rc in in_error:                    

        rc.close()
        client_objects.remove(rc)

Benutzeravatar
sparrow
User
Beiträge: 4540
Registriert: Freitag 17. April 2009, 10:28

Ich kenne das verwendete Modul nicht. Aber der Code deines Scripts macht Dinge, die man definitiv nicht macht. Ich habe "exec" gefunden um Variablen anzulegen + global. Da bin ich direkt ausgestiegen.

Erstes würde man mit dicts lösen, letzteres hat in keinem Programm etwas verloren, weil man nicht mit globalen Variablen arbeitet. Funktionen bekommen alles, was sie benötigen, als Parameter und geben ein Ergebnis mit return zurück.

"exec" ist so fatal, dass ich gar nicht tiefer eingestiegen bin.
__deets__
User
Beiträge: 14545
Registriert: Mittwoch 14. Oktober 2015, 14:29

Das ist wirklich eines der schlimmsten Scripte seit Langem. Statt com_lib benutz pyserial und Bau den ganzen Kram damit neu auf. Ist das serielle Protokoll mit der SPS dokumentiert?
Benutzeravatar
__blackjack__
User
Beiträge: 14078
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

Kurz drüber geschaut und ”gestaunt” bei ``exec('res='+data)`` mit `data` das über eine TCP-Verbindung rein kommt, natürlich wie immer komplett kaputt einfach mit einem `recv()` ohne irgendwie sicher zu stellen, dass da auch eine komplette Nachricht empfangen wurde, und das ganze in einem nackten ``except`` das die Variable mit den empfangenen Daten vom Typ (Byte-)String dann mit einem `False` belegt. Meine Güte, da weiss man ja gar nicht wo man anfangen soll… 😱

Ein weiterer Grund das neu zu implementieren wäre Python 2. Das ist tot.
“Vir, intelligence has nothing to do with politics!” — Londo Mollari
Easymen
User
Beiträge: 3
Registriert: Samstag 17. September 2022, 15:28

Hallo Leute,

Vielen Dank für die zahlreichen Antworten.Hab ich mir schon gedacht das es "Müll" es.Leider ist das Serielle Protokoll nicht bekannt.Ich weiß nicht ob sich der Aufwand noch lohnt.Mit Python stehe ich wieder am Anfang.Ich habe in der Vergangenheit (um die 5/6 Jahre schon her) mit Raspberry PI ein bischen probiert.Da bräuchte ich schon massiv Hilfe .

Ich habe hier noch eins.Hier klappt die Verbindung.Merker(M) kann ich ein und ausschalten. Wie schaut es mit dem aus ?Was sagt ihr dazu?
Ich benütze aber einen PC(mit Linux Mint) anstatt des Raspberry.

CU-Martin

Code: Alles auswählen

# raspi_com is a script providing serial communication between Raspberry Pi and Easy800 devices via USB-CAB
#
# raspi_com needs pySerial to be previously installed -> http://pyserial.sourceforge.net/pyserial.html
#
# Since this code is Python, it should be portable to other plattforms, but has just been written
# and tested on Raspberry Pi's ARM architecture under Raspbian (a subset of Debian Wheezy)
#
# (c) 2013 Peter Sommer ---------- Dilettanten Ole! ------------ <peter at 3d-check dot com> ---------------
#
# -- This code is free software subject to the GNU General Public License <http://www.gnu.org/licenses/>----
#
# WARNING :
#
# The PLC or it's program MUST restrain or reject received data whenever they don't match process-safe values
#
# The PLC or it's program MUST take care of the process security, being allways and anytime able to deny or
# stop the execution of unsafe actions and undo unsafe process-states triggered by this software
#
# USE ON YOUR OWN RISK
#-----------------------------------------------------------------------------------------------------------

import sys
import struct
import serial
import subprocess
import time

# ----------------------------------------- driver ----------------------------------------------------------
# The cp210x driver is included in recent Linux distributions and just needs --------------------------------
# our USB-Serial bridge's vendor and product ID notifyed to the cp210x driver -------------------------------
# Thanks and free beer plenty for Jon at http://www.ha19.no/usb/ --------------------------------------------

subprocess.Popen('echo 188a 3001 > /sys/bus/usb-serial/drivers/cp210x/new_id',shell=True)

# --------------------------------------------------- ttyUSB port usage -------------------------------------
# -- pySerial encapsulates the access for the serial port -- http://pyserial.sourceforge.net/pyserial.html --
# -----------------------------------------------------------------------------------------------------------

   
def open_port( _name='/dev/ttyUSB0' , _baudrate=9600, _timeout=1 ):

    global o_serial
    global urb

    try:

        o_serial = serial.Serial(_name, baudrate = _baudrate, timeout = _timeout) # create and open serial port
       
        test_mesg = (69,7,0,0,1,0,80,119)               # ---- test the port
        test_mesg = struct.pack('<5BHBB', *test_mesg)
        req=o_serial.write(test_mesg)
        res=o_serial.read(8)

        i_mesg = struct.unpack('<c7B',res)

        in_crc = i_mesg[6:]                             # ---- validate PLC response by CRC16

        in_payload = i_mesg[1:6]

        c_data = struct.pack('<5B',*in_payload)
       
        chk_crc = getCRC(c_data)
       
        if chk_crc == in_crc:
           
            return 0                                    # ---- success: communication ok ---------------- >>>

        else:

            return 1                                    # ---- communication is messy (wrong settings?)
   
    except:

        return 2                                        # ---- no succes creating port
       
def close_port():


    try:
       
        o_serial.close()
        return 0                                        # ---- success
   
    except:

        return 3                                        # ---- no such port
   
       
def write_port(outData):

    try:

        if o_serial.inWaiting() > 0:
           
            urb.append(o_serial.read(o_serial.inWaiting())) # --- flush unrequested data to urb
           
        o_serial.write(outData)

        return listen_port()                                # --- go wait for data

    except:

        return 4                                            # --- no port?

urb = [] # ---- buffer for unrequested data
   
def listen_port():

    try:

        cnt = 0
       
        while cnt < 50 :

            data_len=o_serial.inWaiting()
           
            if data_len > 0:                   
               
                in_header = o_serial.read(1)    # --- get header

                # -------------------------------------- REQUESTED DEVICE TO HOST -------------

                if in_header == 'e':        # --- header from device = response on host request

                    pl_len = o_serial.inWaiting()   
                    in_data = o_serial.read(pl_len)                 
                   
                    in_data = struct.unpack('<'+str(pl_len)+'B',in_data)
                   
                    in_data = list(int(val) for val in in_data)
                   
                    msg_len = in_data[0]            # ---- payload length announced by device
                   
                    if len(in_data)-1 == msg_len:   # ---- if true, guess transfer is completed

                        msg_crc_1 = in_data.pop()   # ---- check data integrity by CRC16
                        msg_crc_2 = in_data.pop()                   

                        c_payload = struct.pack('<'+str(len(in_data))+'B', *in_data)
                        crc = getCRC(c_payload)
                       
                        if msg_crc_1 == crc[1] and msg_crc_2 == crc[0]:   #  data is good!
                           
                            return in_data                      # --- return validated data -->>>
                           
                       
                        else:

                            return 5                            # --- data was a mess

                    else:                                       
                            return 6                            # --- data wrong length
                       
                           

                # -------------------------------------- UNREQUESTED DEVICE TO HOST -------------
               
                else:

                     urb.append(in_header+o_serial.readline())

                     while o_serial.inWaiting() > 0:
                         
                         urb.append(o_serial.readline())        # flush unrequested data to urb
                     
                     return 7                                   # just had unrequested stuff
                   
            else:

                time.sleep(.1)
                cnt=cnt+1

        return 8                                                # ---- function timed out

    except:

        return 9                                    # ---- port is closed or does not exist

# --------------------------------------------------------------------------------------------------   
# ----      plc_item objects provide easy read and write access                                 ----
# ----      they build the "front end" of raspi_com                                             ----
# --------------------------------------------------------------------------------------------------

# ----      dictionaries used for message encoding/decoding (kept public allow edit during runtime)

d_write= {'M':(4,1,'B'),'MB':(10,1,'B'),'MW':(10,2,'H'),'MD':(10,4,'i')} #(type,indx-mult,width)

d_read = {'QA':(9,0,'H'),'IA':(8,0,'4H'),'I':(0,0,'H'),'Q':(1,0,'H'),'M':(4,1,'B'),
          'MB':(10,1,'32B'), 'MW':(10,2,'16H'),'MD':(10,4,'8i')}         #(type,indx-mult, fmt)

# --------- create plc_item instances using this sintax: -------------------------------------------
#                                                                                               ----
#           object = plc_item(location as int, item as string, index as int)                    ----
#                                                                                               ----
# --------------------------------------------------------------------------------------------------

class plc_item:

    def __init__(self, location=0, item_type='Q', index=0):

        self.error = None
        self.timestamp = 0
        self.is_writeable = False
       
        try:
           
            self.location = int(location)

            if d_read.has_key(item_type):
               
                self.item_type = item_type

            else:

                return   # --- unknown PLC item type
       
            self.index = int(index)-1
       
            self.msg_h = [self.location,d_read[self.item_type][0],self.index*d_read[self.item_type][1]]

            self.is_writeable = d_write.has_key(item_type)

            self.error = 0
           
        except:

            return   # --- error creating plc_item
       
    def set_value(self, value):

        if self.is_writeable == True:

            try:
               
                data = struct.pack(d_write[self.item_type][2],value)
                data = struct.unpack(str(d_write[self.item_type][1])+'B',data)
                o_mesg = [self.location,d_write[self.item_type][0],self.index*d_write[self.item_type][1], data]

                resp = write_object(*o_mesg)

                try:

                    resp.pop(0)
                    err_b_1 = resp.pop(0)
                    err_b_2 = resp.pop(0)

                    if err_b_1+err_b_2 == 0:

                        return 0    # --------- success ----- >

                    else:

                        self.error=(err_b_1,err_b_2,resp.pop(0))
                       
                        return 10  # -------- device reported error to be read from self.error

                except:

                     return resp # ------ internal error code (int)
                   
            except:

                return 11  # wrong data ?

        else:

            return 12 # --- plc_item is not writeable
       
   
    def get_value(self):

        if self.error != None :

            resp = read_object(*self.msg_h)
           
            try:
               
                resp.pop(0)
                err_b_1 = resp.pop(0)
                err_b_2 = resp.pop(0)
               
                if err_b_1+err_b_2 == 0:

                    c_mesg = struct.pack(str(len(resp))+'B',*resp)
                    o_mesg = struct.unpack(d_read[self.item_type][2],c_mesg)
           
                    self.values = o_mesg
                    self.error = ()
                    self.timestamp = time.clock()
                   
                    return self.values  # ----- success (data as tuple)

                else:

                    self.error = resp.pop(0)
                   
                    return 13  #---- device reported error to be read from self.error

            except:

                return resp # ----- internal error code

        else:

            return 14 # ---- this plc_item is not working

   
# ------------------------------------------------ message encoding -------------------------------
# -------------------------------------------------------------------------------------------------
# -------------------------------------------------------------------------------------------------

def write_object(B_net_id,B_object,H_index,data): # ----- request write to device memory
   
    try:

        a_data = [int(val) for val in data]
   
        o_msg = [len(a_data)+7, 32, B_net_id, B_object, H_index]+a_data

        c_msg = struct.pack('<4BH'+str(len(a_data))+'B', *o_msg)

    except:

        return 15 # ---- wrong data

    crc = getCRC(c_msg)

    o_msg.insert(0,0x45) # --- header ('E')
    o_msg.append(crc[0]) # --- CRC-16 byte 1
    o_msg.append(crc[1]) # --- CRC-16 byte 0
   
    c_msg = struct.pack('<5BH'+str(2+len(a_data))+'B',*o_msg)

    return write_port(c_msg)    # --- wait for success ------------------------------------------->


def read_object(B_net_id,B_object,H_index):       # ----- request data from device memory

    try:
       
        o_msg = [7, 0, B_net_id, B_object, H_index]
       
        c_msg = struct.pack('<4BH', *o_msg)

    except:

        return 16 # ---- wrong data

    crc = getCRC(c_msg)

    o_msg.insert(0,0x45)  # --- header ('E')
    o_msg.append(crc[0])  # --- CRC-16 byte 1   
    o_msg.append(crc[1])  # --- CRC-16 byte 0

    c_msg = struct.pack('<5BHBB',*o_msg)

    return write_port(c_msg)    # --- wait for data ----------------------------------------------->

# --------------------------------------------------------------------------------------------------
# ------  methods dealing with unrequested device to host messages                              ----
# --------------------------------------------------------------------------------------------------
# ------  flush_data set to False performs a conservative dump

# --------------------------------------------------------- read one item from urb -----------------

def pop_urb(flush_data = True, offset=0):

    global urb

    if len(urb) > offset+1:

        if flush_data:
           
            return urb.pop(offset)        # - returns one line as string and deletes it from the list

        else:

            return urb[offset]            # - just returns one line as string
    else:
       
        return ''

# -------------------------------------------------------------- read all data from urb -------------
   
def dump_urb(flush_data = True):
   
    global urb
   
    ret = urb

    if flush_data:
       
        urb=[]     
   
    return ret                              # - returns urb's data as a list



#---------------------------------------------------- CRC ------------------------------------------------
# Table-based CRC-16 calculation from http://www.digi.com/wiki/developer/index.php/Python_CRC16_Modbus_DF1
# shortened functionality for ease of use in our case, and messy byte juggling added by needs ------------
# Lots of thanks and plenty of gallons free beer go to Greg Cook, for his tool "crc revEng" + advise -----
# http://regregex.bbcmicro.net ---------------------------------------------------------------------------
# --------------------------------------------------------------------------------------------------------


def getCRC( st ):

    crc16_table = (
        0x0000, 0xc0c1, 0xc181, 0x0140, 0xc301, 0x03c0, 0x0280, 0xc241,
        0xc601, 0x06c0, 0x0780, 0xc741, 0x0500, 0xc5c1, 0xc481, 0x0440,
        0xcc01, 0x0cc0, 0x0d80, 0xcd41, 0x0f00, 0xcfc1, 0xce81, 0x0e40,
        0x0a00, 0xcac1, 0xcb81, 0x0b40, 0xc901, 0x09c0, 0x0880, 0xc841,
        0xd801, 0x18c0, 0x1980, 0xd941, 0x1b00, 0xdbc1, 0xda81, 0x1a40,
        0x1e00, 0xdec1, 0xdf81, 0x1f40, 0xdd01, 0x1dc0, 0x1c80, 0xdc41,
        0x1400, 0xd4c1, 0xd581, 0x1540, 0xd701, 0x17c0, 0x1680, 0xd641,
        0xd201, 0x12c0, 0x1380, 0xd341, 0x1100, 0xd1c1, 0xd081, 0x1040,
        0xf001, 0x30c0, 0x3180, 0xf141, 0x3300, 0xf3c1, 0xf281, 0x3240,
        0x3600, 0xf6c1, 0xf781, 0x3740, 0xf501, 0x35c0, 0x3480, 0xf441,
        0x3c00, 0xfcc1, 0xfd81, 0x3d40, 0xff01, 0x3fc0, 0x3e80, 0xfe41,
        0xfa01, 0x3ac0, 0x3b80, 0xfb41, 0x3900, 0xf9c1, 0xf881, 0x3840,
        0x2800, 0xe8c1, 0xe981, 0x2940, 0xeb01, 0x2bc0, 0x2a80, 0xea41,
        0xee01, 0x2ec0, 0x2f80, 0xef41, 0x2d00, 0xedc1, 0xec81, 0x2c40,
        0xe401, 0x24c0, 0x2580, 0xe541, 0x2700, 0xe7c1, 0xe681, 0x2640,
        0x2200, 0xe2c1, 0xe381, 0x2340, 0xe101, 0x21c0, 0x2080, 0xe041,
        0xa001, 0x60c0, 0x6180, 0xa141, 0x6300, 0xa3c1, 0xa281, 0x6240,
        0x6600, 0xa6c1, 0xa781, 0x6740, 0xa501, 0x65c0, 0x6480, 0xa441,
        0x6c00, 0xacc1, 0xad81, 0x6d40, 0xaf01, 0x6fc0, 0x6e80, 0xae41,
        0xaa01, 0x6ac0, 0x6b80, 0xab41, 0x6900, 0xa9c1, 0xa881, 0x6840,
        0x7800, 0xb8c1, 0xb981, 0x7940, 0xbb01, 0x7bc0, 0x7a80, 0xba41,
        0xbe01, 0x7ec0, 0x7f80, 0xbf41, 0x7d00, 0xbdc1, 0xbc81, 0x7c40,
        0xb401, 0x74c0, 0x7580, 0xb541, 0x7700, 0xb7c1, 0xb681, 0x7640,
        0x7200, 0xb2c1, 0xb381, 0x7340, 0xb101, 0x71c0, 0x7080, 0xb041,
        0x5000, 0x90c1, 0x9181, 0x5140, 0x9301, 0x53c0, 0x5280, 0x9241,
        0x9601, 0x56c0, 0x5780, 0x9741, 0x5500, 0x95c1, 0x9481, 0x5440,
        0x9c01, 0x5cc0, 0x5d80, 0x9d41, 0x5f00, 0x9fc1, 0x9e81, 0x5e40,
        0x5a00, 0x9ac1, 0x9b81, 0x5b40, 0x9901, 0x59c0, 0x5880, 0x9841,
        0x8801, 0x48c0, 0x4980, 0x8941, 0x4b00, 0x8bc1, 0x8a81, 0x4a40,
        0x4e00, 0x8ec1, 0x8f81, 0x4f40, 0x8d01, 0x4dc0, 0x4c80, 0x8c41,
        0x4400, 0x84c1, 0x8581, 0x4540, 0x8701, 0x47c0, 0x4680, 0x8641,
        0x8201, 0x42c0, 0x4380, 0x8341, 0x4100, 0x81c1, 0x8081, 0x4040 )


    crc = 0x0000
   
    for ch in st:
       
        crc = (crc >> 8) ^ crc16_table[(crc ^ ord(ch)) & 0xFF]
       
    crc = struct.pack('<H',crc)
    crc = struct.unpack('<BB',crc)
         
    return crc
 
# ------------------------------------------------------------------------------------------------ END


Benutzeravatar
kbr
User
Beiträge: 1508
Registriert: Mittwoch 15. Oktober 2008, 09:27

Das ist low-level Bit-Geschiebe übersetzt (vermutlich aus C) in schlechtes Python. Auch ein Kandidat zum neu schreiben. Der erste Schritt wäre, das Protokoll in Erfahrung zu bringen.
Easymen
User
Beiträge: 3
Registriert: Samstag 17. September 2022, 15:28

Hallo,

das Protokoll gibt Eaton nicht mehr her.Gut vielleicht jetzt eher,da die Geräte-Serie seit 2 oder 3 Jahren nicht mehr produziert wird.
OK,Danke nochmal an Alle.Ich melde mich wieder wenn ich wieder Hilfe brauche.
CU-Martin
Antworten