forked from shamblett/mqtt_client
-
Notifications
You must be signed in to change notification settings - Fork 0
/
mqtt_client_publish_qos2.dart
104 lines (90 loc) · 3.61 KB
/
mqtt_client_publish_qos2.dart
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
/*
* Package : mqtt_client
* Author : S. Hamblett <[email protected]>
* Date : 27/09/2018
* Copyright : S.Hamblett
*/
import 'dart:async';
import 'dart:io';
import 'package:mqtt_client/mqtt_client.dart';
/// A QOS2 publishing example, two QOS two topics are subscribed to and published in quick succession,
/// tests QOS2 protocol handling.
Future<int> main() async {
final MqttClient client = MqttClient('test.mosquitto.org', '');
client.logging(on: true);
client.keepAlivePeriod = 20;
client.onDisconnected = onDisconnected;
client.onSubscribed = onSubscribed;
final MqttConnectMessage connMess = MqttConnectMessage()
.withClientIdentifier('Mqtt_MyClientUniqueIdQ2')
.keepAliveFor(20) // Must agree with the keep alive set above or not set
.withWillTopic('willtopic') // If you set this you must set a will message
.withWillMessage('My Will message')
.startClean() // Non persistent session for testing
.withWillQos(MqttQos.atLeastOnce);
print('EXAMPLE::Mosquitto client connecting....');
client.connectionMessage = connMess;
try {
await client.connect();
} on Exception catch (e) {
print('EXAMPLE::client exception - $e');
client.disconnect();
}
/// Check we are connected
if (client.connectionStatus.state == MqttConnectionState.connected) {
print('EXAMPLE::Mosquitto client connected');
} else {
print(
'EXAMPLE::ERROR Mosquitto client connection failed - disconnecting, state is ${client.connectionStatus.state}');
client.disconnect();
exit(-1);
}
/// Lets try our subscriptions
print('EXAMPLE:: <<<< SUBCRIBE 1 >>>>');
const String topic1 = 'SJHTopic1'; // Not a wildcard topic
client.subscribe(topic1, MqttQos.exactlyOnce);
print('EXAMPLE:: <<<< SUBCRIBE 2 >>>>');
const String topic2 = 'SJHTopic2'; // Not a wildcard topic
client.subscribe(topic2, MqttQos.exactlyOnce);
client.updates.listen((dynamic c) {
final MqttPublishMessage recMess = c[0].payload;
final String pt =
MqttPublishPayload.bytesToStringAsString(recMess.payload.message);
print(
'EXAMPLE::Change notification:: topic is <${c[0].topic}>, payload is <-- $pt -->');
print('');
});
/// If needed you can listen for published messages that have completed the publishing
/// handshake which is Qos dependant. Any message received on this stream has completed its
/// publishing handshake with the broker.
client.published.listen((MqttPublishMessage message) {
print(
'EXAMPLE::Published notification:: topic is ${message.variableHeader.topicName}, with Qos ${message.header.qos}');
});
final MqttClientPayloadBuilder builder1 = MqttClientPayloadBuilder();
builder1.addString('Hello from mqtt_client topic 1');
print('EXAMPLE:: <<<< PUBLISH 1 >>>>');
client.publishMessage(topic1, MqttQos.exactlyOnce, builder1.payload);
final MqttClientPayloadBuilder builder2 = MqttClientPayloadBuilder();
builder2.addString('Hello from mqtt_client topic 2');
print('EXAMPLE:: <<<< PUBLISH 2 >>>>');
client.publishMessage(topic2, MqttQos.exactlyOnce, builder2.payload);
print('EXAMPLE::Sleeping....');
await MqttUtilities.asyncSleep(60);
print('EXAMPLE::Unsubscribing');
client.unsubscribe(topic1);
client.unsubscribe(topic2);
await MqttUtilities.asyncSleep(2);
print('EXAMPLE::Disconnecting');
client.disconnect();
return 0;
}
/// The subscribed callback
void onSubscribed(String topic) {
print('EXAMPLE::Subscription confirmed for topic $topic');
}
/// The unsolicited disconnect callback
void onDisconnected() {
print('EXAMPLE::OnDisconnected client callback - Client disconnection');
exit(-1);
}