DLT645-2007模拟表 python源码

 2023-09-05 阅读 91 评论 0

摘要:DLT645-2007模拟表 python源码 # -*- coding: utf-8 -*- import sys import time import binascii import serial import serial.tools.list_ports import logging import os import struct import datetimelogging.basicConfig(format='%(asctime)s,%(msecs)d %(leve

DLT645-2007模拟表 python源码

# -*- coding: utf-8 -*-
import sys
import time
import binascii
import serial
import serial.tools.list_ports
import logging
import os
import struct
import datetimelogging.basicConfig(format='%(asctime)s,%(msecs)d %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s',datefmt='%d-%m-%Y:%H:%M:%S')logging.getLogger().setLevel(logging.WARNING)
logger = logging.getLogger()
u8_lit = struct.Struct('B')
s8_lit = struct.Struct('b')
u16_lit = struct.Struct('<H')
s16_lit = struct.Struct('<h')
u32_lit = struct.Struct('<I')
s32_lit = struct.Struct('<i')
double_lit = struct.Struct('<d')
float_lit = struct.Struct('<f')
u16_big = struct.Struct('>H')
s16_big = struct.Struct('>h')
u32_big = struct.Struct('>I')
s32_big = struct.Struct('>i')
double_big = struct.Struct('>d')
float_big = struct.Struct('>f')
support_dbg_out = 0
support_dbg_send_frame = 1
ST_IDLE = 0
ST_HEAD = 1
ST_FRAME = 2
START_MARK = 0x68ERR_FRAME = 1
ERR_SUM = 2
ERR_NONE = 0
# 支持明细表
id_name_dir = {0x0201ff00: '电压数据块',0x0202ff00: '电流数据块',0x0203ff00: '瞬时总有功功率数据块',0x0204ff00: '瞬时总无功功率数据块',0x0205ff00: '瞬时总视在功率',0x0206ff00: '功率因数数据块',0x02800001: '零线电流',0x0001ff00: '正向有功总电能数据块',0x0002ff00: '反向有功总电能',0x0003ff00: '组合无工1总电能',0x0004ff00: '组合无工2总电能',0x01010000: '正向有功总最大需量及发生时间',0x040005ff: '运行状态字',0x02800002: '电网频率',0x02800004: '当前有功需量',0x0205ff00: '瞬时视在功率',
}def out_send_frame(name, msg):if support_dbg_send_frame:print(name, msg)def dis_wdbg(msg, color=u'白'):color_val = {u'红': '\033[1;31m', u'绿': '\033[1;33m', u'蓝': '\033[1;34m'}if color in color_val.keys():os.system('')print(color_val[color] + str(msg) + '\033[0m')else:print(msg)def get_sum(data):sum_val: int = 0for val in data:sum_val += valreturn sum_val % 256class dlt_645_2007:def __init__(self):self.cache = bytearray()self.st = ST_IDLEself.idx = 0self.addr = Noneself.frame_size = 0def encode(self,ctrl,base_addr,data):msg = bytearray()msg.append(0x68)msg.extend(base_addr)msg.append(0x68)msg.append(ctrl)msg.append(len(data))msg.extend(data)sum_val = get_sum(msg[0:])msg.append(sum_val)msg.append(0x16)return msgdef decode(self, data):try:logger.info(binascii.hexlify(data))for temp in data:# logger.info('st:%d idx:%d 0x%x'%(self.st, self.idx, temp))if self.st == ST_IDLE:if temp == START_MARK:self.cache = bytearray(0)self.cache.append(temp)self.idx = 1self.st = ST_HEADelif self.st == ST_HEAD:self.cache.append(temp)self.idx += 1if self.idx == 10:if self.cache[7] != START_MARK:self.idx = 1self.cache = bytearray(0)self.cache.append(START_MARK)else:self.addr = bytearray(self.cache[1:7])self.frame_size = self.cache[9] + 12self.st = ST_FRAMEelse:self.cache.append(temp)self.idx += 1if self.idx == self.frame_size:crc = get_sum(self.cache[0:-2])logger.info('sum:0x%x[->0x%x]' % (self.cache[-2], crc))if crc == self.cache[-2] and self.cache[-1] == 0x16:self.decode_cb(self.cache, ERR_NONE)else:logger.error('sum:0x%x[->0x%x]' % (self.cache[-2], crc))self.addr = bytearray(0)self.cache = bytearray(0)self.idx = 0self.st = ST_IDLEexcept Exception as e:print(e)def decode_cb(self, data, err):passdef decode_data_unit(data):val = 0xffif data > 0x32:val = data - 0x33return valdef get_u32_val(idx):src1 = datetime.datetime.now()val = src1.day * 1000000 + src1.hour * 10000 + src1.minute * 100 + ((src1.second%10)*10)src = '%08d' % (val + idx)msg = bytearray(4)msg[0] = (int(src[7]) | (int(src[6])) << 4) + 0x33msg[1] = (int(src[5]) | (int(src[4])) << 4) + 0x33msg[2] = (int(src[3]) | (int(src[2])) << 4) + 0x33msg[3] = (int(src[1]) | (int(src[0])) << 4) + 0x33return src, msgdef get_u16_val(idx):src1 = datetime.datetime.now()val = (src1.minute * 100 + ((src1.second%10)*10))src = '%04d' % (val + idx)msg = bytearray(2)msg[0] = (int(src[3]) | (int(src[2])) << 4) + 0x33msg[1] = (int(src[1]) | (int(src[0])) << 4) + 0x33return src, msgdef get_u16_val1(idx):src1 = datetime.datetime.now()val = ((src1.second%10)*10)src = '%04d' % (val + idx)msg = bytearray(2)msg[0] = (int(src[3]) | (int(src[2])) << 4) + 0x33msg[1] = (int(src[1]) | (int(src[0])) << 4) + 0x33return src, msgdef get_u24_val(idx):src1 = datetime.datetime.now()val = (src1.hour * 10000 + src1.minute * 100 + ((src1.second%10)*10))src = '%06d' % ((val + idx))msg = bytearray(3)msg[0] = (int(src[5]) | (int(src[4])) << 4) + 0x33msg[1] = (int(src[3]) | (int(src[2])) << 4) + 0x33msg[2] = (int(src[1]) | (int(src[0])) << 4) + 0x33return src, msgdef get_u24_val1(idx):src1 = datetime.datetime.now()val = (src1.minute * 100 + ((src1.second%10)*10))src = '%06d' % ((val + idx)*10)msg = bytearray(3)msg[0] = (int(src[5]) | (int(src[4])) << 4) + 0x33msg[1] = (int(src[3]) | (int(src[2])) << 4) + 0x33msg[2] = (int(src[1]) | (int(src[0])) << 4) + 0x33return src, msgdef get_time_bin():msg = bytearray(5)src1 = datetime.datetime.now()temp = '%02d' % src1.minuteidx = 0msg[idx] = (int(temp[1]) | (int(temp[0])) << 4) + 0x33idx += 1temp = '%02d' % src1.hourmsg[idx] = (int(temp[1]) | (int(temp[0])) << 4) + 0x33idx += 1temp = '%02d' % src1.daymsg[idx] = (int(temp[1]) | (int(temp[0])) << 4) + 0x33idx += 1temp = '%02d' % src1.monthmsg[idx] = (int(temp[1]) | (int(temp[0])) << 4) + 0x33idx += 1temp = '%02d' % (src1.year - 2000)msg[idx] = (int(temp[1]) | (int(temp[0])) << 4) + 0x33return msgclass main_fn(dlt_645_2007):def decode_cb(self, data, err):if err != ERR_NONE:returnctrl = data[8]data_len = data[9]dot = bytearray(4)dot[0] = decode_data_unit(data[10])dot[1] = decode_data_unit(data[11])dot[2] = decode_data_unit(data[12])dot[3] = decode_data_unit(data[13])id = u32_lit.unpack_from(dot, 0)[0]val_name = ''if ctrl == 0x11 and data_len == 4:if id == 0x0201ff00:  # '电压数据块'msg = bytearray(0)temp, val = get_u16_val(0)val_name = tempmsg.extend(bytearray(data[10:14]))msg.extend(val)temp, val = get_u16_val(1)msg.extend(val)temp, val = get_u16_val(2)msg.extend(val)send = self.encode(0x91, self.addr, msg)self.com.write(send)passelif id == 0x0202ff00:  # 电流数据块msg = bytearray(0)msg.extend(bytearray(data[10:14]))temp, val = get_u24_val1(0)val_name = tempmsg.extend(val)temp, val = get_u24_val1(1)msg.extend(val)temp, val = get_u24_val1(2)msg.extend(val)send = self.encode(0x91, self.addr, msg)self.com.write(send)passelif id == 0x0205ff00:  # '瞬时视在功率'msg = bytearray(0)msg.extend(bytearray(data[10:14]))temp, val = get_u24_val(0)val_name = tempmsg.extend(val)temp, val = get_u24_val(1)msg.extend(val)temp, val = get_u24_val(2)msg.extend(val)temp, val = get_u24_val(3)msg.extend(val)send = self.encode(0x91, self.addr, msg)self.com.write(send)elif id == 0x0203ff00:  # '瞬时总有功功率数据块'msg = bytearray(0)msg.extend(bytearray(data[10:14]))temp, val = get_u24_val(0)val_name = tempmsg.extend(val)temp, val = get_u24_val(1)msg.extend(val)temp, val = get_u24_val(2)msg.extend(val)temp, val = get_u24_val(3)msg.extend(val)send = self.encode(0x91, self.addr, msg)self.com.write(send)elif id == 0x0204ff00:  # '瞬时总无功功率数据块'msg = bytearray(0)msg.extend(bytearray(data[10:14]))temp, val = get_u24_val(0)val_name = tempmsg.extend(val)temp, val = get_u24_val(1)msg.extend(val)temp, val = get_u24_val(2)msg.extend(val)temp, val = get_u24_val(3)msg.extend(val)send = self.encode(0x91, self.addr, msg)self.com.write(send)elif id == 0x0205ff00:  # '瞬时总视在功率'msg = bytearray(0)msg.extend(bytearray(data[10:14]))temp, val = get_u24_val(0)val_name = tempmsg.extend(val)temp, val = get_u24_val(1)msg.extend(val)temp, val = get_u24_val(2)msg.extend(val)temp, val = get_u24_val(3)msg.extend(val)send = self.encode(0x91, self.addr, msg)self.com.write(send)elif id == 0x0206ff00:  # '功率因数数据块'msg = bytearray(0)msg.extend(bytearray(data[10:14]))temp, val = get_u16_val1(0)val_name = tempmsg.extend(val)temp, val = get_u16_val1(1)msg.extend(val)temp, val = get_u16_val1(2)msg.extend(val)temp, val = get_u16_val1(3)msg.extend(val)send = self.encode(0x91, self.addr, msg)self.com.write(send)elif id == 0x02800001:  # '零线电流'msg = bytearray(0)msg.extend(bytearray(data[10:14]))temp, val = get_u24_val(0)val_name = tempmsg.extend(val)send = self.encode(0x91, self.addr, msg)self.com.write(send)elif id == 0x0001ff00:  # '正向有功总电能数据块'msg = bytearray(0)temp, val = get_u32_val(0)val_name = tempmsg.extend(bytearray(data[10:14]))msg.extend(val)temp, val = get_u32_val(1)msg.extend(val)temp, val = get_u32_val(2)msg.extend(val)temp, val = get_u32_val(3)msg.extend(val)temp, val = get_u32_val(4)msg.extend(val)msg.extend(get_time_bin())send = self.encode(0x91, self.addr, msg)self.com.write(send)elif id == 0x0002ff00:  # '反向有功总电能'msg = bytearray(0)msg.extend(bytearray(data[10:14]))temp, val = get_u32_val(0)val_name = tempmsg.extend(val)temp, val = get_u32_val(1)msg.extend(val)temp, val = get_u32_val(2)msg.extend(val)temp, val = get_u32_val(3)msg.extend(val)temp, val = get_u32_val(4)msg.extend(val)msg.extend(get_time_bin())send = self.encode(0x91, self.addr, msg)self.com.write(send)elif id == 0x0003ff00:  # '组合无工1总电能'msg = bytearray(0)msg.extend(bytearray(data[10:14]))temp, val = get_u32_val(0)val_name = tempmsg.extend(val)temp, val = get_u32_val(1)msg.extend(val)temp, val = get_u32_val(2)msg.extend(val)temp, val = get_u32_val(3)msg.extend(val)temp, val = get_u32_val(4)msg.extend(val)msg.extend(get_time_bin())send = self.encode(0x91, self.addr, msg)self.com.write(send)elif id == 0x0004ff00:  # '组合无工2总电能'msg = bytearray(0)msg.extend(bytearray(data[10:14]))temp, val = get_u32_val(0)val_name = tempmsg.extend(val)temp, val = get_u32_val(1)msg.extend(val)temp, val = get_u32_val(2)msg.extend(val)temp, val = get_u32_val(3)msg.extend(val)temp, val = get_u32_val(4)msg.extend(val)msg.extend(get_time_bin())send = self.encode(0x91, self.addr, msg)self.com.write(send)elif id == 0x01010000:  # '正向有功总最大需量及发生时间'msg = bytearray(0)msg.extend(bytearray(data[10:14]))temp, val = get_u24_val(0)val_name = tempmsg.extend(val)msg.extend(get_time_bin())send = self.encode(0x91, self.addr, msg)self.com.write(send)elif id == 0x040005ff:  # '运行状态字'msg = bytearray(0)msg.extend(bytearray(data[10:14]))temp, val = get_u16_val(0)val_name = tempmsg.extend(val)temp, val = get_u16_val(1)msg.extend(val)temp, val = get_u16_val(2)msg.extend(val)temp, val = get_u16_val(3)msg.extend(val)temp, val = get_u16_val(4)msg.extend(val)temp, val = get_u16_val(5)msg.extend(val)temp, val = get_u16_val(6)msg.extend(val)send = self.encode(0x91, self.addr, msg)self.com.write(send)elif id == 0x02800002:  # '电网频率',msg = bytearray(0)msg.extend(bytearray(data[10:14]))temp, val = get_u16_val(0)val_name = tempmsg.extend(val)send = self.encode(0x91, self.addr, msg)self.com.write(send)elif id == 0x02800004:  # '当前有功需量',msg = bytearray(0)msg.extend(bytearray(data[10:14]))temp, val = get_u24_val(0)val_name = tempmsg.extend(val)send = self.encode(0x91, self.addr, msg)self.com.write(send)log = ''if id in id_name_dir.keys():log = '%s_%s 地址:%s 控制字:%x len:%d 值:%10s id:%s ' % (self.used, datetime.datetime.now(), binascii.hexlify(self.addr), ctrl, data_len, val_name,id_name_dir[id])else:log = '%s_%s地址:%s 控制字:%x len:%d 值:%10s id:0x%08x ' % (self.used, datetime.datetime.now(), binascii.hexlify(self.addr), ctrl, data_len, val_name, id)self.write_log(log)print(log)else:print('not support:{}'.format(binascii.hexlify(data)))def write_log(self, data):log_file = os.path.join(os.path.dirname(sys.argv[0]),self.used + datetime.datetime.now().strftime('DTL645_2007%Y%m%d_%H.txt'))try:hd = open(log_file, 'a', encoding='utf-8')hd.write(data + '\n')hd.close()except:print('管理员权限运行')sys.exit()def __init__(self):super().__init__()port_list = list(serial.tools.list_ports.comports())port_name = {}self.val = 1idx = 0dis_wdbg('------------------------', '绿')dis_wdbg('DLT645-2007 模拟表', '红')dis_wdbg('version: 0.0.0', '红')dis_wdbg('auth: wzd', '红')dis_wdbg('time: 2020-10-21', '红')dis_wdbg('RS485参数:9600 E-8-1', '红')dis_wdbg('------------------------', '绿')dis_wdbg('------------------------', '绿')dis_wdbg('序号  名称', '红')for i in port_list:port_name[idx] = i[0]print('%2d    %s' % (idx, i[0]))idx += 1if idx == 0:print('please insert Uart.')sys.exit()dis_wdbg('------------------------', '绿')while True:select = input('序号:')try:val = int(select)if val == 9:sys.exit()elif val <= idx:self.used = port_name[val]breakexcept Exception as e:print(e)passself.com = serial.Serial()try:self.com.baudrate = 9600self.com.port = self.usedself.com.bytesize = 8self.com.parity = 'E'self.com.stopbits = 1self.com.timeout = 0.1self.com.open()except Exception as e:logger.error(e)self.com.close()sys.exit()while True:msg = self.com.read(19)if len(msg):# logger.info(binascii.hexlify(msg))self.decode(msg)if __name__ == '__main__':# get_time_bin()out = main_fn()

版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。

原文链接:https://808629.com/714.html

发表评论:

本站为非赢利网站,部分文章来源或改编自互联网及其他公众平台,主要目的在于分享信息,版权归原作者所有,内容仅供读者参考,如有侵权请联系我们删除!

Copyright © 2022 86后生记录生活 Inc. 保留所有权利。

底部版权信息