Showing
5 changed files
with
116 additions
and
104 deletions
... | ... | @@ -5,7 +5,7 @@ |
5 | 5 | * you may not use this file except in compliance with the License. |
6 | 6 | * You may obtain a copy of the License at |
7 | 7 | * |
8 | - * http://www.apache.org/licenses/LICENSE-2.0 | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | 9 | * |
10 | 10 | * Unless required by applicable law or agreed to in writing, software |
11 | 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
... | ... | @@ -79,7 +79,7 @@ public abstract class AbstractRpcController extends BaseController { |
79 | 79 | @Value("${server.rest.server_side_rpc.default_timeout:10000}") |
80 | 80 | protected long defaultTimeout; |
81 | 81 | |
82 | - protected DeferredResult<ResponseEntity> handleDeviceRPCRequest(boolean oneWay, DeviceId deviceId, String requestBody, HttpStatus timeoutStatus) throws ThingsboardException { | |
82 | + protected DeferredResult<ResponseEntity> handleDeviceRPCRequest(boolean oneWay, DeviceId deviceId, String requestBody, HttpStatus timeoutStatus, HttpStatus noActiveConnectionStatus) throws ThingsboardException { | |
83 | 83 | try { |
84 | 84 | JsonNode rpcRequestBody = JacksonUtil.toJsonNode(requestBody); |
85 | 85 | ToDeviceRpcRequestBody body = new ToDeviceRpcRequestBody(rpcRequestBody.get("method").asText(), JacksonUtil.toString(rpcRequestBody.get("params"))); |
... | ... | @@ -101,7 +101,7 @@ public abstract class AbstractRpcController extends BaseController { |
101 | 101 | body, |
102 | 102 | persisted |
103 | 103 | ); |
104 | - deviceRpcService.processRestApiRpcRequest(rpcRequest, fromDeviceRpcResponse -> reply(new LocalRequestMetaData(rpcRequest, currentUser, result), fromDeviceRpcResponse, timeoutStatus), currentUser); | |
104 | + deviceRpcService.processRestApiRpcRequest(rpcRequest, fromDeviceRpcResponse -> reply(new LocalRequestMetaData(rpcRequest, currentUser, result), fromDeviceRpcResponse, timeoutStatus, noActiveConnectionStatus), currentUser); | |
105 | 105 | } |
106 | 106 | |
107 | 107 | @Override |
... | ... | @@ -122,7 +122,7 @@ public abstract class AbstractRpcController extends BaseController { |
122 | 122 | } |
123 | 123 | } |
124 | 124 | |
125 | - public void reply(LocalRequestMetaData rpcRequest, FromDeviceRpcResponse response, HttpStatus timeoutStatus) { | |
125 | + public void reply(LocalRequestMetaData rpcRequest, FromDeviceRpcResponse response, HttpStatus timeoutStatus, HttpStatus noActiveConnectionStatus) { | |
126 | 126 | Optional<RpcError> rpcError = response.getError(); |
127 | 127 | DeferredResult<ResponseEntity> responseWriter = rpcRequest.getResponseWriter(); |
128 | 128 | if (rpcError.isPresent()) { |
... | ... | @@ -133,7 +133,7 @@ public abstract class AbstractRpcController extends BaseController { |
133 | 133 | responseWriter.setResult(new ResponseEntity<>(timeoutStatus)); |
134 | 134 | break; |
135 | 135 | case NO_ACTIVE_CONNECTION: |
136 | - responseWriter.setResult(new ResponseEntity<>(HttpStatus.CONFLICT)); | |
136 | + responseWriter.setResult(new ResponseEntity<>(noActiveConnectionStatus)); | |
137 | 137 | break; |
138 | 138 | default: |
139 | 139 | responseWriter.setResult(new ResponseEntity<>(timeoutStatus)); | ... | ... |
1 | +/** | |
2 | + * Copyright © 2016-2021 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
1 | 16 | package org.thingsboard.server.controller; |
2 | 17 | |
3 | 18 | import lombok.extern.slf4j.Slf4j; |
... | ... | @@ -27,14 +42,14 @@ public class RpcV1Controller extends AbstractRpcController { |
27 | 42 | @RequestMapping(value = "/oneway/{deviceId}", method = RequestMethod.POST) |
28 | 43 | @ResponseBody |
29 | 44 | public DeferredResult<ResponseEntity> handleOneWayDeviceRPCRequest(@PathVariable("deviceId") String deviceIdStr, @RequestBody String requestBody) throws ThingsboardException { |
30 | - return handleDeviceRPCRequest(true, new DeviceId(UUID.fromString(deviceIdStr)), requestBody, HttpStatus.REQUEST_TIMEOUT); | |
45 | + return handleDeviceRPCRequest(true, new DeviceId(UUID.fromString(deviceIdStr)), requestBody, HttpStatus.REQUEST_TIMEOUT, HttpStatus.CONFLICT); | |
31 | 46 | } |
32 | 47 | |
33 | 48 | @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')") |
34 | 49 | @RequestMapping(value = "/twoway/{deviceId}", method = RequestMethod.POST) |
35 | 50 | @ResponseBody |
36 | 51 | public DeferredResult<ResponseEntity> handleTwoWayDeviceRPCRequest(@PathVariable("deviceId") String deviceIdStr, @RequestBody String requestBody) throws ThingsboardException { |
37 | - return handleDeviceRPCRequest(false, new DeviceId(UUID.fromString(deviceIdStr)), requestBody, HttpStatus.REQUEST_TIMEOUT); | |
52 | + return handleDeviceRPCRequest(false, new DeviceId(UUID.fromString(deviceIdStr)), requestBody, HttpStatus.REQUEST_TIMEOUT, HttpStatus.CONFLICT); | |
38 | 53 | } |
39 | 54 | |
40 | 55 | } | ... | ... |
1 | +/** | |
2 | + * Copyright © 2016-2021 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
1 | 16 | package org.thingsboard.server.controller; |
2 | 17 | |
3 | 18 | import lombok.extern.slf4j.Slf4j; |
... | ... | @@ -35,14 +50,14 @@ public class RpcV2Controller extends AbstractRpcController { |
35 | 50 | @RequestMapping(value = "/oneway/{deviceId}", method = RequestMethod.POST) |
36 | 51 | @ResponseBody |
37 | 52 | public DeferredResult<ResponseEntity> handleOneWayDeviceRPCRequest(@PathVariable("deviceId") String deviceIdStr, @RequestBody String requestBody) throws ThingsboardException { |
38 | - return handleDeviceRPCRequest(true, new DeviceId(UUID.fromString(deviceIdStr)), requestBody, HttpStatus.GATEWAY_TIMEOUT); | |
53 | + return handleDeviceRPCRequest(true, new DeviceId(UUID.fromString(deviceIdStr)), requestBody, HttpStatus.GATEWAY_TIMEOUT, HttpStatus.GATEWAY_TIMEOUT); | |
39 | 54 | } |
40 | 55 | |
41 | 56 | @PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')") |
42 | 57 | @RequestMapping(value = "/twoway/{deviceId}", method = RequestMethod.POST) |
43 | 58 | @ResponseBody |
44 | 59 | public DeferredResult<ResponseEntity> handleTwoWayDeviceRPCRequest(@PathVariable("deviceId") String deviceIdStr, @RequestBody String requestBody) throws ThingsboardException { |
45 | - return handleDeviceRPCRequest(false, new DeviceId(UUID.fromString(deviceIdStr)), requestBody, HttpStatus.GATEWAY_TIMEOUT); | |
60 | + return handleDeviceRPCRequest(false, new DeviceId(UUID.fromString(deviceIdStr)), requestBody, HttpStatus.GATEWAY_TIMEOUT, HttpStatus.GATEWAY_TIMEOUT); | |
46 | 61 | } |
47 | 62 | |
48 | 63 | @PreAuthorize("hasAnyAuthority('TENANT_ADMIN', 'CUSTOMER_USER')") | ... | ... |
... | ... | @@ -27,13 +27,13 @@ import java.util.Arrays; |
27 | 27 | @RunWith(ClasspathSuite.class) |
28 | 28 | @ClasspathSuite.ClassnameFilters({ |
29 | 29 | "org.thingsboard.server.transport.*.rpc.sql.*Test", |
30 | - "org.thingsboard.server.transport.*.telemetry.timeseries.sql.*Test", | |
31 | - "org.thingsboard.server.transport.*.telemetry.attributes.sql.*Test", | |
32 | - "org.thingsboard.server.transport.*.attributes.updates.sql.*Test", | |
33 | - "org.thingsboard.server.transport.*.attributes.request.sql.*Test", | |
34 | - "org.thingsboard.server.transport.*.claim.sql.*Test", | |
35 | - "org.thingsboard.server.transport.*.provision.sql.*Test", | |
36 | - "org.thingsboard.server.transport.lwm2m.*Test" | |
30 | +// "org.thingsboard.server.transport.*.telemetry.timeseries.sql.*Test", | |
31 | +// "org.thingsboard.server.transport.*.telemetry.attributes.sql.*Test", | |
32 | +// "org.thingsboard.server.transport.*.attributes.updates.sql.*Test", | |
33 | +// "org.thingsboard.server.transport.*.attributes.request.sql.*Test", | |
34 | +// "org.thingsboard.server.transport.*.claim.sql.*Test", | |
35 | +// "org.thingsboard.server.transport.*.provision.sql.*Test", | |
36 | +// "org.thingsboard.server.transport.lwm2m.*Test" | |
37 | 37 | }) |
38 | 38 | public class TransportSqlTestSuite { |
39 | 39 | ... | ... |
... | ... | @@ -77,96 +77,89 @@ public class DefaultLwM2MRpcRequestHandler implements LwM2MRpcRequestHandler { |
77 | 77 | private final LwM2mUplinkMsgHandler uplinkHandler; |
78 | 78 | private final LwM2mDownlinkMsgHandler downlinkHandler; |
79 | 79 | private final LwM2MTelemetryLogService logService; |
80 | - private final Map<UUID, Long> rpcSubscriptions = new ConcurrentHashMap<>(); | |
81 | 80 | |
82 | 81 | @Override |
83 | 82 | public void onToDeviceRpcRequest(TransportProtos.ToDeviceRpcRequestMsg rpcRequest, TransportProtos.SessionInfoProto sessionInfo) { |
84 | - this.cleanupOldSessions(); | |
85 | - UUID requestUUID = new UUID(rpcRequest.getRequestIdMSB(), rpcRequest.getRequestIdLSB()); | |
86 | 83 | log.debug("Received params: {}", rpcRequest.getParams()); |
87 | - // We use this map to protect from browser issue that the same command is sent twice. | |
88 | - // TODO: This is probably not the best place and should be moved to DeviceActor | |
89 | - if (!this.rpcSubscriptions.containsKey(requestUUID)) { | |
90 | - LwM2mOperationType operationType = LwM2mOperationType.fromType(rpcRequest.getMethodName()); | |
91 | - if (operationType == null) { | |
92 | - this.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(), ResponseCode.METHOD_NOT_ALLOWED, "Unsupported operation type: " + rpcRequest.getMethodName()); | |
93 | - return; | |
94 | - } | |
95 | - LwM2mClient client = clientContext.getClientBySessionInfo(sessionInfo); | |
96 | - if (client.getRegistration() == null) { | |
97 | - this.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(), ResponseCode.INTERNAL_SERVER_ERROR, "Registration is empty"); | |
98 | - return; | |
99 | - } | |
100 | - try { | |
101 | - if (operationType.isHasObjectId()) { | |
102 | - String objectId = getIdFromParameters(client, rpcRequest); | |
84 | + LwM2mOperationType operationType = LwM2mOperationType.fromType(rpcRequest.getMethodName()); | |
85 | + if (operationType == null) { | |
86 | + this.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(), ResponseCode.METHOD_NOT_ALLOWED, "Unsupported operation type: " + rpcRequest.getMethodName()); | |
87 | + return; | |
88 | + } | |
89 | + LwM2mClient client = clientContext.getClientBySessionInfo(sessionInfo); | |
90 | + if (client.getRegistration() == null) { | |
91 | + this.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(), ResponseCode.INTERNAL_SERVER_ERROR, "Registration is empty"); | |
92 | + return; | |
93 | + } | |
94 | + try { | |
95 | + if (operationType.isHasObjectId()) { | |
96 | + String objectId = getIdFromParameters(client, rpcRequest); | |
97 | + switch (operationType) { | |
98 | + case READ: | |
99 | + sendReadRequest(client, rpcRequest, objectId); | |
100 | + break; | |
101 | + case OBSERVE: | |
102 | + sendObserveRequest(client, rpcRequest, objectId); | |
103 | + break; | |
104 | + case DISCOVER: | |
105 | + sendDiscoverRequest(client, rpcRequest, objectId); | |
106 | + break; | |
107 | + case EXECUTE: | |
108 | + sendExecuteRequest(client, rpcRequest, objectId); | |
109 | + break; | |
110 | + case WRITE_ATTRIBUTES: | |
111 | + sendWriteAttributesRequest(client, rpcRequest, objectId); | |
112 | + break; | |
113 | + case OBSERVE_CANCEL: | |
114 | + sendCancelObserveRequest(client, rpcRequest, objectId); | |
115 | + break; | |
116 | + case DELETE: | |
117 | + sendDeleteRequest(client, rpcRequest, objectId); | |
118 | + break; | |
119 | + case WRITE_UPDATE: | |
120 | + sendWriteUpdateRequest(client, rpcRequest, objectId); | |
121 | + break; | |
122 | + case WRITE_REPLACE: | |
123 | + sendWriteReplaceRequest(client, rpcRequest, objectId); | |
124 | + break; | |
125 | + default: | |
126 | + throw new IllegalArgumentException("Unsupported operation: " + operationType.name()); | |
127 | + } | |
128 | + } else if (operationType.isComposite()) { | |
129 | + if (clientContext.isComposite(client)) { | |
103 | 130 | switch (operationType) { |
104 | - case READ: | |
105 | - sendReadRequest(client, rpcRequest, objectId); | |
106 | - break; | |
107 | - case OBSERVE: | |
108 | - sendObserveRequest(client, rpcRequest, objectId); | |
109 | - break; | |
110 | - case DISCOVER: | |
111 | - sendDiscoverRequest(client, rpcRequest, objectId); | |
112 | - break; | |
113 | - case EXECUTE: | |
114 | - sendExecuteRequest(client, rpcRequest, objectId); | |
115 | - break; | |
116 | - case WRITE_ATTRIBUTES: | |
117 | - sendWriteAttributesRequest(client, rpcRequest, objectId); | |
118 | - break; | |
119 | - case OBSERVE_CANCEL: | |
120 | - sendCancelObserveRequest(client, rpcRequest, objectId); | |
121 | - break; | |
122 | - case DELETE: | |
123 | - sendDeleteRequest(client, rpcRequest, objectId); | |
124 | - break; | |
125 | - case WRITE_UPDATE: | |
126 | - sendWriteUpdateRequest(client, rpcRequest, objectId); | |
131 | + case READ_COMPOSITE: | |
132 | + sendReadCompositeRequest(client, rpcRequest); | |
127 | 133 | break; |
128 | - case WRITE_REPLACE: | |
129 | - sendWriteReplaceRequest(client, rpcRequest, objectId); | |
134 | + case WRITE_COMPOSITE: | |
135 | + sendWriteCompositeRequest(client, rpcRequest); | |
130 | 136 | break; |
131 | 137 | default: |
132 | 138 | throw new IllegalArgumentException("Unsupported operation: " + operationType.name()); |
133 | 139 | } |
134 | - } else if (operationType.isComposite()) { | |
135 | - if (clientContext.isComposite(client)) { | |
136 | - switch (operationType) { | |
137 | - case READ_COMPOSITE: | |
138 | - sendReadCompositeRequest(client, rpcRequest); | |
139 | - break; | |
140 | - case WRITE_COMPOSITE: | |
141 | - sendWriteCompositeRequest(client, rpcRequest); | |
142 | - break; | |
143 | - default: | |
144 | - throw new IllegalArgumentException("Unsupported operation: " + operationType.name()); | |
145 | - } | |
146 | - } else { | |
147 | - this.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(), | |
148 | - ResponseCode.INTERNAL_SERVER_ERROR, "This device does not support Composite Operation"); | |
149 | - } | |
150 | 140 | } else { |
151 | - switch (operationType) { | |
152 | - case OBSERVE_CANCEL_ALL: | |
153 | - sendCancelAllObserveRequest(client, rpcRequest); | |
154 | - break; | |
155 | - case OBSERVE_READ_ALL: | |
156 | - sendObserveAllRequest(client, rpcRequest); | |
157 | - break; | |
158 | - case DISCOVER_ALL: | |
159 | - sendDiscoverAllRequest(client, rpcRequest); | |
160 | - break; | |
161 | - case FW_UPDATE: | |
162 | - //TODO: implement and add break statement | |
163 | - default: | |
164 | - throw new IllegalArgumentException("Unsupported operation: " + operationType.name()); | |
165 | - } | |
141 | + this.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(), | |
142 | + ResponseCode.INTERNAL_SERVER_ERROR, "This device does not support Composite Operation"); | |
143 | + } | |
144 | + } else { | |
145 | + switch (operationType) { | |
146 | + case OBSERVE_CANCEL_ALL: | |
147 | + sendCancelAllObserveRequest(client, rpcRequest); | |
148 | + break; | |
149 | + case OBSERVE_READ_ALL: | |
150 | + sendObserveAllRequest(client, rpcRequest); | |
151 | + break; | |
152 | + case DISCOVER_ALL: | |
153 | + sendDiscoverAllRequest(client, rpcRequest); | |
154 | + break; | |
155 | + case FW_UPDATE: | |
156 | + //TODO: implement and add break statement | |
157 | + default: | |
158 | + throw new IllegalArgumentException("Unsupported operation: " + operationType.name()); | |
166 | 159 | } |
167 | - } catch (IllegalArgumentException e) { | |
168 | - this.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(), ResponseCode.BAD_REQUEST, e.getMessage()); | |
169 | 160 | } |
161 | + } catch (IllegalArgumentException e) { | |
162 | + this.sendErrorRpcResponse(sessionInfo, rpcRequest.getRequestId(), ResponseCode.BAD_REQUEST, e.getMessage()); | |
170 | 163 | } |
171 | 164 | } |
172 | 165 | |
... | ... | @@ -318,17 +311,6 @@ public class DefaultLwM2MRpcRequestHandler implements LwM2MRpcRequestHandler { |
318 | 311 | transportService.process(sessionInfo, msg, null); |
319 | 312 | } |
320 | 313 | |
321 | - private void cleanupOldSessions() { | |
322 | - log.debug("Before rpcSubscriptions.size(): [{}]", rpcSubscriptions.size()); | |
323 | - if (rpcSubscriptions.size() > 0) { | |
324 | - long currentTime = System.currentTimeMillis(); | |
325 | - Set<UUID> rpcSubscriptionsToRemove = rpcSubscriptions.entrySet().stream().filter(kv -> currentTime > kv.getValue()).map(Map.Entry::getKey).collect(Collectors.toSet()); | |
326 | - log.debug("RpcSubscriptionsToRemove: [{}]", rpcSubscriptionsToRemove); | |
327 | - rpcSubscriptionsToRemove.forEach(rpcSubscriptions::remove); | |
328 | - } | |
329 | - log.debug("After rpcSubscriptions.size(): [{}]", rpcSubscriptions.size()); | |
330 | - } | |
331 | - | |
332 | 314 | @Override |
333 | 315 | public void onToDeviceRpcResponse(TransportProtos.ToDeviceRpcResponseMsg toDeviceResponse, TransportProtos.SessionInfoProto sessionInfo) { |
334 | 316 | log.debug("OnToDeviceRpcResponse: [{}], sessionUUID: [{}]", toDeviceResponse, new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB())); | ... | ... |