Commit bc3b55d572d3bdfcadec54356fd66ab8a83a6b8c

Authored by Andrew Shvayka
1 parent 037e65b3

RPC refactoring

Showing 33 changed files with 575 additions and 511 deletions
... ... @@ -26,11 +26,13 @@ import org.thingsboard.server.common.data.id.DeviceId;
26 26 import org.thingsboard.server.common.data.id.SessionId;
27 27 import org.thingsboard.server.common.data.kv.AttributeKey;
28 28 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
  29 +import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
29 30 import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
30 31 import org.thingsboard.server.common.msg.cluster.ServerAddress;
31 32 import org.thingsboard.server.common.msg.core.*;
32 33 import org.thingsboard.server.common.msg.device.ToDeviceActorMsg;
33 34 import org.thingsboard.server.common.msg.kv.BasicAttributeKVMsg;
  35 +import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
34 36 import org.thingsboard.server.common.msg.session.FromDeviceMsg;
35 37 import org.thingsboard.server.common.msg.session.MsgType;
36 38 import org.thingsboard.server.common.msg.session.SessionType;
... ...
... ... @@ -36,9 +36,11 @@ import org.thingsboard.server.common.data.page.TextPageLink;
36 36 import org.thingsboard.server.common.data.plugin.PluginMetaData;
37 37 import org.thingsboard.server.common.data.relation.EntityRelation;
38 38 import org.thingsboard.server.common.data.relation.RelationTypeGroup;
  39 +import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
39 40 import org.thingsboard.server.common.data.rule.RuleChain;
40 41 import org.thingsboard.server.common.data.rule.RuleMetaData;
41 42 import org.thingsboard.server.common.msg.cluster.ServerAddress;
  43 +import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
42 44 import org.thingsboard.server.extensions.api.device.DeviceAttributesEventNotificationMsg;
43 45 import org.thingsboard.server.extensions.api.plugins.PluginApiCallSecurityContext;
44 46 import org.thingsboard.server.extensions.api.plugins.PluginCallback;
... ... @@ -348,7 +350,7 @@ public final class PluginProcessingContext implements PluginContext {
348 350 throw new IllegalStateException("Not Implemented!");
349 351 }
350 352 } else {
351   - callback.onSuccess(this, ValidationResult.ok());
  353 + callback.onSuccess(this, ValidationResult.ok(null));
352 354 }
353 355 }
354 356
... ... @@ -366,7 +368,7 @@ public final class PluginProcessingContext implements PluginContext {
366 368 } else if (ctx.isCustomerUser() && !device.getCustomerId().equals(ctx.getCustomerId())) {
367 369 return ValidationResult.accessDenied("Device doesn't belong to the current Customer!");
368 370 } else {
369   - return ValidationResult.ok();
  371 + return ValidationResult.ok(null);
370 372 }
371 373 }
372 374 }));
... ... @@ -387,7 +389,7 @@ public final class PluginProcessingContext implements PluginContext {
387 389 } else if (ctx.isCustomerUser() && !asset.getCustomerId().equals(ctx.getCustomerId())) {
388 390 return ValidationResult.accessDenied("Asset doesn't belong to the current Customer!");
389 391 } else {
390   - return ValidationResult.ok();
  392 + return ValidationResult.ok(null);
391 393 }
392 394 }
393 395 }));
... ... @@ -408,7 +410,7 @@ public final class PluginProcessingContext implements PluginContext {
408 410 } else if (ctx.isSystemAdmin() && !rule.getTenantId().isNullUid()) {
409 411 return ValidationResult.accessDenied("Rule is not in system scope!");
410 412 } else {
411   - return ValidationResult.ok();
  413 + return ValidationResult.ok(null);
412 414 }
413 415 }
414 416 }));
... ... @@ -429,7 +431,7 @@ public final class PluginProcessingContext implements PluginContext {
429 431 } else if (ctx.isSystemAdmin() && !ruleChain.getTenantId().isNullUid()) {
430 432 return ValidationResult.accessDenied("Rule chain is not in system scope!");
431 433 } else {
432   - return ValidationResult.ok();
  434 + return ValidationResult.ok(null);
433 435 }
434 436 }
435 437 }));
... ... @@ -451,7 +453,7 @@ public final class PluginProcessingContext implements PluginContext {
451 453 } else if (ctx.isSystemAdmin() && !plugin.getTenantId().isNullUid()) {
452 454 return ValidationResult.accessDenied("Plugin is not in system scope!");
453 455 } else {
454   - return ValidationResult.ok();
  456 + return ValidationResult.ok(null);
455 457 }
456 458 }
457 459 }));
... ... @@ -472,7 +474,7 @@ public final class PluginProcessingContext implements PluginContext {
472 474 } else if (ctx.isCustomerUser() && !customer.getId().equals(ctx.getCustomerId())) {
473 475 return ValidationResult.accessDenied("Customer doesn't relate to the currently authorized customer user!");
474 476 } else {
475   - return ValidationResult.ok();
  477 + return ValidationResult.ok(null);
476 478 }
477 479 }
478 480 }));
... ... @@ -483,7 +485,7 @@ public final class PluginProcessingContext implements PluginContext {
483 485 if (ctx.isCustomerUser()) {
484 486 callback.onSuccess(this, ValidationResult.accessDenied(CUSTOMER_USER_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
485 487 } else if (ctx.isSystemAdmin()) {
486   - callback.onSuccess(this, ValidationResult.ok());
  488 + callback.onSuccess(this, ValidationResult.ok(null));
487 489 } else {
488 490 ListenableFuture<Tenant> tenantFuture = pluginCtx.tenantService.findTenantByIdAsync(new TenantId(entityId.getId()));
489 491 Futures.addCallback(tenantFuture, getCallback(callback, tenant -> {
... ... @@ -492,7 +494,7 @@ public final class PluginProcessingContext implements PluginContext {
492 494 } else if (!tenant.getId().equals(ctx.getTenantId())) {
493 495 return ValidationResult.accessDenied("Tenant doesn't relate to the currently authorized user!");
494 496 } else {
495   - return ValidationResult.ok();
  497 + return ValidationResult.ok(null);
496 498 }
497 499 }));
498 500 }
... ...
... ... @@ -18,9 +18,7 @@ package org.thingsboard.server.actors.plugin;
18 18 import akka.actor.ActorRef;
19 19 import lombok.extern.slf4j.Slf4j;
20 20 import org.thingsboard.server.actors.ActorSystemContext;
21   -import org.thingsboard.server.common.data.Device;
22 21 import org.thingsboard.server.common.data.id.DeviceId;
23   -import org.thingsboard.server.common.data.id.EntityId;
24 22 import org.thingsboard.server.common.data.id.TenantId;
25 23 import org.thingsboard.server.common.msg.cluster.ServerAddress;
26 24 import org.thingsboard.server.controller.plugin.PluginWebSocketMsgEndpoint;
... ... @@ -38,7 +36,7 @@ import org.thingsboard.server.dao.tenant.TenantService;
38 36 import org.thingsboard.server.dao.timeseries.TimeseriesService;
39 37 import org.thingsboard.server.extensions.api.device.DeviceAttributesEventNotificationMsg;
40 38 import org.thingsboard.server.extensions.api.plugins.msg.TimeoutMsg;
41   -import org.thingsboard.server.extensions.api.plugins.msg.ToDeviceRpcRequest;
  39 +import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
42 40 import org.thingsboard.server.extensions.api.plugins.msg.ToDeviceRpcRequestPluginMsg;
43 41 import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
44 42 import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
... ...
... ... @@ -20,29 +20,30 @@ import lombok.Data;
20 20
21 21 @Data
22 22 @AllArgsConstructor
23   -public class ValidationResult {
  23 +public class ValidationResult<V> {
24 24
25 25 private final ValidationResultCode resultCode;
26 26 private final String message;
  27 + private final V v;
27 28
28   - public static ValidationResult ok() {
29   - return new ValidationResult(ValidationResultCode.OK, "Ok");
  29 + public static <V> ValidationResult<V> ok(V v) {
  30 + return new ValidationResult<>(ValidationResultCode.OK, "Ok", v);
30 31 }
31 32
32   - public static ValidationResult accessDenied(String message) {
33   - return new ValidationResult(ValidationResultCode.ACCESS_DENIED, message);
  33 + public static <V> ValidationResult<V> accessDenied(String message) {
  34 + return new ValidationResult<>(ValidationResultCode.ACCESS_DENIED, message, null);
34 35 }
35 36
36   - public static ValidationResult entityNotFound(String message) {
37   - return new ValidationResult(ValidationResultCode.ENTITY_NOT_FOUND, message);
  37 + public static <V> ValidationResult<V> entityNotFound(String message) {
  38 + return new ValidationResult<>(ValidationResultCode.ENTITY_NOT_FOUND, message, null);
38 39 }
39 40
40   - public static ValidationResult unauthorized(String message) {
41   - return new ValidationResult(ValidationResultCode.UNAUTHORIZED, message);
  41 + public static <V> ValidationResult<V> unauthorized(String message) {
  42 + return new ValidationResult<>(ValidationResultCode.UNAUTHORIZED, message, null);
42 43 }
43 44
44   - public static ValidationResult internalError(String message) {
45   - return new ValidationResult(ValidationResultCode.INTERNAL_ERROR, message);
  45 + public static <V> ValidationResult<V> internalError(String message) {
  46 + return new ValidationResult<>(ValidationResultCode.INTERNAL_ERROR, message, null);
46 47 }
47 48
48 49 }
... ...
... ... @@ -24,10 +24,12 @@ import org.thingsboard.server.actors.service.ActorService;
24 24 import org.thingsboard.server.common.data.id.DeviceId;
25 25 import org.thingsboard.server.common.data.id.PluginId;
26 26 import org.thingsboard.server.common.data.id.TenantId;
  27 +import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
27 28 import org.thingsboard.server.common.msg.cluster.ServerAddress;
28 29 import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
29 30 import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
30 31 import org.thingsboard.server.common.msg.device.ToDeviceActorMsg;
  32 +import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
31 33 import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
32 34 import org.thingsboard.server.extensions.api.plugins.msg.*;
33 35 import org.thingsboard.server.extensions.api.plugins.rpc.PluginRpcMsg;
... ... @@ -35,6 +37,7 @@ import org.thingsboard.server.extensions.api.plugins.rpc.RpcMsg;
35 37 import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
36 38 import org.thingsboard.server.service.cluster.rpc.GrpcSession;
37 39 import org.thingsboard.server.service.cluster.rpc.GrpcSessionListener;
  40 +import org.thingsboard.server.service.rpc.ToDeviceRpcRequestMsg;
38 41
39 42 import java.io.Serializable;
40 43 import java.util.UUID;
... ... @@ -139,28 +142,20 @@ public class BasicRpcSessionListener implements GrpcSessionListener {
139 142 return new UUID(uid.getPluginUuidMsb(), uid.getPluginUuidLsb());
140 143 }
141 144
142   - private static ToDeviceRpcRequestPluginMsg deserialize(ServerAddress serverAddress, ClusterAPIProtos.ToDeviceRpcRequestRpcMessage msg) {
143   - ClusterAPIProtos.PluginAddress address = msg.getAddress();
144   - TenantId pluginTenantId = new TenantId(toUUID(address.getTenantId()));
145   - PluginId pluginId = new PluginId(toUUID(address.getPluginId()));
146   -
  145 + private static ToDeviceRpcRequestMsg deserialize(ServerAddress serverAddress, ClusterAPIProtos.ToDeviceRpcRequestRpcMessage msg) {
147 146 TenantId deviceTenantId = new TenantId(toUUID(msg.getDeviceTenantId()));
148 147 DeviceId deviceId = new DeviceId(toUUID(msg.getDeviceId()));
149 148
150 149 ToDeviceRpcRequestBody requestBody = new ToDeviceRpcRequestBody(msg.getMethod(), msg.getParams());
151   - ToDeviceRpcRequest request = new ToDeviceRpcRequest(toUUID(msg.getMsgId()), null, deviceTenantId, deviceId, msg.getOneway(), msg.getExpTime(), requestBody);
  150 + ToDeviceRpcRequest request = new ToDeviceRpcRequest(toUUID(msg.getMsgId()), deviceTenantId, deviceId, msg.getOneway(), msg.getExpTime(), requestBody);
152 151
153   - return new ToDeviceRpcRequestPluginMsg(serverAddress, pluginId, pluginTenantId, request);
  152 + return new ToDeviceRpcRequestMsg(serverAddress, request);
154 153 }
155 154
156 155 private static ToPluginRpcResponseDeviceMsg deserialize(ServerAddress serverAddress, ClusterAPIProtos.ToPluginRpcResponseRpcMessage msg) {
157   - ClusterAPIProtos.PluginAddress address = msg.getAddress();
158   - TenantId pluginTenantId = new TenantId(toUUID(address.getTenantId()));
159   - PluginId pluginId = new PluginId(toUUID(address.getPluginId()));
160   -
161 156 RpcError error = !StringUtils.isEmpty(msg.getError()) ? RpcError.valueOf(msg.getError()) : null;
162 157 FromDeviceRpcResponse response = new FromDeviceRpcResponse(toUUID(msg.getMsgId()), msg.getResponse(), error);
163   - return new ToPluginRpcResponseDeviceMsg(pluginId, pluginTenantId, response);
  158 + return new ToPluginRpcResponseDeviceMsg(null, null, response);
164 159 }
165 160
166 161 @SuppressWarnings("unchecked")
... ...
... ... @@ -602,7 +602,7 @@ public abstract class BaseController {
602 602 auditLogService.logEntityAction(user.getTenantId(), customerId, user.getId(), user.getName(), entityId, entity, actionType, e, additionalInfo);
603 603 }
604 604
605   - protected static Exception toException(Throwable error) {
  605 + public static Exception toException(Throwable error) {
606 606 return Exception.class.isInstance(error) ? (Exception) error : new Exception(error);
607 607 }
608 608
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.controller;
  17 +
  18 +import com.fasterxml.jackson.databind.JsonNode;
  19 +import com.fasterxml.jackson.databind.ObjectMapper;
  20 +import com.google.common.util.concurrent.FutureCallback;
  21 +import lombok.extern.slf4j.Slf4j;
  22 +import org.springframework.beans.factory.annotation.Autowired;
  23 +import org.springframework.http.HttpStatus;
  24 +import org.springframework.http.ResponseEntity;
  25 +import org.springframework.security.access.prepost.PreAuthorize;
  26 +import org.springframework.web.bind.annotation.PathVariable;
  27 +import org.springframework.web.bind.annotation.RequestBody;
  28 +import org.springframework.web.bind.annotation.RequestMapping;
  29 +import org.springframework.web.bind.annotation.RequestMethod;
  30 +import org.springframework.web.bind.annotation.ResponseBody;
  31 +import org.springframework.web.bind.annotation.RestController;
  32 +import org.springframework.web.context.request.async.DeferredResult;
  33 +import org.thingsboard.server.actors.plugin.ValidationResult;
  34 +import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
  35 +import org.thingsboard.server.common.data.exception.ThingsboardException;
  36 +import org.thingsboard.server.common.data.id.DeviceId;
  37 +import org.thingsboard.server.common.data.id.TenantId;
  38 +import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
  39 +import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
  40 +import org.thingsboard.server.extensions.api.exception.ToErrorResponseEntity;
  41 +import org.thingsboard.server.extensions.api.plugins.PluginConstants;
  42 +import org.thingsboard.server.common.data.rpc.RpcRequest;
  43 +import org.thingsboard.server.service.rpc.LocalRequestMetaData;
  44 +import org.thingsboard.server.service.rpc.RpcService;
  45 +import org.thingsboard.server.service.security.AccessValidator;
  46 +import org.thingsboard.server.service.security.model.SecurityUser;
  47 +
  48 +import javax.annotation.Nullable;
  49 +import javax.annotation.PostConstruct;
  50 +import javax.annotation.PreDestroy;
  51 +import java.io.IOException;
  52 +import java.util.Optional;
  53 +import java.util.UUID;
  54 +import java.util.concurrent.ExecutorService;
  55 +import java.util.concurrent.Executors;
  56 +
  57 +/**
  58 + * Created by ashvayka on 22.03.18.
  59 + */
  60 +@RestController
  61 +@RequestMapping(PluginConstants.RPC_URL_PREFIX)
  62 +@Slf4j
  63 +public class RpcController extends BaseController {
  64 +
  65 + public static final int DEFAULT_TIMEOUT = 10000;
  66 + protected final ObjectMapper jsonMapper = new ObjectMapper();
  67 +
  68 + @Autowired
  69 + private RpcService rpcService;
  70 +
  71 + @Autowired
  72 + private AccessValidator accessValidator;
  73 +
  74 + private ExecutorService executor;
  75 +
  76 + @PostConstruct
  77 + public void initExecutor() {
  78 + executor = Executors.newSingleThreadExecutor();
  79 + }
  80 +
  81 + @PreDestroy
  82 + public void shutdownExecutor() {
  83 + if (executor != null) {
  84 + executor.shutdownNow();
  85 + }
  86 + }
  87 +
  88 + @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
  89 + @RequestMapping(value = "/oneway/{deviceId}", method = RequestMethod.POST)
  90 + @ResponseBody
  91 + public DeferredResult<ResponseEntity> handleOneWayDeviceRPCRequest(@PathVariable("deviceId") String deviceIdStr, @RequestBody String requestBody) throws ThingsboardException {
  92 + return handleDeviceRPCRequest(true, new DeviceId(UUID.fromString(deviceIdStr)), requestBody);
  93 + }
  94 +
  95 + @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
  96 + @RequestMapping(value = "/twoway/{deviceId}", method = RequestMethod.POST)
  97 + @ResponseBody
  98 + public DeferredResult<ResponseEntity> handleTwoWayDeviceRPCRequest(@PathVariable("deviceId") String deviceIdStr, @RequestBody String requestBody) throws ThingsboardException {
  99 + return handleDeviceRPCRequest(false, new DeviceId(UUID.fromString(deviceIdStr)), requestBody);
  100 + }
  101 +
  102 +
  103 + private DeferredResult<ResponseEntity> handleDeviceRPCRequest(boolean oneWay, DeviceId deviceId, String requestBody) throws ThingsboardException {
  104 + try {
  105 + JsonNode rpcRequestBody = jsonMapper.readTree(requestBody);
  106 + RpcRequest cmd = new RpcRequest(rpcRequestBody.get("method").asText(),
  107 + jsonMapper.writeValueAsString(rpcRequestBody.get("params")));
  108 +
  109 + if (rpcRequestBody.has("timeout")) {
  110 + cmd.setTimeout(rpcRequestBody.get("timeout").asLong());
  111 + }
  112 + SecurityUser currentUser = getCurrentUser();
  113 + TenantId tenantId = currentUser.getTenantId();
  114 + final DeferredResult<ResponseEntity> response = new DeferredResult<>();
  115 + long timeout = System.currentTimeMillis() + (cmd.getTimeout() != null ? cmd.getTimeout() : DEFAULT_TIMEOUT);
  116 + ToDeviceRpcRequestBody body = new ToDeviceRpcRequestBody(cmd.getMethodName(), cmd.getRequestData());
  117 + accessValidator.validate(currentUser, deviceId, new FutureCallback<ValidationResult>() {
  118 + @Override
  119 + public void onSuccess(@Nullable ValidationResult result) {
  120 + ToDeviceRpcRequest rpcRequest = new ToDeviceRpcRequest(UUID.randomUUID(),
  121 + tenantId,
  122 + deviceId,
  123 + oneWay,
  124 + timeout,
  125 + body
  126 + );
  127 + rpcService.process(rpcRequest, new LocalRequestMetaData(rpcRequest, currentUser, response));
  128 + }
  129 +
  130 + @Override
  131 + public void onFailure(Throwable e) {
  132 + ResponseEntity entity;
  133 + if (e instanceof ToErrorResponseEntity) {
  134 + entity = ((ToErrorResponseEntity) e).toErrorResponseEntity();
  135 + } else {
  136 + entity = new ResponseEntity(HttpStatus.UNAUTHORIZED);
  137 + }
  138 + rpcService.logRpcCall(currentUser, deviceId, body, oneWay, Optional.empty(), e);
  139 + response.setResult(entity);
  140 + }
  141 + });
  142 + return response;
  143 + } catch (IOException ioe) {
  144 + throw new ThingsboardException("Invalid request body", ioe, ThingsboardErrorCode.BAD_REQUEST_PARAMS);
  145 + }
  146 + }
  147 +
  148 +}
... ...
... ... @@ -19,7 +19,6 @@ import com.google.protobuf.ByteString;
19 19 import io.grpc.Server;
20 20 import io.grpc.ServerBuilder;
21 21 import io.grpc.stub.StreamObserver;
22   -import lombok.Setter;
23 22 import lombok.extern.slf4j.Slf4j;
24 23 import org.springframework.beans.factory.annotation.Autowired;
25 24 import org.springframework.stereotype.Service;
... ... @@ -27,7 +26,6 @@ import org.springframework.util.SerializationUtils;
27 26 import org.thingsboard.server.actors.rpc.RpcBroadcastMsg;
28 27 import org.thingsboard.server.actors.rpc.RpcSessionCreateRequestMsg;
29 28 import org.thingsboard.server.actors.rpc.RpcSessionTellMsg;
30   -import org.thingsboard.server.actors.service.ActorService;
31 29 import org.thingsboard.server.common.data.id.EntityId;
32 30 import org.thingsboard.server.common.msg.cluster.ServerAddress;
33 31 import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
... ... @@ -35,21 +33,18 @@ import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
35 33 import org.thingsboard.server.common.msg.device.ToDeviceActorMsg;
36 34 import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
37 35 import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
38   -import org.thingsboard.server.extensions.api.plugins.msg.ToDeviceRpcRequest;
  36 +import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
39 37 import org.thingsboard.server.extensions.api.plugins.msg.ToDeviceRpcRequestPluginMsg;
40 38 import org.thingsboard.server.extensions.api.plugins.msg.ToPluginRpcResponseDeviceMsg;
41 39 import org.thingsboard.server.extensions.api.plugins.rpc.PluginRpcMsg;
42 40 import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
43 41 import org.thingsboard.server.gen.cluster.ClusterRpcServiceGrpc;
44   -import org.thingsboard.server.service.cluster.discovery.DiscoveryService;
45 42 import org.thingsboard.server.service.cluster.discovery.ServerInstance;
46 43 import org.thingsboard.server.service.cluster.discovery.ServerInstanceService;
47   -import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
  44 +import org.thingsboard.server.service.rpc.ToDeviceRpcRequestMsg;
48 45
49   -import javax.annotation.PostConstruct;
50 46 import javax.annotation.PreDestroy;
51 47 import java.io.IOException;
52   -import java.util.Set;
53 48 import java.util.UUID;
54 49 import java.util.concurrent.ConcurrentHashMap;
55 50 import java.util.concurrent.ConcurrentMap;
... ... @@ -138,7 +133,7 @@ public class ClusterGrpcService extends ClusterRpcServiceGrpc.ClusterRpcServiceI
138 133 }
139 134
140 135 @Override
141   - public void tell(ServerAddress serverAddress, ToDeviceRpcRequestPluginMsg toForward) {
  136 + public void tell(ServerAddress serverAddress, ToDeviceRpcRequestMsg toForward) {
142 137 ClusterAPIProtos.ToRpcServerMessage msg = ClusterAPIProtos.ToRpcServerMessage.newBuilder()
143 138 .setToDeviceRpcRequestRpcMsg(toProtoMsg(toForward)).build();
144 139 tell(serverAddress, msg);
... ... @@ -202,15 +197,10 @@ public class ClusterGrpcService extends ClusterRpcServiceGrpc.ClusterRpcServiceI
202 197 ).build();
203 198 }
204 199
205   - private static ClusterAPIProtos.ToDeviceRpcRequestRpcMessage toProtoMsg(ToDeviceRpcRequestPluginMsg msg) {
  200 + private static ClusterAPIProtos.ToDeviceRpcRequestRpcMessage toProtoMsg(ToDeviceRpcRequestMsg msg) {
206 201 ClusterAPIProtos.ToDeviceRpcRequestRpcMessage.Builder builder = ClusterAPIProtos.ToDeviceRpcRequestRpcMessage.newBuilder();
207 202 ToDeviceRpcRequest request = msg.getMsg();
208 203
209   - builder.setAddress(ClusterAPIProtos.PluginAddress.newBuilder()
210   - .setTenantId(toUid(msg.getPluginTenantId().getId()))
211   - .setPluginId(toUid(msg.getPluginId().getId()))
212   - .build());
213   -
214 204 builder.setDeviceTenantId(toUid(msg.getTenantId()));
215 205 builder.setDeviceId(toUid(msg.getDeviceId()));
216 206
... ... @@ -227,11 +217,6 @@ public class ClusterGrpcService extends ClusterRpcServiceGrpc.ClusterRpcServiceI
227 217 ClusterAPIProtos.ToPluginRpcResponseRpcMessage.Builder builder = ClusterAPIProtos.ToPluginRpcResponseRpcMessage.newBuilder();
228 218 FromDeviceRpcResponse request = msg.getResponse();
229 219
230   - builder.setAddress(ClusterAPIProtos.PluginAddress.newBuilder()
231   - .setTenantId(toUid(msg.getPluginTenantId().getId()))
232   - .setPluginId(toUid(msg.getPluginId().getId()))
233   - .build());
234   -
235 220 builder.setMsgId(toUid(request.getId()));
236 221 request.getResponse().ifPresent(builder::setResponse);
237 222 request.getError().ifPresent(e -> builder.setError(e.name()));
... ...
... ... @@ -20,11 +20,13 @@ import org.thingsboard.server.common.msg.cluster.ServerAddress;
20 20 import org.thingsboard.server.common.msg.cluster.ToAllNodesMsg;
21 21 import org.thingsboard.server.common.msg.core.ToDeviceSessionActorMsg;
22 22 import org.thingsboard.server.common.msg.device.ToDeviceActorMsg;
  23 +import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
23 24 import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
24 25 import org.thingsboard.server.extensions.api.plugins.msg.ToDeviceRpcRequestPluginMsg;
25 26 import org.thingsboard.server.extensions.api.plugins.msg.ToPluginRpcResponseDeviceMsg;
26 27 import org.thingsboard.server.extensions.api.plugins.rpc.PluginRpcMsg;
27 28 import org.thingsboard.server.gen.cluster.ClusterAPIProtos;
  29 +import org.thingsboard.server.service.rpc.ToDeviceRpcRequestMsg;
28 30
29 31 import java.util.UUID;
30 32
... ... @@ -41,7 +43,7 @@ public interface ClusterRpcService {
41 43
42 44 void tell(ServerAddress serverAddress, ToDeviceActorNotificationMsg toForward);
43 45
44   - void tell(ServerAddress serverAddress, ToDeviceRpcRequestPluginMsg toForward);
  46 + void tell(ServerAddress serverAddress, ToDeviceRpcRequestMsg toForward);
45 47
46 48 void tell(ServerAddress serverAddress, ToPluginRpcResponseDeviceMsg toForward);
47 49
... ... @@ -50,4 +52,5 @@ public interface ClusterRpcService {
50 52 void broadcast(ToAllNodesMsg msg);
51 53
52 54 void onSessionCreated(UUID msgUid, StreamObserver<ClusterAPIProtos.ToRpcServerMessage> inputStream);
  55 +
53 56 }
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.rpc;
  17 +
  18 +import com.fasterxml.jackson.databind.ObjectMapper;
  19 +import lombok.extern.slf4j.Slf4j;
  20 +import org.springframework.beans.factory.annotation.Autowired;
  21 +import org.springframework.http.HttpEntity;
  22 +import org.springframework.http.HttpStatus;
  23 +import org.springframework.http.ResponseEntity;
  24 +import org.springframework.stereotype.Service;
  25 +import org.springframework.util.StringUtils;
  26 +import org.springframework.web.context.request.async.DeferredResult;
  27 +import org.thingsboard.server.actors.service.ActorService;
  28 +import org.thingsboard.server.common.data.audit.ActionType;
  29 +import org.thingsboard.server.common.data.id.DeviceId;
  30 +import org.thingsboard.server.common.data.id.EntityId;
  31 +import org.thingsboard.server.common.data.id.UUIDBased;
  32 +import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
  33 +import org.thingsboard.server.common.msg.cluster.ServerAddress;
  34 +import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
  35 +import org.thingsboard.server.controller.BaseController;
  36 +import org.thingsboard.server.dao.audit.AuditLogService;
  37 +import org.thingsboard.server.extensions.api.plugins.PluginContext;
  38 +import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
  39 +import org.thingsboard.server.extensions.api.plugins.msg.RpcError;
  40 +import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
  41 +import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
  42 +import org.thingsboard.server.service.security.model.SecurityUser;
  43 +
  44 +import javax.annotation.PostConstruct;
  45 +import javax.annotation.PreDestroy;
  46 +import java.io.IOException;
  47 +import java.util.Optional;
  48 +import java.util.UUID;
  49 +import java.util.concurrent.ConcurrentHashMap;
  50 +import java.util.concurrent.ConcurrentMap;
  51 +import java.util.concurrent.Executors;
  52 +import java.util.concurrent.ScheduledExecutorService;
  53 +import java.util.concurrent.TimeUnit;
  54 +import java.util.function.BiConsumer;
  55 +
  56 +/**
  57 + * Created by ashvayka on 27.03.18.
  58 + */
  59 +@Service
  60 +@Slf4j
  61 +public class DefaultRpcService implements RpcService {
  62 +
  63 + private static final ObjectMapper jsonMapper = new ObjectMapper();
  64 +
  65 + @Autowired
  66 + private ClusterRoutingService routingService;
  67 +
  68 + @Autowired
  69 + private ClusterRpcService rpcService;
  70 +
  71 + @Autowired
  72 + private ActorService actorService;
  73 +
  74 + @Autowired
  75 + private AuditLogService auditLogService;
  76 +
  77 + private ScheduledExecutorService rpcCallBackExecutor;
  78 +
  79 + private final ConcurrentMap<UUID, LocalRequestMetaData> localRpcRequests = new ConcurrentHashMap<>();
  80 +
  81 +
  82 + @PostConstruct
  83 + public void initExecutor() {
  84 + rpcCallBackExecutor = Executors.newSingleThreadScheduledExecutor();
  85 + }
  86 +
  87 + @PreDestroy
  88 + public void shutdownExecutor() {
  89 + if (rpcCallBackExecutor != null) {
  90 + rpcCallBackExecutor.shutdownNow();
  91 + }
  92 + }
  93 +
  94 + @Override
  95 + public void process(ToDeviceRpcRequest request, LocalRequestMetaData metaData) {
  96 + log.trace("[{}] Processing local rpc call for device [{}]", request.getTenantId(), request.getDeviceId());
  97 + sendRpcRequest(request);
  98 + UUID requestId = request.getId();
  99 + localRpcRequests.put(requestId, metaData);
  100 + long timeout = Math.max(0, System.currentTimeMillis() - request.getExpirationTime());
  101 + rpcCallBackExecutor.schedule(() -> {
  102 + LocalRequestMetaData localMetaData = localRpcRequests.remove(requestId);
  103 + if (localMetaData != null) {
  104 + reply(localMetaData, new FromDeviceRpcResponse(requestId, null, RpcError.TIMEOUT));
  105 + }
  106 + }, timeout, TimeUnit.MILLISECONDS);
  107 + }
  108 +
  109 + @Override
  110 + public void process(FromDeviceRpcResponse response) {
  111 + UUID requestId = response.getId();
  112 + LocalRequestMetaData md = localRpcRequests.remove(requestId);
  113 + if (md != null) {
  114 + log.trace("[{}] Processing local rpc response from device [{}]", requestId, md.getRequest().getDeviceId());
  115 + reply(md, response);
  116 + } else {
  117 + log.trace("[{}] Unknown or stale rpc response received [{}]", requestId, response);
  118 + }
  119 + }
  120 +
  121 + public void reply(LocalRequestMetaData rpcRequest, FromDeviceRpcResponse response) {
  122 + Optional<RpcError> rpcError = response.getError();
  123 + DeferredResult<ResponseEntity> responseWriter = rpcRequest.getResponseWriter();
  124 + if (rpcError.isPresent()) {
  125 + logRpcCall(rpcRequest, rpcError, null);
  126 + RpcError error = rpcError.get();
  127 + switch (error) {
  128 + case TIMEOUT:
  129 + responseWriter.setResult(new ResponseEntity<>(HttpStatus.REQUEST_TIMEOUT));
  130 + break;
  131 + case NO_ACTIVE_CONNECTION:
  132 + responseWriter.setResult(new ResponseEntity<>(HttpStatus.CONFLICT));
  133 + break;
  134 + default:
  135 + responseWriter.setResult(new ResponseEntity<>(HttpStatus.REQUEST_TIMEOUT));
  136 + break;
  137 + }
  138 + } else {
  139 + Optional<String> responseData = response.getResponse();
  140 + if (responseData.isPresent() && !StringUtils.isEmpty(responseData.get())) {
  141 + String data = responseData.get();
  142 + try {
  143 + logRpcCall(rpcRequest, rpcError, null);
  144 + responseWriter.setResult(new ResponseEntity<>(jsonMapper.readTree(data), HttpStatus.OK));
  145 + } catch (IOException e) {
  146 + log.debug("Failed to decode device response: {}", data, e);
  147 + logRpcCall(rpcRequest, rpcError, e);
  148 + responseWriter.setResult(new ResponseEntity<>(HttpStatus.NOT_ACCEPTABLE));
  149 + }
  150 + } else {
  151 + logRpcCall(rpcRequest, rpcError, null);
  152 + responseWriter.setResult(new ResponseEntity<>(HttpStatus.OK));
  153 + }
  154 + }
  155 + }
  156 +
  157 + private void sendRpcRequest(ToDeviceRpcRequest msg) {
  158 + log.trace("[{}] Forwarding msg {} to device actor!", msg.getDeviceId(), msg);
  159 + ToDeviceRpcRequestMsg rpcMsg = new ToDeviceRpcRequestMsg(msg);
  160 + forward(msg.getDeviceId(), rpcMsg, rpcService::tell);
  161 + }
  162 +
  163 + private void forward(DeviceId deviceId, ToDeviceRpcRequestMsg msg, BiConsumer<ServerAddress, ToDeviceRpcRequestMsg> rpcFunction) {
  164 + Optional<ServerAddress> instance = routingService.resolveById(deviceId);
  165 + if (instance.isPresent()) {
  166 + log.trace("[{}] Forwarding msg {} to remote device actor!", msg.getTenantId(), msg);
  167 + rpcFunction.accept(instance.get(), msg);
  168 + } else {
  169 + log.trace("[{}] Forwarding msg {} to local device actor!", msg.getTenantId(), msg);
  170 + actorService.onMsg(msg);
  171 + }
  172 + }
  173 +
  174 + private void logRpcCall(LocalRequestMetaData rpcRequest, Optional<RpcError> rpcError, Throwable e) {
  175 + logRpcCall(rpcRequest.getUser(), rpcRequest.getRequest().getDeviceId(), rpcRequest.getRequest().getBody(), rpcRequest.getRequest().isOneway(), rpcError, null);
  176 + }
  177 +
  178 + @Override
  179 + public void logRpcCall(SecurityUser user, EntityId entityId, ToDeviceRpcRequestBody body, boolean oneWay, Optional<RpcError> rpcError, Throwable e) {
  180 + String rpcErrorStr = "";
  181 + if (rpcError.isPresent()) {
  182 + rpcErrorStr = "RPC Error: " + rpcError.get().name();
  183 + }
  184 + String method = body.getMethod();
  185 + String params = body.getParams();
  186 +
  187 + auditLogService.logEntityAction(
  188 + user.getTenantId(),
  189 + user.getCustomerId(),
  190 + user.getId(),
  191 + user.getName(),
  192 + (UUIDBased & EntityId) entityId,
  193 + null,
  194 + ActionType.RPC_CALL,
  195 + BaseController.toException(e),
  196 + rpcErrorStr,
  197 + oneWay,
  198 + method,
  199 + params);
  200 + }
  201 +}
... ...
application/src/main/java/org/thingsboard/server/service/rpc/LocalRequestMetaData.java renamed from extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/rpc/LocalRequestMetaData.java
... ... @@ -13,18 +13,20 @@
13 13 * See the License for the specific language governing permissions and
14 14 * limitations under the License.
15 15 */
16   -package org.thingsboard.server.extensions.core.plugin.rpc;
  16 +package org.thingsboard.server.service.rpc;
17 17
18 18 import lombok.Data;
19 19 import org.springframework.http.ResponseEntity;
20 20 import org.springframework.web.context.request.async.DeferredResult;
21   -import org.thingsboard.server.extensions.api.plugins.msg.ToDeviceRpcRequest;
  21 +import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
  22 +import org.thingsboard.server.service.security.model.SecurityUser;
22 23
23 24 /**
24   - * @author Andrew Shvayka
  25 + * Created by ashvayka on 16.04.18.
25 26 */
26 27 @Data
27 28 public class LocalRequestMetaData {
28 29 private final ToDeviceRpcRequest request;
  30 + private final SecurityUser user;
29 31 private final DeferredResult<ResponseEntity> responseWriter;
30 32 }
... ...
application/src/main/java/org/thingsboard/server/service/rpc/RpcService.java renamed from extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/rpc/RpcPluginConfiguration.java
... ... @@ -13,14 +13,25 @@
13 13 * See the License for the specific language governing permissions and
14 14 * limitations under the License.
15 15 */
16   -package org.thingsboard.server.extensions.core.plugin.rpc;
  16 +package org.thingsboard.server.service.rpc;
17 17
18   -import lombok.Data;
  18 +import org.thingsboard.server.common.data.id.EntityId;
  19 +import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
  20 +import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
  21 +import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
  22 +import org.thingsboard.server.extensions.api.plugins.msg.RpcError;
  23 +import org.thingsboard.server.service.security.model.SecurityUser;
  24 +
  25 +import java.util.Optional;
19 26
20 27 /**
21   - * @author Andrew Shvayka
  28 + * Created by ashvayka on 16.04.18.
22 29 */
23   -@Data
24   -public class RpcPluginConfiguration {
25   - private long defaultTimeout;
  30 +public interface RpcService {
  31 +
  32 + void process(ToDeviceRpcRequest request, LocalRequestMetaData metaData);
  33 +
  34 + void process(FromDeviceRpcResponse response);
  35 +
  36 + void logRpcCall(SecurityUser user, EntityId entityId, ToDeviceRpcRequestBody body, boolean oneWay, Optional<RpcError> rpcError, Throwable e);
26 37 }
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.rpc;
  17 +
  18 +import lombok.Getter;
  19 +import lombok.RequiredArgsConstructor;
  20 +import lombok.ToString;
  21 +import org.thingsboard.server.common.data.id.DeviceId;
  22 +import org.thingsboard.server.common.data.id.TenantId;
  23 +import org.thingsboard.server.common.msg.cluster.ServerAddress;
  24 +import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
  25 +import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
  26 +
  27 +import java.util.Optional;
  28 +
  29 +/**
  30 + * Created by ashvayka on 16.04.18.
  31 + */
  32 +@ToString
  33 +@RequiredArgsConstructor
  34 +public class ToDeviceRpcRequestMsg implements ToDeviceActorNotificationMsg {
  35 +
  36 + private final ServerAddress serverAddress;
  37 + @Getter
  38 + private final ToDeviceRpcRequest msg;
  39 +
  40 + public ToDeviceRpcRequestMsg(ToDeviceRpcRequest msg) {
  41 + this(null, msg);
  42 + }
  43 +
  44 + public Optional<ServerAddress> getServerAddress() {
  45 + return Optional.ofNullable(serverAddress);
  46 + }
  47 +
  48 + @Override
  49 + public DeviceId getDeviceId() {
  50 + return msg.getDeviceId();
  51 + }
  52 +
  53 + @Override
  54 + public TenantId getTenantId() {
  55 + return msg.getTenantId();
  56 + }
  57 +}
... ...
... ... @@ -25,6 +25,7 @@ import org.springframework.http.ResponseEntity;
25 25 import org.springframework.stereotype.Component;
26 26 import org.springframework.web.context.request.async.DeferredResult;
27 27 import org.thingsboard.server.actors.plugin.ValidationResult;
  28 +import org.thingsboard.server.common.data.BaseData;
28 29 import org.thingsboard.server.common.data.Customer;
29 30 import org.thingsboard.server.common.data.Device;
30 31 import org.thingsboard.server.common.data.Tenant;
... ... @@ -35,8 +36,10 @@ import org.thingsboard.server.common.data.id.DeviceId;
35 36 import org.thingsboard.server.common.data.id.EntityId;
36 37 import org.thingsboard.server.common.data.id.EntityIdFactory;
37 38 import org.thingsboard.server.common.data.id.RuleChainId;
  39 +import org.thingsboard.server.common.data.id.RuleNodeId;
38 40 import org.thingsboard.server.common.data.id.TenantId;
39 41 import org.thingsboard.server.common.data.rule.RuleChain;
  42 +import org.thingsboard.server.common.data.rule.RuleNode;
40 43 import org.thingsboard.server.controller.HttpValidationCallback;
41 44 import org.thingsboard.server.dao.alarm.AlarmService;
42 45 import org.thingsboard.server.dao.asset.AssetService;
... ... @@ -140,7 +143,7 @@ public class AccessValidator {
140 143 return response;
141 144 }
142 145
143   - public <T> void validate(SecurityUser currentUser, EntityId entityId, FutureCallback<ValidationResult> callback) {
  146 + public void validate(SecurityUser currentUser, EntityId entityId, FutureCallback<ValidationResult> callback) {
144 147 switch (entityId.getEntityType()) {
145 148 case DEVICE:
146 149 validateDevice(currentUser, entityId, callback);
... ... @@ -177,14 +180,14 @@ public class AccessValidator {
177 180 } else if (currentUser.isCustomerUser() && !device.getCustomerId().equals(currentUser.getCustomerId())) {
178 181 return ValidationResult.accessDenied("Device doesn't belong to the current Customer!");
179 182 } else {
180   - return ValidationResult.ok();
  183 + return ValidationResult.ok(device);
181 184 }
182 185 }
183 186 }), executor);
184 187 }
185 188 }
186 189
187   - private <T> void validateAsset(final SecurityUser currentUser, EntityId entityId, FutureCallback<ValidationResult> callback) {
  190 + private void validateAsset(final SecurityUser currentUser, EntityId entityId, FutureCallback<ValidationResult> callback) {
188 191 if (currentUser.isSystemAdmin()) {
189 192 callback.onSuccess(ValidationResult.accessDenied(SYSTEM_ADMINISTRATOR_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
190 193 } else {
... ... @@ -198,15 +201,14 @@ public class AccessValidator {
198 201 } else if (currentUser.isCustomerUser() && !asset.getCustomerId().equals(currentUser.getCustomerId())) {
199 202 return ValidationResult.accessDenied("Asset doesn't belong to the current Customer!");
200 203 } else {
201   - return ValidationResult.ok();
  204 + return ValidationResult.ok(asset);
202 205 }
203 206 }
204 207 }), executor);
205 208 }
206 209 }
207 210
208   -
209   - private <T> void validateRuleChain(final SecurityUser currentUser, EntityId entityId, FutureCallback<ValidationResult> callback) {
  211 + private void validateRuleChain(final SecurityUser currentUser, EntityId entityId, FutureCallback<ValidationResult> callback) {
210 212 if (currentUser.isCustomerUser()) {
211 213 callback.onSuccess(ValidationResult.accessDenied(CUSTOMER_USER_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
212 214 } else {
... ... @@ -220,14 +222,40 @@ public class AccessValidator {
220 222 } else if (currentUser.isSystemAdmin() && !ruleChain.getTenantId().isNullUid()) {
221 223 return ValidationResult.accessDenied("Rule chain is not in system scope!");
222 224 } else {
223   - return ValidationResult.ok();
  225 + return ValidationResult.ok(ruleChain);
  226 + }
  227 + }
  228 + }), executor);
  229 + }
  230 + }
  231 +
  232 + private void validateRule(final SecurityUser currentUser, EntityId entityId, FutureCallback<ValidationResult> callback) {
  233 + if (currentUser.isCustomerUser()) {
  234 + callback.onSuccess(ValidationResult.accessDenied(CUSTOMER_USER_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
  235 + } else {
  236 + ListenableFuture<RuleNode> ruleNodeFuture = ruleChainService.findRuleNodeByIdAsync(new RuleNodeId(entityId.getId()));
  237 + Futures.addCallback(ruleNodeFuture, getCallback(callback, ruleNodeTmp -> {
  238 + RuleNode ruleNode = ruleNodeTmp;
  239 + if (ruleNode == null) {
  240 + return ValidationResult.entityNotFound("Rule node with requested id wasn't found!");
  241 + } else if (ruleNode.getRuleChainId() == null) {
  242 + return ValidationResult.entityNotFound("Rule chain with requested node id wasn't found!");
  243 + } else {
  244 + //TODO: make async
  245 + RuleChain ruleChain = ruleChainService.findRuleChainById(ruleNode.getRuleChainId());
  246 + if (currentUser.isTenantAdmin() && !ruleChain.getTenantId().equals(currentUser.getTenantId())) {
  247 + return ValidationResult.accessDenied("Rule chain doesn't belong to the current Tenant!");
  248 + } else if (currentUser.isSystemAdmin() && !ruleChain.getTenantId().isNullUid()) {
  249 + return ValidationResult.accessDenied("Rule chain is not in system scope!");
  250 + } else {
  251 + return ValidationResult.ok(ruleNode);
224 252 }
225 253 }
226 254 }), executor);
227 255 }
228 256 }
229 257
230   - private <T> void validateCustomer(final SecurityUser currentUser, EntityId entityId, FutureCallback<ValidationResult> callback) {
  258 + private void validateCustomer(final SecurityUser currentUser, EntityId entityId, FutureCallback<ValidationResult> callback) {
231 259 if (currentUser.isSystemAdmin()) {
232 260 callback.onSuccess(ValidationResult.accessDenied(SYSTEM_ADMINISTRATOR_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
233 261 } else {
... ... @@ -241,18 +269,18 @@ public class AccessValidator {
241 269 } else if (currentUser.isCustomerUser() && !customer.getId().equals(currentUser.getCustomerId())) {
242 270 return ValidationResult.accessDenied("Customer doesn't relate to the currently authorized customer user!");
243 271 } else {
244   - return ValidationResult.ok();
  272 + return ValidationResult.ok(customer);
245 273 }
246 274 }
247 275 }), executor);
248 276 }
249 277 }
250 278
251   - private <T> void validateTenant(final SecurityUser currentUser, EntityId entityId, FutureCallback<ValidationResult> callback) {
  279 + private void validateTenant(final SecurityUser currentUser, EntityId entityId, FutureCallback<ValidationResult> callback) {
252 280 if (currentUser.isCustomerUser()) {
253 281 callback.onSuccess(ValidationResult.accessDenied(CUSTOMER_USER_IS_NOT_ALLOWED_TO_PERFORM_THIS_OPERATION));
254 282 } else if (currentUser.isSystemAdmin()) {
255   - callback.onSuccess(ValidationResult.ok());
  283 + callback.onSuccess(ValidationResult.ok(null));
256 284 } else {
257 285 ListenableFuture<Tenant> tenantFuture = tenantService.findTenantByIdAsync(new TenantId(entityId.getId()));
258 286 Futures.addCallback(tenantFuture, getCallback(callback, tenant -> {
... ... @@ -261,13 +289,13 @@ public class AccessValidator {
261 289 } else if (!tenant.getId().equals(currentUser.getTenantId())) {
262 290 return ValidationResult.accessDenied("Tenant doesn't relate to the currently authorized user!");
263 291 } else {
264   - return ValidationResult.ok();
  292 + return ValidationResult.ok(tenant);
265 293 }
266 294 }), executor);
267 295 }
268 296 }
269 297
270   - private <T> FutureCallback<T> getCallback(FutureCallback<ValidationResult> callback, Function<T, ValidationResult> transformer) {
  298 + private <T, V> FutureCallback<T> getCallback(FutureCallback<ValidationResult> callback, Function<T, ValidationResult<V>> transformer) {
271 299 return new FutureCallback<T>() {
272 300 @Override
273 301 public void onSuccess(@Nullable T result) {
... ...
... ... @@ -61,7 +61,6 @@ message ConnectRpcMessage {
61 61 }
62 62
63 63 message ToDeviceRpcRequestRpcMessage {
64   - PluginAddress address = 1;
65 64 Uid deviceTenantId = 2;
66 65 Uid deviceId = 3;
67 66
... ... @@ -73,8 +72,6 @@ message ToDeviceRpcRequestRpcMessage {
73 72 }
74 73
75 74 message ToPluginRpcResponseRpcMessage {
76   - PluginAddress address = 1;
77   -
78 75 Uid msgId = 2;
79 76 string response = 3;
80 77 string error = 4;
... ...
common/data/src/main/java/org/thingsboard/server/common/data/rpc/RpcRequest.java renamed from extensions-core/src/main/java/org/thingsboard/server/extensions/core/plugin/rpc/cmd/RpcRequest.java
... ... @@ -13,7 +13,7 @@
13 13 * See the License for the specific language governing permissions and
14 14 * limitations under the License.
15 15 */
16   -package org.thingsboard.server.extensions.core.plugin.rpc.cmd;
  16 +package org.thingsboard.server.common.data.rpc;
17 17
18 18 import lombok.Data;
19 19
... ...
common/data/src/main/java/org/thingsboard/server/common/data/rpc/ToDeviceRpcRequestBody.java renamed from extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/msg/ToDeviceRpcRequestBody.java
... ... @@ -13,7 +13,7 @@
13 13 * See the License for the specific language governing permissions and
14 14 * limitations under the License.
15 15 */
16   -package org.thingsboard.server.extensions.api.plugins.msg;
  16 +package org.thingsboard.server.common.data.rpc;
17 17
18 18 import lombok.Data;
19 19
... ...
... ... @@ -22,6 +22,7 @@ import lombok.EqualsAndHashCode;
22 22 import lombok.extern.slf4j.Slf4j;
23 23 import org.thingsboard.server.common.data.HasName;
24 24 import org.thingsboard.server.common.data.SearchTextBasedWithAdditionalInfo;
  25 +import org.thingsboard.server.common.data.id.RuleChainId;
25 26 import org.thingsboard.server.common.data.id.RuleNodeId;
26 27 import org.thingsboard.server.common.data.id.TenantId;
27 28
... ... @@ -32,6 +33,7 @@ public class RuleNode extends SearchTextBasedWithAdditionalInfo<RuleNodeId> impl
32 33
33 34 private static final long serialVersionUID = -5656679015121235465L;
34 35
  36 + private RuleChainId ruleChainId;
35 37 private String type;
36 38 private String name;
37 39 private boolean debugMode;
... ... @@ -49,8 +51,10 @@ public class RuleNode extends SearchTextBasedWithAdditionalInfo<RuleNodeId> impl
49 51
50 52 public RuleNode(RuleNode ruleNode) {
51 53 super(ruleNode);
  54 + this.ruleChainId = ruleNode.getRuleChainId();
52 55 this.type = ruleNode.getType();
53 56 this.name = ruleNode.getName();
  57 + this.debugMode = ruleNode.isDebugMode();
54 58 this.setConfiguration(ruleNode.getConfiguration());
55 59 }
56 60
... ...
common/message/src/main/java/org/thingsboard/server/common/msg/rpc/ToDeviceRpcRequest.java renamed from extensions-api/src/main/java/org/thingsboard/server/extensions/api/plugins/msg/ToDeviceRpcRequest.java
... ... @@ -13,12 +13,12 @@
13 13 * See the License for the specific language governing permissions and
14 14 * limitations under the License.
15 15 */
16   -package org.thingsboard.server.extensions.api.plugins.msg;
  16 +package org.thingsboard.server.common.msg.rpc;
17 17
18 18 import lombok.Data;
19 19 import org.thingsboard.server.common.data.id.DeviceId;
20 20 import org.thingsboard.server.common.data.id.TenantId;
21   -import org.thingsboard.server.extensions.api.plugins.PluginApiCallSecurityContext;
  21 +import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
22 22
23 23 import java.io.Serializable;
24 24 import java.util.UUID;
... ... @@ -29,7 +29,6 @@ import java.util.UUID;
29 29 @Data
30 30 public class ToDeviceRpcRequest implements Serializable {
31 31 private final UUID id;
32   - private final PluginApiCallSecurityContext securityCtx;
33 32 private final TenantId tenantId;
34 33 private final DeviceId deviceId;
35 34 private final boolean oneway;
... ...
... ... @@ -350,6 +350,7 @@ public class ModelConstants {
350 350 * Cassandra rule node constants.
351 351 */
352 352 public static final String RULE_NODE_COLUMN_FAMILY_NAME = "rule_node";
  353 + public static final String RULE_NODE_CHAIN_ID_PROPERTY = "rule_chain_id";
353 354 public static final String RULE_NODE_TYPE_PROPERTY = "type";
354 355 public static final String RULE_NODE_NAME_PROPERTY = "name";
355 356 public static final String RULE_NODE_CONFIGURATION_PROPERTY = "configuration";
... ...
... ... @@ -24,6 +24,7 @@ import lombok.EqualsAndHashCode;
24 24 import lombok.Getter;
25 25 import lombok.Setter;
26 26 import lombok.ToString;
  27 +import org.thingsboard.server.common.data.id.RuleChainId;
27 28 import org.thingsboard.server.common.data.id.RuleNodeId;
28 29 import org.thingsboard.server.common.data.rule.RuleNode;
29 30 import org.thingsboard.server.dao.model.SearchTextEntity;
... ... @@ -41,6 +42,8 @@ public class RuleNodeEntity implements SearchTextEntity<RuleNode> {
41 42 @PartitionKey
42 43 @Column(name = ID_PROPERTY)
43 44 private UUID id;
  45 + @Column(name = RULE_NODE_CHAIN_ID_PROPERTY)
  46 + private UUID ruleChainId;
44 47 @Column(name = RULE_NODE_TYPE_PROPERTY)
45 48 private String type;
46 49 @Column(name = RULE_NODE_NAME_PROPERTY)
... ... @@ -56,7 +59,6 @@ public class RuleNodeEntity implements SearchTextEntity<RuleNode> {
56 59 @Column(name = DEBUG_MODE)
57 60 private boolean debugMode;
58 61
59   -
60 62 public RuleNodeEntity() {
61 63 }
62 64
... ... @@ -64,6 +66,9 @@ public class RuleNodeEntity implements SearchTextEntity<RuleNode> {
64 66 if (ruleNode.getId() != null) {
65 67 this.id = ruleNode.getUuidId();
66 68 }
  69 + if (ruleNode.getRuleChainId() != null) {
  70 + this.ruleChainId = ruleNode.getRuleChainId().getId();
  71 + }
67 72 this.type = ruleNode.getType();
68 73 this.name = ruleNode.getName();
69 74 this.debugMode = ruleNode.isDebugMode();
... ... @@ -92,6 +97,14 @@ public class RuleNodeEntity implements SearchTextEntity<RuleNode> {
92 97 this.id = id;
93 98 }
94 99
  100 + public UUID getRuleChainId() {
  101 + return ruleChainId;
  102 + }
  103 +
  104 + public void setRuleChainId(UUID ruleChainId) {
  105 + this.ruleChainId = ruleChainId;
  106 + }
  107 +
95 108 public String getType() {
96 109 return type;
97 110 }
... ... @@ -132,6 +145,9 @@ public class RuleNodeEntity implements SearchTextEntity<RuleNode> {
132 145 public RuleNode toData() {
133 146 RuleNode ruleNode = new RuleNode(new RuleNodeId(id));
134 147 ruleNode.setCreatedTime(UUIDs.unixTimestamp(id));
  148 + if (this.ruleChainId != null) {
  149 + ruleNode.setRuleChainId(new RuleChainId(this.ruleChainId));
  150 + }
135 151 ruleNode.setType(this.type);
136 152 ruleNode.setName(this.name);
137 153 ruleNode.setDebugMode(this.debugMode);
... ...
... ... @@ -21,8 +21,10 @@ import lombok.Data;
21 21 import lombok.EqualsAndHashCode;
22 22 import org.hibernate.annotations.Type;
23 23 import org.hibernate.annotations.TypeDef;
  24 +import org.thingsboard.server.common.data.id.RuleChainId;
24 25 import org.thingsboard.server.common.data.id.RuleNodeId;
25 26 import org.thingsboard.server.common.data.rule.RuleNode;
  27 +import org.thingsboard.server.dao.DaoUtil;
26 28 import org.thingsboard.server.dao.model.BaseSqlEntity;
27 29 import org.thingsboard.server.dao.model.ModelConstants;
28 30 import org.thingsboard.server.dao.model.SearchTextEntity;
... ... @@ -39,6 +41,9 @@ import javax.persistence.Table;
39 41 @Table(name = ModelConstants.RULE_NODE_COLUMN_FAMILY_NAME)
40 42 public class RuleNodeEntity extends BaseSqlEntity<RuleNode> implements SearchTextEntity<RuleNode> {
41 43
  44 + @Column(name = ModelConstants.RULE_NODE_CHAIN_ID_PROPERTY)
  45 + private String ruleChainId;
  46 +
42 47 @Column(name = ModelConstants.RULE_NODE_TYPE_PROPERTY)
43 48 private String type;
44 49
... ... @@ -66,6 +71,9 @@ public class RuleNodeEntity extends BaseSqlEntity<RuleNode> implements SearchTex
66 71 if (ruleNode.getId() != null) {
67 72 this.setId(ruleNode.getUuidId());
68 73 }
  74 + if (ruleNode.getRuleChainId() != null) {
  75 + this.ruleChainId = toString(DaoUtil.getId(ruleNode.getRuleChainId()));
  76 + }
69 77 this.type = ruleNode.getType();
70 78 this.name = ruleNode.getName();
71 79 this.debugMode = ruleNode.isDebugMode();
... ... @@ -88,6 +96,9 @@ public class RuleNodeEntity extends BaseSqlEntity<RuleNode> implements SearchTex
88 96 public RuleNode toData() {
89 97 RuleNode ruleNode = new RuleNode(new RuleNodeId(getId()));
90 98 ruleNode.setCreatedTime(UUIDs.unixTimestamp(getId()));
  99 + if (ruleChainId != null) {
  100 + ruleNode.setRuleChainId(new RuleChainId(toUUID(ruleChainId)));
  101 + }
91 102 ruleNode.setType(type);
92 103 ruleNode.setName(name);
93 104 ruleNode.setDebugMode(debugMode);
... ...
... ... @@ -13,7 +13,6 @@
13 13 * See the License for the specific language governing permissions and
14 14 * limitations under the License.
15 15 */
16   -
17 16 package org.thingsboard.server.dao.rule;
18 17
19 18 import com.google.common.util.concurrent.ListenableFuture;
... ... @@ -124,6 +123,7 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC
124 123 }
125 124 }
126 125 for (RuleNode node : toAddOrUpdate) {
  126 + node.setRuleChainId(ruleChain.getId());
127 127 RuleNode savedNode = ruleNodeDao.save(node);
128 128 try {
129 129 createRelation(new EntityRelation(ruleChainMetaData.getRuleChainId(), savedNode.getId(),
... ... @@ -137,7 +137,7 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC
137 137 nodes.set(index, savedNode);
138 138 ruleNodeIndexMap.put(savedNode.getId(), index);
139 139 }
140   - for (RuleNode node: toDelete) {
  140 + for (RuleNode node : toDelete) {
141 141 deleteRuleNode(node.getId());
142 142 }
143 143 RuleNodeId firstRuleNodeId = null;
... ... @@ -234,6 +234,12 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC
234 234 }
235 235
236 236 @Override
  237 + public ListenableFuture<RuleNode> findRuleNodeByIdAsync(RuleNodeId ruleNodeId) {
  238 + Validator.validateId(ruleNodeId, "Incorrect rule node id for search request.");
  239 + return ruleNodeDao.findByIdAsync(ruleNodeId.getId());
  240 + }
  241 +
  242 + @Override
237 243 public RuleChain getRootTenantRuleChain(TenantId tenantId) {
238 244 Validator.validateId(tenantId, "Incorrect tenant id for search request.");
239 245 List<EntityRelation> relations = relationService.findByFrom(tenantId, RelationTypeGroup.RULE_CHAIN);
... ...
... ... @@ -46,6 +46,8 @@ public interface RuleChainService {
46 46
47 47 ListenableFuture<RuleChain> findRuleChainByIdAsync(RuleChainId ruleChainId);
48 48
  49 + ListenableFuture<RuleNode> findRuleNodeByIdAsync(RuleNodeId ruleNodeId);
  50 +
49 51 RuleChain getRootTenantRuleChain(TenantId tenantId);
50 52
51 53 List<RuleNode> getRuleChainNodes(RuleChainId ruleChainId);
... ...
... ... @@ -684,6 +684,7 @@ CREATE MATERIALIZED VIEW IF NOT EXISTS thingsboard.rule_chain_by_tenant_and_sear
684 684
685 685 CREATE TABLE IF NOT EXISTS thingsboard.rule_node (
686 686 id uuid,
  687 + rule_chain_id uuid,
687 688 type text,
688 689 name text,
689 690 debug_mode boolean,
... ...
... ... @@ -270,6 +270,7 @@ CREATE TABLE IF NOT EXISTS rule_chain (
270 270
271 271 CREATE TABLE IF NOT EXISTS rule_node (
272 272 id varchar(31) NOT NULL CONSTRAINT rule_node_pkey PRIMARY KEY,
  273 + rule_chain_id varchar(31),
273 274 additional_info varchar,
274 275 configuration varchar(10000000),
275 276 type varchar(255),
... ...
... ... @@ -22,8 +22,9 @@ import org.thingsboard.server.common.data.kv.AttributeKvEntry;
22 22 import org.thingsboard.server.common.data.kv.TsKvEntry;
23 23 import org.thingsboard.server.common.data.kv.TsKvQuery;
24 24 import org.thingsboard.server.common.data.relation.EntityRelation;
25   -import org.thingsboard.server.common.data.relation.RelationTypeGroup;
  25 +import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
26 26 import org.thingsboard.server.common.msg.cluster.ServerAddress;
  27 +import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
27 28 import org.thingsboard.server.extensions.api.plugins.msg.*;
28 29 import org.thingsboard.server.extensions.api.plugins.rpc.RpcMsg;
29 30 import org.thingsboard.server.extensions.api.plugins.ws.PluginWebsocketSessionRef;
... ...
... ... @@ -22,6 +22,7 @@ import org.thingsboard.server.common.data.id.DeviceId;
22 22 import org.thingsboard.server.common.data.id.PluginId;
23 23 import org.thingsboard.server.common.data.id.TenantId;
24 24 import org.thingsboard.server.common.msg.cluster.ServerAddress;
  25 +import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
25 26 import org.thingsboard.server.extensions.api.device.ToDeviceActorNotificationMsg;
26 27
27 28 import java.util.Optional;
... ...
... ... @@ -26,15 +26,24 @@ import org.thingsboard.server.common.data.id.CustomerId;
26 26 import org.thingsboard.server.common.data.id.DeviceId;
27 27 import org.thingsboard.server.common.data.id.RuleId;
28 28 import org.thingsboard.server.common.data.id.TenantId;
  29 +import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
29 30 import org.thingsboard.server.common.msg.core.ToServerRpcRequestMsg;
30 31 import org.thingsboard.server.common.msg.core.ToServerRpcResponseMsg;
  32 +import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
31 33 import org.thingsboard.server.extensions.api.plugins.PluginCallback;
32 34 import org.thingsboard.server.extensions.api.plugins.PluginContext;
33 35 import org.thingsboard.server.extensions.api.plugins.handlers.RuleMsgHandler;
34   -import org.thingsboard.server.extensions.api.plugins.msg.*;
  36 +import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
  37 +import org.thingsboard.server.extensions.api.plugins.msg.RpcError;
  38 +import org.thingsboard.server.extensions.api.plugins.msg.RpcResponsePluginToRuleMsg;
  39 +import org.thingsboard.server.extensions.api.plugins.msg.RuleToPluginMsg;
35 40 import org.thingsboard.server.extensions.api.rules.RuleException;
36 41
37   -import java.util.*;
  42 +import java.util.HashMap;
  43 +import java.util.List;
  44 +import java.util.Map;
  45 +import java.util.Optional;
  46 +import java.util.UUID;
38 47
39 48 /**
40 49 * @author Andrew Shvayka
... ... @@ -152,7 +161,7 @@ public class DeviceMessagingRuleMsgHandler implements RuleMsgHandler {
152 161 pendingMsgs.put(uid, requestMd);
153 162 log.trace("[{}] Forwarding {} to [{}]", uid, params, targetDeviceId);
154 163 ToDeviceRpcRequestBody requestBody = new ToDeviceRpcRequestBody(ON_MSG_METHOD_NAME, GSON.toJson(params.get("body")));
155   - ctx.sendRpcRequest(new ToDeviceRpcRequest(uid, null, targetDevice.getTenantId(), targetDeviceId, oneWay, System.currentTimeMillis() + timeout, requestBody));
  164 + ctx.sendRpcRequest(new ToDeviceRpcRequest(uid, targetDevice.getTenantId(), targetDeviceId, oneWay, System.currentTimeMillis() + timeout, requestBody));
156 165 } else {
157 166 replyWithError(ctx, requestMd, RpcError.FORBIDDEN);
158 167 }
... ...
1   -/**
2   - * Copyright © 2016-2018 The Thingsboard Authors
3   - *
4   - * Licensed under the Apache License, Version 2.0 (the "License");
5   - * you may not use this file except in compliance with the License.
6   - * You may obtain a copy of the License at
7   - *
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - *
10   - * Unless required by applicable law or agreed to in writing, software
11   - * distributed under the License is distributed on an "AS IS" BASIS,
12   - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   - * See the License for the specific language governing permissions and
14   - * limitations under the License.
15   - */
16   -package org.thingsboard.server.extensions.core.plugin.rpc;
17   -
18   -import lombok.Setter;
19   -import lombok.extern.slf4j.Slf4j;
20   -import org.thingsboard.server.extensions.api.plugins.PluginContext;
21   -import org.thingsboard.server.extensions.api.plugins.msg.*;
22   -import org.thingsboard.server.extensions.core.plugin.rpc.handlers.RpcRestMsgHandler;
23   -
24   -import java.util.HashMap;
25   -import java.util.Map;
26   -import java.util.UUID;
27   -
28   -/**
29   - * @author Andrew Shvayka
30   - */
31   -@Slf4j
32   -public class RpcManager {
33   -
34   - @Setter
35   - private RpcRestMsgHandler restHandler;
36   -
37   - private Map<UUID, LocalRequestMetaData> localRpcRequests = new HashMap<>();
38   -
39   - public void process(PluginContext ctx, LocalRequestMetaData requestMd) {
40   - ToDeviceRpcRequest request = requestMd.getRequest();
41   - log.trace("[{}] Processing local rpc call for device [{}]", request.getId(), request.getDeviceId());
42   - ctx.sendRpcRequest(request);
43   - localRpcRequests.put(request.getId(), requestMd);
44   - ctx.scheduleTimeoutMsg(new TimeoutUUIDMsg(request.getId(), request.getExpirationTime() - System.currentTimeMillis()));
45   - }
46   -
47   - public void process(PluginContext ctx, FromDeviceRpcResponse response) {
48   - UUID requestId = response.getId();
49   - LocalRequestMetaData md = localRpcRequests.remove(requestId);
50   - if (md != null) {
51   - log.trace("[{}] Processing local rpc response from device [{}]", requestId, md.getRequest().getDeviceId());
52   - restHandler.reply(ctx, md.getRequest(), md.getResponseWriter(), response);
53   - } else {
54   - log.trace("[{}] Unknown or stale rpc response received [{}]", requestId, response);
55   - }
56   - }
57   -
58   - public void process(PluginContext ctx, TimeoutMsg msg) {
59   - if (msg instanceof TimeoutUUIDMsg) {
60   - UUID requestId = ((TimeoutUUIDMsg) msg).getId();
61   - FromDeviceRpcResponse timeoutReponse = new FromDeviceRpcResponse(requestId, null, RpcError.TIMEOUT);
62   - LocalRequestMetaData md = localRpcRequests.remove(requestId);
63   - if (md != null) {
64   - log.trace("[{}] Processing rpc timeout for local device [{}]", requestId, md.getRequest().getDeviceId());
65   - restHandler.reply(ctx, md.getRequest(), md.getResponseWriter(), timeoutReponse);
66   - }
67   - }
68   - }
69   -}
1   -/**
2   - * Copyright © 2016-2018 The Thingsboard Authors
3   - *
4   - * Licensed under the Apache License, Version 2.0 (the "License");
5   - * you may not use this file except in compliance with the License.
6   - * You may obtain a copy of the License at
7   - *
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - *
10   - * Unless required by applicable law or agreed to in writing, software
11   - * distributed under the License is distributed on an "AS IS" BASIS,
12   - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   - * See the License for the specific language governing permissions and
14   - * limitations under the License.
15   - */
16   -package org.thingsboard.server.extensions.core.plugin.rpc;
17   -
18   -import lombok.extern.slf4j.Slf4j;
19   -import org.thingsboard.server.extensions.api.component.Plugin;
20   -import org.thingsboard.server.extensions.api.plugins.AbstractPlugin;
21   -import org.thingsboard.server.extensions.api.plugins.PluginContext;
22   -import org.thingsboard.server.extensions.api.plugins.handlers.DefaultRuleMsgHandler;
23   -import org.thingsboard.server.extensions.api.plugins.handlers.RestMsgHandler;
24   -import org.thingsboard.server.extensions.api.plugins.handlers.RuleMsgHandler;
25   -import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
26   -import org.thingsboard.server.extensions.api.plugins.msg.TimeoutMsg;
27   -import org.thingsboard.server.extensions.core.action.rpc.ServerSideRpcCallAction;
28   -import org.thingsboard.server.extensions.core.plugin.rpc.handlers.RpcRestMsgHandler;
29   -import org.thingsboard.server.extensions.core.plugin.rpc.handlers.RpcRuleMsgHandler;
30   -
31   -/**
32   - * @author Andrew Shvayka
33   - */
34   -@Plugin(name = "RPC Plugin", actions = {ServerSideRpcCallAction.class}, descriptor = "RpcPluginDescriptor.json", configuration = RpcPluginConfiguration.class)
35   -@Slf4j
36   -public class RpcPlugin extends AbstractPlugin<RpcPluginConfiguration> {
37   -
38   - private final RpcManager rpcManager;
39   - private final RpcRestMsgHandler restMsgHandler;
40   -
41   - public RpcPlugin() {
42   - this.rpcManager = new RpcManager();
43   - this.restMsgHandler = new RpcRestMsgHandler(rpcManager);
44   - this.rpcManager.setRestHandler(restMsgHandler);
45   - }
46   -
47   - @Override
48   - public void process(PluginContext ctx, FromDeviceRpcResponse msg) {
49   - rpcManager.process(ctx, msg);
50   - }
51   -
52   - @Override
53   - public void process(PluginContext ctx, TimeoutMsg<?> msg) {
54   - rpcManager.process(ctx, msg);
55   - }
56   -
57   - @Override
58   - protected RestMsgHandler getRestMsgHandler() {
59   - return restMsgHandler;
60   - }
61   -
62   - @Override
63   - public void init(RpcPluginConfiguration configuration) {
64   - restMsgHandler.setDefaultTimeout(configuration.getDefaultTimeout());
65   - }
66   -
67   - @Override
68   - protected RuleMsgHandler getRuleMsgHandler() {
69   - return new RpcRuleMsgHandler();
70   - }
71   -
72   - @Override
73   - public void resume(PluginContext ctx) {
74   - //Do nothing
75   - }
76   -
77   - @Override
78   - public void suspend(PluginContext ctx) {
79   - //Do nothing
80   - }
81   -
82   - @Override
83   - public void stop(PluginContext ctx) {
84   - //Do nothing
85   - }
86   -}
1   -/**
2   - * Copyright © 2016-2018 The Thingsboard Authors
3   - *
4   - * Licensed under the Apache License, Version 2.0 (the "License");
5   - * you may not use this file except in compliance with the License.
6   - * You may obtain a copy of the License at
7   - *
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - *
10   - * Unless required by applicable law or agreed to in writing, software
11   - * distributed under the License is distributed on an "AS IS" BASIS,
12   - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   - * See the License for the specific language governing permissions and
14   - * limitations under the License.
15   - */
16   -package org.thingsboard.server.extensions.core.plugin.rpc.handlers;
17   -
18   -import com.fasterxml.jackson.core.JsonProcessingException;
19   -import com.fasterxml.jackson.databind.JsonNode;
20   -import lombok.RequiredArgsConstructor;
21   -import lombok.Setter;
22   -import lombok.extern.slf4j.Slf4j;
23   -import org.springframework.http.HttpStatus;
24   -import org.springframework.http.ResponseEntity;
25   -import org.springframework.util.StringUtils;
26   -import org.springframework.web.context.request.async.DeferredResult;
27   -import org.thingsboard.server.common.data.DataConstants;
28   -import org.thingsboard.server.common.data.id.DeviceId;
29   -import org.thingsboard.server.common.data.id.TenantId;
30   -import org.thingsboard.server.extensions.api.exception.ToErrorResponseEntity;
31   -import org.thingsboard.server.extensions.api.plugins.PluginApiCallSecurityContext;
32   -import org.thingsboard.server.extensions.api.plugins.PluginCallback;
33   -import org.thingsboard.server.extensions.api.plugins.PluginContext;
34   -import org.thingsboard.server.extensions.api.plugins.handlers.DefaultRestMsgHandler;
35   -import org.thingsboard.server.extensions.api.plugins.msg.FromDeviceRpcResponse;
36   -import org.thingsboard.server.extensions.api.plugins.msg.RpcError;
37   -import org.thingsboard.server.extensions.api.plugins.msg.ToDeviceRpcRequest;
38   -import org.thingsboard.server.extensions.api.plugins.msg.ToDeviceRpcRequestBody;
39   -import org.thingsboard.server.extensions.api.plugins.rest.PluginRestMsg;
40   -import org.thingsboard.server.extensions.api.plugins.rest.RestRequest;
41   -import org.thingsboard.server.extensions.core.plugin.rpc.LocalRequestMetaData;
42   -import org.thingsboard.server.extensions.core.plugin.rpc.RpcManager;
43   -import org.thingsboard.server.extensions.core.plugin.rpc.cmd.RpcRequest;
44   -
45   -import javax.servlet.ServletException;
46   -import java.io.IOException;
47   -import java.util.Optional;
48   -import java.util.UUID;
49   -
50   -/**
51   - * @author Andrew Shvayka
52   - */
53   -@Slf4j
54   -@RequiredArgsConstructor
55   -public class RpcRestMsgHandler extends DefaultRestMsgHandler {
56   -
57   - private final RpcManager rpcManager;
58   - @Setter
59   - private long defaultTimeout;
60   -
61   - @Override
62   - public void handleHttpPostRequest(PluginContext ctx, PluginRestMsg msg) throws ServletException {
63   - boolean valid = false;
64   - RestRequest request = msg.getRequest();
65   - try {
66   - String[] pathParams = request.getPathParams();
67   - if (pathParams.length == 2) {
68   - String method = pathParams[0].toUpperCase();
69   - if (DataConstants.ONEWAY.equals(method) || DataConstants.TWOWAY.equals(method)) {
70   - final TenantId tenantId = ctx.getSecurityCtx().orElseThrow(() -> new IllegalStateException("Security context is empty!")).getTenantId();
71   - JsonNode rpcRequestBody = jsonMapper.readTree(request.getRequestBody());
72   -
73   - RpcRequest cmd = new RpcRequest(rpcRequestBody.get("method").asText(),
74   - jsonMapper.writeValueAsString(rpcRequestBody.get("params")));
75   - if (rpcRequestBody.has("timeout")) {
76   - cmd.setTimeout(rpcRequestBody.get("timeout").asLong());
77   - }
78   -
79   - boolean oneWay = DataConstants.ONEWAY.equals(method);
80   -
81   - DeviceId deviceId = DeviceId.fromString(pathParams[1]);
82   - valid = handleDeviceRPCRequest(ctx, msg, tenantId, deviceId, cmd, oneWay);
83   - }
84   - }
85   - } catch (IOException e) {
86   - log.debug("Failed to process POST request due to IO exception", e);
87   - } catch (RuntimeException e) {
88   - log.debug("Failed to process POST request due to Runtime exception", e);
89   - }
90   - if (!valid) {
91   - msg.getResponseHolder().setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
92   - }
93   - }
94   -
95   - private boolean handleDeviceRPCRequest(PluginContext ctx, final PluginRestMsg msg, TenantId tenantId, DeviceId deviceId, RpcRequest cmd, boolean oneWay) throws JsonProcessingException {
96   - long timeout = System.currentTimeMillis() + (cmd.getTimeout() != null ? cmd.getTimeout() : defaultTimeout);
97   - ToDeviceRpcRequestBody body = new ToDeviceRpcRequestBody(cmd.getMethodName(), cmd.getRequestData());
98   - ctx.checkAccess(deviceId, new PluginCallback<Void>() {
99   - @Override
100   - public void onSuccess(PluginContext ctx, Void value) {
101   - ToDeviceRpcRequest rpcRequest = new ToDeviceRpcRequest(UUID.randomUUID(),
102   - msg.getSecurityCtx(),
103   - tenantId,
104   - deviceId,
105   - oneWay,
106   - timeout,
107   - body
108   - );
109   - rpcManager.process(ctx, new LocalRequestMetaData(rpcRequest, msg.getResponseHolder()));
110   - }
111   -
112   - @Override
113   - public void onFailure(PluginContext ctx, Exception e) {
114   - ResponseEntity response;
115   - if (e instanceof ToErrorResponseEntity) {
116   - response = ((ToErrorResponseEntity)e).toErrorResponseEntity();
117   - } else {
118   - response = new ResponseEntity(HttpStatus.UNAUTHORIZED);
119   - }
120   - ctx.logRpcRequest(msg.getSecurityCtx(), deviceId, body, oneWay, Optional.empty(), e);
121   - msg.getResponseHolder().setResult(response);
122   - }
123   - });
124   - return true;
125   - }
126   -
127   - public void reply(PluginContext ctx, ToDeviceRpcRequest rpcRequest, DeferredResult<ResponseEntity> responseWriter, FromDeviceRpcResponse response) {
128   - Optional<RpcError> rpcError = response.getError();
129   - if (rpcError.isPresent()) {
130   - ctx.logRpcRequest(rpcRequest.getSecurityCtx(), rpcRequest.getDeviceId(), rpcRequest.getBody(), rpcRequest.isOneway(), rpcError, null);
131   - RpcError error = rpcError.get();
132   - switch (error) {
133   - case TIMEOUT:
134   - responseWriter.setResult(new ResponseEntity<>(HttpStatus.REQUEST_TIMEOUT));
135   - break;
136   - case NO_ACTIVE_CONNECTION:
137   - responseWriter.setResult(new ResponseEntity<>(HttpStatus.CONFLICT));
138   - break;
139   - default:
140   - responseWriter.setResult(new ResponseEntity<>(HttpStatus.REQUEST_TIMEOUT));
141   - break;
142   - }
143   - } else {
144   - Optional<String> responseData = response.getResponse();
145   - if (responseData.isPresent() && !StringUtils.isEmpty(responseData.get())) {
146   - String data = responseData.get();
147   - try {
148   - ctx.logRpcRequest(rpcRequest.getSecurityCtx(), rpcRequest.getDeviceId(), rpcRequest.getBody(), rpcRequest.isOneway(), rpcError, null);
149   - responseWriter.setResult(new ResponseEntity<>(jsonMapper.readTree(data), HttpStatus.OK));
150   - } catch (IOException e) {
151   - log.debug("Failed to decode device response: {}", data, e);
152   - ctx.logRpcRequest(rpcRequest.getSecurityCtx(), rpcRequest.getDeviceId(), rpcRequest.getBody(), rpcRequest.isOneway(), rpcError, e);
153   - responseWriter.setResult(new ResponseEntity<>(HttpStatus.NOT_ACCEPTABLE));
154   - }
155   - } else {
156   - ctx.logRpcRequest(rpcRequest.getSecurityCtx(), rpcRequest.getDeviceId(), rpcRequest.getBody(), rpcRequest.isOneway(), rpcError, null);
157   - responseWriter.setResult(new ResponseEntity<>(HttpStatus.OK));
158   - }
159   - }
160   - }
161   -}
\ No newline at end of file
1   -/**
2   - * Copyright © 2016-2018 The Thingsboard Authors
3   - *
4   - * Licensed under the Apache License, Version 2.0 (the "License");
5   - * you may not use this file except in compliance with the License.
6   - * You may obtain a copy of the License at
7   - *
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - *
10   - * Unless required by applicable law or agreed to in writing, software
11   - * distributed under the License is distributed on an "AS IS" BASIS,
12   - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   - * See the License for the specific language governing permissions and
14   - * limitations under the License.
15   - */
16   -package org.thingsboard.server.extensions.core.plugin.rpc.handlers;
17   -
18   -import lombok.extern.slf4j.Slf4j;
19   -import org.springframework.util.StringUtils;
20   -import org.thingsboard.server.common.data.id.DeviceId;
21   -import org.thingsboard.server.common.data.id.EntityId;
22   -import org.thingsboard.server.common.data.id.RuleId;
23   -import org.thingsboard.server.common.data.id.TenantId;
24   -import org.thingsboard.server.common.data.relation.EntityRelation;
25   -import org.thingsboard.server.extensions.api.plugins.PluginCallback;
26   -import org.thingsboard.server.extensions.api.plugins.PluginContext;
27   -import org.thingsboard.server.extensions.api.plugins.handlers.RuleMsgHandler;
28   -import org.thingsboard.server.extensions.api.plugins.msg.RuleToPluginMsg;
29   -import org.thingsboard.server.extensions.api.plugins.msg.ToDeviceRpcRequest;
30   -import org.thingsboard.server.extensions.api.plugins.msg.ToDeviceRpcRequestBody;
31   -import org.thingsboard.server.extensions.api.rules.RuleException;
32   -import org.thingsboard.server.extensions.core.action.rpc.ServerSideRpcCallActionMsg;
33   -import org.thingsboard.server.extensions.core.action.rpc.ServerSideRpcCallRuleToPluginActionMsg;
34   -
35   -import java.util.Collections;
36   -import java.util.List;
37   -import java.util.UUID;
38   -import java.util.concurrent.TimeUnit;
39   -import java.util.stream.Collectors;
40   -
41   -/**
42   - * Created by ashvayka on 14.09.17.
43   - */
44   -@Slf4j
45   -public class RpcRuleMsgHandler implements RuleMsgHandler {
46   -
47   - @Override
48   - public void process(PluginContext ctx, TenantId tenantId, RuleId ruleId, RuleToPluginMsg<?> msg) throws RuleException {
49   - if (msg instanceof ServerSideRpcCallRuleToPluginActionMsg) {
50   - handle(ctx, tenantId, ruleId, ((ServerSideRpcCallRuleToPluginActionMsg) msg).getPayload());
51   - } else {
52   - throw new RuntimeException("Not supported msg: " + msg + "!");
53   - }
54   - }
55   -
56   - private void handle(final PluginContext ctx, TenantId tenantId, RuleId ruleId, ServerSideRpcCallActionMsg msg) {
57   - DeviceId deviceId = new DeviceId(UUID.fromString(msg.getDeviceId()));
58   - ctx.checkAccess(deviceId, new PluginCallback<Void>() {
59   - @Override
60   - public void onSuccess(PluginContext dummy, Void value) {
61   - try {
62   - List<EntityId> deviceIds;
63   - if (StringUtils.isEmpty(msg.getFromDeviceRelation()) && StringUtils.isEmpty(msg.getToDeviceRelation())) {
64   - deviceIds = Collections.singletonList(deviceId);
65   - } else if (!StringUtils.isEmpty(msg.getFromDeviceRelation())) {
66   - List<EntityRelation> relations = ctx.findByFromAndType(deviceId, msg.getFromDeviceRelation()).get();
67   - deviceIds = relations.stream().map(EntityRelation::getTo).collect(Collectors.toList());
68   - } else {
69   - List<EntityRelation> relations = ctx.findByToAndType(deviceId, msg.getFromDeviceRelation()).get();
70   - deviceIds = relations.stream().map(EntityRelation::getFrom).collect(Collectors.toList());
71   - }
72   - ToDeviceRpcRequestBody body = new ToDeviceRpcRequestBody(msg.getRpcCallMethod(), msg.getRpcCallBody());
73   - long expirationTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(msg.getRpcCallTimeoutInSec());
74   - for (EntityId address : deviceIds) {
75   - DeviceId tmpId = new DeviceId(address.getId());
76   - ctx.checkAccess(tmpId, new PluginCallback<Void>() {
77   - @Override
78   - public void onSuccess(PluginContext ctx, Void value) {
79   - ctx.sendRpcRequest(new ToDeviceRpcRequest(UUID.randomUUID(),
80   - null, tenantId, tmpId, true, expirationTime, body)
81   - );
82   - log.trace("[{}] Sent RPC Call Action msg", tmpId);
83   - }
84   -
85   - @Override
86   - public void onFailure(PluginContext ctx, Exception e) {
87   - log.info("[{}] Failed to process RPC Call Action msg", tmpId, e);
88   - }
89   - });
90   - }
91   - } catch (Exception e) {
92   - log.info("Failed to process RPC Call Action msg", e);
93   - }
94   - }
95   -
96   - @Override
97   - public void onFailure(PluginContext dummy, Exception e) {
98   - log.info("[{}] Failed to process RPC Call Action msg", deviceId, e);
99   - }
100   - });
101   - }
102   -}