Commit 257ca5db5f15bebc26187c198ec7edf74d45ced7

Authored by Andrew Shvayka
1 parent 316178e4

Implementation of RPC call action

... ... @@ -44,6 +44,7 @@ import org.thingsboard.server.dao.customer.CustomerService;
44 44 import org.thingsboard.server.dao.device.DeviceService;
45 45 import org.thingsboard.server.dao.event.EventService;
46 46 import org.thingsboard.server.dao.plugin.PluginService;
  47 +import org.thingsboard.server.dao.relation.RelationService;
47 48 import org.thingsboard.server.dao.rule.RuleService;
48 49 import org.thingsboard.server.dao.tenant.TenantService;
49 50 import org.thingsboard.server.dao.timeseries.TimeseriesService;
... ... @@ -110,6 +111,9 @@ public class ActorSystemContext {
110 111 @Getter private AlarmService alarmService;
111 112
112 113 @Autowired
  114 + @Getter private RelationService relationService;
  115 +
  116 + @Autowired
113 117 @Getter @Setter private PluginWebSocketMsgEndpoint wsMsgEndpoint;
114 118
115 119 @Value("${actors.session.sync.timeout}")
... ...
... ... @@ -33,6 +33,8 @@ import org.thingsboard.server.common.data.kv.TsKvEntry;
33 33 import org.thingsboard.server.common.data.kv.TsKvQuery;
34 34 import org.thingsboard.server.common.data.page.TextPageLink;
35 35 import org.thingsboard.server.common.data.plugin.PluginMetaData;
  36 +import org.thingsboard.server.common.data.relation.EntityRelation;
  37 +import org.thingsboard.server.common.data.relation.RelationTypeGroup;
36 38 import org.thingsboard.server.common.data.rule.RuleMetaData;
37 39 import org.thingsboard.server.common.msg.cluster.ServerAddress;
38 40 import org.thingsboard.server.extensions.api.device.DeviceAttributesEventNotificationMsg;
... ... @@ -395,6 +397,16 @@ public final class PluginProcessingContext implements PluginContext {
395 397 }
396 398
397 399 @Override
  400 + public ListenableFuture<List<EntityRelation>> findByFromAndType(EntityId from, String relationType) {
  401 + return this.pluginCtx.relationService.findByFromAndType(from, relationType, RelationTypeGroup.COMMON);
  402 + }
  403 +
  404 + @Override
  405 + public ListenableFuture<List<EntityRelation>> findByToAndType(EntityId from, String relationType) {
  406 + return this.pluginCtx.relationService.findByToAndType(from, relationType, RelationTypeGroup.COMMON);
  407 + }
  408 +
  409 + @Override
398 410 public Optional<ServerAddress> resolve(EntityId entityId) {
399 411 return pluginCtx.routingService.resolveById(entityId);
400 412 }
... ...
... ... @@ -30,6 +30,7 @@ import org.thingsboard.server.dao.attributes.AttributesService;
30 30 import org.thingsboard.server.dao.customer.CustomerService;
31 31 import org.thingsboard.server.dao.device.DeviceService;
32 32 import org.thingsboard.server.dao.plugin.PluginService;
  33 +import org.thingsboard.server.dao.relation.RelationService;
33 34 import org.thingsboard.server.dao.rule.RuleService;
34 35 import org.thingsboard.server.dao.tenant.TenantService;
35 36 import org.thingsboard.server.dao.timeseries.TimeseriesService;
... ... @@ -61,6 +62,7 @@ public final class SharedPluginProcessingContext {
61 62 final AttributesService attributesService;
62 63 final ClusterRpcService rpcService;
63 64 final ClusterRoutingService routingService;
  65 + final RelationService relationService;
64 66 final PluginId pluginId;
65 67 final TenantId tenantId;
66 68
... ... @@ -83,6 +85,7 @@ public final class SharedPluginProcessingContext {
83 85 this.pluginService = sysContext.getPluginService();
84 86 this.customerService = sysContext.getCustomerService();
85 87 this.tenantService = sysContext.getTenantService();
  88 + this.relationService = sysContext.getRelationService();
86 89 }
87 90
88 91 public PluginId getPluginId() {
... ...
... ... @@ -15,11 +15,14 @@
15 15 */
16 16 package org.thingsboard.server.extensions.api.plugins;
17 17
  18 +import com.google.common.util.concurrent.ListenableFuture;
18 19 import org.thingsboard.server.common.data.Device;
19 20 import org.thingsboard.server.common.data.id.*;
20 21 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
21 22 import org.thingsboard.server.common.data.kv.TsKvEntry;
22 23 import org.thingsboard.server.common.data.kv.TsKvQuery;
  24 +import org.thingsboard.server.common.data.relation.EntityRelation;
  25 +import org.thingsboard.server.common.data.relation.RelationTypeGroup;
23 26 import org.thingsboard.server.common.msg.cluster.ServerAddress;
24 27 import org.thingsboard.server.extensions.api.plugins.msg.PluginToRuleMsg;
25 28 import org.thingsboard.server.extensions.api.plugins.msg.TimeoutMsg;
... ... @@ -109,4 +112,12 @@ public interface PluginContext {
109 112
110 113 void getCustomerDevices(TenantId tenantId, CustomerId customerId, int limit, PluginCallback<List<Device>> callback);
111 114
  115 +
  116 + /*
  117 + * Relations API
  118 + * */
  119 +
  120 + ListenableFuture<List<EntityRelation>> findByFromAndType(EntityId from, String relationType);
  121 +
  122 + ListenableFuture<List<EntityRelation>> findByToAndType(EntityId from, String relationType);
112 123 }
... ...
  1 +/**
  2 + * Copyright © 2016-2017 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
1 16 package org.thingsboard.server.extensions.core.action.rpc;
2 17
3 18 import lombok.extern.slf4j.Slf4j;
... ... @@ -28,7 +43,8 @@ public class ServerSideRpcCallAction extends SimpleRuleLifecycleComponent implem
28 43
29 44 private ServerSideRpcCallActionConfiguration configuration;
30 45 private Optional<Template> deviceIdTemplate;
31   - private Optional<Template> deviceRelationTemplate;
  46 + private Optional<Template> fromDeviceRelationTemplate;
  47 + private Optional<Template> toDeviceRelationTemplate;
32 48 private Optional<Template> rpcCallMethodTemplate;
33 49 private Optional<Template> rpcCallBodyTemplate;
34 50
... ... @@ -37,7 +53,8 @@ public class ServerSideRpcCallAction extends SimpleRuleLifecycleComponent implem
37 53 this.configuration = configuration;
38 54 try {
39 55 deviceIdTemplate = toTemplate(configuration.getDeviceIdTemplate(), "Device Id Template");
40   - deviceRelationTemplate = toTemplate(configuration.getDeviceRelationTemplate(), "Device Relation Template");
  56 + fromDeviceRelationTemplate = toTemplate(configuration.getFromDeviceRelationTemplate(), "From Device Relation Template");
  57 + toDeviceRelationTemplate = toTemplate(configuration.getToDeviceRelationTemplate(), "To Device Relation Template");
41 58 rpcCallMethodTemplate = toTemplate(configuration.getRpcCallMethodTemplate(), "RPC Call Method Template");
42 59 rpcCallBodyTemplate = toTemplate(configuration.getRpcCallBodyTemplate(), "RPC Call Body Template");
43 60 } catch (ParseException e) {
... ... @@ -55,7 +72,8 @@ public class ServerSideRpcCallAction extends SimpleRuleLifecycleComponent implem
55 72 ServerSideRpcCallActionMsg.ServerSideRpcCallActionMsgBuilder builder = ServerSideRpcCallActionMsg.builder();
56 73
57 74 deviceIdTemplate.ifPresent(t -> builder.deviceId(VelocityUtils.merge(t, context)));
58   - deviceRelationTemplate.ifPresent(t -> builder.deviceRelation(VelocityUtils.merge(t, context)));
  75 + fromDeviceRelationTemplate.ifPresent(t -> builder.fromDeviceRelation(VelocityUtils.merge(t, context)));
  76 + toDeviceRelationTemplate.ifPresent(t -> builder.toDeviceRelation(VelocityUtils.merge(t, context)));
59 77 rpcCallMethodTemplate.ifPresent(t -> builder.rpcCallMethod(VelocityUtils.merge(t, context)));
60 78 rpcCallBodyTemplate.ifPresent(t -> builder.rpcCallBody(VelocityUtils.merge(t, context)));
61 79 return Optional.of(new ServerSideRpcCallRuleToPluginActionMsg(toDeviceActorMsg.getTenantId(), toDeviceActorMsg.getCustomerId(), toDeviceActorMsg.getDeviceId(),
... ...
... ... @@ -26,7 +26,9 @@ public class ServerSideRpcCallActionConfiguration {
26 26 private String sendFlag;
27 27
28 28 private String deviceIdTemplate;
29   - private String deviceRelationTemplate;
30 29 private String rpcCallMethodTemplate;
31 30 private String rpcCallBodyTemplate;
  31 + private long rpcCallTimeoutInSec;
  32 + private String fromDeviceRelationTemplate;
  33 + private String toDeviceRelationTemplate;
32 34 }
... ...
  1 +/**
  2 + * Copyright © 2016-2017 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
1 16 package org.thingsboard.server.extensions.core.action.rpc;
2 17
3 18 import lombok.Builder;
... ... @@ -13,8 +28,11 @@ import java.io.Serializable;
13 28 public class ServerSideRpcCallActionMsg implements Serializable {
14 29
15 30 private String deviceId;
16   - private String deviceRelation;
17 31 private String rpcCallMethod;
18 32 private String rpcCallBody;
  33 + private long rpcCallTimeoutInSec;
  34 +
  35 + private String fromDeviceRelation;
  36 + private String toDeviceRelation;
19 37
20 38 }
... ...
  1 +/**
  2 + * Copyright © 2016-2017 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
1 16 package org.thingsboard.server.extensions.core.action.rpc;
2 17
3 18 import org.thingsboard.server.common.data.id.CustomerId;
... ...
1 1 /**
2 2 * Copyright © 2016-2017 The Thingsboard Authors
3   - * <p>
  3 + *
4 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7   - * <p>
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - * <p>
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
... ... @@ -26,6 +26,7 @@ import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
26 26 import org.thingsboard.server.extensions.api.plugins.msg.TimeoutMsg;
27 27 import org.thingsboard.server.extensions.core.action.rpc.ServerSideRpcCallAction;
28 28 import org.thingsboard.server.extensions.core.plugin.rpc.handlers.RpcRestMsgHandler;
  29 +import org.thingsboard.server.extensions.core.plugin.rpc.handlers.RpcRuleMsgHandler;
29 30
30 31 /**
31 32 * @author Andrew Shvayka
... ... @@ -65,7 +66,7 @@ public class RpcPlugin extends AbstractPlugin<RpcPluginConfiguration> {
65 66
66 67 @Override
67 68 protected RuleMsgHandler getRuleMsgHandler() {
68   - return new DefaultRuleMsgHandler();
  69 + return new RpcRuleMsgHandler();
69 70 }
70 71
71 72 @Override
... ...
  1 +/**
  2 + * Copyright © 2016-2017 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
1 16 package org.thingsboard.server.extensions.core.plugin.rpc.handlers;
2 17
  18 +import lombok.extern.slf4j.Slf4j;
  19 +import org.springframework.util.StringUtils;
  20 +import org.thingsboard.server.common.data.id.DeviceId;
  21 +import org.thingsboard.server.common.data.id.EntityId;
3 22 import org.thingsboard.server.common.data.id.RuleId;
4 23 import org.thingsboard.server.common.data.id.TenantId;
  24 +import org.thingsboard.server.common.data.relation.EntityRelation;
  25 +import org.thingsboard.server.extensions.api.plugins.PluginCallback;
5 26 import org.thingsboard.server.extensions.api.plugins.PluginContext;
6 27 import org.thingsboard.server.extensions.api.plugins.handlers.RuleMsgHandler;
7 28 import org.thingsboard.server.extensions.api.plugins.msg.RuleToPluginMsg;
8 29 import org.thingsboard.server.extensions.api.plugins.msg.ToDeviceRpcRequest;
  30 +import org.thingsboard.server.extensions.api.plugins.msg.ToDeviceRpcRequestBody;
9 31 import org.thingsboard.server.extensions.api.rules.RuleException;
10 32 import org.thingsboard.server.extensions.core.action.rpc.ServerSideRpcCallActionMsg;
  33 +import org.thingsboard.server.extensions.core.action.rpc.ServerSideRpcCallRuleToPluginActionMsg;
  34 +
  35 +import java.util.Collections;
  36 +import java.util.List;
  37 +import java.util.UUID;
  38 +import java.util.concurrent.TimeUnit;
  39 +import java.util.stream.Collectors;
11 40
12 41 /**
13 42 * Created by ashvayka on 14.09.17.
14 43 */
  44 +@Slf4j
15 45 public class RpcRuleMsgHandler implements RuleMsgHandler {
16 46
17 47 @Override
18 48 public void process(PluginContext ctx, TenantId tenantId, RuleId ruleId, RuleToPluginMsg<?> msg) throws RuleException {
19   - if (msg instanceof ServerSideRpcCallActionMsg) {
20   - handle(ctx, tenantId, ruleId, (ServerSideRpcCallActionMsg) msg);
  49 + if (msg instanceof ServerSideRpcCallRuleToPluginActionMsg) {
  50 + handle(ctx, tenantId, ruleId, ((ServerSideRpcCallRuleToPluginActionMsg) msg).getPayload());
21 51 } else {
22 52 throw new RuntimeException("Not supported msg: " + msg + "!");
23 53 }
24 54 }
25 55
26   - private void handle(PluginContext ctx, TenantId tenantId, RuleId ruleId, ServerSideRpcCallActionMsg msg) {
27   -// TODO: implement
28   -// ToDeviceRpcRequest request = new ToDeviceRpcRequest();
29   -// ctx.sendRpcRequest(request);
  56 + private void handle(final PluginContext ctx, TenantId tenantId, RuleId ruleId, ServerSideRpcCallActionMsg msg) {
  57 + DeviceId deviceId = new DeviceId(UUID.fromString(msg.getDeviceId()));
  58 + ctx.checkAccess(deviceId, new PluginCallback<Void>() {
  59 + @Override
  60 + public void onSuccess(PluginContext dummy, Void value) {
  61 + try {
  62 + List<EntityId> deviceIds;
  63 + if (StringUtils.isEmpty(msg.getFromDeviceRelation()) && StringUtils.isEmpty(msg.getToDeviceRelation())) {
  64 + deviceIds = Collections.singletonList(deviceId);
  65 + } else if (!StringUtils.isEmpty(msg.getFromDeviceRelation())) {
  66 + List<EntityRelation> relations = ctx.findByFromAndType(deviceId, msg.getFromDeviceRelation()).get();
  67 + deviceIds = relations.stream().map(EntityRelation::getTo).collect(Collectors.toList());
  68 + } else {
  69 + List<EntityRelation> relations = ctx.findByToAndType(deviceId, msg.getFromDeviceRelation()).get();
  70 + deviceIds = relations.stream().map(EntityRelation::getFrom).collect(Collectors.toList());
  71 + }
  72 + ToDeviceRpcRequestBody body = new ToDeviceRpcRequestBody(msg.getRpcCallMethod(), msg.getRpcCallBody());
  73 + long expirationTime = System.currentTimeMillis() + msg.getRpcCallTimeoutInSec();
  74 + for (EntityId address : deviceIds) {
  75 + DeviceId tmpId = new DeviceId(address.getId());
  76 + ctx.checkAccess(tmpId, new PluginCallback<Void>() {
  77 + @Override
  78 + public void onSuccess(PluginContext ctx, Void value) {
  79 + ctx.sendRpcRequest(new ToDeviceRpcRequest(UUID.randomUUID(),
  80 + tenantId, tmpId, true, expirationTime, body)
  81 + );
  82 + log.trace("[{}] Sent RPC Call Action msg", tmpId);
  83 + }
  84 +
  85 + @Override
  86 + public void onFailure(PluginContext ctx, Exception e) {
  87 + log.info("[{}] Failed to process RPC Call Action msg", tmpId, e);
  88 + }
  89 + });
  90 + }
  91 + } catch (Exception e) {
  92 + log.info("Failed to process RPC Call Action msg", e);
  93 + }
  94 + }
  95 +
  96 + @Override
  97 + public void onFailure(PluginContext dummy, Exception e) {
  98 + log.info("[{}] Failed to process RPC Call Action msg", deviceId, e);
  99 + }
  100 + });
30 101 }
31 102 }
... ...
... ... @@ -4,7 +4,7 @@
4 4 "type": "object",
5 5 "properties": {
6 6 "sendFlag": {
7   - "title": "Send flag",
  7 + "title": "Send flag (empty or 'isNewAlarm', 'isExistingAlarm', 'isClearedAlarm', 'isNewOrClearedAlarm')",
8 8 "type": "string"
9 9 },
10 10 "fromTemplate": {
... ...
... ... @@ -4,16 +4,13 @@
4 4 "type": "object",
5 5 "properties": {
6 6 "sendFlag": {
7   - "title": "Send flag",
  7 + "title": "Send flag (empty or 'isNewAlarm', 'isExistingAlarm', 'isClearedAlarm', 'isNewOrClearedAlarm')",
8 8 "type": "string"
9 9 },
10 10 "deviceIdTemplate": {
11 11 "title": "Device ID template",
12   - "type": "string"
13   - },
14   - "deviceRelationTemplate": {
15   - "title": "Device Relation template",
16   - "type": "string"
  12 + "type": "string",
  13 + "default": "$deviceId"
17 14 },
18 15 "rpcCallMethodTemplate": {
19 16 "title": "RPC Call template",
... ... @@ -22,24 +19,39 @@
22 19 "rpcCallBodyTemplate": {
23 20 "title": "RPC Call Body template",
24 21 "type": "string"
  22 + },
  23 + "rpcCallTimeoutInSec": {
  24 + "title": "RPC Call timeout in seconds",
  25 + "type": "integer",
  26 + "default": 60
  27 + },
  28 + "fromDeviceRelationTemplate": {
  29 + "title": "From Device Relation template",
  30 + "type": "string"
  31 + },
  32 + "toDeviceRelationTemplate": {
  33 + "title": "To Device Relation template",
  34 + "type": "string"
25 35 }
26 36 },
27 37 "required": [
28 38 "deviceIdTemplate",
29   - "deviceRelationTemplate",
30 39 "rpcCallMethodTemplate",
31   - "rpcCallBodyTemplate"
  40 + "rpcCallBodyTemplate",
  41 + "rpcCallTimeoutInSec"
32 42 ]
33 43 },
34 44 "form": [
35 45 "sendFlag",
36 46 "deviceIdTemplate",
37   - "deviceRelationTemplate",
38 47 "rpcCallMethodTemplate",
39 48 {
40 49 "key": "rpcCallBodyTemplate",
41 50 "type": "textarea",
42 51 "rows": 5
43   - }
  52 + },
  53 + "rpcCallTimeoutInSec",
  54 + "fromDeviceRelationTemplate",
  55 + "toDeviceRelationTemplate"
44 56 ]
45 57 }
\ No newline at end of file
... ...