aws-crt-cpp
C++ wrapper around the aws-c-* libraries. Provides Cross-Platform Transport Protocols and SSL/TLS implementations for C++.
MqttClient.h
Go to the documentation of this file.
1#pragma once
6#include <aws/crt/Exports.h>
8#include <aws/crt/Types.h>
12
13#include <aws/mqtt/client.h>
14
15#include <atomic>
16#include <functional>
17#include <memory>
18
19namespace Aws
20{
21 namespace Crt
22 {
23 namespace Io
24 {
25 class ClientBootstrap;
26 }
27
28 namespace Http
29 {
30 class HttpRequest;
31 }
32
33 namespace Mqtt
34 {
35 class MqttClient;
36 class MqttConnection;
37
41 using OnConnectionInterruptedHandler = std::function<void(MqttConnection &connection, int error)>;
42
47 std::function<void(MqttConnection &connection, ReturnCode connectCode, bool sessionPresent)>;
48
52 using OnConnectionCompletedHandler = std::function<
53 void(MqttConnection &connection, int errorCode, ReturnCode returnCode, bool sessionPresent)>;
54
58 using OnSubAckHandler = std::function<
59 void(MqttConnection &connection, uint16_t packetId, const String &topic, QOS qos, int errorCode)>;
60
64 using OnMultiSubAckHandler = std::function<void(
65 MqttConnection &connection,
66 uint16_t packetId,
67 const Vector<String> &topics,
68 QOS qos,
69 int errorCode)>;
70
74 using OnDisconnectHandler = std::function<void(MqttConnection &connection)>;
75
87 using OnMessageReceivedHandler = std::function<void(
88 MqttConnection &connection,
89 const String &topic,
90 const ByteBuf &payload,
91 bool dup,
92 QOS qos,
93 bool retain)>;
94
99 std::function<void(MqttConnection &connection, const String &topic, const ByteBuf &payload)>;
100
106 std::function<void(MqttConnection &connection, uint16_t packetId, int errorCode)>;
107
113 std::function<void(const std::shared_ptr<Http::HttpRequest> &, int errorCode)>;
114
121 using OnWebSocketHandshakeIntercept = std::function<
122 void(std::shared_ptr<Http::HttpRequest> req, const OnWebSocketHandshakeInterceptComplete &onComplete)>;
123
131 {
132 friend class MqttClient;
133
134 public:
140
144 operator bool() const noexcept;
145
149 int LastError() const noexcept;
150
159 bool SetWill(const char *topic, QOS qos, bool retain, const ByteBuf &payload) noexcept;
160
168 bool SetLogin(const char *userName, const char *password) noexcept;
169
173 bool SetWebsocketProxyOptions(const Http::HttpClientConnectionProxyOptions &proxyOptions) noexcept;
174
184 bool SetHttpProxyOptions(const Http::HttpClientConnectionProxyOptions &proxyOptions) noexcept;
185
197 bool SetReconnectTimeout(uint64_t min_seconds, uint64_t max_seconds) noexcept;
198
215 bool Connect(
216 const char *clientId,
217 bool cleanSession,
218 uint16_t keepAliveTimeSecs = 0,
219 uint32_t pingTimeoutMs = 0,
220 uint32_t protocolOperationTimeoutMs = 0) noexcept;
221
226 bool Disconnect() noexcept;
227
229 aws_mqtt_client_connection *GetUnderlyingConnection() noexcept;
230
243 uint16_t Subscribe(
244 const char *topicFilter,
245 QOS qos,
246 OnMessageReceivedHandler &&onMessage,
247 OnSubAckHandler &&onSubAck) noexcept;
248
252 uint16_t Subscribe(
253 const char *topicFilter,
254 QOS qos,
255 OnPublishReceivedHandler &&onPublish,
256 OnSubAckHandler &&onSubAck) noexcept;
257
270 uint16_t Subscribe(
271 const Vector<std::pair<const char *, OnMessageReceivedHandler>> &topicFilters,
272 QOS qos,
273 OnMultiSubAckHandler &&onOpComplete) noexcept;
274
278 uint16_t Subscribe(
279 const Vector<std::pair<const char *, OnPublishReceivedHandler>> &topicFilters,
280 QOS qos,
281 OnMultiSubAckHandler &&onOpComplete) noexcept;
282
290 bool SetOnMessageHandler(OnMessageReceivedHandler &&onMessage) noexcept;
291
295 bool SetOnMessageHandler(OnPublishReceivedHandler &&onPublish) noexcept;
296
306 uint16_t Unsubscribe(const char *topicFilter, OnOperationCompleteHandler &&onOpComplete) noexcept;
307
321 uint16_t Publish(
322 const char *topic,
323 QOS qos,
324 bool retain,
325 const ByteBuf &payload,
326 OnOperationCompleteHandler &&onOpComplete) noexcept;
327
328 OnConnectionInterruptedHandler OnConnectionInterrupted;
329 OnConnectionResumedHandler OnConnectionResumed;
330 OnConnectionCompletedHandler OnConnectionCompleted;
332 OnWebSocketHandshakeIntercept WebsocketInterceptor;
333
334 private:
335 aws_mqtt_client *m_owningClient;
336 aws_mqtt_client_connection *m_underlyingConnection;
337 String m_hostName;
338 uint16_t m_port;
339 Crt::Io::TlsContext m_tlsContext;
340 Io::TlsConnectionOptions m_tlsOptions;
341 Io::SocketOptions m_socketOptions;
342 Crt::Optional<Http::HttpClientConnectionProxyOptions> m_proxyOptions;
343 void *m_onAnyCbData;
344 bool m_useTls;
345 bool m_useWebsocket;
346
348 aws_mqtt_client *client,
349 const char *hostName,
350 uint16_t port,
351 const Io::SocketOptions &socketOptions,
352 const Crt::Io::TlsContext &tlsContext,
353 bool useWebsocket) noexcept;
354
356 aws_mqtt_client *client,
357 const char *hostName,
358 uint16_t port,
359 const Io::SocketOptions &socketOptions,
360 bool useWebsocket) noexcept;
361
362 static void s_onConnectionInterrupted(aws_mqtt_client_connection *, int errorCode, void *userData);
363 static void s_onConnectionCompleted(
364 aws_mqtt_client_connection *,
365 int errorCode,
366 enum aws_mqtt_connect_return_code returnCode,
367 bool sessionPresent,
368 void *userData);
369 static void s_onConnectionResumed(
370 aws_mqtt_client_connection *,
371 ReturnCode returnCode,
372 bool sessionPresent,
373 void *userData);
374
375 static void s_onDisconnect(aws_mqtt_client_connection *connection, void *userData);
376 static void s_onPublish(
377 aws_mqtt_client_connection *connection,
378 const aws_byte_cursor *topic,
379 const aws_byte_cursor *payload,
380 bool dup,
381 enum aws_mqtt_qos qos,
382 bool retain,
383 void *user_data);
384
385 static void s_onSubAck(
386 aws_mqtt_client_connection *connection,
387 uint16_t packetId,
388 const struct aws_byte_cursor *topic,
389 enum aws_mqtt_qos qos,
390 int error_code,
391 void *userdata);
392 static void s_onMultiSubAck(
393 aws_mqtt_client_connection *connection,
394 uint16_t packetId,
395 const struct aws_array_list *topic_subacks,
396 int error_code,
397 void *userdata);
398 static void s_onOpComplete(
399 aws_mqtt_client_connection *connection,
400 uint16_t packetId,
401 int errorCode,
402 void *userdata);
403
404 static void s_onWebsocketHandshake(
405 struct aws_http_message *request,
406 void *user_data,
407 aws_mqtt_transform_websocket_handshake_complete_fn *complete_fn,
408 void *complete_ctx);
409
410 static void s_connectionInit(
411 MqttConnection *self,
412 const char *hostName,
413 uint16_t port,
414 const Io::SocketOptions &socketOptions);
415 };
416
423 {
424 public:
428 MqttClient(Io::ClientBootstrap &bootstrap, Allocator *allocator = g_allocator) noexcept;
429
436 MqttClient(Allocator *allocator = g_allocator) noexcept;
437
438 ~MqttClient();
439 MqttClient(const MqttClient &) = delete;
440 MqttClient(MqttClient &&) noexcept;
441 MqttClient &operator=(const MqttClient &) = delete;
442 MqttClient &operator=(MqttClient &&) noexcept;
443
447 operator bool() const noexcept;
448
452 int LastError() const noexcept;
453
467 std::shared_ptr<MqttConnection> NewConnection(
468 const char *hostName,
469 uint16_t port,
470 const Io::SocketOptions &socketOptions,
471 const Crt::Io::TlsContext &tlsContext,
472 bool useWebsocket = false) noexcept;
473
485 std::shared_ptr<MqttConnection> NewConnection(
486 const char *hostName,
487 uint16_t port,
488 const Io::SocketOptions &socketOptions,
489 bool useWebsocket = false) noexcept;
490
491 private:
492 aws_mqtt_client *m_client;
493 };
494 } // namespace Mqtt
495 } // namespace Crt
496} // namespace Aws
#define AWS_CRT_CPP_API
Definition: Exports.h:37
Definition: Bootstrap.h:35
Definition: MqttClient.h:423
MqttClient(const MqttClient &)=delete
Definition: MqttClient.h:131
MqttConnection & operator=(MqttConnection &&)=delete
MqttConnection(MqttConnection &&)=delete
MqttConnection(const MqttConnection &)=delete
MqttConnection & operator=(const MqttConnection &)=delete
Definition: Optional.h:17
std::function< void(MqttConnection &connection, uint16_t packetId, const Vector< String > &topics, QOS qos, int errorCode)> OnMultiSubAckHandler
Definition: MqttClient.h:69
std::function< void(std::shared_ptr< Http::HttpRequest > req, const OnWebSocketHandshakeInterceptComplete &onComplete)> OnWebSocketHandshakeIntercept
Definition: MqttClient.h:122
std::function< void(MqttConnection &connection, ReturnCode connectCode, bool sessionPresent)> OnConnectionResumedHandler
Definition: MqttClient.h:47
std::function< void(MqttConnection &connection, const String &topic, const ByteBuf &payload, bool dup, QOS qos, bool retain)> OnMessageReceivedHandler
Definition: MqttClient.h:93
std::function< void(MqttConnection &connection, uint16_t packetId, int errorCode)> OnOperationCompleteHandler
Definition: MqttClient.h:106
std::function< void(MqttConnection &connection, const String &topic, const ByteBuf &payload)> OnPublishReceivedHandler
Definition: MqttClient.h:99
std::function< void(MqttConnection &connection, int error)> OnConnectionInterruptedHandler
Definition: MqttClient.h:41
std::function< void(const std::shared_ptr< Http::HttpRequest > &, int errorCode)> OnWebSocketHandshakeInterceptComplete
Definition: MqttClient.h:113
aws_mqtt_qos QOS
Definition: Types.h:42
std::function< void(MqttConnection &connection)> OnDisconnectHandler
Definition: MqttClient.h:74
aws_mqtt_connect_return_code ReturnCode
Definition: Types.h:43
std::function< void(MqttConnection &connection, uint16_t packetId, const String &topic, QOS qos, int errorCode)> OnSubAckHandler
Definition: MqttClient.h:59
std::function< void(MqttConnection &connection, int errorCode, ReturnCode returnCode, bool sessionPresent)> OnConnectionCompletedHandler
Definition: MqttClient.h:53
aws_allocator Allocator
Definition: StlAllocator.h:17
AWS_CRT_CPP_API Allocator * g_allocator
Definition: Api.cpp:23
std::basic_string< char, std::char_traits< char >, StlAllocator< char > > String
Definition: Types.h:47
aws_byte_buf ByteBuf
Definition: Types.h:32
std::vector< T, StlAllocator< T > > Vector
Definition: Types.h:55
AWS_CRT_CPP_API int LastError() noexcept
Definition: Api.cpp:391
Definition: Api.h:14
Definition: StringView.h:851
Definition: cJSON.cpp:105