# Copyright (C) 2021 - 2022 nymea GmbH # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. import os import re import sys import json import shutil import datetime import logging logger = logging.getLogger('modbus-tools') def convertToAlphaNumeric(text): finalText = '' for character in text: if character.isalnum(): finalText += character else: finalText += ' ' return finalText def splitCamelCase(text): return re.sub('([A-Z][a-z]+)', r' \1', re.sub('([A-Z]+)', r' \1', text)).split() def convertToCamelCase(text, capitalize = False): s = convertToAlphaNumeric(text) s = s.replace("-", " ").replace("_", " ") words = s.split() logger.debug('--> words', words) finalWords = [] for i in range(len(words)): camelCaseSplit = splitCamelCase(words[i]) if len(camelCaseSplit) == 0: finalWords.append(words[i]) else: logging.debug('Camel calse split words', camelCaseSplit) for j in range(len(camelCaseSplit)): finalWords.append(camelCaseSplit[j]) if len(finalWords) == 0: return text finalText = '' if capitalize: finalText = finalWords[0].capitalize() + ''.join(i.capitalize() for i in finalWords[1:]) else: finalText = finalWords[0].lower() + ''.join(i.capitalize() for i in finalWords[1:]) logging.debug('Convert camel case:', text, '-->', finalText) return finalText def loadJsonFile(filePath): logger.info('Loading JSON file %s', filePath) jsonFile = open(filePath, 'r') return json.load(jsonFile) def writeLine(fileDescriptor, line = ''): fileDescriptor.write(line + '\n') def writeLicenseHeader(fileDescriptor): writeLine(fileDescriptor, '/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *') writeLine(fileDescriptor, '*') writeLine(fileDescriptor, '* Copyright 2013 - %s, nymea GmbH' % datetime.datetime.now().year) writeLine(fileDescriptor, '* Contact: contact@nymea.io') writeLine(fileDescriptor, '*') writeLine(fileDescriptor, '* This fileDescriptor is part of nymea.') writeLine(fileDescriptor, '* This project including source code and documentation is protected by') writeLine(fileDescriptor, '* copyright law, and remains the property of nymea GmbH. All rights, including') writeLine(fileDescriptor, '* reproduction, publication, editing and translation, are reserved. The use of') writeLine(fileDescriptor, '* this project is subject to the terms of a license agreement to be concluded') writeLine(fileDescriptor, '* with nymea GmbH in accordance with the terms of use of nymea GmbH, available') writeLine(fileDescriptor, '* under https://nymea.io/license') writeLine(fileDescriptor, '*') writeLine(fileDescriptor, '* GNU Lesser General Public License Usage') writeLine(fileDescriptor, '* Alternatively, this project may be redistributed and/or modified under the') writeLine(fileDescriptor, '* terms of the GNU Lesser General Public License as published by the Free') writeLine(fileDescriptor, '* Software Foundation; version 3. This project is distributed in the hope that') writeLine(fileDescriptor, '* it will be useful, but WITHOUT ANY WARRANTY; without even the implied') writeLine(fileDescriptor, '* warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU') writeLine(fileDescriptor, '* Lesser General Public License for more details.') writeLine(fileDescriptor, '*') writeLine(fileDescriptor, '* You should have received a copy of the GNU Lesser General Public License') writeLine(fileDescriptor, '* along with this project. If not, see .') writeLine(fileDescriptor, '*') writeLine(fileDescriptor, '* For any further details and any questions please contact us under') writeLine(fileDescriptor, '* contact@nymea.io or see our FAQ/Licensing Information on') writeLine(fileDescriptor, '* https://nymea.io/license/faq') writeLine(fileDescriptor, '*') writeLine(fileDescriptor, '* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */') writeLine(fileDescriptor) writeLine(fileDescriptor, '/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *') writeLine(fileDescriptor, '*') writeLine(fileDescriptor, '* WARNING') writeLine(fileDescriptor, '*') writeLine(fileDescriptor, '* This file has been autogenerated. Any changes in this file may be overwritten.') writeLine(fileDescriptor, '* If you want to change something, update the register json or the tool.') writeLine(fileDescriptor, '*') writeLine(fileDescriptor, '* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */') writeLine(fileDescriptor) def writeRegistersEnum(fileDescriptor, registerJson): logger.debug('Writing enum for all registers') registerEnums = {} # Read all register names and addresses if 'blocks' in registerJson: for blockDefinition in registerJson['blocks']: blockRegisters = blockDefinition['registers'] for blockRegister in blockRegisters: registerName = blockRegister['id'] registerAddress = blockRegister['address'] registerEnums[registerAddress] = registerName for registerDefinition in registerJson['registers']: registerName = registerDefinition['id'] registerAddress = registerDefinition['address'] registerEnums[registerAddress] = registerName # Sort the enum map registersKeys = registerEnums.keys() sortedRegistersKeys = sorted(registersKeys) sortedRegisterEnumList = [] logger.debug('Sorted registers') for registerAddress in sortedRegistersKeys: logger.debug('--> %s : %s' % (registerAddress, registerEnums[registerAddress])) enumData = {} enumData['key'] = registerEnums[registerAddress] enumData['value'] = registerAddress sortedRegisterEnumList.append(enumData) enumName = 'Registers' writeLine(fileDescriptor, ' enum %s {' % enumName) for i in range(len(sortedRegisterEnumList)): enumData = sortedRegisterEnumList[i] line = (' Register%s = %s' % (enumData['key'][0].upper() + enumData['key'][1:] , enumData['value'])) if i < (len(sortedRegisterEnumList) - 1): line += ',' writeLine(fileDescriptor, line) writeLine(fileDescriptor, ' };') writeLine(fileDescriptor, ' Q_ENUM(%s)' % enumName) writeLine(fileDescriptor) def writeEnumDefinition(fileDescriptor, enumDefinition): logger.debug('Writing enum %s', enumDefinition) enumName = enumDefinition['name'] enumValues = enumDefinition['values'] writeLine(fileDescriptor, ' enum %s {' % enumName) for i in range(len(enumValues)): enumData = enumValues[i] line = (' %s%s = %s' % (enumName, enumData['key'], enumData['value'])) if i < (len(enumValues) - 1): line += ',' writeLine(fileDescriptor, line) writeLine(fileDescriptor, ' };') writeLine(fileDescriptor, ' Q_ENUM(%s)' % enumName) writeLine(fileDescriptor) def getCppDataType(registerDefinition, rawType = False): if not rawType: if 'enum' in registerDefinition: return registerDefinition['enum'] if 'scaleFactor' in registerDefinition or 'staticScaleFactor' in registerDefinition: return 'float' if registerDefinition['type'] == 'uint16': return 'quint16' if registerDefinition['type'] == 'int16': return 'qint16' if registerDefinition['type'] == 'uint32': return 'quint32' if registerDefinition['type'] == 'int32': return 'qint32' if registerDefinition['type'] == 'uint64': return 'quint64' if registerDefinition['type'] == 'int64': return 'qint64' if registerDefinition['type'] == 'float': return 'float' if registerDefinition['type'] == 'float64': return 'double' if registerDefinition['type'] == 'string': return 'QString' def getConversionToValueMethod(registerDefinition): # Handle enums propertyName = registerDefinition['id'] propertyTyp = getCppDataType(registerDefinition, True) if 'enum' in registerDefinition: enumName = registerDefinition['enum'] if registerDefinition['type'] == 'uint16': return ('ModbusDataUtils::convertFromUInt16(static_cast<%s>(%s))' % (propertyTyp, propertyName)) elif registerDefinition['type'] == 'int16': return ('ModbusDataUtils::convertFromInt16(static_cast<%s>(%s))' % (propertyTyp, propertyName)) elif registerDefinition['type'] == 'uint32': return ('ModbusDataUtils::convertFromUInt32(static_cast<%s>(%s), m_endianness)' % (propertyTyp, propertyName)) elif registerDefinition['type'] == 'int32': return ('ModbusDataUtils::convertFromInt32(static_cast<%s>(%s), m_endianness)' % (propertyTyp, propertyName)) # Handle scale factors if 'scaleFactor' in registerDefinition: scaleFactorProperty = 'm_%s' % registerDefinition['scaleFactor'] if registerDefinition['type'] == 'uint16': return ('ModbusDataUtils::convertFromUInt16(static_cast<%s>(%s * 1.0 / pow(10, %s)))' % (propertyTyp, propertyName, scaleFactorProperty)) elif registerDefinition['type'] == 'int16': return ('ModbusDataUtils::convertFromInt16(static_cast<%s>(%s * 1.0 / pow(10, %s)))' % (propertyTyp, propertyName, scaleFactorProperty)) elif registerDefinition['type'] == 'uint32': return ('ModbusDataUtils::convertFromUInt32(static_cast<%s>(%s * 1.0 / pow(10, %s)), m_endianness)' % (propertyTyp, propertyName, scaleFactorProperty)) elif registerDefinition['type'] == 'int32': return ('ModbusDataUtils::convertFromInt32(static_cast<%s>(%s * 1.0 / pow(10, %s)), m_endianness)' % (propertyTyp, propertyName, scaleFactorProperty)) elif 'staticScaleFactor' in registerDefinition: scaleFactor = registerDefinition['staticScaleFactor'] if registerDefinition['type'] == 'uint16': return ('ModbusDataUtils::convertFromUInt16(static_cast<%s>(%s * 1.0 / pow(10, %s)))' % (propertyTyp, propertyName, scaleFactor)) elif registerDefinition['type'] == 'int16': return ('ModbusDataUtils::convertFromInt16(static_cast<%s>(%s * 1.0 / pow(10, %s)))' % (propertyTyp, propertyName, scaleFactor)) elif registerDefinition['type'] == 'uint32': return ('ModbusDataUtils::convertFromUInt32(static_cast<%s>(%s * 1.0 / pow(10, %s)), m_endianness)' % (propertyTyp, propertyName, scaleFactor)) elif registerDefinition['type'] == 'int32': return ('ModbusDataUtils::convertFromInt32(static_cast<%s>(%s * 1.0 / pow(10, %s)), m_endianness)' % (propertyTyp, propertyName, scaleFactor)) # Handle default types elif registerDefinition['type'] == 'uint16': return ('ModbusDataUtils::convertFromUInt16(%s)' % propertyName) elif registerDefinition['type'] == 'int16': return ('ModbusDataUtils::convertFromInt16(%s)' % propertyName) elif registerDefinition['type'] == 'uint32': return ('ModbusDataUtils::convertFromUInt32(%s, m_endianness)' % (propertyName)) elif registerDefinition['type'] == 'int32': return ('ModbusDataUtils::convertFromInt32(%s, m_endianness)' % (propertyName)) elif registerDefinition['type'] == 'uint64': return ('ModbusDataUtils::convertFromUInt64(%s, m_endianness)' % (propertyName)) elif registerDefinition['type'] == 'int64': return ('ModbusDataUtils::convertFromInt64(%s, m_endianness)' % (propertyName)) elif registerDefinition['type'] == 'float': return ('ModbusDataUtils::convertFromFloat32(%s, m_endianness)' % propertyName) elif registerDefinition['type'] == 'float64': return ('ModbusDataUtils::convertFromFloat64(%s, m_endianness)' % propertyName) elif registerDefinition['type'] == 'string': return ('ModbusDataUtils::convertFromString(%s)' % propertyName) def getValueConversionMethod(registerDefinition): # Handle enums if 'enum' in registerDefinition: enumName = registerDefinition['enum'] if registerDefinition['type'] == 'uint16': return ('static_cast<%s>(ModbusDataUtils::convertToUInt16(values))' % (enumName)) elif registerDefinition['type'] == 'int16': return ('static_cast<%s>(ModbusDataUtils::convertToInt16(values))' % (enumName)) elif registerDefinition['type'] == 'uint32': return ('static_cast<%s>(ModbusDataUtils::convertToUInt32(values, m_endianness))' % (enumName)) elif registerDefinition['type'] == 'int32': return ('static_cast<%s>(ModbusDataUtils::convertToInt32(values, m_endianness))' % (enumName)) # Handle scale factors if 'scaleFactor' in registerDefinition: scaleFactorProperty = 'm_%s' % registerDefinition['scaleFactor'] if registerDefinition['type'] == 'uint16': return ('ModbusDataUtils::convertToUInt16(values) * 1.0 * pow(10, %s)' % (scaleFactorProperty)) elif registerDefinition['type'] == 'int16': return ('ModbusDataUtils::convertToInt16(values) * 1.0 * pow(10, %s)' % (scaleFactorProperty)) elif registerDefinition['type'] == 'uint32': return ('ModbusDataUtils::convertToUInt32(values, m_endianness) * 1.0 * pow(10, %s)' % (scaleFactorProperty)) elif registerDefinition['type'] == 'int32': return ('ModbusDataUtils::convertToInt32(values, m_endianness) * 1.0 * pow(10, %s)' % (scaleFactorProperty)) elif 'staticScaleFactor' in registerDefinition: scaleFactor = registerDefinition['staticScaleFactor'] if registerDefinition['type'] == 'uint16': return ('ModbusDataUtils::convertToUInt16(values) * 1.0 * pow(10, %s)' % (scaleFactor)) elif registerDefinition['type'] == 'int16': return ('ModbusDataUtils::convertToInt16(values) * 1.0 * pow(10, %s)' % (scaleFactor)) elif registerDefinition['type'] == 'uint32': return ('ModbusDataUtils::convertToUInt32(values, m_endianness) * 1.0 * pow(10, %s)' % (scaleFactor)) elif registerDefinition['type'] == 'int32': return ('ModbusDataUtils::convertToInt32(values, m_endianness) * 1.0 * pow(10, %s)' % (scaleFactor)) # Handle default types elif registerDefinition['type'] == 'uint16': return ('ModbusDataUtils::convertToUInt16(values)') elif registerDefinition['type'] == 'int16': return ('ModbusDataUtils::convertToInt16(values)') elif registerDefinition['type'] == 'uint32': return ('ModbusDataUtils::convertToUInt32(values, m_endianness)') elif registerDefinition['type'] == 'int32': return ('ModbusDataUtils::convertToInt32(values, m_endianness)') elif registerDefinition['type'] == 'uint64': return ('ModbusDataUtils::convertToUInt64(values, m_endianness)') elif registerDefinition['type'] == 'int64': return ('ModbusDataUtils::convertToInt64(values, m_endianness)') elif registerDefinition['type'] == 'float': return ('ModbusDataUtils::convertToFloat32(values, m_endianness)') elif registerDefinition['type'] == 'float64': return ('ModbusDataUtils::convertToFloat64(values, m_endianness)') elif registerDefinition['type'] == 'string': return ('ModbusDataUtils::convertToString(values)') def writeBlockGetMethodDeclarations(fileDescriptor, registerDefinitions): for registerDefinition in registerDefinitions: propertyName = registerDefinition['id'] propertyTyp = getCppDataType(registerDefinition) if 'unit' in registerDefinition and registerDefinition['unit'] != '': writeLine(fileDescriptor, ' /* %s [%s] - Address: %s, Size: %s */' % (registerDefinition['description'], registerDefinition['unit'], registerDefinition['address'], registerDefinition['size'])) else: writeLine(fileDescriptor, ' /* %s - Address: %s, Size: %s */' % (registerDefinition['description'], registerDefinition['address'], registerDefinition['size'])) writeLine(fileDescriptor, ' %s %s() const;' % (propertyTyp, propertyName)) writeLine(fileDescriptor) def writePropertyUpdateMethodDeclarations(fileDescriptor, registerDefinitions): for registerDefinition in registerDefinitions: if 'readSchedule' in registerDefinition and registerDefinition['readSchedule'] == 'init': continue propertyName = registerDefinition['id'] propertyTyp = getCppDataType(registerDefinition) writeLine(fileDescriptor, ' void update%s();' % (propertyName[0].upper() + propertyName[1:])) def validateBlocks(blockDefinitions): for blockDefinition in blockDefinitions: blockName = blockDefinition['id'] blockRegisters = blockDefinition['registers'] blockStartAddress = 0 registerCount = 0 blockSize = 0 registerAccess = "" registerType = "" for i, blockRegister in enumerate(blockRegisters): if i == 0: blockStartAddress = blockRegister['address'] registerAccess = blockRegister['access'] registerType = blockRegister['registerType'] else: previouseRegisterAddress = blockRegisters[i - 1]['address'] previouseRegisterSize = blockRegisters[i - 1]['size'] previouseRegisterType = blockRegisters[i - 1]['registerType'] if previouseRegisterAddress + previouseRegisterSize != blockRegister['address']: logger.warning('Error: block %s has invalid register order in register %s. There seems to be a gap between the registers.' % (blockName, blockRegister['id'])) exit(1) if blockRegister['access'] != registerAccess: logger.warning('Error: block %s has inconsistent register access in register %s. The block registers dont seem to have the same access rights.' % (blockName, blockRegister['id'])) exit(1) if blockRegister['registerType'] != registerType: logger.warning('Error: block %s has inconsistent register type in register %s. The block registers dont seem to be from the same type.' % (blockName, blockRegister['id'])) exit(1) registerCount += 1 blockSize += blockRegister['size'] logger.debug('Define valid block \"%s\" starting at %s with length %s containing %s properties to read.' % (blockName, blockStartAddress, blockSize, registerCount)) def writeBlocksUpdateMethodDeclarations(fileDescriptor, blockDefinitions): for blockDefinition in blockDefinitions: blockName = blockDefinition['id'] blockRegisters = blockDefinition['registers'] blockStartAddress = 0 blockSize = 0 registerCount = 0 for i, blockRegister in enumerate(blockRegisters): if i == 0: blockStartAddress = blockRegister['address'] registerCount += 1 blockSize += blockRegister['size'] # Write the block update method writeLine(fileDescriptor, ' /* Read block from start addess %s with size of %s registers containing following %s properties:' % (blockStartAddress, blockSize, registerCount)) for i, registerDefinition in enumerate(blockRegisters): if 'unit' in registerDefinition and registerDefinition['unit'] != '': writeLine(fileDescriptor, ' - %s [%s] - Address: %s, Size: %s' % (registerDefinition['description'], registerDefinition['unit'], registerDefinition['address'], registerDefinition['size'])) else: writeLine(fileDescriptor, ' - %s - Address: %s, Size: %s' % (registerDefinition['description'], registerDefinition['address'], registerDefinition['size'])) writeLine(fileDescriptor, ' */' ) writeLine(fileDescriptor, ' void update%sBlock();' % (blockName[0].upper() + blockName[1:])) writeLine(fileDescriptor) def writeRegistersDebugLine(fileDescriptor, debugObjectParamName, registerDefinitions): for registerDefinition in registerDefinitions: propertyName = registerDefinition['id'] propertyTyp = getCppDataType(registerDefinition) line = ('" - %s: " << %s->%s()' % (registerDefinition['description'], debugObjectParamName, propertyName)) if 'unit' in registerDefinition and registerDefinition['unit'] != '': line += (' << " [%s]"' % registerDefinition['unit']) writeLine(fileDescriptor, ' debug.nospace().noquote() << %s << "\\n";' % (line)) def writePropertyChangedSignals(fileDescriptor, registerDefinitions): for registerDefinition in registerDefinitions: propertyName = registerDefinition['id'] propertyTyp = getCppDataType(registerDefinition) if propertyTyp == 'QString': writeLine(fileDescriptor, ' void %sChanged(const %s &%s);' % (propertyName, propertyTyp, propertyName)) writeLine(fileDescriptor, ' void %sReadFinished(const %s &%s);' % (propertyName, propertyTyp, propertyName)) else: writeLine(fileDescriptor, ' void %sChanged(%s %s);' % (propertyName, propertyTyp, propertyName)) writeLine(fileDescriptor, ' void %sReadFinished(%s %s);' % (propertyName, propertyTyp, propertyName)) def writeProtectedPropertyMembers(fileDescriptor, registerDefinitions): for registerDefinition in registerDefinitions: propertyName = registerDefinition['id'] propertyTyp = getCppDataType(registerDefinition) if 'defaultValue' in registerDefinition: writeLine(fileDescriptor, ' %s m_%s = %s;' % (propertyTyp, propertyName, registerDefinition['defaultValue'])) else: writeLine(fileDescriptor, ' %s m_%s;' % (propertyTyp, propertyName)) def writePropertyProcessMethodDeclaration(fileDescriptor, registerDefinitions): propertyVariables = [] for registerDefinition in registerDefinitions: propertyName = registerDefinition['id'] writeLine(fileDescriptor, ' void process%sRegisterValues(const QVector values);' % (propertyName[0].upper() + propertyName[1:])) writeLine(fileDescriptor) def writePropertyProcessMethodImplementations(fileDescriptor, className, registerDefinitions): propertyVariables = [] for registerDefinition in registerDefinitions: propertyName = registerDefinition['id'] propertyTyp = getCppDataType(registerDefinition) writeLine(fileDescriptor, 'void %s::process%sRegisterValues(const QVector values)' % (className, propertyName[0].upper() + propertyName[1:])) writeLine(fileDescriptor, '{') writeLine(fileDescriptor, ' %s received%s = %s;' % (propertyTyp, propertyName[0].upper() + propertyName[1:], getValueConversionMethod(registerDefinition))) writeLine(fileDescriptor, ' emit %sReadFinished(received%s);' % (propertyName, propertyName[0].upper() + propertyName[1:])) writeLine(fileDescriptor) writeLine(fileDescriptor, ' if (m_%s != received%s) {' % (propertyName, propertyName[0].upper() + propertyName[1:])) writeLine(fileDescriptor, ' m_%s = received%s;' % (propertyName, propertyName[0].upper() + propertyName[1:])) writeLine(fileDescriptor, ' emit %sChanged(m_%s);' % (propertyName, propertyName)) writeLine(fileDescriptor, ' }') writeLine(fileDescriptor, '}') writeLine(fileDescriptor)