Showing
12 changed files
with
238 additions
and
43 deletions
1 | 1 | /** |
2 | 2 | * Copyright © 2016-2018 The Thingsboard Authors |
3 | - * | |
3 | + * <p> | |
4 | 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | 5 | * you may not use this file except in compliance with the License. |
6 | 6 | * You may obtain a copy of the License at |
7 | - * | |
8 | - * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | - * | |
7 | + * <p> | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * <p> | |
10 | 10 | * Unless required by applicable law or agreed to in writing, software |
11 | 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
... | ... | @@ -16,8 +16,10 @@ |
16 | 16 | package org.thingsboard.server.actors.ruleChain; |
17 | 17 | |
18 | 18 | import akka.actor.ActorRef; |
19 | +import akka.actor.Cancellable; | |
19 | 20 | import com.google.common.base.Function; |
20 | 21 | import org.thingsboard.rule.engine.api.ListeningExecutor; |
22 | +import org.thingsboard.rule.engine.api.RuleEngineTelemetryService; | |
21 | 23 | import org.thingsboard.rule.engine.api.TbContext; |
22 | 24 | import org.thingsboard.server.actors.ActorSystemContext; |
23 | 25 | import org.thingsboard.server.common.data.id.RuleNodeId; |
... | ... | @@ -165,6 +167,11 @@ class DefaultTbContext implements TbContext { |
165 | 167 | } |
166 | 168 | |
167 | 169 | @Override |
170 | + public RuleEngineTelemetryService getTelemetryService() { | |
171 | + return mainCtx.getTsSubService(); | |
172 | + } | |
173 | + | |
174 | + @Override | |
168 | 175 | public RelationService getRelationService() { |
169 | 176 | return mainCtx.getRelationService(); |
170 | 177 | } | ... | ... |
1 | 1 | /** |
2 | 2 | * Copyright © 2016-2018 The Thingsboard Authors |
3 | - * | |
3 | + * <p> | |
4 | 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | 5 | * you may not use this file except in compliance with the License. |
6 | 6 | * You may obtain a copy of the License at |
7 | - * | |
8 | - * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | - * | |
7 | + * <p> | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * <p> | |
10 | 10 | * Unless required by applicable law or agreed to in writing, software |
11 | 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
... | ... | @@ -115,7 +115,7 @@ public class TelemetryController extends BaseController { |
115 | 115 | } |
116 | 116 | |
117 | 117 | @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')") |
118 | - @RequestMapping(value = "/{entityType}/{entityId}/keys/ATTRIBUTES", method = RequestMethod.GET) | |
118 | + @RequestMapping(value = "/{entityType}/{entityId}/keys/attributes", method = RequestMethod.GET) | |
119 | 119 | @ResponseBody |
120 | 120 | public DeferredResult<ResponseEntity> getAttributeKeys( |
121 | 121 | @PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr) throws ThingsboardException { |
... | ... | @@ -123,7 +123,7 @@ public class TelemetryController extends BaseController { |
123 | 123 | } |
124 | 124 | |
125 | 125 | @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')") |
126 | - @RequestMapping(value = "/{entityType}/{entityId}/keys/ATTRIBUTES/{scope}", method = RequestMethod.GET) | |
126 | + @RequestMapping(value = "/{entityType}/{entityId}/keys/attributes/{scope}", method = RequestMethod.GET) | |
127 | 127 | @ResponseBody |
128 | 128 | public DeferredResult<ResponseEntity> getAttributeKeysByScope( |
129 | 129 | @PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr |
... | ... | @@ -133,7 +133,7 @@ public class TelemetryController extends BaseController { |
133 | 133 | } |
134 | 134 | |
135 | 135 | @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')") |
136 | - @RequestMapping(value = "/{entityType}/{entityId}/values/ATTRIBUTES", method = RequestMethod.GET) | |
136 | + @RequestMapping(value = "/{entityType}/{entityId}/values/attributes", method = RequestMethod.GET) | |
137 | 137 | @ResponseBody |
138 | 138 | public DeferredResult<ResponseEntity> getAttributes( |
139 | 139 | @PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr, |
... | ... | @@ -144,7 +144,7 @@ public class TelemetryController extends BaseController { |
144 | 144 | } |
145 | 145 | |
146 | 146 | @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')") |
147 | - @RequestMapping(value = "/{entityType}/{entityId}/values/ATTRIBUTES/{scope}", method = RequestMethod.GET) | |
147 | + @RequestMapping(value = "/{entityType}/{entityId}/values/attributes/{scope}", method = RequestMethod.GET) | |
148 | 148 | @ResponseBody |
149 | 149 | public DeferredResult<ResponseEntity> getAttributesByScope( |
150 | 150 | @PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr, |
... | ... | @@ -156,7 +156,7 @@ public class TelemetryController extends BaseController { |
156 | 156 | } |
157 | 157 | |
158 | 158 | @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')") |
159 | - @RequestMapping(value = "/{entityType}/{entityId}/keys/TIMESERIES", method = RequestMethod.GET) | |
159 | + @RequestMapping(value = "/{entityType}/{entityId}/keys/timeseries", method = RequestMethod.GET) | |
160 | 160 | @ResponseBody |
161 | 161 | public DeferredResult<ResponseEntity> getTimeseriesKeys( |
162 | 162 | @PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr) throws ThingsboardException { |
... | ... | @@ -167,7 +167,7 @@ public class TelemetryController extends BaseController { |
167 | 167 | } |
168 | 168 | |
169 | 169 | @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')") |
170 | - @RequestMapping(value = "/{entityType}/{entityId}/values/TIMESERIES", method = RequestMethod.GET) | |
170 | + @RequestMapping(value = "/{entityType}/{entityId}/values/timeseries", method = RequestMethod.GET) | |
171 | 171 | @ResponseBody |
172 | 172 | public DeferredResult<ResponseEntity> getLatestTimeseries( |
173 | 173 | @PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr, |
... | ... | @@ -180,7 +180,7 @@ public class TelemetryController extends BaseController { |
180 | 180 | |
181 | 181 | |
182 | 182 | @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')") |
183 | - @RequestMapping(value = "/{entityType}/{entityId}/values/TIMESERIES", method = RequestMethod.GET) | |
183 | + @RequestMapping(value = "/{entityType}/{entityId}/values/timeseries", method = RequestMethod.GET, params = {"keys", "startTs", "endTs"}) | |
184 | 184 | @ResponseBody |
185 | 185 | public DeferredResult<ResponseEntity> getTimeseries( |
186 | 186 | @PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr, |
... | ... | @@ -222,7 +222,7 @@ public class TelemetryController extends BaseController { |
222 | 222 | } |
223 | 223 | |
224 | 224 | @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')") |
225 | - @RequestMapping(value = "/{entityType}/{entityId}/ATTRIBUTES/{scope}", method = RequestMethod.POST) | |
225 | + @RequestMapping(value = "/{entityType}/{entityId}/attributes/{scope}", method = RequestMethod.POST) | |
226 | 226 | @ResponseBody |
227 | 227 | public DeferredResult<ResponseEntity> saveEntityAttributesV2(@PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr, |
228 | 228 | @PathVariable("scope") String scope, |
... | ... | @@ -232,7 +232,7 @@ public class TelemetryController extends BaseController { |
232 | 232 | } |
233 | 233 | |
234 | 234 | @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')") |
235 | - @RequestMapping(value = "/{entityType}/{entityId}/TIMESERIES/{scope}", method = RequestMethod.POST) | |
235 | + @RequestMapping(value = "/{entityType}/{entityId}/timeseries/{scope}", method = RequestMethod.POST) | |
236 | 236 | @ResponseBody |
237 | 237 | public DeferredResult<ResponseEntity> saveEntityTelemetry(@PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr, |
238 | 238 | @PathVariable("scope") String scope, |
... | ... | @@ -242,7 +242,7 @@ public class TelemetryController extends BaseController { |
242 | 242 | } |
243 | 243 | |
244 | 244 | @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')") |
245 | - @RequestMapping(value = "/{entityType}/{entityId}/TIMESERIES/{scope}/{ttl}", method = RequestMethod.POST) | |
245 | + @RequestMapping(value = "/{entityType}/{entityId}/timeseries/{scope}/{ttl}", method = RequestMethod.POST) | |
246 | 246 | @ResponseBody |
247 | 247 | public DeferredResult<ResponseEntity> saveEntityTelemetryWithTTL(@PathVariable("entityType") String entityType, @PathVariable("entityId") String entityIdStr, |
248 | 248 | @PathVariable("scope") String scope, @PathVariable("ttl") Long ttl, | ... | ... |
... | ... | @@ -155,7 +155,9 @@ public class TbWebSocketHandler extends TextWebSocketHandler implements PluginWe |
155 | 155 | if (internalId != null) { |
156 | 156 | SessionMetaData sessionMd = internalSessionMap.get(internalId); |
157 | 157 | if (sessionMd != null) { |
158 | - sessionMd.session.sendMessage(new TextMessage(msg)); | |
158 | + synchronized (sessionMd) { | |
159 | + sessionMd.session.sendMessage(new TextMessage(msg)); | |
160 | + } | |
159 | 161 | } else { |
160 | 162 | log.warn("[{}][{}] Failed to find session by internal id", externalId, internalId); |
161 | 163 | } | ... | ... |
application/src/main/java/org/thingsboard/server/service/telemetry/TelemetrySubscriptionService.java
... | ... | @@ -15,21 +15,14 @@ |
15 | 15 | */ |
16 | 16 | package org.thingsboard.server.service.telemetry; |
17 | 17 | |
18 | -import com.google.common.util.concurrent.FutureCallback; | |
18 | +import org.thingsboard.rule.engine.api.RuleEngineTelemetryService; | |
19 | 19 | import org.thingsboard.server.common.data.id.EntityId; |
20 | -import org.thingsboard.server.common.data.kv.AttributeKvEntry; | |
21 | -import org.thingsboard.server.common.data.kv.KvEntry; | |
22 | -import org.thingsboard.server.common.data.kv.TsKvEntry; | |
23 | 20 | import org.thingsboard.server.extensions.core.plugin.telemetry.sub.SubscriptionState; |
24 | 21 | |
25 | -import java.util.List; | |
26 | -import java.util.Map; | |
27 | -import java.util.Set; | |
28 | - | |
29 | 22 | /** |
30 | 23 | * Created by ashvayka on 27.03.18. |
31 | 24 | */ |
32 | -public interface TelemetrySubscriptionService { | |
25 | +public interface TelemetrySubscriptionService extends RuleEngineTelemetryService { | |
33 | 26 | |
34 | 27 | void addLocalWsSubscription(String sessionId, EntityId entityId, SubscriptionState sub); |
35 | 28 | |
... | ... | @@ -37,9 +30,4 @@ public interface TelemetrySubscriptionService { |
37 | 30 | |
38 | 31 | void removeSubscription(String sessionId, int cmdId); |
39 | 32 | |
40 | - void saveAndNotify(EntityId entityId, List<TsKvEntry> ts, FutureCallback<Void> callback); | |
41 | - | |
42 | - void saveAndNotify(EntityId entityId, List<TsKvEntry> ts, long ttl, FutureCallback<Void> callback); | |
43 | - | |
44 | - void saveAndNotify(EntityId entityId, String scope, List<AttributeKvEntry> attributes, FutureCallback<Void> callback); | |
45 | 33 | } | ... | ... |
1 | +package org.thingsboard.rule.engine.api; | |
2 | + | |
3 | +import com.google.common.util.concurrent.FutureCallback; | |
4 | +import org.thingsboard.server.common.data.id.EntityId; | |
5 | +import org.thingsboard.server.common.data.kv.AttributeKvEntry; | |
6 | +import org.thingsboard.server.common.data.kv.TsKvEntry; | |
7 | + | |
8 | +import java.util.List; | |
9 | + | |
10 | +/** | |
11 | + * Created by ashvayka on 02.04.18. | |
12 | + */ | |
13 | +public interface RuleEngineTelemetryService { | |
14 | + | |
15 | + void saveAndNotify(EntityId entityId, List<TsKvEntry> ts, FutureCallback<Void> callback); | |
16 | + | |
17 | + void saveAndNotify(EntityId entityId, List<TsKvEntry> ts, long ttl, FutureCallback<Void> callback); | |
18 | + | |
19 | + void saveAndNotify(EntityId entityId, String scope, List<AttributeKvEntry> attributes, FutureCallback<Void> callback); | |
20 | + | |
21 | +} | ... | ... |
... | ... | @@ -44,6 +44,11 @@ |
44 | 44 | <scope>provided</scope> |
45 | 45 | </dependency> |
46 | 46 | <dependency> |
47 | + <groupId>org.thingsboard.common</groupId> | |
48 | + <artifactId>transport</artifactId> | |
49 | + <scope>provided</scope> | |
50 | + </dependency> | |
51 | + <dependency> | |
47 | 52 | <groupId>ch.qos.logback</groupId> |
48 | 53 | <artifactId>logback-core</artifactId> |
49 | 54 | <scope>provided</scope> |
... | ... | @@ -88,6 +93,10 @@ |
88 | 93 | <artifactId>mockito-all</artifactId> |
89 | 94 | <scope>test</scope> |
90 | 95 | </dependency> |
96 | + <dependency> | |
97 | + <groupId>org.thingsboard.common</groupId> | |
98 | + <artifactId>transport</artifactId> | |
99 | + </dependency> | |
91 | 100 | |
92 | 101 | <!--<dependency>--> |
93 | 102 | <!--<groupId>org.springframework.boot</groupId>--> | ... | ... |
1 | 1 | /** |
2 | 2 | * Copyright © 2016-2018 The Thingsboard Authors |
3 | - * | |
3 | + * <p> | |
4 | 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
5 | 5 | * you may not use this file except in compliance with the License. |
6 | 6 | * You may obtain a copy of the License at |
7 | - * | |
8 | - * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | - * | |
7 | + * <p> | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * <p> | |
10 | 10 | * Unless required by applicable law or agreed to in writing, software |
11 | 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
12 | 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
... | ... | @@ -17,6 +17,7 @@ package org.thingsboard.rule.engine.debug; |
17 | 17 | |
18 | 18 | import com.datastax.driver.core.utils.UUIDs; |
19 | 19 | import lombok.extern.slf4j.Slf4j; |
20 | +import org.springframework.util.StringUtils; | |
20 | 21 | import org.thingsboard.rule.engine.TbNodeUtils; |
21 | 22 | import org.thingsboard.rule.engine.api.ListeningExecutor; |
22 | 23 | import org.thingsboard.rule.engine.api.RuleNode; |
... | ... | @@ -26,6 +27,8 @@ import org.thingsboard.rule.engine.api.TbNodeConfiguration; |
26 | 27 | import org.thingsboard.rule.engine.api.TbNodeException; |
27 | 28 | import org.thingsboard.rule.engine.filter.TbJsFilterNodeConfiguration; |
28 | 29 | import org.thingsboard.rule.engine.js.NashornJsEngine; |
30 | +import org.thingsboard.server.common.data.id.EntityId; | |
31 | +import org.thingsboard.server.common.data.id.EntityIdFactory; | |
29 | 32 | import org.thingsboard.server.common.data.plugin.ComponentType; |
30 | 33 | import org.thingsboard.server.common.msg.TbMsg; |
31 | 34 | import org.thingsboard.server.common.msg.TbMsgMetaData; |
... | ... | @@ -33,6 +36,7 @@ import org.thingsboard.server.common.msg.TbMsgMetaData; |
33 | 36 | import javax.script.Bindings; |
34 | 37 | |
35 | 38 | import java.nio.charset.StandardCharsets; |
39 | +import java.util.UUID; | |
36 | 40 | import java.util.concurrent.TimeUnit; |
37 | 41 | |
38 | 42 | import static org.thingsboard.rule.engine.DonAsynchron.withCallback; |
... | ... | @@ -53,30 +57,40 @@ public class TbMsgGeneratorNode implements TbNode { |
53 | 57 | |
54 | 58 | private TbMsgGeneratorNodeConfiguration config; |
55 | 59 | private long delay; |
60 | + private EntityId originatorId; | |
61 | + private UUID nextTickId; | |
56 | 62 | |
57 | 63 | @Override |
58 | 64 | public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { |
59 | 65 | this.config = TbNodeUtils.convert(configuration, TbMsgGeneratorNodeConfiguration.class); |
60 | 66 | this.delay = TimeUnit.SECONDS.toMillis(config.getPeriodInSeconds()); |
61 | - ctx.tellSelf(newTickMsg(ctx), delay); | |
67 | + if (!StringUtils.isEmpty(config.getOriginatorId())) { | |
68 | + originatorId = EntityIdFactory.getByTypeAndUuid(config.getOriginatorType(), config.getOriginatorId()); | |
69 | + } else { | |
70 | + originatorId = ctx.getSelfId(); | |
71 | + } | |
72 | + sentTickMsg(ctx); | |
62 | 73 | } |
63 | 74 | |
64 | 75 | @Override |
65 | 76 | public void onMsg(TbContext ctx, TbMsg msg) { |
66 | - if (msg.getType().equals(TB_MSG_GENERATOR_NODE_MSG)) { | |
77 | + if (msg.getType().equals(TB_MSG_GENERATOR_NODE_MSG) && msg.getId().equals(nextTickId)) { | |
67 | 78 | TbMsgMetaData metaData = new TbMsgMetaData(); |
68 | 79 | if (config.getMsgMetaData() != null) { |
69 | 80 | config.getMsgMetaData().forEach(metaData::putValue); |
70 | 81 | } |
71 | - ctx.tellNext(new TbMsg(UUIDs.timeBased(), config.getMsgType(), ctx.getSelfId(), metaData, config.getMsgBody().getBytes(StandardCharsets.UTF_8))); | |
72 | - ctx.tellSelf(newTickMsg(ctx), delay); | |
82 | + ctx.tellNext(new TbMsg(UUIDs.timeBased(), config.getMsgType(), originatorId, metaData, config.getMsgBody().getBytes(StandardCharsets.UTF_8))); | |
83 | + sentTickMsg(ctx); | |
73 | 84 | } |
74 | 85 | } |
75 | 86 | |
76 | - private TbMsg newTickMsg(TbContext ctx) { | |
77 | - return new TbMsg(UUIDs.timeBased(), TB_MSG_GENERATOR_NODE_MSG, ctx.getSelfId(), new TbMsgMetaData(), new byte[]{}); | |
87 | + private void sentTickMsg(TbContext ctx) { | |
88 | + TbMsg tickMsg = new TbMsg(UUIDs.timeBased(), TB_MSG_GENERATOR_NODE_MSG, ctx.getSelfId(), new TbMsgMetaData(), new byte[]{}); | |
89 | + nextTickId = tickMsg.getId(); | |
90 | + ctx.tellSelf(tickMsg, delay); | |
78 | 91 | } |
79 | 92 | |
93 | + | |
80 | 94 | @Override |
81 | 95 | public void destroy() { |
82 | 96 | } | ... | ... |
... | ... | @@ -17,6 +17,8 @@ package org.thingsboard.rule.engine.debug; |
17 | 17 | |
18 | 18 | import lombok.Data; |
19 | 19 | import org.thingsboard.rule.engine.api.NodeConfiguration; |
20 | +import org.thingsboard.server.common.data.EntityType; | |
21 | + | |
20 | 22 | import java.util.Map; |
21 | 23 | |
22 | 24 | @Data |
... | ... | @@ -24,6 +26,8 @@ public class TbMsgGeneratorNodeConfiguration implements NodeConfiguration<TbMsgG |
24 | 26 | |
25 | 27 | private int msgCount; |
26 | 28 | private int periodInSeconds; |
29 | + private String originatorId; | |
30 | + private EntityType originatorType; | |
27 | 31 | private String msgType; |
28 | 32 | private String msgBody; |
29 | 33 | private Map<String, String> msgMetaData; | ... | ... |
1 | +/** | |
2 | + * Copyright © 2016-2018 The Thingsboard Authors | |
3 | + * <p> | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * <p> | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * <p> | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.rule.engine.telemetry; | |
17 | + | |
18 | +import com.google.gson.JsonParser; | |
19 | +import lombok.extern.slf4j.Slf4j; | |
20 | +import org.springframework.util.StringUtils; | |
21 | +import org.thingsboard.rule.engine.TbNodeUtils; | |
22 | +import org.thingsboard.rule.engine.api.RuleNode; | |
23 | +import org.thingsboard.rule.engine.api.TbContext; | |
24 | +import org.thingsboard.rule.engine.api.TbNode; | |
25 | +import org.thingsboard.rule.engine.api.TbNodeConfiguration; | |
26 | +import org.thingsboard.rule.engine.api.TbNodeException; | |
27 | +import org.thingsboard.server.common.data.kv.BasicTsKvEntry; | |
28 | +import org.thingsboard.server.common.data.kv.KvEntry; | |
29 | +import org.thingsboard.server.common.data.kv.TsKvEntry; | |
30 | +import org.thingsboard.server.common.data.plugin.ComponentType; | |
31 | +import org.thingsboard.server.common.msg.TbMsg; | |
32 | +import org.thingsboard.server.common.msg.core.TelemetryUploadRequest; | |
33 | +import org.thingsboard.server.common.transport.adaptor.JsonConverter; | |
34 | + | |
35 | +import java.nio.charset.StandardCharsets; | |
36 | +import java.util.ArrayList; | |
37 | +import java.util.List; | |
38 | +import java.util.Map; | |
39 | + | |
40 | +@Slf4j | |
41 | +@RuleNode( | |
42 | + type = ComponentType.ACTION, | |
43 | + name = "save timeseries data", | |
44 | + configClazz = TbMsgTelemetryNodeConfiguration.class, | |
45 | + nodeDescription = "Saves timeseries data", | |
46 | + nodeDetails = "Saves timeseries telemetry data based on configurable TTL parameter. Expects messages with 'POST_TELEMETRY' message type" | |
47 | +) | |
48 | + | |
49 | +public class TbMsgTelemetryNode implements TbNode { | |
50 | + | |
51 | + private TbMsgTelemetryNodeConfiguration config; | |
52 | + | |
53 | + @Override | |
54 | + public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { | |
55 | + this.config = TbNodeUtils.convert(configuration, TbMsgTelemetryNodeConfiguration.class); | |
56 | + } | |
57 | + | |
58 | + @Override | |
59 | + public void onMsg(TbContext ctx, TbMsg msg) { | |
60 | + if (!msg.getType().equals("POST_TELEMETRY")) { | |
61 | + ctx.tellError(msg, new IllegalArgumentException("Unsupported msg type: " + msg.getType())); | |
62 | + return; | |
63 | + } | |
64 | + | |
65 | + String src = new String(msg.getData(), StandardCharsets.UTF_8); | |
66 | + TelemetryUploadRequest telemetryUploadRequest = JsonConverter.convertToTelemetry(new JsonParser().parse(src)); | |
67 | + Map<Long, List<KvEntry>> tsKvMap = telemetryUploadRequest.getData(); | |
68 | + if (tsKvMap == null) { | |
69 | + ctx.tellError(msg, new IllegalArgumentException("Msg body us empty: " + src)); | |
70 | + return; | |
71 | + } | |
72 | + List<TsKvEntry> tsKvEntryList = new ArrayList<>(); | |
73 | + for (Map.Entry<Long, List<KvEntry>> tsKvEntry : tsKvMap.entrySet()) { | |
74 | + for (KvEntry kvEntry : tsKvEntry.getValue()) { | |
75 | + tsKvEntryList.add(new BasicTsKvEntry(tsKvEntry.getKey(), kvEntry)); | |
76 | + } | |
77 | + } | |
78 | + String ttlValue = msg.getMetaData().getValue("TTL"); | |
79 | + long ttl = !StringUtils.isEmpty(ttlValue) ? Long.valueOf(ttlValue) : config.getDefaultTTL(); | |
80 | + ctx.getTelemetryService().saveAndNotify(msg.getOriginator(), tsKvEntryList, ttl, new TelemetryNodeCallback(ctx, msg)); | |
81 | + } | |
82 | + | |
83 | + @Override | |
84 | + public void destroy() { | |
85 | + } | |
86 | + | |
87 | +} | ... | ... |
1 | +/** | |
2 | + * Copyright © 2016-2018 The Thingsboard Authors | |
3 | + * <p> | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * <p> | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * <p> | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.rule.engine.telemetry; | |
17 | + | |
18 | +import lombok.Data; | |
19 | +import org.thingsboard.rule.engine.api.NodeConfiguration; | |
20 | + | |
21 | +import java.util.Map; | |
22 | + | |
23 | +@Data | |
24 | +public class TbMsgTelemetryNodeConfiguration implements NodeConfiguration<TbMsgTelemetryNodeConfiguration> { | |
25 | + | |
26 | + private long defaultTTL; | |
27 | + | |
28 | + @Override | |
29 | + public TbMsgTelemetryNodeConfiguration defaultConfiguration() { | |
30 | + TbMsgTelemetryNodeConfiguration configuration = new TbMsgTelemetryNodeConfiguration(); | |
31 | + configuration.setDefaultTTL(0L); | |
32 | + return configuration; | |
33 | + } | |
34 | +} | ... | ... |
1 | +package org.thingsboard.rule.engine.telemetry; | |
2 | + | |
3 | +import com.google.common.util.concurrent.FutureCallback; | |
4 | +import lombok.Data; | |
5 | +import org.thingsboard.rule.engine.api.TbContext; | |
6 | +import org.thingsboard.server.common.msg.TbMsg; | |
7 | + | |
8 | +import javax.annotation.Nullable; | |
9 | + | |
10 | +/** | |
11 | + * Created by ashvayka on 02.04.18. | |
12 | + */ | |
13 | +@Data | |
14 | +class TelemetryNodeCallback implements FutureCallback<Void> { | |
15 | + private final TbContext ctx; | |
16 | + private final TbMsg msg; | |
17 | + | |
18 | + @Override | |
19 | + public void onSuccess(@Nullable Void result) { | |
20 | + ctx.tellNext(msg); | |
21 | + } | |
22 | + | |
23 | + @Override | |
24 | + public void onFailure(Throwable t) { | |
25 | + ctx.tellError(msg, t); | |
26 | + } | |
27 | +} | ... | ... |