Commit 30c0f42f8699549a9c1c2e03cadf28620af511cb
Committed by
GitHub
Merge pull request #137 from volodymyr-babak/feature/mqtt-plugin
Added MQTT plugin
Showing
15 changed files
with
582 additions
and
2 deletions
@@ -91,8 +91,6 @@ | @@ -91,8 +91,6 @@ | ||
91 | <dependency> | 91 | <dependency> |
92 | <groupId>org.eclipse.paho</groupId> | 92 | <groupId>org.eclipse.paho</groupId> |
93 | <artifactId>org.eclipse.paho.client.mqttv3</artifactId> | 93 | <artifactId>org.eclipse.paho.client.mqttv3</artifactId> |
94 | - <version>1.1.0</version> | ||
95 | - <scope>test</scope> | ||
96 | </dependency> | 94 | </dependency> |
97 | <dependency> | 95 | <dependency> |
98 | <groupId>org.cassandraunit</groupId> | 96 | <groupId>org.cassandraunit</groupId> |
@@ -437,6 +435,11 @@ | @@ -437,6 +435,11 @@ | ||
437 | <artifactId>extension-kafka</artifactId> | 435 | <artifactId>extension-kafka</artifactId> |
438 | <classifier>extension</classifier> | 436 | <classifier>extension</classifier> |
439 | </artifactItem> | 437 | </artifactItem> |
438 | + <artifactItem> | ||
439 | + <groupId>org.thingsboard.extensions</groupId> | ||
440 | + <artifactId>extension-mqtt</artifactId> | ||
441 | + <classifier>extension</classifier> | ||
442 | + </artifactItem> | ||
440 | </artifactItems> | 443 | </artifactItems> |
441 | </configuration> | 444 | </configuration> |
442 | </execution> | 445 | </execution> |
@@ -68,6 +68,7 @@ public class KafkaPlugin extends AbstractPlugin<KafkaPluginConfiguration> { | @@ -68,6 +68,7 @@ public class KafkaPlugin extends AbstractPlugin<KafkaPluginConfiguration> { | ||
68 | this.producer.close(); | 68 | this.producer.close(); |
69 | } catch (Exception e) { | 69 | } catch (Exception e) { |
70 | log.error("Failed to close producer during destroy()", e); | 70 | log.error("Failed to close producer during destroy()", e); |
71 | + throw new RuntimeException(e); | ||
71 | } | 72 | } |
72 | } | 73 | } |
73 | 74 |
extensions/extension-mqtt/pom.xml
0 → 100644
1 | +<?xml version="1.0" encoding="UTF-8"?> | ||
2 | +<!-- | ||
3 | + | ||
4 | + Copyright © 2016-2017 The Thingsboard Authors | ||
5 | + | ||
6 | + Licensed under the Apache License, Version 2.0 (the "License"); | ||
7 | + you may not use this file except in compliance with the License. | ||
8 | + You may obtain a copy of the License at | ||
9 | + | ||
10 | + http://www.apache.org/licenses/LICENSE-2.0 | ||
11 | + | ||
12 | + Unless required by applicable law or agreed to in writing, software | ||
13 | + distributed under the License is distributed on an "AS IS" BASIS, | ||
14 | + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
15 | + See the License for the specific language governing permissions and | ||
16 | + limitations under the License. | ||
17 | + | ||
18 | +--> | ||
19 | +<project xmlns="http://maven.apache.org/POM/4.0.0" | ||
20 | + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||
21 | + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||
22 | + <modelVersion>4.0.0</modelVersion> | ||
23 | + <parent> | ||
24 | + <groupId>org.thingsboard</groupId> | ||
25 | + <version>1.3.0-SNAPSHOT</version> | ||
26 | + <artifactId>extensions</artifactId> | ||
27 | + </parent> | ||
28 | + <groupId>org.thingsboard.extensions</groupId> | ||
29 | + <artifactId>extension-mqtt</artifactId> | ||
30 | + <packaging>jar</packaging> | ||
31 | + | ||
32 | + <name>Thingsboard Server MQTT Extension</name> | ||
33 | + <url>http://thingsboard.org</url> | ||
34 | + | ||
35 | + <properties> | ||
36 | + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> | ||
37 | + <main.dir>${basedir}/../..</main.dir> | ||
38 | + </properties> | ||
39 | + | ||
40 | + <dependencies> | ||
41 | + <dependency> | ||
42 | + <groupId>org.eclipse.paho</groupId> | ||
43 | + <artifactId>org.eclipse.paho.client.mqttv3</artifactId> | ||
44 | + <scope>provided</scope> | ||
45 | + </dependency> | ||
46 | + <dependency> | ||
47 | + <groupId>ch.qos.logback</groupId> | ||
48 | + <artifactId>logback-core</artifactId> | ||
49 | + <scope>provided</scope> | ||
50 | + </dependency> | ||
51 | + <dependency> | ||
52 | + <groupId>ch.qos.logback</groupId> | ||
53 | + <artifactId>logback-classic</artifactId> | ||
54 | + <scope>provided</scope> | ||
55 | + </dependency> | ||
56 | + <dependency> | ||
57 | + <groupId>org.thingsboard</groupId> | ||
58 | + <artifactId>extensions-api</artifactId> | ||
59 | + <scope>provided</scope> | ||
60 | + </dependency> | ||
61 | + <dependency> | ||
62 | + <groupId>org.thingsboard</groupId> | ||
63 | + <artifactId>extensions-core</artifactId> | ||
64 | + <scope>provided</scope> | ||
65 | + </dependency> | ||
66 | + <dependency> | ||
67 | + <groupId>org.apache.velocity</groupId> | ||
68 | + <artifactId>velocity</artifactId> | ||
69 | + <scope>provided</scope> | ||
70 | + </dependency> | ||
71 | + <dependency> | ||
72 | + <groupId>org.apache.velocity</groupId> | ||
73 | + <artifactId>velocity-tools</artifactId> | ||
74 | + <scope>provided</scope> | ||
75 | + </dependency> | ||
76 | + </dependencies> | ||
77 | + <build> | ||
78 | + <plugins> | ||
79 | + <plugin> | ||
80 | + <artifactId>maven-assembly-plugin</artifactId> | ||
81 | + <configuration> | ||
82 | + <descriptors> | ||
83 | + <descriptor>src/assembly/extension.xml</descriptor> | ||
84 | + </descriptors> | ||
85 | + </configuration> | ||
86 | + <executions> | ||
87 | + <execution> | ||
88 | + <id>make-assembly</id> | ||
89 | + <phase>package</phase> | ||
90 | + <goals> | ||
91 | + <goal>single</goal> | ||
92 | + </goals> | ||
93 | + </execution> | ||
94 | + </executions> | ||
95 | + </plugin> | ||
96 | + </plugins> | ||
97 | + </build> | ||
98 | +</project> |
1 | +<!-- | ||
2 | + | ||
3 | + Copyright © 2016-2017 The Thingsboard Authors | ||
4 | + | ||
5 | + Licensed under the Apache License, Version 2.0 (the "License"); | ||
6 | + you may not use this file except in compliance with the License. | ||
7 | + You may obtain a copy of the License at | ||
8 | + | ||
9 | + http://www.apache.org/licenses/LICENSE-2.0 | ||
10 | + | ||
11 | + Unless required by applicable law or agreed to in writing, software | ||
12 | + distributed under the License is distributed on an "AS IS" BASIS, | ||
13 | + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
14 | + See the License for the specific language governing permissions and | ||
15 | + limitations under the License. | ||
16 | + | ||
17 | +--> | ||
18 | +<assembly xmlns="http://maven.apache.org/ASSEMBLY/2.0.0" | ||
19 | + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||
20 | + xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.0.0 http://maven.apache.org/xsd/assembly-2.0.0.xsd"> | ||
21 | + <id>extension</id> | ||
22 | + <formats> | ||
23 | + <format>jar</format> | ||
24 | + </formats> | ||
25 | + <includeBaseDirectory>false</includeBaseDirectory> | ||
26 | + <dependencySets> | ||
27 | + <dependencySet> | ||
28 | + <outputDirectory>/</outputDirectory> | ||
29 | + <useProjectArtifact>true</useProjectArtifact> | ||
30 | + <unpack>true</unpack> | ||
31 | + <scope>runtime</scope> | ||
32 | + </dependencySet> | ||
33 | + </dependencySets> | ||
34 | +</assembly> |
1 | +/** | ||
2 | + * Copyright © 2016-2017 The Thingsboard Authors | ||
3 | + * | ||
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | ||
5 | + * you may not use this file except in compliance with the License. | ||
6 | + * You may obtain a copy of the License at | ||
7 | + * | ||
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | ||
9 | + * | ||
10 | + * Unless required by applicable law or agreed to in writing, software | ||
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | ||
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
13 | + * See the License for the specific language governing permissions and | ||
14 | + * limitations under the License. | ||
15 | + */ | ||
16 | +package org.thingsboard.server.extensions.mqtt.action; | ||
17 | + | ||
18 | +import org.thingsboard.server.common.data.id.CustomerId; | ||
19 | +import org.thingsboard.server.common.data.id.DeviceId; | ||
20 | +import org.thingsboard.server.common.data.id.TenantId; | ||
21 | +import org.thingsboard.server.extensions.api.plugins.msg.AbstractRuleToPluginMsg; | ||
22 | + | ||
23 | +public class MqttActionMsg extends AbstractRuleToPluginMsg<MqttActionPayload> { | ||
24 | + | ||
25 | + public MqttActionMsg(TenantId tenantId, CustomerId customerId, DeviceId deviceId, MqttActionPayload payload) { | ||
26 | + super(tenantId, customerId, deviceId, payload); | ||
27 | + } | ||
28 | +} |
1 | +/** | ||
2 | + * Copyright © 2016-2017 The Thingsboard Authors | ||
3 | + * | ||
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | ||
5 | + * you may not use this file except in compliance with the License. | ||
6 | + * You may obtain a copy of the License at | ||
7 | + * | ||
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | ||
9 | + * | ||
10 | + * Unless required by applicable law or agreed to in writing, software | ||
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | ||
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
13 | + * See the License for the specific language governing permissions and | ||
14 | + * limitations under the License. | ||
15 | + */ | ||
16 | +package org.thingsboard.server.extensions.mqtt.action; | ||
17 | + | ||
18 | +import lombok.Builder; | ||
19 | +import lombok.Data; | ||
20 | +import org.thingsboard.server.common.msg.session.MsgType; | ||
21 | + | ||
22 | +import java.io.Serializable; | ||
23 | + | ||
24 | +@Data | ||
25 | +@Builder | ||
26 | +public class MqttActionPayload implements Serializable { | ||
27 | + | ||
28 | + private final boolean sync; | ||
29 | + private final String topic; | ||
30 | + private final String msgBody; | ||
31 | + | ||
32 | + private final Integer requestId; | ||
33 | + private final MsgType msgType; | ||
34 | +} |
1 | +/** | ||
2 | + * Copyright © 2016-2017 The Thingsboard Authors | ||
3 | + * | ||
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | ||
5 | + * you may not use this file except in compliance with the License. | ||
6 | + * You may obtain a copy of the License at | ||
7 | + * | ||
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | ||
9 | + * | ||
10 | + * Unless required by applicable law or agreed to in writing, software | ||
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | ||
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
13 | + * See the License for the specific language governing permissions and | ||
14 | + * limitations under the License. | ||
15 | + */ | ||
16 | +package org.thingsboard.server.extensions.mqtt.action; | ||
17 | + | ||
18 | +import org.thingsboard.server.common.msg.device.ToDeviceActorMsg; | ||
19 | +import org.thingsboard.server.common.msg.session.FromDeviceRequestMsg; | ||
20 | +import org.thingsboard.server.extensions.api.component.Action; | ||
21 | +import org.thingsboard.server.extensions.api.plugins.msg.RuleToPluginMsg; | ||
22 | +import org.thingsboard.server.extensions.api.rules.RuleContext; | ||
23 | +import org.thingsboard.server.extensions.core.action.template.AbstractTemplatePluginAction; | ||
24 | + | ||
25 | +import java.util.Optional; | ||
26 | + | ||
27 | +@Action(name = "Mqtt Plugin Action", descriptor = "MqttActionDescriptor.json", configuration = MqttPluginActionConfiguration.class) | ||
28 | +public class MqttPluginAction extends AbstractTemplatePluginAction<MqttPluginActionConfiguration> { | ||
29 | + | ||
30 | + @Override | ||
31 | + protected Optional<RuleToPluginMsg<?>> buildRuleToPluginMsg(RuleContext ctx, ToDeviceActorMsg msg, FromDeviceRequestMsg payload) { | ||
32 | + MqttActionPayload.MqttActionPayloadBuilder builder = MqttActionPayload.builder(); | ||
33 | + builder.sync(configuration.isSync()); | ||
34 | + builder.msgType(payload.getMsgType()); | ||
35 | + builder.requestId(payload.getRequestId()); | ||
36 | + builder.topic(configuration.getTopic()); | ||
37 | + builder.msgBody(getMsgBody(ctx, msg)); | ||
38 | + return Optional.of(new MqttActionMsg(msg.getTenantId(), | ||
39 | + msg.getCustomerId(), | ||
40 | + msg.getDeviceId(), | ||
41 | + builder.build())); | ||
42 | + } | ||
43 | +} |
1 | +/** | ||
2 | + * Copyright © 2016-2017 The Thingsboard Authors | ||
3 | + * | ||
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | ||
5 | + * you may not use this file except in compliance with the License. | ||
6 | + * You may obtain a copy of the License at | ||
7 | + * | ||
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | ||
9 | + * | ||
10 | + * Unless required by applicable law or agreed to in writing, software | ||
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | ||
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
13 | + * See the License for the specific language governing permissions and | ||
14 | + * limitations under the License. | ||
15 | + */ | ||
16 | +package org.thingsboard.server.extensions.mqtt.action; | ||
17 | + | ||
18 | +import lombok.Data; | ||
19 | +import org.thingsboard.server.extensions.core.action.template.TemplateActionConfiguration; | ||
20 | + | ||
21 | +@Data | ||
22 | +public class MqttPluginActionConfiguration implements TemplateActionConfiguration { | ||
23 | + private boolean sync; | ||
24 | + private String topic; | ||
25 | + private String template; | ||
26 | +} |
1 | +/** | ||
2 | + * Copyright © 2016-2017 The Thingsboard Authors | ||
3 | + * | ||
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | ||
5 | + * you may not use this file except in compliance with the License. | ||
6 | + * You may obtain a copy of the License at | ||
7 | + * | ||
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | ||
9 | + * | ||
10 | + * Unless required by applicable law or agreed to in writing, software | ||
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | ||
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
13 | + * See the License for the specific language governing permissions and | ||
14 | + * limitations under the License. | ||
15 | + */ | ||
16 | +package org.thingsboard.server.extensions.mqtt.plugin; | ||
17 | + | ||
18 | +import lombok.RequiredArgsConstructor; | ||
19 | +import lombok.extern.slf4j.Slf4j; | ||
20 | +import org.eclipse.paho.client.mqttv3.*; | ||
21 | +import org.thingsboard.server.common.data.id.RuleId; | ||
22 | +import org.thingsboard.server.common.data.id.TenantId; | ||
23 | +import org.thingsboard.server.common.msg.core.BasicStatusCodeResponse; | ||
24 | +import org.thingsboard.server.extensions.api.plugins.PluginContext; | ||
25 | +import org.thingsboard.server.extensions.api.plugins.handlers.RuleMsgHandler; | ||
26 | +import org.thingsboard.server.extensions.api.plugins.msg.ResponsePluginToRuleMsg; | ||
27 | +import org.thingsboard.server.extensions.api.plugins.msg.RuleToPluginMsg; | ||
28 | +import org.thingsboard.server.extensions.api.rules.RuleException; | ||
29 | +import org.thingsboard.server.extensions.mqtt.action.MqttActionMsg; | ||
30 | +import org.thingsboard.server.extensions.mqtt.action.MqttActionPayload; | ||
31 | + | ||
32 | +import java.nio.charset.StandardCharsets; | ||
33 | + | ||
34 | +@RequiredArgsConstructor | ||
35 | +@Slf4j | ||
36 | +public class MqttMsgHandler implements RuleMsgHandler { | ||
37 | + | ||
38 | + private final MqttAsyncClient mqttClient; | ||
39 | + | ||
40 | + @Override | ||
41 | + public void process(PluginContext ctx, TenantId tenantId, RuleId ruleId, RuleToPluginMsg<?> msg) throws RuleException { | ||
42 | + if (!(msg instanceof MqttActionMsg)) { | ||
43 | + throw new RuleException("Unsupported message type " + msg.getClass().getName() + "!"); | ||
44 | + } | ||
45 | + MqttActionPayload payload = ((MqttActionMsg) msg).getPayload(); | ||
46 | + MqttMessage mqttMsg = new MqttMessage(payload.getMsgBody().getBytes(StandardCharsets.UTF_8)); | ||
47 | + try { | ||
48 | + mqttClient.publish(payload.getTopic(), mqttMsg, null, new IMqttActionListener() { | ||
49 | + @Override | ||
50 | + public void onSuccess(IMqttToken asyncActionToken) { | ||
51 | + log.debug("Message [{}] was successfully delivered to topic [{}]!", msg.toString(), payload.getTopic()); | ||
52 | + if (payload.isSync()) { | ||
53 | + ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId, | ||
54 | + BasicStatusCodeResponse.onSuccess(payload.getMsgType(), payload.getRequestId()))); | ||
55 | + } | ||
56 | + } | ||
57 | + @Override | ||
58 | + public void onFailure(IMqttToken asyncActionToken, Throwable e) { | ||
59 | + log.warn("Failed to deliver message [{}] to topic [{}]!", msg.toString(), payload.getTopic()); | ||
60 | + if (payload.isSync()) { | ||
61 | + ctx.reply(new ResponsePluginToRuleMsg(msg.getUid(), tenantId, ruleId, | ||
62 | + BasicStatusCodeResponse.onError(payload.getMsgType(), payload.getRequestId(), new Exception(e)))); | ||
63 | + } | ||
64 | + } | ||
65 | + }); | ||
66 | + } catch (MqttException e) { | ||
67 | + throw new RuntimeException(e.getMessage(), e); | ||
68 | + } | ||
69 | + } | ||
70 | +} |
1 | +/** | ||
2 | + * Copyright © 2016-2017 The Thingsboard Authors | ||
3 | + * | ||
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | ||
5 | + * you may not use this file except in compliance with the License. | ||
6 | + * You may obtain a copy of the License at | ||
7 | + * | ||
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | ||
9 | + * | ||
10 | + * Unless required by applicable law or agreed to in writing, software | ||
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | ||
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
13 | + * See the License for the specific language governing permissions and | ||
14 | + * limitations under the License. | ||
15 | + */ | ||
16 | +package org.thingsboard.server.extensions.mqtt.plugin; | ||
17 | + | ||
18 | +import lombok.extern.slf4j.Slf4j; | ||
19 | +import org.apache.commons.lang.StringUtils; | ||
20 | +import org.eclipse.paho.client.mqttv3.*; | ||
21 | +import org.thingsboard.server.extensions.api.component.Plugin; | ||
22 | +import org.thingsboard.server.extensions.api.plugins.AbstractPlugin; | ||
23 | +import org.thingsboard.server.extensions.api.plugins.PluginContext; | ||
24 | +import org.thingsboard.server.extensions.api.plugins.handlers.RuleMsgHandler; | ||
25 | +import org.thingsboard.server.extensions.mqtt.action.MqttPluginAction; | ||
26 | + | ||
27 | +import java.util.UUID; | ||
28 | + | ||
29 | +@Plugin(name = "Mqtt Plugin", actions = {MqttPluginAction.class}, | ||
30 | + descriptor = "MqttPluginDescriptor.json", configuration = MqttPluginConfiguration.class) | ||
31 | +@Slf4j | ||
32 | +public class MqttPlugin extends AbstractPlugin<MqttPluginConfiguration> { | ||
33 | + | ||
34 | + private MqttMsgHandler handler; | ||
35 | + | ||
36 | + private MqttAsyncClient mqttClient; | ||
37 | + private MqttConnectOptions mqttClientOptions; | ||
38 | + | ||
39 | + private int retryInterval; | ||
40 | + | ||
41 | + private final Object connectLock = new Object(); | ||
42 | + | ||
43 | + @Override | ||
44 | + public void init(MqttPluginConfiguration configuration) { | ||
45 | + retryInterval = configuration.getRetryInterval(); | ||
46 | + | ||
47 | + mqttClientOptions = new MqttConnectOptions(); | ||
48 | + mqttClientOptions.setCleanSession(false); | ||
49 | + mqttClientOptions.setMaxInflight(configuration.getMaxInFlight()); | ||
50 | + mqttClientOptions.setAutomaticReconnect(true); | ||
51 | + String clientId = configuration.getClientId(); | ||
52 | + if (StringUtils.isEmpty(clientId)) { | ||
53 | + clientId = UUID.randomUUID().toString(); | ||
54 | + } | ||
55 | + if (!StringUtils.isEmpty(configuration.getAccessToken())) { | ||
56 | + mqttClientOptions.setUserName(configuration.getAccessToken()); | ||
57 | + } | ||
58 | + try { | ||
59 | + mqttClient = new MqttAsyncClient("tcp://" + configuration.getHost() + ":" + configuration.getPort(), clientId); | ||
60 | + } catch (Exception e) { | ||
61 | + log.error("Failed to create mqtt client", e); | ||
62 | + throw new RuntimeException(e); | ||
63 | + } | ||
64 | + connect(); | ||
65 | + } | ||
66 | + | ||
67 | + private void connect() { | ||
68 | + if (!mqttClient.isConnected()) { | ||
69 | + synchronized (connectLock) { | ||
70 | + while (!mqttClient.isConnected()) { | ||
71 | + log.debug("Attempt to connect to requested mqtt host [{}]!", mqttClient.getServerURI()); | ||
72 | + try { | ||
73 | + mqttClient.connect(mqttClientOptions, null, new IMqttActionListener() { | ||
74 | + @Override | ||
75 | + public void onSuccess(IMqttToken iMqttToken) { | ||
76 | + log.info("Connected to requested mqtt host [{}]!", mqttClient.getServerURI()); | ||
77 | + } | ||
78 | + | ||
79 | + @Override | ||
80 | + public void onFailure(IMqttToken iMqttToken, Throwable e) { | ||
81 | + } | ||
82 | + }).waitForCompletion(); | ||
83 | + } catch (MqttException e) { | ||
84 | + log.warn("Failed to connect to requested mqtt host [{}]!", mqttClient.getServerURI(), e); | ||
85 | + if (!mqttClient.isConnected()) { | ||
86 | + try { | ||
87 | + Thread.sleep(retryInterval); | ||
88 | + } catch (InterruptedException e1) { | ||
89 | + log.trace("Failed to wait for retry interval!", e); | ||
90 | + } | ||
91 | + } | ||
92 | + } | ||
93 | + } | ||
94 | + } | ||
95 | + } | ||
96 | + this.handler = new MqttMsgHandler(mqttClient); | ||
97 | + } | ||
98 | + | ||
99 | + private void destroy() { | ||
100 | + try { | ||
101 | + this.handler = null; | ||
102 | + this.mqttClient.disconnect(); | ||
103 | + } catch (MqttException e) { | ||
104 | + log.error("Failed to close mqtt client connection during destroy()", e); | ||
105 | + throw new RuntimeException(e); | ||
106 | + } | ||
107 | + } | ||
108 | + | ||
109 | + @Override | ||
110 | + protected RuleMsgHandler getRuleMsgHandler() { | ||
111 | + return handler; | ||
112 | + } | ||
113 | + | ||
114 | + @Override | ||
115 | + public void resume(PluginContext ctx) { | ||
116 | + connect(); | ||
117 | + } | ||
118 | + | ||
119 | + @Override | ||
120 | + public void suspend(PluginContext ctx) { | ||
121 | + destroy(); | ||
122 | + } | ||
123 | + | ||
124 | + @Override | ||
125 | + public void stop(PluginContext ctx) { | ||
126 | + destroy(); | ||
127 | + } | ||
128 | +} |
1 | +/** | ||
2 | + * Copyright © 2016-2017 The Thingsboard Authors | ||
3 | + * | ||
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | ||
5 | + * you may not use this file except in compliance with the License. | ||
6 | + * You may obtain a copy of the License at | ||
7 | + * | ||
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | ||
9 | + * | ||
10 | + * Unless required by applicable law or agreed to in writing, software | ||
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | ||
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
13 | + * See the License for the specific language governing permissions and | ||
14 | + * limitations under the License. | ||
15 | + */ | ||
16 | +package org.thingsboard.server.extensions.mqtt.plugin; | ||
17 | + | ||
18 | +import lombok.Data; | ||
19 | + | ||
20 | +@Data | ||
21 | +public class MqttPluginConfiguration { | ||
22 | + private String host; | ||
23 | + private int port; | ||
24 | + private int maxInFlight; | ||
25 | + private int retryInterval; | ||
26 | + private String clientId; | ||
27 | + private String accessToken; | ||
28 | +} |
1 | +{ | ||
2 | + "schema": { | ||
3 | + "title": "Mqtt Action Configuration", | ||
4 | + "type": "object", | ||
5 | + "properties": { | ||
6 | + "sync": { | ||
7 | + "title": "Requires delivery confirmation", | ||
8 | + "type": "boolean" | ||
9 | + }, | ||
10 | + "topic": { | ||
11 | + "title": "Topic Name", | ||
12 | + "type": "string" | ||
13 | + }, | ||
14 | + "template": { | ||
15 | + "title": "Body Template", | ||
16 | + "type": "string" | ||
17 | + } | ||
18 | + }, | ||
19 | + "required": [ | ||
20 | + "topic", | ||
21 | + "template" | ||
22 | + ] | ||
23 | + }, | ||
24 | + "form": [ | ||
25 | + "topic", | ||
26 | + { | ||
27 | + "key": "template", | ||
28 | + "type": "textarea", | ||
29 | + "rows": 5 | ||
30 | + } | ||
31 | + ] | ||
32 | +} |
1 | +{ | ||
2 | + "schema": { | ||
3 | + "title": "Mqtt Plugin Configuration", | ||
4 | + "type": "object", | ||
5 | + "properties": { | ||
6 | + "host": { | ||
7 | + "title": "Specify the host to connect to", | ||
8 | + "type": "string", | ||
9 | + "default": "localhost" | ||
10 | + }, | ||
11 | + "port": { | ||
12 | + "title": "Connect to the port specified", | ||
13 | + "type": "integer", | ||
14 | + "default": 1883 | ||
15 | + }, | ||
16 | + "accessToken": { | ||
17 | + "title": "Username (Access Token) to be used for authenticating.", | ||
18 | + "type": "string" | ||
19 | + }, | ||
20 | + "clientId": { | ||
21 | + "title": "The id to use for this client.", | ||
22 | + "type": "string" | ||
23 | + }, | ||
24 | + "maxInFlight": { | ||
25 | + "title": "How many messages can be send without acknowledgments.", | ||
26 | + "type": "integer", | ||
27 | + "default": 1000 | ||
28 | + }, | ||
29 | + "retryInterval": { | ||
30 | + "title": "Interval to wait between connect attempts to host.", | ||
31 | + "type": "integer", | ||
32 | + "default": 3000 | ||
33 | + } | ||
34 | + }, | ||
35 | + "required": [ | ||
36 | + "host", | ||
37 | + "port" | ||
38 | + ] | ||
39 | + }, | ||
40 | + "form": [ | ||
41 | + "host", | ||
42 | + "port", | ||
43 | + "accessToken", | ||
44 | + "clientId", | ||
45 | + "maxInFlight", | ||
46 | + "retryInterval" | ||
47 | + ] | ||
48 | +} |
@@ -38,6 +38,7 @@ | @@ -38,6 +38,7 @@ | ||
38 | <module>extension-rabbitmq</module> | 38 | <module>extension-rabbitmq</module> |
39 | <module>extension-rest-api-call</module> | 39 | <module>extension-rest-api-call</module> |
40 | <module>extension-kafka</module> | 40 | <module>extension-kafka</module> |
41 | + <module>extension-mqtt</module> | ||
41 | </modules> | 42 | </modules> |
42 | 43 | ||
43 | </project> | 44 | </project> |
@@ -341,6 +341,12 @@ | @@ -341,6 +341,12 @@ | ||
341 | <version>${project.version}</version> | 341 | <version>${project.version}</version> |
342 | </dependency> | 342 | </dependency> |
343 | <dependency> | 343 | <dependency> |
344 | + <groupId>org.thingsboard.extensions</groupId> | ||
345 | + <artifactId>extension-mqtt</artifactId> | ||
346 | + <classifier>extension</classifier> | ||
347 | + <version>${project.version}</version> | ||
348 | + </dependency> | ||
349 | + <dependency> | ||
344 | <groupId>org.thingsboard.common</groupId> | 350 | <groupId>org.thingsboard.common</groupId> |
345 | <artifactId>data</artifactId> | 351 | <artifactId>data</artifactId> |
346 | <version>${project.version}</version> | 352 | <version>${project.version}</version> |