aws-crt-cpp
C++ wrapper around the aws-c-* libraries. Provides Cross-Platform Transport Protocols and SSL/TLS implementations for C++.
MqttClient.h
Go to the documentation of this file.
1#pragma once
6#include <aws/crt/Exports.h>
8#include <aws/crt/Types.h>
12
13#include <aws/mqtt/client.h>
14
15#include <atomic>
16#include <functional>
17#include <memory>
18
19namespace Aws
20{
21 namespace Crt
22 {
23 namespace Io
24 {
25 class ClientBootstrap;
26 }
27
28 namespace Http
29 {
30 class HttpRequest;
31 }
32
33 namespace Mqtt
34 {
35 class MqttClient;
36 class MqttConnection;
37
41 using OnConnectionInterruptedHandler = std::function<void(MqttConnection &connection, int error)>;
42
47 std::function<void(MqttConnection &connection, ReturnCode connectCode, bool sessionPresent)>;
48
52 using OnConnectionCompletedHandler = std::function<
53 void(MqttConnection &connection, int errorCode, ReturnCode returnCode, bool sessionPresent)>;
54
58 using OnSubAckHandler = std::function<
59 void(MqttConnection &connection, uint16_t packetId, const String &topic, QOS qos, int errorCode)>;
60
64 using OnMultiSubAckHandler = std::function<void(
65 MqttConnection &connection,
66 uint16_t packetId,
67 const Vector<String> &topics,
68 QOS qos,
69 int errorCode)>;
70
74 using OnDisconnectHandler = std::function<void(MqttConnection &connection)>;
75
87 using OnMessageReceivedHandler = std::function<void(
88 MqttConnection &connection,
89 const String &topic,
90 const ByteBuf &payload,
91 bool dup,
92 QOS qos,
93 bool retain)>;
94
99 std::function<void(MqttConnection &connection, const String &topic, const ByteBuf &payload)>;
100
102 std::function<void(MqttConnection &connection, uint16_t packetId, int errorCode)>;
103
109 std::function<void(const std::shared_ptr<Http::HttpRequest> &, int errorCode)>;
110
117 using OnWebSocketHandshakeIntercept = std::function<
118 void(std::shared_ptr<Http::HttpRequest> req, const OnWebSocketHandshakeInterceptComplete &onComplete)>;
119
127 {
128 friend class MqttClient;
129
130 public:
139 operator bool() const noexcept;
143 int LastError() const noexcept;
144
148 bool SetWill(const char *topic, QOS qos, bool retain, const ByteBuf &payload) noexcept;
149
154 bool SetLogin(const char *userName, const char *password) noexcept;
155
159 bool SetWebsocketProxyOptions(const Http::HttpClientConnectionProxyOptions &proxyOptions) noexcept;
160
166 bool SetHttpProxyOptions(const Http::HttpClientConnectionProxyOptions &proxyOptions) noexcept;
167
174 bool SetReconnectTimeout(uint64_t min_seconds, uint64_t max_seconds) noexcept;
175
180 bool Connect(
181 const char *clientId,
182 bool cleanSession,
183 uint16_t keepAliveTimeSecs = 0,
184 uint32_t pingTimeoutMs = 0,
185 uint32_t protocolOperationTimeoutMs = 0) noexcept;
186
190 bool Disconnect() noexcept;
191
195 aws_mqtt_client_connection *GetUnderlyingConnection() noexcept;
196
202 uint16_t Subscribe(
203 const char *topicFilter,
204 QOS qos,
205 OnMessageReceivedHandler &&onMessage,
206 OnSubAckHandler &&onSubAck) noexcept;
207
211 uint16_t Subscribe(
212 const char *topicFilter,
213 QOS qos,
214 OnPublishReceivedHandler &&onPublish,
215 OnSubAckHandler &&onSubAck) noexcept;
216
222 uint16_t Subscribe(
223 const Vector<std::pair<const char *, OnMessageReceivedHandler>> &topicFilters,
224 QOS qos,
225 OnMultiSubAckHandler &&onOpComplete) noexcept;
226
230 uint16_t Subscribe(
231 const Vector<std::pair<const char *, OnPublishReceivedHandler>> &topicFilters,
232 QOS qos,
233 OnMultiSubAckHandler &&onOpComplete) noexcept;
234
239 bool SetOnMessageHandler(OnMessageReceivedHandler &&onMessage) noexcept;
240
244 bool SetOnMessageHandler(OnPublishReceivedHandler &&onPublish) noexcept;
245
250 uint16_t Unsubscribe(const char *topicFilter, OnOperationCompleteHandler &&onOpComplete) noexcept;
251
256 uint16_t Publish(
257 const char *topic,
258 QOS qos,
259 bool retain,
260 const ByteBuf &payload,
261 OnOperationCompleteHandler &&onOpComplete) noexcept;
262
263 OnConnectionInterruptedHandler OnConnectionInterrupted;
264 OnConnectionResumedHandler OnConnectionResumed;
265 OnConnectionCompletedHandler OnConnectionCompleted;
267 OnWebSocketHandshakeIntercept WebsocketInterceptor;
268
269 private:
270 aws_mqtt_client *m_owningClient;
271 aws_mqtt_client_connection *m_underlyingConnection;
272 String m_hostName;
273 uint16_t m_port;
274 Crt::Io::TlsContext m_tlsContext;
275 Io::TlsConnectionOptions m_tlsOptions;
276 Io::SocketOptions m_socketOptions;
277 Crt::Optional<Http::HttpClientConnectionProxyOptions> m_proxyOptions;
278 void *m_onAnyCbData;
279 bool m_useTls;
280 bool m_useWebsocket;
281
283 aws_mqtt_client *client,
284 const char *hostName,
285 uint16_t port,
286 const Io::SocketOptions &socketOptions,
287 const Crt::Io::TlsContext &tlsContext,
288 bool useWebsocket) noexcept;
289
291 aws_mqtt_client *client,
292 const char *hostName,
293 uint16_t port,
294 const Io::SocketOptions &socketOptions,
295 bool useWebsocket) noexcept;
296
297 static void s_onConnectionInterrupted(aws_mqtt_client_connection *, int errorCode, void *userData);
298 static void s_onConnectionCompleted(
299 aws_mqtt_client_connection *,
300 int errorCode,
301 enum aws_mqtt_connect_return_code returnCode,
302 bool sessionPresent,
303 void *userData);
304 static void s_onConnectionResumed(
305 aws_mqtt_client_connection *,
306 ReturnCode returnCode,
307 bool sessionPresent,
308 void *userData);
309
310 static void s_onDisconnect(aws_mqtt_client_connection *connection, void *userData);
311 static void s_onPublish(
312 aws_mqtt_client_connection *connection,
313 const aws_byte_cursor *topic,
314 const aws_byte_cursor *payload,
315 bool dup,
316 enum aws_mqtt_qos qos,
317 bool retain,
318 void *user_data);
319
320 static void s_onSubAck(
321 aws_mqtt_client_connection *connection,
322 uint16_t packetId,
323 const struct aws_byte_cursor *topic,
324 enum aws_mqtt_qos qos,
325 int error_code,
326 void *userdata);
327 static void s_onMultiSubAck(
328 aws_mqtt_client_connection *connection,
329 uint16_t packetId,
330 const struct aws_array_list *topic_subacks,
331 int error_code,
332 void *userdata);
333 static void s_onOpComplete(
334 aws_mqtt_client_connection *connection,
335 uint16_t packetId,
336 int errorCode,
337 void *userdata);
338
339 static void s_onWebsocketHandshake(
340 struct aws_http_message *request,
341 void *user_data,
342 aws_mqtt_transform_websocket_handshake_complete_fn *complete_fn,
343 void *complete_ctx);
344
345 static void s_connectionInit(
346 MqttConnection *self,
347 const char *hostName,
348 uint16_t port,
349 const Io::SocketOptions &socketOptions);
350 };
351
358 {
359 public:
363 MqttClient(Io::ClientBootstrap &bootstrap, Allocator *allocator = g_allocator) noexcept;
364
365 ~MqttClient();
366 MqttClient(const MqttClient &) = delete;
367 MqttClient(MqttClient &&) noexcept;
368 MqttClient &operator=(const MqttClient &) = delete;
369 MqttClient &operator=(MqttClient &&) noexcept;
373 operator bool() const noexcept;
377 int LastError() const noexcept;
378
383 std::shared_ptr<MqttConnection> NewConnection(
384 const char *hostName,
385 uint16_t port,
386 const Io::SocketOptions &socketOptions,
387 const Crt::Io::TlsContext &tlsContext,
388 bool useWebsocket = false) noexcept;
393 std::shared_ptr<MqttConnection> NewConnection(
394 const char *hostName,
395 uint16_t port,
396 const Io::SocketOptions &socketOptions,
397 bool useWebsocket = false) noexcept;
398
399 private:
400 aws_mqtt_client *m_client;
401 };
402 } // namespace Mqtt
403 } // namespace Crt
404} // namespace Aws
#define AWS_CRT_CPP_API
Definition: Exports.h:37
Definition: Bootstrap.h:35
Definition: MqttClient.h:358
MqttClient(const MqttClient &)=delete
Definition: MqttClient.h:127
MqttConnection & operator=(MqttConnection &&)=delete
MqttConnection(MqttConnection &&)=delete
MqttConnection(const MqttConnection &)=delete
MqttConnection & operator=(const MqttConnection &)=delete
Definition: Optional.h:13
std::function< void(MqttConnection &connection, uint16_t packetId, const Vector< String > &topics, QOS qos, int errorCode)> OnMultiSubAckHandler
Definition: MqttClient.h:69
std::function< void(std::shared_ptr< Http::HttpRequest > req, const OnWebSocketHandshakeInterceptComplete &onComplete)> OnWebSocketHandshakeIntercept
Definition: MqttClient.h:118
std::function< void(MqttConnection &connection, ReturnCode connectCode, bool sessionPresent)> OnConnectionResumedHandler
Definition: MqttClient.h:47
std::function< void(MqttConnection &connection, const String &topic, const ByteBuf &payload, bool dup, QOS qos, bool retain)> OnMessageReceivedHandler
Definition: MqttClient.h:93
std::function< void(MqttConnection &connection, uint16_t packetId, int errorCode)> OnOperationCompleteHandler
Definition: MqttClient.h:102
std::function< void(MqttConnection &connection, const String &topic, const ByteBuf &payload)> OnPublishReceivedHandler
Definition: MqttClient.h:99
std::function< void(MqttConnection &connection, int error)> OnConnectionInterruptedHandler
Definition: MqttClient.h:41
std::function< void(const std::shared_ptr< Http::HttpRequest > &, int errorCode)> OnWebSocketHandshakeInterceptComplete
Definition: MqttClient.h:109
aws_mqtt_qos QOS
Definition: Types.h:42
std::function< void(MqttConnection &connection)> OnDisconnectHandler
Definition: MqttClient.h:74
aws_mqtt_connect_return_code ReturnCode
Definition: Types.h:43
std::function< void(MqttConnection &connection, uint16_t packetId, const String &topic, QOS qos, int errorCode)> OnSubAckHandler
Definition: MqttClient.h:59
std::function< void(MqttConnection &connection, int errorCode, ReturnCode returnCode, bool sessionPresent)> OnConnectionCompletedHandler
Definition: MqttClient.h:53
aws_allocator Allocator
Definition: StlAllocator.h:17
AWS_CRT_CPP_API Allocator * g_allocator
Definition: Api.cpp:21
std::basic_string< char, std::char_traits< char >, StlAllocator< char > > String
Definition: Types.h:47
aws_byte_buf ByteBuf
Definition: Types.h:32
std::vector< T, StlAllocator< T > > Vector
Definition: Types.h:55
AWS_CRT_CPP_API int LastError() noexcept
Definition: Api.cpp:315
Definition: Api.h:17
Definition: StringView.h:846
Definition: cJSON.cpp:105