add reconnect feature to client and improve server api a bit
This commit is contained in:
parent
63e8f2055e
commit
7e35b3e79e
@ -30,12 +30,18 @@ MqttClientPrivate::MqttClientPrivate(MqttClient *q):
|
||||
{
|
||||
qRegisterMetaType<Mqtt::SubscribeReturnCodes>();
|
||||
qRegisterMetaType<Mqtt::ConnackFlags>();
|
||||
reconnectTimer.setSingleShot(true);
|
||||
connect(&reconnectTimer, &QTimer::timeout, this, &MqttClientPrivate::reconnectTimerTimeout);
|
||||
}
|
||||
|
||||
void MqttClientPrivate::connectToHost(const QString &hostName, quint16 port, bool cleanSession)
|
||||
{
|
||||
serverHostname = hostName;
|
||||
serverPort = port;
|
||||
if (serverHostname != hostName || serverPort != port) {
|
||||
serverHostname = hostName;
|
||||
serverPort = port;
|
||||
reconnectAttempt = 1;
|
||||
reconnectTimer.stop();
|
||||
}
|
||||
this->cleanSession = cleanSession;
|
||||
|
||||
sessionActive = true;
|
||||
@ -100,6 +106,16 @@ void MqttClient::setAutoReconnect(bool autoReconnect)
|
||||
d_ptr->autoReconnect = autoReconnect;
|
||||
}
|
||||
|
||||
quint16 MqttClient::maxAutoReconnectTimeout() const
|
||||
{
|
||||
return d_ptr->maxReconnectTimeout;
|
||||
}
|
||||
|
||||
void MqttClient::setMaxAutoReconnectTimeout(quint16 maxAutoReconnectTimeout)
|
||||
{
|
||||
d_ptr->maxReconnectTimeout = maxAutoReconnectTimeout;
|
||||
}
|
||||
|
||||
void MqttClient::setKeepAlive(quint16 keepAlive)
|
||||
{
|
||||
d_ptr->keepAlive = keepAlive;
|
||||
@ -262,7 +278,10 @@ void MqttClientPrivate::onDisconnected()
|
||||
qCDebug(dbgClient) << "Disconnected from server";
|
||||
emit q_ptr->disconnected();
|
||||
if (sessionActive && autoReconnect) {
|
||||
connectToHost(serverHostname, serverPort, cleanSession);
|
||||
reconnectAttempt = qMin(maxReconnectTimeout / 60 / 60, reconnectAttempt * 2);
|
||||
qCDebug(dbgClient) << "Reconnecint in" << reconnectAttempt << "seconds";
|
||||
reconnectTimer.setInterval(reconnectAttempt * 1000);
|
||||
reconnectTimer.start();
|
||||
}
|
||||
}
|
||||
|
||||
@ -287,18 +306,23 @@ void MqttClientPrivate::onReadyRead()
|
||||
|
||||
switch (packet.type()) {
|
||||
case MqttPacket::TypeConnack:
|
||||
emit q_ptr->connected(packet.connectReturnCode(), packet.connackFlags());
|
||||
if (packet.connectReturnCode() != Mqtt::ConnectReturnCodeAccepted) {
|
||||
qCWarning(dbgClient) << "MQTT connection refused:" << packet.connectReturnCode();
|
||||
// Always emit connected, even if just to indicate a "ClientRefusedError"
|
||||
emit q_ptr->connected(packet.connectReturnCode(), packet.connackFlags());
|
||||
socket->abort();
|
||||
emit q_ptr->disconnected();
|
||||
emit q_ptr->error(QAbstractSocket::ConnectionRefusedError);
|
||||
return;
|
||||
}
|
||||
foreach (quint16 retryPacketId, unackedPacketList) {
|
||||
MqttPacket retryPacket = unackedPackets.value(retryPacketId);
|
||||
retryPacket.setDup(true);
|
||||
if (retryPacket.type() == MqttPacket::TypePublish) {
|
||||
retryPacket.setDup(true);
|
||||
}
|
||||
socket->write(retryPacket.serialize());
|
||||
}
|
||||
// Make sure we emit connected after having handled all the retransmission queue
|
||||
emit q_ptr->connected(packet.connectReturnCode(), packet.connackFlags());
|
||||
restartKeepAliveTimer();
|
||||
break;
|
||||
case MqttPacket::TypePublish:
|
||||
@ -413,3 +437,11 @@ void MqttClientPrivate::restartKeepAliveTimer()
|
||||
keepAliveTimer.start(keepAlive * 1000);
|
||||
}
|
||||
}
|
||||
|
||||
void MqttClientPrivate::reconnectTimerTimeout()
|
||||
{
|
||||
if (!autoReconnect) {
|
||||
return;
|
||||
}
|
||||
connectToHost(serverHostname, serverPort, false);
|
||||
}
|
||||
|
||||
@ -39,6 +39,9 @@ public:
|
||||
bool autoReconnect() const;
|
||||
void setAutoReconnect(bool autoReconnect);
|
||||
|
||||
quint16 maxAutoReconnectTimeout() const;
|
||||
void setMaxAutoReconnectTimeout(quint16 maxAutoReconnectTimeout);
|
||||
|
||||
quint16 keepAlive() const;
|
||||
void setKeepAlive(quint16 keepAlive);
|
||||
|
||||
|
||||
@ -52,6 +52,8 @@ public slots:
|
||||
void sendPingreq();
|
||||
void restartKeepAliveTimer();
|
||||
|
||||
void reconnectTimerTimeout();
|
||||
|
||||
public:
|
||||
QString serverHostname;
|
||||
quint16 serverPort = 0;
|
||||
@ -59,6 +61,9 @@ public:
|
||||
bool sessionActive = false;
|
||||
bool cleanSession = true;
|
||||
QTcpSocket *socket = nullptr;
|
||||
QTimer reconnectTimer;
|
||||
int reconnectAttempt = 0;
|
||||
quint16 maxReconnectTimeout = 36000;
|
||||
|
||||
QString clientId;
|
||||
quint16 keepAlive;
|
||||
|
||||
@ -60,7 +60,9 @@ MqttPacket::MqttPacket(MqttPacket::Type type, quint16 packetId, Mqtt::QoS qos, b
|
||||
case TypeSubscribe:
|
||||
case TypePubrel:
|
||||
case TypeUnsubscribe:
|
||||
setDup(false);
|
||||
setQoS(Mqtt::QoS1);
|
||||
setRetain(false);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@ -35,9 +35,6 @@ MqttServerPrivate::MqttServerPrivate(MqttServer *q):
|
||||
q_ptr(q)
|
||||
{
|
||||
qRegisterMetaType<Mqtt::QoS>();
|
||||
|
||||
server = new QTcpServer(this);
|
||||
connect(server, &QTcpServer::newConnection, this, &MqttServerPrivate::onNewConnection);
|
||||
}
|
||||
|
||||
QHash<QString, quint16> MqttServerPrivate::publish(const QString &topic, const QByteArray &payload)
|
||||
@ -94,26 +91,56 @@ void MqttServer::setMaximumSubscriptionsQoS(Mqtt::QoS maximumSubscriptionQoS)
|
||||
d_ptr->maximumSubscriptionQoS = maximumSubscriptionQoS;
|
||||
}
|
||||
|
||||
bool MqttServer::listen(const QHostAddress &address, quint16 port, MqttUserValidator *userValidator)
|
||||
void MqttServer::setAuthorizer(MqttAuthorizer *authorizer)
|
||||
{
|
||||
d_ptr->userValidator = userValidator;
|
||||
d_ptr->authorizer = authorizer;
|
||||
}
|
||||
|
||||
if (!d_ptr->server->listen(address, port)) {
|
||||
qCWarning(dbgServer()) << "Error listening on port" << port;
|
||||
return false;
|
||||
int MqttServer::listen(const QHostAddress &address, quint16 port, const QSslConfiguration &sslConfiguration)
|
||||
{
|
||||
SslServer *server = new SslServer(sslConfiguration);
|
||||
connect(server, &SslServer::clientConnected, d_ptr, &MqttServerPrivate::onClientConnected);
|
||||
connect(server, &SslServer::clientDisconnected, d_ptr, &MqttServerPrivate::onClientDisconnected);
|
||||
connect(server, &SslServer::dataAvailable, d_ptr, &MqttServerPrivate::onDataAvailable);
|
||||
|
||||
if (!server->listen(address, port)) {
|
||||
qCWarning(dbgServer) << "Error listening on port" << port;
|
||||
server->deleteLater();
|
||||
return -1;
|
||||
}
|
||||
qCDebug(dbgServer) << "nymea MQTT server running on" << address.toString() << ":" << port;
|
||||
return true;
|
||||
static int addressId = -1;
|
||||
d_ptr->servers.insert(++addressId, server);
|
||||
qCDebug(dbgServer) << "nymea MQTT server running on" << address.toString() << ":" << port << "( Address ID" << addressId << ")";
|
||||
return addressId;
|
||||
}
|
||||
|
||||
bool MqttServer::isListening() const
|
||||
bool MqttServer::isListening(const QHostAddress &address, quint16 port) const
|
||||
{
|
||||
return d_ptr->server->isListening();
|
||||
foreach (SslServer *server, d_ptr->servers) {
|
||||
if (server->serverAddress() == address && server->serverPort() == port && server->isListening()) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
void MqttServer::close()
|
||||
QList<int> MqttServer::listeningAddressIds() const
|
||||
{
|
||||
d_ptr->server->close();
|
||||
return d_ptr->servers.keys();
|
||||
}
|
||||
|
||||
void MqttServer::close(int interfaceId)
|
||||
{
|
||||
if (!d_ptr->servers.contains(interfaceId)) {
|
||||
qCWarning(dbgServer) << "No such server address ID" << interfaceId;
|
||||
return;
|
||||
}
|
||||
SslServer *server = d_ptr->servers.take(interfaceId);
|
||||
while (!d_ptr->clientServerMap.keys(server).isEmpty()) {
|
||||
d_ptr->cleanupClient(d_ptr->clientServerMap.keys(server).first());
|
||||
}
|
||||
server->close();
|
||||
server->deleteLater();
|
||||
}
|
||||
|
||||
QStringList MqttServer::clients() const
|
||||
@ -125,14 +152,24 @@ QStringList MqttServer::clients() const
|
||||
return clientIds;
|
||||
}
|
||||
|
||||
void MqttServer::disconnectClient(const QString &clientId)
|
||||
{
|
||||
foreach (ClientContext *ctx, d_ptr->clientList) {
|
||||
if (ctx->clientId == clientId) {
|
||||
d_ptr->cleanupClient(d_ptr->clientList.key(ctx));
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
QHash<QString, quint16> MqttServer::publish(const QString &topic, const QByteArray &payload)
|
||||
{
|
||||
return d_ptr->publish(topic, payload);
|
||||
}
|
||||
|
||||
void MqttServerPrivate::onNewConnection()
|
||||
void MqttServerPrivate::onClientConnected(QSslSocket *client)
|
||||
{
|
||||
QTcpSocket *client = server->nextPendingConnection();
|
||||
SslServer *server = static_cast<SslServer*>(sender());
|
||||
|
||||
// Start a 10 second timer to clean up the connection if we don't get data until then.
|
||||
QTimer *timeoutTimer = new QTimer(this);
|
||||
@ -143,16 +180,43 @@ void MqttServerPrivate::onNewConnection()
|
||||
client->deleteLater();
|
||||
});
|
||||
timeoutTimer->start(10000);
|
||||
clientServerMap.insert(client, server);
|
||||
pendingConnections.insert(client, timeoutTimer);
|
||||
|
||||
connect(client, &QTcpSocket::readyRead, this, &MqttServerPrivate::onClientReadyRead);
|
||||
connect(client, SIGNAL(error(QAbstractSocket::SocketError)), this, SLOT(onClientError(QAbstractSocket::SocketError)));
|
||||
connect(client, &QTcpSocket::disconnected, this, &MqttServerPrivate::onClientDisconnected);
|
||||
}
|
||||
|
||||
void MqttServerPrivate::onClientDisconnected()
|
||||
void MqttServerPrivate::onDataAvailable(QSslSocket *client, const QByteArray &data)
|
||||
{
|
||||
clientBuffers[client].append(data);
|
||||
|
||||
do {
|
||||
MqttPacket packet;
|
||||
int ret = packet.parse(clientBuffers[client]);
|
||||
if (ret == 0) {
|
||||
qCDebug(dbgServer) << "Packet too short... Waiting for more...";
|
||||
return;
|
||||
}
|
||||
|
||||
// Ok, we've got a full packet (or garbage data). If this client is still pending
|
||||
// we can stop the timer, the protocol will take it from here.
|
||||
if (pendingConnections.contains(client)) {
|
||||
pendingConnections.take(client)->deleteLater();
|
||||
}
|
||||
|
||||
if (ret == -1) {
|
||||
qCWarning(dbgServer) << "Bad MQTT packet data, Dropping connection" << packet.serialize().toHex();
|
||||
cleanupClient(client);
|
||||
return;
|
||||
}
|
||||
|
||||
clientBuffers[client].remove(0, ret);
|
||||
|
||||
processPacket(packet, client);
|
||||
|
||||
} while (!clientBuffers.value(client).isEmpty());
|
||||
}
|
||||
|
||||
void MqttServerPrivate::onClientDisconnected(QSslSocket *client)
|
||||
{
|
||||
QTcpSocket *client = static_cast<QTcpSocket*>(sender());
|
||||
cleanupClient(client);
|
||||
}
|
||||
|
||||
@ -161,6 +225,9 @@ void MqttServerPrivate::cleanupClient(QTcpSocket *client)
|
||||
if (clientBuffers.contains(client)) {
|
||||
clientBuffers.remove(client);
|
||||
}
|
||||
if (clientServerMap.contains(client)) {
|
||||
clientServerMap.remove(client);
|
||||
}
|
||||
if (clientList.contains(client)) {
|
||||
ClientContext *ctx = clientList.value(client);
|
||||
qCDebug(dbgServer) << "Client" << ctx->clientId << "disconnected.";
|
||||
@ -174,12 +241,20 @@ void MqttServerPrivate::cleanupClient(QTcpSocket *client)
|
||||
processPacket(willPacket, client);
|
||||
}
|
||||
|
||||
while (!ctx->subscriptions.isEmpty()) {
|
||||
emit q_ptr->clientUnsubscribed(ctx->clientId, ctx->subscriptions.takeFirst().topicFilter());
|
||||
}
|
||||
|
||||
emit q_ptr->clientDisconnected(ctx->clientId);
|
||||
|
||||
clientList.remove(client);
|
||||
delete ctx;
|
||||
}
|
||||
client->flush();
|
||||
|
||||
if (client->isOpen()) {
|
||||
client->flush();
|
||||
client->close();
|
||||
}
|
||||
client->deleteLater();
|
||||
}
|
||||
|
||||
@ -215,7 +290,7 @@ void MqttServerPrivate::processPacket(const MqttPacket &packet, QTcpSocket *clie
|
||||
clientId = QUuid::createUuid().toString().remove(QRegExp("[{}-]*"));
|
||||
}
|
||||
|
||||
if (userValidator) {
|
||||
if (authorizer) {
|
||||
QString username;
|
||||
if (packet.connectFlags().testFlag(Mqtt::ConnectFlagUsername)) {
|
||||
username = packet.username();
|
||||
@ -224,7 +299,9 @@ void MqttServerPrivate::processPacket(const MqttPacket &packet, QTcpSocket *clie
|
||||
if (packet.connectFlags().testFlag(Mqtt::ConnectFlagPassword)) {
|
||||
password = packet.password();
|
||||
}
|
||||
Mqtt::ConnectReturnCode userValidationReturnCode = userValidator->validateConnect(clientId, username, password, client->peerAddress());
|
||||
SslServer *server = clientServerMap.value(client);
|
||||
int serverAddressId = servers.key(server);
|
||||
Mqtt::ConnectReturnCode userValidationReturnCode = authorizer->authorizeConnect(serverAddressId, clientId, username, password, client->peerAddress());
|
||||
if (userValidationReturnCode != Mqtt::ConnectReturnCodeAccepted) {
|
||||
qCWarning(dbgServer) << "Rejecting connection due to user validation.";
|
||||
response.setConnectReturnCode(userValidationReturnCode);
|
||||
@ -311,7 +388,7 @@ void MqttServerPrivate::processPacket(const MqttPacket &packet, QTcpSocket *clie
|
||||
clientList.insert(client, ctx);
|
||||
response.setConnectReturnCode(Mqtt::ConnectReturnCodeAccepted);
|
||||
client->write(response.serialize());
|
||||
emit q_ptr->clientConnected(ctx->clientId, ctx->username, client->peerAddress());
|
||||
emit q_ptr->clientConnected(servers.key(clientServerMap.value(client)), ctx->clientId, ctx->username, client->peerAddress());
|
||||
|
||||
foreach (quint16 retryPacketId, ctx->unackedPacketList) {
|
||||
qCDebug(dbgServer) << "Resending unacked packet" << retryPacketId << "to" << ctx->clientId;;
|
||||
@ -336,8 +413,6 @@ void MqttServerPrivate::processPacket(const MqttPacket &packet, QTcpSocket *clie
|
||||
|
||||
if (packet.type() == MqttPacket::TypePublish) {
|
||||
qCDebug(dbgServer).nospace() << "Publish received from client " << ctx->clientId << ": Topic: " << packet.topic() << ", Payload: " << packet.payload() << " (Packet ID: " << packet.packetId() << ", DUP: " << packet.dup() << ", QoS: " << packet.qos() << ", Retain: " << packet.retain() << ')';
|
||||
emit q_ptr->publishReceived(ctx->clientId, packet.packetId(), packet.topic(), packet.payload(), packet.dup());
|
||||
|
||||
switch (packet.qos()) {
|
||||
case Mqtt::QoS0:
|
||||
break;
|
||||
@ -379,6 +454,12 @@ void MqttServerPrivate::processPacket(const MqttPacket &packet, QTcpSocket *clie
|
||||
}
|
||||
}
|
||||
|
||||
if (authorizer && !authorizer->authorizePublish(servers.key(clientServerMap.value(client)), ctx->clientId, packet.topic())) {
|
||||
qCDebug(dbgServer) << "Client not authorized to publish to this topic. Discarding packet";
|
||||
return;
|
||||
}
|
||||
|
||||
emit q_ptr->publishReceived(ctx->clientId, packet.packetId(), packet.topic(), packet.payload());
|
||||
publish(packet.topic(), packet.payload());
|
||||
|
||||
return;
|
||||
@ -413,8 +494,9 @@ void MqttServerPrivate::processPacket(const MqttPacket &packet, QTcpSocket *clie
|
||||
// qCDebug(dbgServer).nospace() << ctx->clientId ": Subscribe packet received.";
|
||||
MqttPacket response(MqttPacket::TypeSuback, packet.packetId());
|
||||
QByteArray payload;
|
||||
MqttSubscriptions effectiveSubscriptions;
|
||||
foreach (MqttSubscription subscription, packet.subscriptions()) {
|
||||
if (userValidator && !userValidator->validateSubscribe(subscription.topicFilter(), ctx->clientId, ctx->username)) {
|
||||
if (authorizer && !authorizer->authorizeSubscribe(servers.key(clientServerMap.value(client)), ctx->clientId, subscription.topicFilter())) {
|
||||
qCWarning(dbgServer).nospace().noquote() << "Subscription topic filter not allowed for client \"" << ctx->clientId << "\": \"" << subscription.topicFilter() << '\"';
|
||||
response.addSubscribeReturnCode(Mqtt::SubscribeReturnCodeFailure);
|
||||
continue;
|
||||
@ -437,6 +519,7 @@ void MqttServerPrivate::processPacket(const MqttPacket &packet, QTcpSocket *clie
|
||||
ctx->subscriptions.append(subscription);
|
||||
}
|
||||
qCDebug(dbgServer).noquote().nospace() << "Subscribed client \"" << ctx->clientId << "\" to topic filter: \"" << subscription.topicFilter() << "\" with QoS " << subscription.qoS();
|
||||
effectiveSubscriptions << subscription;
|
||||
emit q_ptr->clientSubscribed(ctx->clientId, subscription.topicFilter(), subscription.qoS());
|
||||
switch (subscription.qoS()) {
|
||||
case Mqtt::QoS0:
|
||||
@ -453,7 +536,7 @@ void MqttServerPrivate::processPacket(const MqttPacket &packet, QTcpSocket *clie
|
||||
client->write(response.serialize());
|
||||
|
||||
// Deliver any retained messages for this topic
|
||||
foreach (MqttSubscription subscription, packet.subscriptions()) {
|
||||
foreach (MqttSubscription subscription, effectiveSubscriptions) {
|
||||
foreach (const QString &topic, retainedMessages.keys()) {
|
||||
if (matchTopic(subscription.topicFilter(), topic)) {
|
||||
foreach (MqttPacket packet, retainedMessages.value(topic)) {
|
||||
@ -560,40 +643,40 @@ quint16 MqttServerPrivate::newPacketId(ClientContext *ctx)
|
||||
return packetId;
|
||||
}
|
||||
|
||||
void MqttServerPrivate::onClientReadyRead()
|
||||
void SslServer::incomingConnection(qintptr socketDescriptor)
|
||||
{
|
||||
QTcpSocket* client = static_cast<QTcpSocket*>(sender());
|
||||
QSslSocket *sslSocket = new QSslSocket(this);
|
||||
|
||||
clientBuffers[client].append(client->readAll());
|
||||
qCDebug(dbgServer) << "New client socket connection:" << sslSocket;
|
||||
|
||||
do {
|
||||
MqttPacket packet;
|
||||
int ret = packet.parse(clientBuffers[client]);
|
||||
if (ret == 0) {
|
||||
qCDebug(dbgServer) << "Packet too short... Waiting for more...";
|
||||
return;
|
||||
}
|
||||
connect(sslSocket, &QSslSocket::encrypted, [this, sslSocket](){ emit clientConnected(sslSocket); });
|
||||
connect(sslSocket, &QSslSocket::readyRead, this, &SslServer::onSocketReadyRead);
|
||||
connect(sslSocket, &QSslSocket::disconnected, this, &SslServer::onClientDisconnected);
|
||||
|
||||
// Ok, we've got a full packet (or garbage data). If this client is still pending
|
||||
// we can stop the timer, the protocol will take it from here.
|
||||
if (pendingConnections.contains(client)) {
|
||||
pendingConnections.take(client)->deleteLater();
|
||||
}
|
||||
|
||||
if (ret == -1) {
|
||||
qCWarning(dbgServer) << "Bad MQTT packet data, Dropping connection";
|
||||
cleanupClient(client);
|
||||
return;
|
||||
}
|
||||
|
||||
clientBuffers[client].remove(0, ret);
|
||||
|
||||
processPacket(packet, client);
|
||||
|
||||
} while (!clientBuffers.value(client).isEmpty());
|
||||
if (!sslSocket->setSocketDescriptor(socketDescriptor)) {
|
||||
qCWarning(dbgServer) << "Failed to set SSL socket descriptor.";
|
||||
delete sslSocket;
|
||||
return;
|
||||
}
|
||||
if (!m_config.isNull()) {
|
||||
sslSocket->setSslConfiguration(m_config);
|
||||
sslSocket->startServerEncryption();
|
||||
} else {
|
||||
emit clientConnected(sslSocket);
|
||||
}
|
||||
}
|
||||
|
||||
void MqttServerPrivate::onClientError(QAbstractSocket::SocketError error)
|
||||
void SslServer::onClientDisconnected()
|
||||
{
|
||||
// qCWarning(dbgServer) << "Client error:" << error;
|
||||
QSslSocket *socket = static_cast<QSslSocket*>(sender());
|
||||
qCDebug(dbgServer) << "Client socket disconnected:" << socket;
|
||||
emit clientDisconnected(socket);
|
||||
socket->deleteLater();
|
||||
}
|
||||
|
||||
void SslServer::onSocketReadyRead()
|
||||
{
|
||||
QSslSocket *socket = static_cast<QSslSocket*>(sender());
|
||||
QByteArray data = socket->readAll();
|
||||
emit dataAvailable(socket, data);
|
||||
}
|
||||
|
||||
@ -26,17 +26,19 @@
|
||||
#include <QTcpSocket>
|
||||
#include <QTimer>
|
||||
#include <QLoggingCategory>
|
||||
#include <QSslConfiguration>
|
||||
|
||||
#include "mqttpacket.h"
|
||||
|
||||
class MqttServerPrivate;
|
||||
class Subscription;
|
||||
|
||||
class MqttUserValidator {
|
||||
class MqttAuthorizer {
|
||||
public:
|
||||
virtual ~MqttUserValidator() = default;
|
||||
virtual Mqtt::ConnectReturnCode validateConnect(const QString &clientId, const QString &username, const QString &password, const QHostAddress &peerAddress) = 0;
|
||||
virtual bool validateSubscribe(const QString &topicFilter, const QString &clientId, const QString &username) = 0;
|
||||
virtual ~MqttAuthorizer() = default;
|
||||
virtual Mqtt::ConnectReturnCode authorizeConnect(int serverAddressId, const QString &clientId, const QString &username, const QString &password, const QHostAddress &peerAddress) = 0;
|
||||
virtual bool authorizeSubscribe(int serverAddressId, const QString &clientId, const QString &topicFilter) = 0;
|
||||
virtual bool authorizePublish(int serverAddressId, const QString &clientId, const QString &topic) = 0;
|
||||
};
|
||||
|
||||
class MqttServer : public QObject
|
||||
@ -48,18 +50,23 @@ public:
|
||||
Mqtt::QoS maximumSubscriptionsQoS() const;
|
||||
void setMaximumSubscriptionsQoS(Mqtt::QoS maximumSubscriptionQoS);
|
||||
|
||||
bool listen(const QHostAddress &address = QHostAddress::Any, quint16 port = 1883, MqttUserValidator *userValidator = nullptr);
|
||||
bool isListening() const;
|
||||
void close();
|
||||
void setAuthorizer(MqttAuthorizer *authorizer);
|
||||
|
||||
int listen(const QHostAddress &address = QHostAddress::Any, quint16 port = 1883, const QSslConfiguration &sslConfiguration = QSslConfiguration());
|
||||
QList<int> listeningAddressIds() const;
|
||||
QPair<QHostAddress, quint16> listeningAddress(int addressId);
|
||||
void close(int addressId);
|
||||
bool isListening(const QHostAddress &address, quint16 port) const;
|
||||
|
||||
QStringList clients() const;
|
||||
void disconnectClient(const QString &clientId);
|
||||
|
||||
// allows publishing from the server, including topcis starting with $
|
||||
QHash<QString, quint16> publish(const QString &topic, const QByteArray &payload = QByteArray());
|
||||
|
||||
signals:
|
||||
// emitted whenever a client connects, after the mqtt connect handshake has been done.
|
||||
void clientConnected(const QString &clientId, const QString &username, const QHostAddress &clientAddress);
|
||||
void clientConnected(int serverAddressId, const QString &clientId, const QString &username, const QHostAddress &clientAddress);
|
||||
// emitted whenever a client disconnects, that is, when a DISCONNECT message has been received or the keep alive timeout has been reached.
|
||||
void clientDisconnected(const QString &clientId);
|
||||
// emitted whenever a client has been seen, that is, a control message or a keep alive message has been received.
|
||||
@ -69,7 +76,7 @@ signals:
|
||||
// emitted whenever a client unsubscribes from a topic
|
||||
void clientUnsubscribed(const QString &clientId, const QString &topicFiltr);
|
||||
// emitted whenever a publish message is received from a client before the message is relayed to other clients. Topics starting with $ will be received here, but not relayed to other clients.
|
||||
void publishReceived(const QString &clientId, quint16 packetId, const QString &topic, const QByteArray &payload, bool dup);
|
||||
void publishReceived(const QString &clientId, quint16 packetId, const QString &topic, const QByteArray &payload);
|
||||
// emitted whenever a publish message is sent to a client. Note: this might be fired often if many clients are connected and subsribed to matching topic filters.
|
||||
void published(const QString &clientId, quint16 packetId, const QString &topic, const QByteArray &payload);
|
||||
|
||||
|
||||
@ -34,6 +34,7 @@ Q_DECLARE_LOGGING_CATEGORY(dbgServer)
|
||||
|
||||
class ClientContext;
|
||||
class Subscription;
|
||||
class SslServer;
|
||||
|
||||
class MqttServerPrivate: public QObject
|
||||
{
|
||||
@ -52,16 +53,15 @@ public:
|
||||
quint16 newPacketId(ClientContext *ctx);
|
||||
|
||||
public slots:
|
||||
void onNewConnection();
|
||||
void onClientReadyRead();
|
||||
void onClientError(QAbstractSocket::SocketError);
|
||||
void onClientDisconnected();
|
||||
void onClientConnected(QSslSocket *client);
|
||||
void onDataAvailable(QSslSocket *client, const QByteArray &data);
|
||||
void onClientDisconnected(QSslSocket *client);
|
||||
|
||||
public:
|
||||
MqttServer *q_ptr;
|
||||
|
||||
QTcpServer *server = nullptr;
|
||||
MqttUserValidator *userValidator = nullptr;
|
||||
QHash<int, SslServer*> servers;
|
||||
MqttAuthorizer *authorizer = nullptr;
|
||||
|
||||
Mqtt::QoS maximumSubscriptionQoS = Mqtt::QoS2;
|
||||
|
||||
@ -69,6 +69,7 @@ public:
|
||||
QHash<QTcpSocket*, ClientContext*> clientList;
|
||||
QHash<QTcpSocket*, QByteArray> clientBuffers;
|
||||
QHash<QString, MqttPackets> retainedMessages;
|
||||
QHash<QTcpSocket*, SslServer*> clientServerMap;
|
||||
};
|
||||
|
||||
class ClientContext {
|
||||
@ -90,4 +91,31 @@ public:
|
||||
QHash<quint16, MqttPacket> unackedPackets;
|
||||
};
|
||||
|
||||
class SslServer: public QTcpServer
|
||||
{
|
||||
Q_OBJECT
|
||||
public:
|
||||
SslServer(const QSslConfiguration &config, QObject *parent = nullptr):
|
||||
QTcpServer(parent),
|
||||
m_config(config)
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
signals:
|
||||
void clientConnected(QSslSocket *socket);
|
||||
void clientDisconnected(QSslSocket *socket);
|
||||
void dataAvailable(QSslSocket *socket, const QByteArray &data);
|
||||
|
||||
protected:
|
||||
void incomingConnection(qintptr socketDescriptor) override;
|
||||
|
||||
private slots:
|
||||
void onClientDisconnected();
|
||||
void onSocketReadyRead();
|
||||
|
||||
private:
|
||||
QSslConfiguration m_config;
|
||||
};
|
||||
|
||||
#endif // MQTTSERVER_P_H
|
||||
|
||||
@ -178,7 +178,7 @@ void OperationTests::connectAndDisconnect()
|
||||
MqttClient* client = connectAndWait(clientId);
|
||||
|
||||
QVERIFY2(serverSpy.count() == 1, "Server didn't emit clientConnected");
|
||||
QVERIFY2(serverSpy.at(0).first() == clientId, "ClientId not matching on server side.");
|
||||
QVERIFY2(serverSpy.at(0).at(1) == clientId, "ClientId not matching on server side.");
|
||||
|
||||
QSignalSpy serverSpyDisconnect(m_server, &MqttServer::clientDisconnected);
|
||||
QSignalSpy clientSpy(client, &MqttClient::disconnected);
|
||||
@ -382,13 +382,11 @@ void OperationTests::testQoS1Retransmissions()
|
||||
QCOMPARE(serverSpy.at(0).at(1).toInt(), packetId);
|
||||
QCOMPARE(serverSpy.at(0).at(2).toString(), QString("/testtopic"));
|
||||
QCOMPARE(serverSpy.at(0).at(3).toString(), QString("Hello world"));
|
||||
QCOMPARE(serverSpy.at(0).at(4).toBool(), false);
|
||||
|
||||
QCOMPARE(serverSpy.at(1).at(0).toString(), QString("client1"));
|
||||
QCOMPARE(serverSpy.at(1).at(1).toInt(), packetId);
|
||||
QCOMPARE(serverSpy.at(1).at(2).toString(), QString("/testtopic"));
|
||||
QCOMPARE(serverSpy.at(1).at(3).toString(), QString("Hello world"));
|
||||
QCOMPARE(serverSpy.at(1).at(4).toBool(), true);
|
||||
}
|
||||
|
||||
void OperationTests::testMultiSubscription()
|
||||
@ -811,6 +809,7 @@ void OperationTests::testEmptyClientId()
|
||||
QPair<MqttClient*, QSignalSpy*> client3 = connectToServer("", false);
|
||||
QTRY_VERIFY2(client3.second->count() == 1, "Client did not emit connected signal");
|
||||
QTRY_COMPARE(client3.second->first().at(0).value<Mqtt::ConnectReturnCode>(), Mqtt::ConnectReturnCodeIdentifierRejected);
|
||||
QTRY_VERIFY2(client3.first->isConnected() == false, "Connection should have been dropped");
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
Reference in New Issue
Block a user