mirror of https://github.com/nymea/nymea-mqtt
Shared pointer for the packet data
parent
c2133e5539
commit
f7962614f5
|
|
@ -48,12 +48,17 @@ Q_LOGGING_CATEGORY(dbgProto, "nymea.mqtt.protocol")
|
|||
#define VERIFY_LEN(a, name) if (remainingLength != a) { qCWarning(dbgProto) << "Bad" << name << "packet. Data length unexpected."; return -1; }
|
||||
|
||||
MqttPacket::MqttPacket():
|
||||
d_ptr(new MqttPacketPrivate(this))
|
||||
d_ptr(new MqttPacketPrivate())
|
||||
{
|
||||
}
|
||||
|
||||
MqttPacket::MqttPacket(const MqttPacket &other)
|
||||
{
|
||||
d_ptr = other.d_ptr;
|
||||
}
|
||||
|
||||
MqttPacket::MqttPacket(MqttPacket::Type type, quint16 packetId, Mqtt::QoS qos, bool retain, bool dup):
|
||||
d_ptr(new MqttPacketPrivate(this))
|
||||
d_ptr(new MqttPacketPrivate())
|
||||
{
|
||||
d_ptr->packetId = packetId;
|
||||
d_ptr->header = type;
|
||||
|
|
@ -85,14 +90,19 @@ MqttPacket::MqttPacket(MqttPacket::Type type, quint16 packetId, Mqtt::QoS qos, b
|
|||
}
|
||||
}
|
||||
|
||||
MqttPacket::~MqttPacket()
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
MqttPacket::Type MqttPacket::type() const
|
||||
{
|
||||
return static_cast<Type>(d_ptr->header & 0xF0);
|
||||
return d_ptr->type();
|
||||
}
|
||||
|
||||
bool MqttPacket::dup() const
|
||||
{
|
||||
return d_ptr->header & 0x08;
|
||||
return d_ptr->dup();
|
||||
}
|
||||
|
||||
void MqttPacket::setDup(bool dup)
|
||||
|
|
@ -106,7 +116,7 @@ void MqttPacket::setDup(bool dup)
|
|||
|
||||
Mqtt::QoS MqttPacket::qos() const
|
||||
{
|
||||
return static_cast<Mqtt::QoS>((d_ptr->header & 0x06) >> 1);
|
||||
return d_ptr->qos();
|
||||
}
|
||||
|
||||
void MqttPacket::setQoS(Mqtt::QoS qoS)
|
||||
|
|
@ -117,7 +127,7 @@ void MqttPacket::setQoS(Mqtt::QoS qoS)
|
|||
|
||||
bool MqttPacket::retain() const
|
||||
{
|
||||
return d_ptr->header & 0x01;
|
||||
return d_ptr->retain();
|
||||
}
|
||||
|
||||
void MqttPacket::setRetain(bool retain)
|
||||
|
|
@ -397,17 +407,17 @@ int MqttPacket::parse(const QByteArray &buffer)
|
|||
|
||||
switch (type()) {
|
||||
case TypeConnect: {
|
||||
ASSERT_LEN(2, "CONNECT");
|
||||
ASSERT_LEN(2, "CONNECT")
|
||||
inputStream >> strLen;
|
||||
remainingLength -= 2;
|
||||
|
||||
ASSERT_LEN(strLen, "CONNECT");
|
||||
ASSERT_LEN(strLen, "CONNECT")
|
||||
memset(str, 0, MAX_STRLEN);
|
||||
inputStream.readRawData(str, qMin(MAX_STRLEN, strLen));
|
||||
remainingLength -= strLen;
|
||||
d_ptr->protocolName = QByteArray(str);
|
||||
|
||||
ASSERT_LEN(6, "CONNECT");
|
||||
ASSERT_LEN(6, "CONNECT")
|
||||
quint8 pl;
|
||||
inputStream >> pl;
|
||||
d_ptr->protocolLevel = static_cast<Mqtt::Protocol>(pl);
|
||||
|
|
@ -422,26 +432,26 @@ int MqttPacket::parse(const QByteArray &buffer)
|
|||
inputStream >> strLen;
|
||||
remainingLength -= 2;
|
||||
|
||||
ASSERT_LEN(strLen, "CONNECT");
|
||||
ASSERT_LEN(strLen, "CONNECT")
|
||||
memset(str, 0, MAX_STRLEN);
|
||||
inputStream.readRawData(str, qMin(MAX_STRLEN, strLen));
|
||||
remainingLength -= strLen;
|
||||
d_ptr->clientId = QByteArray(str);
|
||||
|
||||
if (connectFlags().testFlag(Mqtt::ConnectFlagWill)) {
|
||||
ASSERT_LEN(2, "CONNECT");
|
||||
ASSERT_LEN(2, "CONNECT")
|
||||
inputStream >> strLen;
|
||||
remainingLength -= 2;
|
||||
ASSERT_LEN(strLen, "CONNECT");
|
||||
ASSERT_LEN(strLen, "CONNECT")
|
||||
memset(str, 0, MAX_STRLEN);
|
||||
inputStream.readRawData(str, qMin(MAX_STRLEN, strLen));
|
||||
remainingLength -= strLen;
|
||||
d_ptr->willTopic = QByteArray(str);
|
||||
|
||||
ASSERT_LEN(2, "CONNECT");
|
||||
ASSERT_LEN(2, "CONNECT")
|
||||
inputStream >> strLen;
|
||||
remainingLength -= 2;
|
||||
ASSERT_LEN(strLen, "CONNECT");
|
||||
ASSERT_LEN(strLen, "CONNECT")
|
||||
memset(str, 0, MAX_STRLEN);
|
||||
inputStream.readRawData(str, qMin(MAX_STRLEN, strLen));
|
||||
remainingLength -= strLen;
|
||||
|
|
@ -454,10 +464,10 @@ int MqttPacket::parse(const QByteArray &buffer)
|
|||
}
|
||||
|
||||
if (connectFlags().testFlag(Mqtt::ConnectFlagUsername)) {
|
||||
ASSERT_LEN(2, "CONNECT");
|
||||
ASSERT_LEN(2, "CONNECT")
|
||||
inputStream >> strLen;
|
||||
remainingLength -= 2;
|
||||
ASSERT_LEN(strLen, "CONNECT");
|
||||
ASSERT_LEN(strLen, "CONNECT")
|
||||
memset(str, 0, MAX_STRLEN);
|
||||
inputStream.readRawData(str, qMin(MAX_STRLEN, strLen));
|
||||
remainingLength -= strLen;
|
||||
|
|
@ -470,20 +480,20 @@ int MqttPacket::parse(const QByteArray &buffer)
|
|||
}
|
||||
|
||||
if (connectFlags().testFlag(Mqtt::ConnectFlagPassword)) {
|
||||
ASSERT_LEN(2, "CONNECT");
|
||||
ASSERT_LEN(2, "CONNECT")
|
||||
inputStream >> strLen;
|
||||
remainingLength -= 2;
|
||||
ASSERT_LEN(strLen, "CONNECT");
|
||||
ASSERT_LEN(strLen, "CONNECT")
|
||||
memset(str, 0, MAX_STRLEN);
|
||||
inputStream.readRawData(str, qMin(MAX_STRLEN, strLen));
|
||||
remainingLength -= strLen;
|
||||
d_ptr->password = QByteArray(str);
|
||||
}
|
||||
VERIFY_LEN(0, "CONNECT");
|
||||
VERIFY_LEN(0, "CONNECT")
|
||||
break;
|
||||
}
|
||||
case TypeConnack: {
|
||||
VERIFY_LEN(2, "CONNACK");
|
||||
VERIFY_LEN(2, "CONNACK")
|
||||
quint8 connackFlags;
|
||||
inputStream >> connackFlags;
|
||||
remainingLength -= 1;
|
||||
|
|
@ -492,21 +502,21 @@ int MqttPacket::parse(const QByteArray &buffer)
|
|||
inputStream >> connectReturnCode;
|
||||
d_ptr->connectReturnCode = static_cast<Mqtt::ConnectReturnCode>(connectReturnCode);
|
||||
remainingLength -= 1;
|
||||
VERIFY_LEN(0, "CONNACK");
|
||||
VERIFY_LEN(0, "CONNACK")
|
||||
break;
|
||||
}
|
||||
case TypePublish: {
|
||||
ASSERT_LEN(2, "PUBLISH");
|
||||
ASSERT_LEN(2, "PUBLISH")
|
||||
inputStream >> strLen;
|
||||
remainingLength -= 2;
|
||||
ASSERT_LEN(strLen, "PUBLISH");
|
||||
ASSERT_LEN(strLen, "PUBLISH")
|
||||
memset(str, 0, MAX_STRLEN);
|
||||
inputStream.readRawData(str, qMin(MAX_STRLEN, strLen));
|
||||
remainingLength -= strLen;
|
||||
d_ptr->topic = QByteArray(str);
|
||||
|
||||
if (qos() == Mqtt::QoS1 || qos() == Mqtt::QoS2) {
|
||||
ASSERT_LEN(2, "PUBLISH");
|
||||
ASSERT_LEN(2, "PUBLISH")
|
||||
inputStream >> d_ptr->packetId;
|
||||
remainingLength -= 2;
|
||||
}
|
||||
|
|
@ -517,23 +527,23 @@ int MqttPacket::parse(const QByteArray &buffer)
|
|||
break;
|
||||
}
|
||||
case TypePuback:
|
||||
VERIFY_LEN(2, "PUBACK");
|
||||
VERIFY_LEN(2, "PUBACK")
|
||||
inputStream >> d_ptr->packetId;
|
||||
break;
|
||||
case TypePubrec:
|
||||
VERIFY_LEN(2, "PUBREC");
|
||||
VERIFY_LEN(2, "PUBREC")
|
||||
inputStream >> d_ptr->packetId;
|
||||
break;
|
||||
case TypePubrel:
|
||||
VERIFY_LEN(2, "PUBREL");
|
||||
VERIFY_LEN(2, "PUBREL")
|
||||
inputStream >> d_ptr->packetId;
|
||||
break;
|
||||
case TypePubcomp:
|
||||
VERIFY_LEN(2, "PUBCOMP");
|
||||
VERIFY_LEN(2, "PUBCOMP")
|
||||
inputStream >> d_ptr->packetId;
|
||||
break;
|
||||
case TypeSubscribe: {
|
||||
ASSERT_LEN(2, "SUBSCRIBE");
|
||||
ASSERT_LEN(2, "SUBSCRIBE")
|
||||
inputStream >> d_ptr->packetId;
|
||||
remainingLength -= 2;
|
||||
|
||||
|
|
@ -542,17 +552,17 @@ int MqttPacket::parse(const QByteArray &buffer)
|
|||
return -1;
|
||||
}
|
||||
while (remainingLength > 0) {
|
||||
ASSERT_LEN(2, "SUBSCRIBE");
|
||||
ASSERT_LEN(2, "SUBSCRIBE")
|
||||
inputStream >> strLen;
|
||||
remainingLength -= 2;
|
||||
ASSERT_LEN(strLen, "SUBSCRIBE");
|
||||
ASSERT_LEN(strLen, "SUBSCRIBE")
|
||||
memset(str, 0, MAX_STRLEN);
|
||||
inputStream.readRawData(str, qMin(MAX_STRLEN, strLen));
|
||||
remainingLength -= strLen;
|
||||
MqttSubscription subscription;
|
||||
subscription.setTopicFilter(QByteArray(str));
|
||||
|
||||
ASSERT_LEN(1, "SUBSCRIBE");
|
||||
ASSERT_LEN(1, "SUBSCRIBE")
|
||||
quint8 requestedQoS;
|
||||
inputStream >> requestedQoS;
|
||||
remainingLength -= 1;
|
||||
|
|
@ -570,7 +580,7 @@ int MqttPacket::parse(const QByteArray &buffer)
|
|||
break;
|
||||
}
|
||||
case TypeSuback:
|
||||
ASSERT_LEN(3, "SUBACK");
|
||||
ASSERT_LEN(3, "SUBACK")
|
||||
inputStream >> d_ptr->packetId;
|
||||
remainingLength -= 2;
|
||||
while (remainingLength > 0) {
|
||||
|
|
@ -581,14 +591,14 @@ int MqttPacket::parse(const QByteArray &buffer)
|
|||
}
|
||||
break;
|
||||
case TypeUnsubscribe: {
|
||||
ASSERT_LEN(5, "UNSUBSCRIBE");
|
||||
ASSERT_LEN(5, "UNSUBSCRIBE")
|
||||
inputStream >> d_ptr->packetId;
|
||||
remainingLength -= 2;
|
||||
while (remainingLength > 0) {
|
||||
ASSERT_LEN(2, "UNSUBSCRIBE");
|
||||
ASSERT_LEN(2, "UNSUBSCRIBE")
|
||||
inputStream >> strLen;
|
||||
remainingLength -= 2;
|
||||
ASSERT_LEN(strLen, "UNSUBSCRIBE");
|
||||
ASSERT_LEN(strLen, "UNSUBSCRIBE")
|
||||
memset(str, 0, MAX_STRLEN);
|
||||
inputStream.readRawData(str, qMin(MAX_STRLEN, strLen));
|
||||
remainingLength -= strLen;
|
||||
|
|
@ -599,17 +609,17 @@ int MqttPacket::parse(const QByteArray &buffer)
|
|||
}
|
||||
break;
|
||||
case TypeUnsuback:
|
||||
VERIFY_LEN(2, "UNSUBACK");
|
||||
VERIFY_LEN(2, "UNSUBACK")
|
||||
inputStream >> d_ptr->packetId;
|
||||
break;
|
||||
case TypePingreq:
|
||||
VERIFY_LEN(0, "PINGREC");
|
||||
VERIFY_LEN(0, "PINGREC")
|
||||
break;
|
||||
case TypePingresp:
|
||||
VERIFY_LEN(0, "PINGRESP");
|
||||
VERIFY_LEN(0, "PINGRESP")
|
||||
break;
|
||||
case TypeDisconnect:
|
||||
VERIFY_LEN(0, "DISCONNECT");
|
||||
VERIFY_LEN(0, "DISCONNECT")
|
||||
break;
|
||||
}
|
||||
return fullRemainingLength + 1 + lenFields;
|
||||
|
|
@ -776,11 +786,40 @@ bool MqttPacket::operator==(const MqttPacket &other) const
|
|||
return serialize() == other.serialize();
|
||||
}
|
||||
|
||||
MqttPacket &MqttPacket::operator =(const MqttPacket &other)
|
||||
{
|
||||
d_ptr = other.d_ptr;
|
||||
return *this;
|
||||
}
|
||||
|
||||
MqttPacketPrivate::MqttPacketPrivate(const MqttPacketPrivate &other):
|
||||
QSharedData(other),
|
||||
header(other.header),
|
||||
protocolName(other.protocolName),
|
||||
protocolLevel(other.protocolLevel),
|
||||
connectFlags(other.connectFlags),
|
||||
connackFlags(other.connackFlags),
|
||||
keepAlive(other.keepAlive),
|
||||
clientId(other.clientId),
|
||||
willTopic(other.willTopic),
|
||||
willMessage(other.willMessage),
|
||||
username(other.username),
|
||||
password(other.password),
|
||||
packetId(other.packetId),
|
||||
topic(other.topic),
|
||||
payload(other.payload),
|
||||
connectReturnCode(other.connectReturnCode),
|
||||
subscriptions(other.subscriptions),
|
||||
subscribeReturnCodes(other.subscribeReturnCodes)
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
bool MqttPacketPrivate::verifyHeaderFlags()
|
||||
{
|
||||
bool fail = false;
|
||||
|
||||
switch (q_ptr->type()) {
|
||||
switch (type()) {
|
||||
case MqttPacket::MqttPacket::TypeConnect:
|
||||
case MqttPacket::TypeConnack:
|
||||
case MqttPacket::TypePuback:
|
||||
|
|
@ -791,20 +830,40 @@ bool MqttPacketPrivate::verifyHeaderFlags()
|
|||
case MqttPacket::TypePingreq:
|
||||
case MqttPacket::TypePingresp:
|
||||
case MqttPacket::TypeDisconnect:
|
||||
fail |= q_ptr->dup();
|
||||
fail |= (q_ptr->qos() != Mqtt::QoS0);
|
||||
fail |= q_ptr->retain();
|
||||
fail |= dup();
|
||||
fail |= (qos() != Mqtt::QoS0);
|
||||
fail |= retain();
|
||||
break;
|
||||
case MqttPacket::TypePublish:
|
||||
fail |= (q_ptr->qos() == 0x03);
|
||||
fail |= (qos() == 0x03);
|
||||
break;
|
||||
case MqttPacket::TypeSubscribe:
|
||||
case MqttPacket::TypePubrel:
|
||||
case MqttPacket::TypeUnsubscribe:
|
||||
fail |= q_ptr->dup();
|
||||
fail |= (q_ptr->qos() != Mqtt::QoS1);
|
||||
fail |= q_ptr->retain();
|
||||
fail |= dup();
|
||||
fail |= (qos() != Mqtt::QoS1);
|
||||
fail |= retain();
|
||||
break;
|
||||
}
|
||||
return !fail;
|
||||
}
|
||||
|
||||
bool MqttPacketPrivate::dup() const
|
||||
{
|
||||
return header & 0x08;
|
||||
}
|
||||
|
||||
Mqtt::QoS MqttPacketPrivate::qos() const
|
||||
{
|
||||
return static_cast<Mqtt::QoS>((header & 0x06) >> 1);
|
||||
}
|
||||
|
||||
bool MqttPacketPrivate::retain() const
|
||||
{
|
||||
return header & 0x01;
|
||||
}
|
||||
|
||||
MqttPacket::Type MqttPacketPrivate::type() const
|
||||
{
|
||||
return static_cast<MqttPacket::Type>(header & 0xF0);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -32,6 +32,7 @@
|
|||
#include <QByteArray>
|
||||
#include <QList>
|
||||
#include <QLoggingCategory>
|
||||
#include <QSharedDataPointer>
|
||||
|
||||
#include "mqtt.h"
|
||||
#include "mqttsubscription.h"
|
||||
|
|
@ -60,9 +61,10 @@ public:
|
|||
};
|
||||
|
||||
MqttPacket();
|
||||
MqttPacket(const MqttPacket &other);
|
||||
MqttPacket(Type type, quint16 packetId = 0, Mqtt::QoS qos = Mqtt::QoS0, bool retain = false, bool dup = false);
|
||||
~MqttPacket();
|
||||
|
||||
public:
|
||||
Type type() const;
|
||||
bool dup() const;
|
||||
void setDup(bool dup);
|
||||
|
|
@ -129,9 +131,10 @@ public:
|
|||
QByteArray serialize() const;
|
||||
|
||||
bool operator==(const MqttPacket &other) const;
|
||||
MqttPacket &operator=(const MqttPacket &other);
|
||||
|
||||
private:
|
||||
MqttPacketPrivate *d_ptr = nullptr;
|
||||
QSharedDataPointer<MqttPacketPrivate> d_ptr;
|
||||
};
|
||||
|
||||
typedef QList<MqttPacket> MqttPackets;
|
||||
|
|
|
|||
|
|
@ -37,15 +37,21 @@
|
|||
#include "mqttpacket.h"
|
||||
#include "mqttsubscription.h"
|
||||
|
||||
#include <QSharedData>
|
||||
|
||||
Q_DECLARE_LOGGING_CATEGORY(dbgProto)
|
||||
|
||||
class MqttPacketPrivate
|
||||
class MqttPacketPrivate: public QSharedData
|
||||
{
|
||||
public:
|
||||
MqttPacketPrivate(MqttPacket *q) : q_ptr(q) { }
|
||||
MqttPacket *q_ptr;
|
||||
MqttPacketPrivate(){ }
|
||||
MqttPacketPrivate(const MqttPacketPrivate &other);
|
||||
|
||||
bool verifyHeaderFlags();
|
||||
MqttPacket::Type type() const;
|
||||
bool dup() const;
|
||||
Mqtt::QoS qos() const;
|
||||
bool retain() const;
|
||||
|
||||
quint8 header = 0;
|
||||
QByteArray protocolName = "MQTT";
|
||||
|
|
|
|||
Loading…
Reference in New Issue