Commit c276e67d5e42677eb76550d40de496de06a43311

Authored by 芯火源
1 parent 0bf8b966

refactor: 命令下发逻辑调整

1、命令下发内容编辑统一在TbSendRPCRequestNode节点处理
... ... @@ -72,6 +72,7 @@ import org.thingsboard.server.dao.tenant.TenantProfileService;
72 72 import org.thingsboard.server.dao.tenant.TenantService;
73 73 import org.thingsboard.server.dao.timeseries.TimeseriesService;
74 74 import org.thingsboard.server.dao.user.UserService;
  75 +import org.thingsboard.server.dao.yunteng.service.TkDeviceService;
75 76 import org.thingsboard.server.queue.discovery.PartitionService;
76 77 import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
77 78 import org.thingsboard.server.queue.usagestats.TbApiUsageClient;
... ... @@ -645,4 +646,10 @@ public class ActorSystemContext {
645 646 }
646 647 }
647 648
  649 +
  650 + //Thingskit function
  651 + @Autowired
  652 + @Getter
  653 + private TkDeviceService tkDeviceService;
  654 +
648 655 }
... ...
1 1 /**
2 2 * Copyright © 2016-2022 The Thingsboard Authors
3 3 *
4   - * Licensed under the Apache License, Version 2.0 (the "License");
5   - * you may not use this file except in compliance with the License.
6   - * You may obtain a copy of the License at
  4 + * <p>Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
  5 + * except in compliance with the License. You may obtain a copy of the License at
7 6 *
8   - * http://www.apache.org/licenses/LICENSE-2.0
  7 + * <p>http://www.apache.org/licenses/LICENSE-2.0
9 8 *
10   - * Unless required by applicable law or agreed to in writing, software
11   - * distributed under the License is distributed on an "AS IS" BASIS,
12   - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   - * See the License for the specific language governing permissions and
  9 + * <p>Unless required by applicable law or agreed to in writing, software distributed under the
  10 + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
  11 + * express or implied. See the License for the specific language governing permissions and
14 12 * limitations under the License.
15 13 */
16 14 package org.thingsboard.server.actors.device;
... ... @@ -22,6 +20,21 @@ import com.google.common.util.concurrent.Futures;
22 20 import com.google.common.util.concurrent.ListenableFuture;
23 21 import com.google.common.util.concurrent.MoreExecutors;
24 22 import com.google.protobuf.InvalidProtocolBufferException;
  23 +import java.util.ArrayList;
  24 +import java.util.Arrays;
  25 +import java.util.Collections;
  26 +import java.util.HashMap;
  27 +import java.util.HashSet;
  28 +import java.util.LinkedHashMap;
  29 +import java.util.List;
  30 +import java.util.Map;
  31 +import java.util.Objects;
  32 +import java.util.Optional;
  33 +import java.util.Set;
  34 +import java.util.UUID;
  35 +import java.util.function.Consumer;
  36 +import java.util.stream.Collectors;
  37 +import javax.annotation.Nullable;
25 38 import lombok.extern.slf4j.Slf4j;
26 39 import org.apache.commons.collections.CollectionUtils;
27 40 import org.thingsboard.common.util.JacksonUtil;
... ... @@ -35,7 +48,6 @@ import org.thingsboard.server.actors.TbActorCtx;
35 48 import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor;
36 49 import org.thingsboard.server.common.data.DataConstants;
37 50 import org.thingsboard.server.common.data.Device;
38   -import org.thingsboard.server.common.data.DeviceTransportType;
39 51 import org.thingsboard.server.common.data.StringUtils;
40 52 import org.thingsboard.server.common.data.edge.EdgeEvent;
41 53 import org.thingsboard.server.common.data.edge.EdgeEventActionType;
... ... @@ -59,8 +71,6 @@ import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
59 71 import org.thingsboard.server.common.data.security.DeviceCredentials;
60 72 import org.thingsboard.server.common.data.security.DeviceCredentialsType;
61 73 import org.thingsboard.server.common.data.yunteng.constant.FastIotConstants;
62   -import org.thingsboard.server.common.data.yunteng.constant.ModelConstants;
63   -import org.thingsboard.server.common.data.yunteng.enums.CmdTypeEnum;
64 74 import org.thingsboard.server.common.msg.TbActorMsg;
65 75 import org.thingsboard.server.common.msg.TbMsgMetaData;
66 76 import org.thingsboard.server.common.msg.queue.TbCallback;
... ... @@ -97,921 +107,1109 @@ import org.thingsboard.server.service.rpc.RemoveRpcActorMsg;
97 107 import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
98 108 import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper;
99 109
100   -import javax.annotation.Nullable;
101   -import java.util.ArrayList;
102   -import java.util.Arrays;
103   -import java.util.Collections;
104   -import java.util.ConcurrentModificationException;
105   -import java.util.HashMap;
106   -import java.util.HashSet;
107   -import java.util.LinkedHashMap;
108   -import java.util.List;
109   -import java.util.Map;
110   -import java.util.Objects;
111   -import java.util.Optional;
112   -import java.util.Set;
113   -import java.util.UUID;
114   -import java.util.function.Consumer;
115   -import java.util.stream.Collectors;
116   -
117   -
118 110 /**
119 111 * @author Andrew Shvayka
120 112 */
121 113 @Slf4j
122 114 class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
123 115
124   - static final String SESSION_TIMEOUT_MESSAGE = "session timeout!";
125   - final TenantId tenantId;
126   - final DeviceId deviceId;
127   - final LinkedHashMapRemoveEldest<UUID, SessionInfoMetaData> sessions;
128   - private final Map<UUID, SessionInfo> attributeSubscriptions;
129   - private final Map<UUID, SessionInfo> rpcSubscriptions;
130   - private final Map<Integer, ToDeviceRpcRequestMetadata> toDeviceRpcPendingMap;
131   - private final boolean rpcSequential;
132   -
133   - private int rpcSeq = 0;
134   - private String deviceName;
135   - private String deviceType;
136   - private TbMsgMetaData defaultMetaData;
137   - private EdgeId edgeId;
138   -
139   - DeviceActorMessageProcessor(ActorSystemContext systemContext, TenantId tenantId, DeviceId deviceId) {
140   - super(systemContext);
141   - this.tenantId = tenantId;
142   - this.deviceId = deviceId;
143   - this.rpcSequential = systemContext.isRpcSequential();
144   - this.attributeSubscriptions = new HashMap<>();
145   - this.rpcSubscriptions = new HashMap<>();
146   - this.toDeviceRpcPendingMap = new LinkedHashMap<>();
147   - this.sessions = new LinkedHashMapRemoveEldest<>(systemContext.getMaxConcurrentSessionsPerDevice(), this::notifyTransportAboutClosedSessionMaxSessionsLimit);
148   - if (initAttributes()) {
149   - restoreSessions();
150   - }
  116 + static final String SESSION_TIMEOUT_MESSAGE = "session timeout!";
  117 + final TenantId tenantId;
  118 + final DeviceId deviceId;
  119 + final LinkedHashMapRemoveEldest<UUID, SessionInfoMetaData> sessions;
  120 + private final Map<UUID, SessionInfo> attributeSubscriptions;
  121 + private final Map<UUID, SessionInfo> rpcSubscriptions;
  122 + private final Map<Integer, ToDeviceRpcRequestMetadata> toDeviceRpcPendingMap;
  123 + private final boolean rpcSequential;
  124 +
  125 + private int rpcSeq = 0;
  126 + private String deviceName;
  127 + private String deviceType;
  128 + private TbMsgMetaData defaultMetaData;
  129 + private EdgeId edgeId;
  130 +
  131 + DeviceActorMessageProcessor(
  132 + ActorSystemContext systemContext, TenantId tenantId, DeviceId deviceId) {
  133 + super(systemContext);
  134 + this.tenantId = tenantId;
  135 + this.deviceId = deviceId;
  136 + this.rpcSequential = systemContext.isRpcSequential();
  137 + this.attributeSubscriptions = new HashMap<>();
  138 + this.rpcSubscriptions = new HashMap<>();
  139 + this.toDeviceRpcPendingMap = new LinkedHashMap<>();
  140 + this.sessions =
  141 + new LinkedHashMapRemoveEldest<>(
  142 + systemContext.getMaxConcurrentSessionsPerDevice(),
  143 + this::notifyTransportAboutClosedSessionMaxSessionsLimit);
  144 + if (initAttributes()) {
  145 + restoreSessions();
151 146 }
152   -
153   - boolean initAttributes() {
154   - Device device = systemContext.getDeviceService().findDeviceById(tenantId, deviceId);
155   - if (device != null) {
156   - this.deviceName = device.getName();
157   - this.deviceType = device.getType();
158   - this.defaultMetaData = new TbMsgMetaData();
159   - this.defaultMetaData.putValue("deviceName", deviceName);
160   - this.defaultMetaData.putValue("deviceType", deviceType);
161   - if (systemContext.isEdgesEnabled()) {
162   - this.edgeId = findRelatedEdgeId();
163   - }
164   - return true;
165   - } else {
166   - return false;
167   - }
  147 + }
  148 +
  149 + boolean initAttributes() {
  150 + Device device = systemContext.getDeviceService().findDeviceById(tenantId, deviceId);
  151 + if (device != null) {
  152 + this.deviceName = device.getName();
  153 + this.deviceType = device.getType();
  154 + this.defaultMetaData = new TbMsgMetaData();
  155 + this.defaultMetaData.putValue("deviceName", deviceName);
  156 + this.defaultMetaData.putValue("deviceType", deviceType);
  157 + if (systemContext.isEdgesEnabled()) {
  158 + this.edgeId = findRelatedEdgeId();
  159 + }
  160 + return true;
  161 + } else {
  162 + return false;
168 163 }
169   -
170   - private EdgeId findRelatedEdgeId() {
171   - List<EntityRelation> result =
172   - systemContext.getRelationService().findByToAndType(tenantId, deviceId, EntityRelation.EDGE_TYPE, RelationTypeGroup.COMMON);
173   - if (result != null && result.size() > 0) {
174   - EntityRelation relationToEdge = result.get(0);
175   - if (relationToEdge.getFrom() != null && relationToEdge.getFrom().getId() != null) {
176   - log.trace("[{}][{}] found edge [{}] for device", tenantId, deviceId, relationToEdge.getFrom().getId());
177   - return new EdgeId(relationToEdge.getFrom().getId());
178   - } else {
179   - log.trace("[{}][{}] edge relation is empty {}", tenantId, deviceId, relationToEdge);
180   - }
181   - } else {
182   - log.trace("[{}][{}] device doesn't have any related edge", tenantId, deviceId);
183   - }
184   - return null;
  164 + }
  165 +
  166 + private EdgeId findRelatedEdgeId() {
  167 + List<EntityRelation> result =
  168 + systemContext
  169 + .getRelationService()
  170 + .findByToAndType(
  171 + tenantId, deviceId, EntityRelation.EDGE_TYPE, RelationTypeGroup.COMMON);
  172 + if (result != null && result.size() > 0) {
  173 + EntityRelation relationToEdge = result.get(0);
  174 + if (relationToEdge.getFrom() != null && relationToEdge.getFrom().getId() != null) {
  175 + log.trace(
  176 + "[{}][{}] found edge [{}] for device",
  177 + tenantId,
  178 + deviceId,
  179 + relationToEdge.getFrom().getId());
  180 + return new EdgeId(relationToEdge.getFrom().getId());
  181 + } else {
  182 + log.trace("[{}][{}] edge relation is empty {}", tenantId, deviceId, relationToEdge);
  183 + }
  184 + } else {
  185 + log.trace("[{}][{}] device doesn't have any related edge", tenantId, deviceId);
  186 + }
  187 + return null;
  188 + }
  189 +
  190 + void processRpcRequest(TbActorCtx context, ToDeviceRpcRequestActorMsg msg) {
  191 + ToDeviceRpcRequest request = msg.getMsg();
  192 + ToDeviceRpcRequestMsg rpcRequest = creteToDeviceRpcRequestMsg(request);
  193 +
  194 + long timeout = request.getExpirationTime() - System.currentTimeMillis();
  195 + boolean persisted = request.isPersisted();
  196 +
  197 + if (timeout <= 0) {
  198 + log.debug(
  199 + "[{}][{}] Ignoring message due to exp time reached, {}",
  200 + deviceId,
  201 + request.getId(),
  202 + request.getExpirationTime());
  203 + if (persisted) {
  204 + createRpc(request, RpcStatus.EXPIRED);
  205 + }
  206 + return;
  207 + } else if (persisted) {
  208 + createRpc(request, RpcStatus.QUEUED);
185 209 }
186 210
187   - void processRpcRequest(TbActorCtx context, ToDeviceRpcRequestActorMsg msg) {
188   - ToDeviceRpcRequest request = msg.getMsg();
189   - ToDeviceRpcRequestMsg rpcRequest = creteToDeviceRpcRequestMsg(request);
190   -
191   - long timeout = request.getExpirationTime() - System.currentTimeMillis();
192   - boolean persisted = request.isPersisted();
193   -
194   - if (timeout <= 0) {
195   - log.debug("[{}][{}] Ignoring message due to exp time reached, {}", deviceId, request.getId(), request.getExpirationTime());
196   - if (persisted) {
197   - createRpc(request, RpcStatus.EXPIRED);
  211 + boolean sent = false;
  212 + if (systemContext.isEdgesEnabled() && edgeId != null) {
  213 + log.debug(
  214 + "[{}][{}] device is related to edge [{}]. Saving RPC request to edge queue",
  215 + tenantId,
  216 + deviceId,
  217 + edgeId.getId());
  218 + saveRpcRequestToEdgeQueue(request, rpcRequest.getRequestId());
  219 + sent = true;
  220 + } else if (isSendNewRpcAvailable()) {
  221 + sent = rpcSubscriptions.size() > 0;
  222 + Set<UUID> syncSessionSet = new HashSet<>();
  223 + rpcSubscriptions.forEach(
  224 + (key, value) -> {
  225 + sendToTransport(rpcRequest, key, value.getNodeId());
  226 + if (SessionType.SYNC == value.getType()) {
  227 + syncSessionSet.add(key);
198 228 }
199   - return;
200   - } else if (persisted) {
201   - createRpc(request, RpcStatus.QUEUED);
202   - }
203   -
204   - boolean sent = false;
205   - if (systemContext.isEdgesEnabled() && edgeId != null) {
206   - log.debug("[{}][{}] device is related to edge [{}]. Saving RPC request to edge queue", tenantId, deviceId, edgeId.getId());
207   - saveRpcRequestToEdgeQueue(request, rpcRequest.getRequestId());
208   - sent = true;
209   - } else if (isSendNewRpcAvailable()) {
210   - sent = rpcSubscriptions.size() > 0;
211   - Set<UUID> syncSessionSet = new HashSet<>();
212   - rpcSubscriptions.forEach((key, value) -> {
213   - sendToTransport(rpcRequest, key, value.getNodeId());
214   - if (SessionType.SYNC == value.getType()) {
215   - syncSessionSet.add(key);
216   - }
217   - });
218   - log.trace("Rpc syncSessionSet [{}] subscription after sent [{}]", syncSessionSet, rpcSubscriptions);
219   - syncSessionSet.forEach(rpcSubscriptions::remove);
220   - }
221   -
222   - if (persisted) {
223   - ObjectNode response = JacksonUtil.newObjectNode();
224   - response.put("rpcId", request.getId().toString());
225   - systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(msg.getMsg().getId(), JacksonUtil.toString(response), null));
226   - }
227   -
228   - if (!persisted && request.isOneway() && sent) {
229   - log.debug("[{}] Rpc command response sent [{}]!", deviceId, request.getId());
230   - systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(msg.getMsg().getId(), null, null));
231   - } else {
232   - registerPendingRpcRequest(context, msg, sent, rpcRequest, timeout);
233   - }
234   - if (sent) {
235   - log.debug("[{}] RPC request {} is sent!", deviceId, request.getId());
236   - } else {
237   - log.debug("[{}] RPC request {} is NOT sent!", deviceId, request.getId());
238   - }
  229 + });
  230 + log.trace(
  231 + "Rpc syncSessionSet [{}] subscription after sent [{}]", syncSessionSet, rpcSubscriptions);
  232 + syncSessionSet.forEach(rpcSubscriptions::remove);
239 233 }
240 234
241   - private boolean isSendNewRpcAvailable() {
242   - return !rpcSequential || toDeviceRpcPendingMap.values().stream().filter(md -> !md.isDelivered()).findAny().isEmpty();
  235 + if (persisted) {
  236 + ObjectNode response = JacksonUtil.newObjectNode();
  237 + response.put("rpcId", request.getId().toString());
  238 + systemContext
  239 + .getTbCoreDeviceRpcService()
  240 + .processRpcResponseFromDeviceActor(
  241 + new FromDeviceRpcResponse(
  242 + msg.getMsg().getId(), JacksonUtil.toString(response), null));
243 243 }
244 244
245   - private Rpc createRpc(ToDeviceRpcRequest request, RpcStatus status) {
246   - //Thingskit function
247   - JsonNode old = JacksonUtil.toJsonNode(request.getAdditionalInfo());
248   - ObjectNode additional = (old == null || old.isEmpty()) ?mapper.createObjectNode():(ObjectNode)old;
249   - if(!additional.has(ModelConstants.TablePropertyMapping.COMMAND_TYPE)){
250   - additional.put(ModelConstants.TablePropertyMapping.COMMAND_TYPE, CmdTypeEnum.DIY.ordinal());
251   - }
252   - DeviceId saveDeviceId = deviceId;
253   - if(additional.has(FastIotConstants.Rpc.TARGET_ID)){
254   - saveDeviceId = new DeviceId(UUID.fromString(additional.get(FastIotConstants.Rpc.TARGET_ID).asText()));
255   - }
256   -
257   - Rpc rpc = new Rpc(new RpcId(request.getId()));
258   - rpc.setCreatedTime(System.currentTimeMillis());
259   - rpc.setTenantId(tenantId);
260   - rpc.setDeviceId(saveDeviceId);
261   - rpc.setExpirationTime(request.getExpirationTime());
262   - rpc.setRequest(JacksonUtil.valueToTree(request));
263   - rpc.setStatus(status);
264   - rpc.setAdditionalInfo(JacksonUtil.toJsonNode(request.getAdditionalInfo()));
265   -
266   - rpc.setAdditionalInfo(additional);
267   -
268   - return systemContext.getTbRpcService().save(tenantId, rpc);
269   - }
270   -
271   - private ToDeviceRpcRequestMsg creteToDeviceRpcRequestMsg(ToDeviceRpcRequest request) {
272   - ToDeviceRpcRequestBody body = request.getBody();
273   - return ToDeviceRpcRequestMsg.newBuilder()
274   - .setRequestId(rpcSeq++)
275   - .setMethodName(body.getMethod())
276   - .setParams(body.getParams())
277   - .setExpirationTime(request.getExpirationTime())
278   - .setRequestIdMSB(request.getId().getMostSignificantBits())
279   - .setRequestIdLSB(request.getId().getLeastSignificantBits())
280   - .setOneway(request.isOneway())
281   - .setPersisted(request.isPersisted())
282   - .build();
283   - }
284   -
285   - void processRpcResponsesFromEdge(TbActorCtx context, FromDeviceRpcResponseActorMsg responseMsg) {
286   - log.debug("[{}] Processing rpc command response from edge session", deviceId);
287   - ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(responseMsg.getRequestId());
288   - boolean success = requestMd != null;
289   - if (success) {
290   - systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(responseMsg.getMsg());
291   - } else {
292   - log.debug("[{}] Rpc command response [{}] is stale!", deviceId, responseMsg.getRequestId());
293   - }
  245 + if (!persisted && request.isOneway() && sent) {
  246 + log.debug("[{}] Rpc command response sent [{}]!", deviceId, request.getId());
  247 + systemContext
  248 + .getTbCoreDeviceRpcService()
  249 + .processRpcResponseFromDeviceActor(
  250 + new FromDeviceRpcResponse(msg.getMsg().getId(), null, null));
  251 + } else {
  252 + registerPendingRpcRequest(context, msg, sent, rpcRequest, timeout);
294 253 }
295   -
296   - void processRemoveRpc(TbActorCtx context, RemoveRpcActorMsg msg) {
297   - log.debug("[{}] Processing remove rpc command", msg.getRequestId());
298   - Map.Entry<Integer, ToDeviceRpcRequestMetadata> entry = null;
299   - for (Map.Entry<Integer, ToDeviceRpcRequestMetadata> e : toDeviceRpcPendingMap.entrySet()) {
300   - if (e.getValue().getMsg().getMsg().getId().equals(msg.getRequestId())) {
301   - entry = e;
302   - break;
303   - }
304   - }
305   -
306   - if (entry != null) {
307   - if (entry.getValue().isDelivered()) {
308   - toDeviceRpcPendingMap.remove(entry.getKey());
309   - } else {
310   - Optional<Map.Entry<Integer, ToDeviceRpcRequestMetadata>> firstRpc = getFirstRpc();
311   - if (firstRpc.isPresent() && entry.getKey().equals(firstRpc.get().getKey())) {
312   - toDeviceRpcPendingMap.remove(entry.getKey());
313   - sendNextPendingRequest(context);
314   - } else {
315   - toDeviceRpcPendingMap.remove(entry.getKey());
316   - }
317   - }
318   - }
  254 + if (sent) {
  255 + log.debug("[{}] RPC request {} is sent!", deviceId, request.getId());
  256 + } else {
  257 + log.debug("[{}] RPC request {} is NOT sent!", deviceId, request.getId());
319 258 }
320   -
321   - private void registerPendingRpcRequest(TbActorCtx context, ToDeviceRpcRequestActorMsg msg, boolean sent, ToDeviceRpcRequestMsg rpcRequest, long timeout) {
322   - toDeviceRpcPendingMap.put(rpcRequest.getRequestId(), new ToDeviceRpcRequestMetadata(msg, sent));
323   - DeviceActorServerSideRpcTimeoutMsg timeoutMsg = new DeviceActorServerSideRpcTimeoutMsg(rpcRequest.getRequestId(), timeout);
324   - scheduleMsgWithDelay(context, timeoutMsg, timeoutMsg.getTimeout());
  259 + }
  260 +
  261 + private boolean isSendNewRpcAvailable() {
  262 + return !rpcSequential
  263 + || toDeviceRpcPendingMap.values().stream()
  264 + .filter(md -> !md.isDelivered())
  265 + .findAny()
  266 + .isEmpty();
  267 + }
  268 +
  269 + private Rpc createRpc(ToDeviceRpcRequest request, RpcStatus status) {
  270 + // Thingskit function
  271 + ObjectNode additional = (ObjectNode) JacksonUtil.toJsonNode(request.getAdditionalInfo());
  272 + DeviceId saveDeviceId = deviceId;
  273 + if (additional.has(FastIotConstants.Rpc.TARGET_ID)) {
  274 + saveDeviceId =
  275 + new DeviceId(UUID.fromString(additional.get(FastIotConstants.Rpc.TARGET_ID).asText()));
325 276 }
326 277
327   - void processServerSideRpcTimeout(TbActorCtx context, DeviceActorServerSideRpcTimeoutMsg msg) {
328   - ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(msg.getId());
329   - if (requestMd != null) {
330   - log.debug("[{}] RPC request [{}] timeout detected!", deviceId, msg.getId());
331   - if (requestMd.getMsg().getMsg().isPersisted()) {
332   - systemContext.getTbRpcService().save(tenantId, new RpcId(requestMd.getMsg().getMsg().getId()), RpcStatus.EXPIRED, null);
333   - }
334   - systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),
335   - null, requestMd.isSent() ? RpcError.TIMEOUT : RpcError.NO_ACTIVE_CONNECTION));
336   - if (!requestMd.isDelivered()) {
337   - sendNextPendingRequest(context);
338   - }
339   - }
  278 + Rpc rpc = new Rpc(new RpcId(request.getId()));
  279 + rpc.setCreatedTime(System.currentTimeMillis());
  280 + rpc.setTenantId(tenantId);
  281 + rpc.setDeviceId(saveDeviceId);
  282 + rpc.setExpirationTime(request.getExpirationTime());
  283 + rpc.setRequest(JacksonUtil.valueToTree(request));
  284 + rpc.setStatus(status);
  285 + rpc.setAdditionalInfo(JacksonUtil.toJsonNode(request.getAdditionalInfo()));
  286 +
  287 + rpc.setAdditionalInfo(additional);
  288 +
  289 + return systemContext.getTbRpcService().save(tenantId, rpc);
  290 + }
  291 +
  292 + private ToDeviceRpcRequestMsg creteToDeviceRpcRequestMsg(ToDeviceRpcRequest request) {
  293 + ToDeviceRpcRequestBody body = request.getBody();
  294 + return ToDeviceRpcRequestMsg.newBuilder()
  295 + .setRequestId(rpcSeq++)
  296 + .setMethodName(body.getMethod())
  297 + .setParams(body.getParams())
  298 + .setExpirationTime(request.getExpirationTime())
  299 + .setRequestIdMSB(request.getId().getMostSignificantBits())
  300 + .setRequestIdLSB(request.getId().getLeastSignificantBits())
  301 + .setOneway(request.isOneway())
  302 + .setPersisted(request.isPersisted())
  303 + .build();
  304 + }
  305 +
  306 + void processRpcResponsesFromEdge(TbActorCtx context, FromDeviceRpcResponseActorMsg responseMsg) {
  307 + log.debug("[{}] Processing rpc command response from edge session", deviceId);
  308 + ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(responseMsg.getRequestId());
  309 + boolean success = requestMd != null;
  310 + if (success) {
  311 + systemContext
  312 + .getTbCoreDeviceRpcService()
  313 + .processRpcResponseFromDeviceActor(responseMsg.getMsg());
  314 + } else {
  315 + log.debug("[{}] Rpc command response [{}] is stale!", deviceId, responseMsg.getRequestId());
  316 + }
  317 + }
  318 +
  319 + void processRemoveRpc(TbActorCtx context, RemoveRpcActorMsg msg) {
  320 + log.debug("[{}] Processing remove rpc command", msg.getRequestId());
  321 + Map.Entry<Integer, ToDeviceRpcRequestMetadata> entry = null;
  322 + for (Map.Entry<Integer, ToDeviceRpcRequestMetadata> e : toDeviceRpcPendingMap.entrySet()) {
  323 + if (e.getValue().getMsg().getMsg().getId().equals(msg.getRequestId())) {
  324 + entry = e;
  325 + break;
  326 + }
340 327 }
341 328
342   - private void sendPendingRequests(TbActorCtx context, UUID sessionId, String nodeId) {
343   - SessionType sessionType = getSessionType(sessionId);
344   - if (!toDeviceRpcPendingMap.isEmpty()) {
345   - log.debug("[{}] Pushing {} pending RPC messages to new async session [{}]", deviceId, toDeviceRpcPendingMap.size(), sessionId);
346   - if (sessionType == SessionType.SYNC) {
347   - log.debug("[{}] Cleanup sync rpc session [{}]", deviceId, sessionId);
348   - rpcSubscriptions.remove(sessionId);
349   - }
350   - } else {
351   - log.debug("[{}] No pending RPC messages for new async session [{}]", deviceId, sessionId);
352   - }
353   - Set<Integer> sentOneWayIds = new HashSet<>();
354   -
355   - if (rpcSequential) {
356   - getFirstRpc().ifPresent(processPendingRpc(context, sessionId, nodeId, sentOneWayIds));
357   - } else if (sessionType == SessionType.ASYNC) {
358   - toDeviceRpcPendingMap.entrySet().forEach(processPendingRpc(context, sessionId, nodeId, sentOneWayIds));
  329 + if (entry != null) {
  330 + if (entry.getValue().isDelivered()) {
  331 + toDeviceRpcPendingMap.remove(entry.getKey());
  332 + } else {
  333 + Optional<Map.Entry<Integer, ToDeviceRpcRequestMetadata>> firstRpc = getFirstRpc();
  334 + if (firstRpc.isPresent() && entry.getKey().equals(firstRpc.get().getKey())) {
  335 + toDeviceRpcPendingMap.remove(entry.getKey());
  336 + sendNextPendingRequest(context);
359 337 } else {
360   - toDeviceRpcPendingMap.entrySet().stream().findFirst().ifPresent(processPendingRpc(context, sessionId, nodeId, sentOneWayIds));
  338 + toDeviceRpcPendingMap.remove(entry.getKey());
361 339 }
362   -
363   - sentOneWayIds.stream().filter(id -> !toDeviceRpcPendingMap.get(id).getMsg().getMsg().isPersisted()).forEach(toDeviceRpcPendingMap::remove);
  340 + }
364 341 }
365   -
366   - private Optional<Map.Entry<Integer, ToDeviceRpcRequestMetadata>> getFirstRpc() {
367   - return toDeviceRpcPendingMap.entrySet().stream().filter(e -> !e.getValue().isDelivered()).findFirst();
  342 + }
  343 +
  344 + private void registerPendingRpcRequest(
  345 + TbActorCtx context,
  346 + ToDeviceRpcRequestActorMsg msg,
  347 + boolean sent,
  348 + ToDeviceRpcRequestMsg rpcRequest,
  349 + long timeout) {
  350 + toDeviceRpcPendingMap.put(rpcRequest.getRequestId(), new ToDeviceRpcRequestMetadata(msg, sent));
  351 + DeviceActorServerSideRpcTimeoutMsg timeoutMsg =
  352 + new DeviceActorServerSideRpcTimeoutMsg(rpcRequest.getRequestId(), timeout);
  353 + scheduleMsgWithDelay(context, timeoutMsg, timeoutMsg.getTimeout());
  354 + }
  355 +
  356 + void processServerSideRpcTimeout(TbActorCtx context, DeviceActorServerSideRpcTimeoutMsg msg) {
  357 + ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(msg.getId());
  358 + if (requestMd != null) {
  359 + log.debug("[{}] RPC request [{}] timeout detected!", deviceId, msg.getId());
  360 + if (requestMd.getMsg().getMsg().isPersisted()) {
  361 + systemContext
  362 + .getTbRpcService()
  363 + .save(
  364 + tenantId, new RpcId(requestMd.getMsg().getMsg().getId()), RpcStatus.EXPIRED, null);
  365 + }
  366 + systemContext
  367 + .getTbCoreDeviceRpcService()
  368 + .processRpcResponseFromDeviceActor(
  369 + new FromDeviceRpcResponse(
  370 + requestMd.getMsg().getMsg().getId(),
  371 + null,
  372 + requestMd.isSent() ? RpcError.TIMEOUT : RpcError.NO_ACTIVE_CONNECTION));
  373 + if (!requestMd.isDelivered()) {
  374 + sendNextPendingRequest(context);
  375 + }
368 376 }
369   -
370   - private void sendNextPendingRequest(TbActorCtx context) {
371   - if (rpcSequential) {
372   - rpcSubscriptions.forEach((id, s) -> sendPendingRequests(context, id, s.getNodeId()));
373   - }
  377 + }
  378 +
  379 + private void sendPendingRequests(TbActorCtx context, UUID sessionId, String nodeId) {
  380 + SessionType sessionType = getSessionType(sessionId);
  381 + if (!toDeviceRpcPendingMap.isEmpty()) {
  382 + log.debug(
  383 + "[{}] Pushing {} pending RPC messages to new async session [{}]",
  384 + deviceId,
  385 + toDeviceRpcPendingMap.size(),
  386 + sessionId);
  387 + if (sessionType == SessionType.SYNC) {
  388 + log.debug("[{}] Cleanup sync rpc session [{}]", deviceId, sessionId);
  389 + rpcSubscriptions.remove(sessionId);
  390 + }
  391 + } else {
  392 + log.debug("[{}] No pending RPC messages for new async session [{}]", deviceId, sessionId);
374 393 }
375   -
376   - private Consumer<Map.Entry<Integer, ToDeviceRpcRequestMetadata>> processPendingRpc(TbActorCtx context, UUID sessionId, String nodeId, Set<Integer> sentOneWayIds) {
377   - return entry -> {
378   - ToDeviceRpcRequest request = entry.getValue().getMsg().getMsg();
379   - ToDeviceRpcRequestBody body = request.getBody();
380   - if (request.isOneway() && !rpcSequential) {
381   - sentOneWayIds.add(entry.getKey());
382   - systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(request.getId(), null, null));
383   - }
384   - ToDeviceRpcRequestMsg rpcRequest = ToDeviceRpcRequestMsg.newBuilder()
385   - .setRequestId(entry.getKey())
386   - .setMethodName(body.getMethod())
387   - .setParams(body.getParams())
388   - .setExpirationTime(request.getExpirationTime())
389   - .setRequestIdMSB(request.getId().getMostSignificantBits())
390   - .setRequestIdLSB(request.getId().getLeastSignificantBits())
391   - .setOneway(request.isOneway())
392   - .setPersisted(request.isPersisted())
393   - .build();
394   - sendToTransport(rpcRequest, sessionId, nodeId);
395   - };
396   - }
397   -
398   - void process(TbActorCtx context, TransportToDeviceActorMsgWrapper wrapper) {
399   - TransportToDeviceActorMsg msg = wrapper.getMsg();
400   - TbCallback callback = wrapper.getCallback();
401   - var sessionInfo = msg.getSessionInfo();
402   -
403   - if (msg.hasSessionEvent()) {
404   - processSessionStateMsgs(sessionInfo, msg.getSessionEvent());
405   - }
406   - if (msg.hasSubscribeToAttributes()) {
407   - processSubscriptionCommands(context, sessionInfo, msg.getSubscribeToAttributes());
408   - }
409   - if (msg.hasSubscribeToRPC()) {
410   - processSubscriptionCommands(context, sessionInfo, msg.getSubscribeToRPC());
411   - }
412   - if (msg.hasSendPendingRPC()) {
413   - sendPendingRequests(context, getSessionId(sessionInfo), sessionInfo.getNodeId());
414   - }
415   - if (msg.hasGetAttributes()) {
416   - handleGetAttributesRequest(context, sessionInfo, msg.getGetAttributes());
417   - }
418   - if (msg.hasToDeviceRPCCallResponse()) {
419   - processRpcResponses(context, sessionInfo, msg.getToDeviceRPCCallResponse());
420   - }
421   - if (msg.hasSubscriptionInfo()) {
422   - handleSessionActivity(context, sessionInfo, msg.getSubscriptionInfo());
423   - }
424   - if (msg.hasClaimDevice()) {
425   - handleClaimDeviceMsg(context, sessionInfo, msg.getClaimDevice());
426   - }
427   - if (msg.hasRpcResponseStatusMsg()) {
428   - processRpcResponseStatus(context, sessionInfo, msg.getRpcResponseStatusMsg());
429   - }
430   - if (msg.hasUplinkNotificationMsg()) {
431   - processUplinkNotificationMsg(context, sessionInfo, msg.getUplinkNotificationMsg());
432   - }
433   - callback.onSuccess();
  394 + Set<Integer> sentOneWayIds = new HashSet<>();
  395 +
  396 + if (rpcSequential) {
  397 + getFirstRpc().ifPresent(processPendingRpc(context, sessionId, nodeId, sentOneWayIds));
  398 + } else if (sessionType == SessionType.ASYNC) {
  399 + toDeviceRpcPendingMap
  400 + .entrySet()
  401 + .forEach(processPendingRpc(context, sessionId, nodeId, sentOneWayIds));
  402 + } else {
  403 + toDeviceRpcPendingMap.entrySet().stream()
  404 + .findFirst()
  405 + .ifPresent(processPendingRpc(context, sessionId, nodeId, sentOneWayIds));
434 406 }
435 407
436   - private void processUplinkNotificationMsg(TbActorCtx context, SessionInfoProto sessionInfo, TransportProtos.UplinkNotificationMsg uplinkNotificationMsg) {
437   - String nodeId = sessionInfo.getNodeId();
438   - sessions.entrySet().stream()
439   - .filter(kv -> kv.getValue().getSessionInfo().getNodeId().equals(nodeId) && (kv.getValue().isSubscribedToAttributes() || kv.getValue().isSubscribedToRPC()))
440   - .forEach(kv -> {
441   - ToTransportMsg msg = ToTransportMsg.newBuilder()
442   - .setSessionIdMSB(kv.getKey().getMostSignificantBits())
443   - .setSessionIdLSB(kv.getKey().getLeastSignificantBits())
444   - .setUplinkNotificationMsg(uplinkNotificationMsg)
445   - .build();
446   - systemContext.getTbCoreToTransportService().process(kv.getValue().getSessionInfo().getNodeId(), msg);
447   - });
448   - }
  408 + sentOneWayIds.stream()
  409 + .filter(id -> !toDeviceRpcPendingMap.get(id).getMsg().getMsg().isPersisted())
  410 + .forEach(toDeviceRpcPendingMap::remove);
  411 + }
449 412
450   - private void handleClaimDeviceMsg(TbActorCtx context, SessionInfoProto sessionInfo, ClaimDeviceMsg msg) {
451   - DeviceId deviceId = new DeviceId(new UUID(msg.getDeviceIdMSB(), msg.getDeviceIdLSB()));
452   - systemContext.getClaimDevicesService().registerClaimingInfo(tenantId, deviceId, msg.getSecretKey(), msg.getDurationMs());
453   - }
  413 + private Optional<Map.Entry<Integer, ToDeviceRpcRequestMetadata>> getFirstRpc() {
  414 + return toDeviceRpcPendingMap.entrySet().stream()
  415 + .filter(e -> !e.getValue().isDelivered())
  416 + .findFirst();
  417 + }
454 418
455   - private void reportSessionOpen() {
456   - systemContext.getDeviceStateService().onDeviceConnect(tenantId, deviceId);
  419 + private void sendNextPendingRequest(TbActorCtx context) {
  420 + if (rpcSequential) {
  421 + rpcSubscriptions.forEach((id, s) -> sendPendingRequests(context, id, s.getNodeId()));
457 422 }
458   -
459   - private void reportSessionClose() {
460   - systemContext.getDeviceStateService().onDeviceDisconnect(tenantId, deviceId);
  423 + }
  424 +
  425 + private Consumer<Map.Entry<Integer, ToDeviceRpcRequestMetadata>> processPendingRpc(
  426 + TbActorCtx context, UUID sessionId, String nodeId, Set<Integer> sentOneWayIds) {
  427 + return entry -> {
  428 + ToDeviceRpcRequest request = entry.getValue().getMsg().getMsg();
  429 + ToDeviceRpcRequestBody body = request.getBody();
  430 + if (request.isOneway() && !rpcSequential) {
  431 + sentOneWayIds.add(entry.getKey());
  432 + systemContext
  433 + .getTbCoreDeviceRpcService()
  434 + .processRpcResponseFromDeviceActor(
  435 + new FromDeviceRpcResponse(request.getId(), null, null));
  436 + }
  437 + ToDeviceRpcRequestMsg rpcRequest =
  438 + ToDeviceRpcRequestMsg.newBuilder()
  439 + .setRequestId(entry.getKey())
  440 + .setMethodName(body.getMethod())
  441 + .setParams(body.getParams())
  442 + .setExpirationTime(request.getExpirationTime())
  443 + .setRequestIdMSB(request.getId().getMostSignificantBits())
  444 + .setRequestIdLSB(request.getId().getLeastSignificantBits())
  445 + .setOneway(request.isOneway())
  446 + .setPersisted(request.isPersisted())
  447 + .build();
  448 + sendToTransport(rpcRequest, sessionId, nodeId);
  449 + };
  450 + }
  451 +
  452 + void process(TbActorCtx context, TransportToDeviceActorMsgWrapper wrapper) {
  453 + TransportToDeviceActorMsg msg = wrapper.getMsg();
  454 + TbCallback callback = wrapper.getCallback();
  455 + var sessionInfo = msg.getSessionInfo();
  456 +
  457 + if (msg.hasSessionEvent()) {
  458 + processSessionStateMsgs(sessionInfo, msg.getSessionEvent());
461 459 }
462   -
463   - private void handleGetAttributesRequest(TbActorCtx context, SessionInfoProto sessionInfo, GetAttributeRequestMsg request) {
464   - int requestId = request.getRequestId();
465   - if (request.getOnlyShared()) {
466   - Futures.addCallback(findAllAttributesByScope(DataConstants.SHARED_SCOPE), new FutureCallback<>() {
467   - @Override
468   - public void onSuccess(@Nullable List<AttributeKvEntry> result) {
469   - GetAttributeResponseMsg responseMsg = GetAttributeResponseMsg.newBuilder()
470   - .setRequestId(requestId)
471   - .setSharedStateMsg(true)
472   - .addAllSharedAttributeList(toTsKvProtos(result))
473   - .setIsMultipleAttributesRequest(request.getSharedAttributeNamesCount() > 1)
474   - .build();
475   - sendToTransport(responseMsg, sessionInfo);
476   - }
477   -
478   - @Override
479   - public void onFailure(Throwable t) {
480   - GetAttributeResponseMsg responseMsg = GetAttributeResponseMsg.newBuilder()
481   - .setError(t.getMessage())
482   - .setSharedStateMsg(true)
483   - .build();
484   - sendToTransport(responseMsg, sessionInfo);
485   - }
486   - }, MoreExecutors.directExecutor());
487   - } else {
488   - Futures.addCallback(getAttributesKvEntries(request), new FutureCallback<>() {
489   - @Override
490   - public void onSuccess(@Nullable List<List<AttributeKvEntry>> result) {
491   - GetAttributeResponseMsg responseMsg = GetAttributeResponseMsg.newBuilder()
492   - .setRequestId(requestId)
493   - .addAllClientAttributeList(toTsKvProtos(result.get(0)))
494   - .addAllSharedAttributeList(toTsKvProtos(result.get(1)))
495   - .setIsMultipleAttributesRequest(
496   - request.getSharedAttributeNamesCount() + request.getClientAttributeNamesCount() > 1)
497   - .build();
498   - sendToTransport(responseMsg, sessionInfo);
499   - }
500   -
501   - @Override
502   - public void onFailure(Throwable t) {
503   - GetAttributeResponseMsg responseMsg = GetAttributeResponseMsg.newBuilder()
504   - .setError(t.getMessage())
505   - .build();
506   - sendToTransport(responseMsg, sessionInfo);
507   - }
508   - }, MoreExecutors.directExecutor());
509   - }
  460 + if (msg.hasSubscribeToAttributes()) {
  461 + processSubscriptionCommands(context, sessionInfo, msg.getSubscribeToAttributes());
510 462 }
511   -
512   - private ListenableFuture<List<List<AttributeKvEntry>>> getAttributesKvEntries(GetAttributeRequestMsg request) {
513   - ListenableFuture<List<AttributeKvEntry>> clientAttributesFuture;
514   - ListenableFuture<List<AttributeKvEntry>> sharedAttributesFuture;
515   - if (CollectionUtils.isEmpty(request.getClientAttributeNamesList()) && CollectionUtils.isEmpty(request.getSharedAttributeNamesList())) {
516   - clientAttributesFuture = findAllAttributesByScope(DataConstants.CLIENT_SCOPE);
517   - sharedAttributesFuture = findAllAttributesByScope(DataConstants.SHARED_SCOPE);
518   - } else if (!CollectionUtils.isEmpty(request.getClientAttributeNamesList()) && !CollectionUtils.isEmpty(request.getSharedAttributeNamesList())) {
519   - clientAttributesFuture = findAttributesByScope(toSet(request.getClientAttributeNamesList()), DataConstants.CLIENT_SCOPE);
520   - sharedAttributesFuture = findAttributesByScope(toSet(request.getSharedAttributeNamesList()), DataConstants.SHARED_SCOPE);
521   - } else if (CollectionUtils.isEmpty(request.getClientAttributeNamesList()) && !CollectionUtils.isEmpty(request.getSharedAttributeNamesList())) {
522   - clientAttributesFuture = Futures.immediateFuture(Collections.emptyList());
523   - sharedAttributesFuture = findAttributesByScope(toSet(request.getSharedAttributeNamesList()), DataConstants.SHARED_SCOPE);
524   - } else {
525   - sharedAttributesFuture = Futures.immediateFuture(Collections.emptyList());
526   - clientAttributesFuture = findAttributesByScope(toSet(request.getClientAttributeNamesList()), DataConstants.CLIENT_SCOPE);
527   - }
528   - return Futures.allAsList(Arrays.asList(clientAttributesFuture, sharedAttributesFuture));
  463 + if (msg.hasSubscribeToRPC()) {
  464 + processSubscriptionCommands(context, sessionInfo, msg.getSubscribeToRPC());
529 465 }
530   -
531   - private ListenableFuture<List<AttributeKvEntry>> findAllAttributesByScope(String scope) {
532   - return systemContext.getAttributesService().findAll(tenantId, deviceId, scope);
  466 + if (msg.hasSendPendingRPC()) {
  467 + sendPendingRequests(context, getSessionId(sessionInfo), sessionInfo.getNodeId());
533 468 }
534   -
535   - private ListenableFuture<List<AttributeKvEntry>> findAttributesByScope(Set<String> attributesSet, String scope) {
536   - return systemContext.getAttributesService().find(tenantId, deviceId, scope, attributesSet);
  469 + if (msg.hasGetAttributes()) {
  470 + handleGetAttributesRequest(context, sessionInfo, msg.getGetAttributes());
537 471 }
538   -
539   - private Set<String> toSet(List<String> strings) {
540   - return new HashSet<>(strings);
  472 + if (msg.hasToDeviceRPCCallResponse()) {
  473 + processRpcResponses(context, sessionInfo, msg.getToDeviceRPCCallResponse());
541 474 }
542   -
543   - private SessionType getSessionType(UUID sessionId) {
544   - return sessions.containsKey(sessionId) ? SessionType.ASYNC : SessionType.SYNC;
  475 + if (msg.hasSubscriptionInfo()) {
  476 + handleSessionActivity(context, sessionInfo, msg.getSubscriptionInfo());
545 477 }
546   -
547   - void processAttributesUpdate(TbActorCtx context, DeviceAttributesEventNotificationMsg msg) {
548   - if (attributeSubscriptions.size() > 0) {
549   - boolean hasNotificationData = false;
550   - AttributeUpdateNotificationMsg.Builder notification = AttributeUpdateNotificationMsg.newBuilder();
551   - if (msg.isDeleted()) {
552   - List<String> sharedKeys = msg.getDeletedKeys().stream()
553   - .filter(key -> DataConstants.SHARED_SCOPE.equals(key.getScope()))
554   - .map(AttributeKey::getAttributeKey)
555   - .collect(Collectors.toList());
556   - if (!sharedKeys.isEmpty()) {
557   - notification.addAllSharedDeleted(sharedKeys);
558   - hasNotificationData = true;
559   - }
560   - } else {
561   - if (DataConstants.SHARED_SCOPE.equals(msg.getScope())) {
562   - List<AttributeKvEntry> attributes = new ArrayList<>(msg.getValues());
563   - if (attributes.size() > 0) {
564   - List<TsKvProto> sharedUpdated = msg.getValues().stream().map(this::toTsKvProto)
565   - .collect(Collectors.toList());
566   - if (!sharedUpdated.isEmpty()) {
567   - notification.addAllSharedUpdated(sharedUpdated);
568   - hasNotificationData = true;
569   - }
570   - } else {
571   - log.debug("[{}] No public shared side attributes changed!", deviceId);
572   - }
573   - }
574   - }
575   - if (hasNotificationData) {
576   - AttributeUpdateNotificationMsg finalNotification = notification.build();
577   - attributeSubscriptions.forEach((key, value) -> sendToTransport(finalNotification, key, value.getNodeId()));
578   - }
579   - } else {
580   - log.debug("[{}] No registered attributes subscriptions to process!", deviceId);
581   - }
  478 + if (msg.hasClaimDevice()) {
  479 + handleClaimDeviceMsg(context, sessionInfo, msg.getClaimDevice());
582 480 }
583   -
584   - private void processRpcResponses(TbActorCtx context, SessionInfoProto sessionInfo, ToDeviceRpcResponseMsg responseMsg) {
585   - UUID sessionId = getSessionId(sessionInfo);
586   - log.debug("[{}] Processing rpc command response [{}]", deviceId, sessionId);
587   - ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(responseMsg.getRequestId());
588   - boolean success = requestMd != null;
589   - if (success) {
590   - boolean hasError = StringUtils.isNotEmpty(responseMsg.getError());
591   - try {
592   - String payload = hasError ? responseMsg.getError() : responseMsg.getPayload();
593   - systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(
594   - new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(),
595   - payload, null));
596   - if (requestMd.getMsg().getMsg().isPersisted()) {
597   - RpcStatus status = hasError ? RpcStatus.FAILED : RpcStatus.SUCCESSFUL;
598   - JsonNode response;
599   - try {
600   - response = JacksonUtil.toJsonNode(payload);
601   - } catch (IllegalArgumentException e) {
602   - response = JacksonUtil.newObjectNode().put("error", payload);
603   - }
604   - systemContext.getTbRpcService().save(tenantId, new RpcId(requestMd.getMsg().getMsg().getId()), status, response);
605   - }
606   - } finally {
607   - if (hasError && !requestMd.isDelivered()) {
608   - sendNextPendingRequest(context);
609   - }
610   - }
611   - } else {
612   - log.debug("[{}] Rpc command response [{}] is stale!", deviceId, responseMsg.getRequestId());
613   - }
  481 + if (msg.hasRpcResponseStatusMsg()) {
  482 + processRpcResponseStatus(context, sessionInfo, msg.getRpcResponseStatusMsg());
614 483 }
615   -
616   - private void processRpcResponseStatus(TbActorCtx context, SessionInfoProto sessionInfo, ToDeviceRpcResponseStatusMsg responseMsg) {
617   - UUID rpcId = new UUID(responseMsg.getRequestIdMSB(), responseMsg.getRequestIdLSB());
618   - RpcStatus status = RpcStatus.valueOf(responseMsg.getStatus());
619   - ToDeviceRpcRequestMetadata md = toDeviceRpcPendingMap.get(responseMsg.getRequestId());
620   -
621   - if (md != null) {
622   - JsonNode response = null;
623   - if (status.equals(RpcStatus.DELIVERED)) {
624   - if (md.getMsg().getMsg().isOneway()) {
625   - toDeviceRpcPendingMap.remove(responseMsg.getRequestId());
626   - if (rpcSequential) {
627   - systemContext.getTbCoreDeviceRpcService().processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(rpcId, null, null));
628   - }
629   - } else {
630   - md.setDelivered(true);
631   - }
632   - } else if (status.equals(RpcStatus.TIMEOUT)) {
633   - Integer maxRpcRetries = md.getMsg().getMsg().getRetries();
634   - maxRpcRetries = maxRpcRetries == null ? systemContext.getMaxRpcRetries() : Math.min(maxRpcRetries, systemContext.getMaxRpcRetries());
635   - if (maxRpcRetries <= md.getRetries()) {
636   - toDeviceRpcPendingMap.remove(responseMsg.getRequestId());
637   - status = RpcStatus.FAILED;
638   - response = JacksonUtil.newObjectNode().put("error", "There was a Timeout and all retry attempts have been exhausted. Retry attempts set: " + maxRpcRetries);
639   - } else {
640   - md.setRetries(md.getRetries() + 1);
641   - }
  484 + if (msg.hasUplinkNotificationMsg()) {
  485 + processUplinkNotificationMsg(context, sessionInfo, msg.getUplinkNotificationMsg());
  486 + }
  487 + callback.onSuccess();
  488 + }
  489 +
  490 + private void processUplinkNotificationMsg(
  491 + TbActorCtx context,
  492 + SessionInfoProto sessionInfo,
  493 + TransportProtos.UplinkNotificationMsg uplinkNotificationMsg) {
  494 + String nodeId = sessionInfo.getNodeId();
  495 + sessions.entrySet().stream()
  496 + .filter(
  497 + kv ->
  498 + kv.getValue().getSessionInfo().getNodeId().equals(nodeId)
  499 + && (kv.getValue().isSubscribedToAttributes()
  500 + || kv.getValue().isSubscribedToRPC()))
  501 + .forEach(
  502 + kv -> {
  503 + ToTransportMsg msg =
  504 + ToTransportMsg.newBuilder()
  505 + .setSessionIdMSB(kv.getKey().getMostSignificantBits())
  506 + .setSessionIdLSB(kv.getKey().getLeastSignificantBits())
  507 + .setUplinkNotificationMsg(uplinkNotificationMsg)
  508 + .build();
  509 + systemContext
  510 + .getTbCoreToTransportService()
  511 + .process(kv.getValue().getSessionInfo().getNodeId(), msg);
  512 + });
  513 + }
  514 +
  515 + private void handleClaimDeviceMsg(
  516 + TbActorCtx context, SessionInfoProto sessionInfo, ClaimDeviceMsg msg) {
  517 + DeviceId deviceId = new DeviceId(new UUID(msg.getDeviceIdMSB(), msg.getDeviceIdLSB()));
  518 + systemContext
  519 + .getClaimDevicesService()
  520 + .registerClaimingInfo(tenantId, deviceId, msg.getSecretKey(), msg.getDurationMs());
  521 + }
  522 +
  523 + private void reportSessionOpen() {
  524 + systemContext.getDeviceStateService().onDeviceConnect(tenantId, deviceId);
  525 + }
  526 +
  527 + private void reportSessionClose() {
  528 + systemContext.getDeviceStateService().onDeviceDisconnect(tenantId, deviceId);
  529 + }
  530 +
  531 + private void handleGetAttributesRequest(
  532 + TbActorCtx context, SessionInfoProto sessionInfo, GetAttributeRequestMsg request) {
  533 + int requestId = request.getRequestId();
  534 + if (request.getOnlyShared()) {
  535 + Futures.addCallback(
  536 + findAllAttributesByScope(DataConstants.SHARED_SCOPE),
  537 + new FutureCallback<>() {
  538 + @Override
  539 + public void onSuccess(@Nullable List<AttributeKvEntry> result) {
  540 + GetAttributeResponseMsg responseMsg =
  541 + GetAttributeResponseMsg.newBuilder()
  542 + .setRequestId(requestId)
  543 + .setSharedStateMsg(true)
  544 + .addAllSharedAttributeList(toTsKvProtos(result))
  545 + .setIsMultipleAttributesRequest(request.getSharedAttributeNamesCount() > 1)
  546 + .build();
  547 + sendToTransport(responseMsg, sessionInfo);
642 548 }
643 549
644   - if (md.getMsg().getMsg().isPersisted()) {
645   - systemContext.getTbRpcService().save(tenantId, new RpcId(rpcId), status, response);
  550 + @Override
  551 + public void onFailure(Throwable t) {
  552 + GetAttributeResponseMsg responseMsg =
  553 + GetAttributeResponseMsg.newBuilder()
  554 + .setError(t.getMessage())
  555 + .setSharedStateMsg(true)
  556 + .build();
  557 + sendToTransport(responseMsg, sessionInfo);
646 558 }
647   - if (status != RpcStatus.SENT) {
648   - sendNextPendingRequest(context);
  559 + },
  560 + MoreExecutors.directExecutor());
  561 + } else {
  562 + Futures.addCallback(
  563 + getAttributesKvEntries(request),
  564 + new FutureCallback<>() {
  565 + @Override
  566 + public void onSuccess(@Nullable List<List<AttributeKvEntry>> result) {
  567 + GetAttributeResponseMsg responseMsg =
  568 + GetAttributeResponseMsg.newBuilder()
  569 + .setRequestId(requestId)
  570 + .addAllClientAttributeList(toTsKvProtos(result.get(0)))
  571 + .addAllSharedAttributeList(toTsKvProtos(result.get(1)))
  572 + .setIsMultipleAttributesRequest(
  573 + request.getSharedAttributeNamesCount()
  574 + + request.getClientAttributeNamesCount()
  575 + > 1)
  576 + .build();
  577 + sendToTransport(responseMsg, sessionInfo);
649 578 }
650   - } else {
651   - log.info("[{}][{}] Rpc has already removed from pending map.", deviceId, rpcId);
652   - }
653   - }
654 579
655   - private void processSubscriptionCommands(TbActorCtx context, SessionInfoProto sessionInfo, SubscribeToAttributeUpdatesMsg subscribeCmd) {
656   - UUID sessionId = getSessionId(sessionInfo);
657   - if (subscribeCmd.getUnsubscribe()) {
658   - log.debug("[{}] Canceling attributes subscription for session [{}]", deviceId, sessionId);
659   - attributeSubscriptions.remove(sessionId);
660   - } else {
661   - SessionInfoMetaData sessionMD = sessions.get(sessionId);
662   - if (sessionMD == null) {
663   - sessionMD = new SessionInfoMetaData(new SessionInfo(subscribeCmd.getSessionType(), sessionInfo.getNodeId()));
  580 + @Override
  581 + public void onFailure(Throwable t) {
  582 + GetAttributeResponseMsg responseMsg =
  583 + GetAttributeResponseMsg.newBuilder().setError(t.getMessage()).build();
  584 + sendToTransport(responseMsg, sessionInfo);
664 585 }
665   - sessionMD.setSubscribedToAttributes(true);
666   - log.debug("[{}] Registering attributes subscription for session [{}]", deviceId, sessionId);
667   - attributeSubscriptions.put(sessionId, sessionMD.getSessionInfo());
668   - dumpSessions();
669   - }
  586 + },
  587 + MoreExecutors.directExecutor());
670 588 }
671   -
672   - private UUID getSessionId(SessionInfoProto sessionInfo) {
673   - return new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
  589 + }
  590 +
  591 + private ListenableFuture<List<List<AttributeKvEntry>>> getAttributesKvEntries(
  592 + GetAttributeRequestMsg request) {
  593 + ListenableFuture<List<AttributeKvEntry>> clientAttributesFuture;
  594 + ListenableFuture<List<AttributeKvEntry>> sharedAttributesFuture;
  595 + if (CollectionUtils.isEmpty(request.getClientAttributeNamesList())
  596 + && CollectionUtils.isEmpty(request.getSharedAttributeNamesList())) {
  597 + clientAttributesFuture = findAllAttributesByScope(DataConstants.CLIENT_SCOPE);
  598 + sharedAttributesFuture = findAllAttributesByScope(DataConstants.SHARED_SCOPE);
  599 + } else if (!CollectionUtils.isEmpty(request.getClientAttributeNamesList())
  600 + && !CollectionUtils.isEmpty(request.getSharedAttributeNamesList())) {
  601 + clientAttributesFuture =
  602 + findAttributesByScope(
  603 + toSet(request.getClientAttributeNamesList()), DataConstants.CLIENT_SCOPE);
  604 + sharedAttributesFuture =
  605 + findAttributesByScope(
  606 + toSet(request.getSharedAttributeNamesList()), DataConstants.SHARED_SCOPE);
  607 + } else if (CollectionUtils.isEmpty(request.getClientAttributeNamesList())
  608 + && !CollectionUtils.isEmpty(request.getSharedAttributeNamesList())) {
  609 + clientAttributesFuture = Futures.immediateFuture(Collections.emptyList());
  610 + sharedAttributesFuture =
  611 + findAttributesByScope(
  612 + toSet(request.getSharedAttributeNamesList()), DataConstants.SHARED_SCOPE);
  613 + } else {
  614 + sharedAttributesFuture = Futures.immediateFuture(Collections.emptyList());
  615 + clientAttributesFuture =
  616 + findAttributesByScope(
  617 + toSet(request.getClientAttributeNamesList()), DataConstants.CLIENT_SCOPE);
674 618 }
675   -
676   - private void processSubscriptionCommands(TbActorCtx context, SessionInfoProto sessionInfo, SubscribeToRPCMsg subscribeCmd) {
677   - UUID sessionId = getSessionId(sessionInfo);
678   - if (subscribeCmd.getUnsubscribe()) {
679   - log.debug("[{}] Canceling rpc subscription for session [{}]", deviceId, sessionId);
680   - rpcSubscriptions.remove(sessionId);
681   - } else {
682   - SessionInfoMetaData sessionMD = sessions.get(sessionId);
683   - if (sessionMD == null) {
684   - sessionMD = new SessionInfoMetaData(new SessionInfo(subscribeCmd.getSessionType(), sessionInfo.getNodeId()));
685   - }
686   - sessionMD.setSubscribedToRPC(true);
687   - log.debug("[{}] Registering rpc subscription for session [{}]", deviceId, sessionId);
688   - rpcSubscriptions.put(sessionId, sessionMD.getSessionInfo());
689   - sendPendingRequests(context, sessionId, sessionInfo.getNodeId());
690   - dumpSessions();
  619 + return Futures.allAsList(Arrays.asList(clientAttributesFuture, sharedAttributesFuture));
  620 + }
  621 +
  622 + private ListenableFuture<List<AttributeKvEntry>> findAllAttributesByScope(String scope) {
  623 + return systemContext.getAttributesService().findAll(tenantId, deviceId, scope);
  624 + }
  625 +
  626 + private ListenableFuture<List<AttributeKvEntry>> findAttributesByScope(
  627 + Set<String> attributesSet, String scope) {
  628 + return systemContext.getAttributesService().find(tenantId, deviceId, scope, attributesSet);
  629 + }
  630 +
  631 + private Set<String> toSet(List<String> strings) {
  632 + return new HashSet<>(strings);
  633 + }
  634 +
  635 + private SessionType getSessionType(UUID sessionId) {
  636 + return sessions.containsKey(sessionId) ? SessionType.ASYNC : SessionType.SYNC;
  637 + }
  638 +
  639 + void processAttributesUpdate(TbActorCtx context, DeviceAttributesEventNotificationMsg msg) {
  640 + if (attributeSubscriptions.size() > 0) {
  641 + boolean hasNotificationData = false;
  642 + AttributeUpdateNotificationMsg.Builder notification =
  643 + AttributeUpdateNotificationMsg.newBuilder();
  644 + if (msg.isDeleted()) {
  645 + List<String> sharedKeys =
  646 + msg.getDeletedKeys().stream()
  647 + .filter(key -> DataConstants.SHARED_SCOPE.equals(key.getScope()))
  648 + .map(AttributeKey::getAttributeKey)
  649 + .collect(Collectors.toList());
  650 + if (!sharedKeys.isEmpty()) {
  651 + notification.addAllSharedDeleted(sharedKeys);
  652 + hasNotificationData = true;
691 653 }
692   - }
693   -
694   - private void processSessionStateMsgs(SessionInfoProto sessionInfo, SessionEventMsg msg) {
695   - UUID sessionId = getSessionId(sessionInfo);
696   - Objects.requireNonNull(sessionId);
697   - if (msg.getEvent() == SessionEvent.OPEN) {
698   - if (sessions.containsKey(sessionId)) {
699   - log.debug("[{}] Received duplicate session open event [{}]", deviceId, sessionId);
700   - return;
701   - }
702   - log.debug("[{}] Processing new session [{}]. Current sessions size {}", deviceId, sessionId, sessions.size());
703   -
704   - sessions.put(sessionId, new SessionInfoMetaData(new SessionInfo(SessionType.ASYNC, sessionInfo.getNodeId())));
705   - if (sessions.size() == 1) {
706   - reportSessionOpen();
  654 + } else {
  655 + if (DataConstants.SHARED_SCOPE.equals(msg.getScope())) {
  656 + List<AttributeKvEntry> attributes = new ArrayList<>(msg.getValues());
  657 + if (attributes.size() > 0) {
  658 + List<TsKvProto> sharedUpdated =
  659 + msg.getValues().stream().map(this::toTsKvProto).collect(Collectors.toList());
  660 + if (!sharedUpdated.isEmpty()) {
  661 + notification.addAllSharedUpdated(sharedUpdated);
  662 + hasNotificationData = true;
707 663 }
708   - systemContext.getDeviceStateService().onDeviceActivity(tenantId, deviceId, System.currentTimeMillis());
709   - dumpSessions();
710   - } else if (msg.getEvent() == SessionEvent.CLOSED) {
711   - log.debug("[{}] Canceling subscriptions for closed session [{}]", deviceId, sessionId);
712   - sessions.remove(sessionId);
713   - attributeSubscriptions.remove(sessionId);
714   - rpcSubscriptions.remove(sessionId);
715   - if (sessions.isEmpty()) {
716   - reportSessionClose();
717   - }
718   - dumpSessions();
  664 + } else {
  665 + log.debug("[{}] No public shared side attributes changed!", deviceId);
  666 + }
719 667 }
  668 + }
  669 + if (hasNotificationData) {
  670 + AttributeUpdateNotificationMsg finalNotification = notification.build();
  671 + attributeSubscriptions.forEach(
  672 + (key, value) -> sendToTransport(finalNotification, key, value.getNodeId()));
  673 + }
  674 + } else {
  675 + log.debug("[{}] No registered attributes subscriptions to process!", deviceId);
720 676 }
721   -
722   - private void handleSessionActivity(TbActorCtx context, SessionInfoProto sessionInfoProto, SubscriptionInfoProto subscriptionInfo) {
723   - UUID sessionId = getSessionId(sessionInfoProto);
724   - Objects.requireNonNull(sessionId);
725   -
726   - SessionInfoMetaData sessionMD = sessions.get(sessionId);
727   - if (sessionMD != null) {
728   - sessionMD.setLastActivityTime(subscriptionInfo.getLastActivityTime());
729   - sessionMD.setSubscribedToAttributes(subscriptionInfo.getAttributeSubscription());
730   - sessionMD.setSubscribedToRPC(subscriptionInfo.getRpcSubscription());
731   - if (subscriptionInfo.getAttributeSubscription()) {
732   - attributeSubscriptions.putIfAbsent(sessionId, sessionMD.getSessionInfo());
733   - }
734   - if (subscriptionInfo.getRpcSubscription()) {
735   - rpcSubscriptions.putIfAbsent(sessionId, sessionMD.getSessionInfo());
736   - }
  677 + }
  678 +
  679 + private void processRpcResponses(
  680 + TbActorCtx context, SessionInfoProto sessionInfo, ToDeviceRpcResponseMsg responseMsg) {
  681 + UUID sessionId = getSessionId(sessionInfo);
  682 + log.debug("[{}] Processing rpc command response [{}]", deviceId, sessionId);
  683 + ToDeviceRpcRequestMetadata requestMd = toDeviceRpcPendingMap.remove(responseMsg.getRequestId());
  684 + boolean success = requestMd != null;
  685 + if (success) {
  686 + boolean hasError = StringUtils.isNotEmpty(responseMsg.getError());
  687 + try {
  688 + String payload = hasError ? responseMsg.getError() : responseMsg.getPayload();
  689 + systemContext
  690 + .getTbCoreDeviceRpcService()
  691 + .processRpcResponseFromDeviceActor(
  692 + new FromDeviceRpcResponse(requestMd.getMsg().getMsg().getId(), payload, null));
  693 + if (requestMd.getMsg().getMsg().isPersisted()) {
  694 + RpcStatus status = hasError ? RpcStatus.FAILED : RpcStatus.SUCCESSFUL;
  695 + JsonNode response;
  696 + try {
  697 + response = JacksonUtil.toJsonNode(payload);
  698 + } catch (IllegalArgumentException e) {
  699 + response = JacksonUtil.newObjectNode().put("error", payload);
  700 + }
  701 + systemContext
  702 + .getTbRpcService()
  703 + .save(tenantId, new RpcId(requestMd.getMsg().getMsg().getId()), status, response);
737 704 }
738   - systemContext.getDeviceStateService().onDeviceActivity(tenantId, deviceId, subscriptionInfo.getLastActivityTime());
739   - if (sessionMD != null) {
740   - dumpSessions();
  705 + } finally {
  706 + if (hasError && !requestMd.isDelivered()) {
  707 + sendNextPendingRequest(context);
741 708 }
  709 + }
  710 + } else {
  711 + log.debug("[{}] Rpc command response [{}] is stale!", deviceId, responseMsg.getRequestId());
742 712 }
743   -
744   - void processCredentialsUpdate(TbActorMsg msg) {
745   - if (((DeviceCredentialsUpdateNotificationMsg) msg).getDeviceCredentials().getCredentialsType() == DeviceCredentialsType.LWM2M_CREDENTIALS) {
746   - sessions.forEach((k, v) -> {
747   - notifyTransportAboutDeviceCredentialsUpdate(k, v, ((DeviceCredentialsUpdateNotificationMsg) msg).getDeviceCredentials());
748   - });
  713 + }
  714 +
  715 + private void processRpcResponseStatus(
  716 + TbActorCtx context, SessionInfoProto sessionInfo, ToDeviceRpcResponseStatusMsg responseMsg) {
  717 + UUID rpcId = new UUID(responseMsg.getRequestIdMSB(), responseMsg.getRequestIdLSB());
  718 + RpcStatus status = RpcStatus.valueOf(responseMsg.getStatus());
  719 + ToDeviceRpcRequestMetadata md = toDeviceRpcPendingMap.get(responseMsg.getRequestId());
  720 +
  721 + if (md != null) {
  722 + JsonNode response = null;
  723 + if (status.equals(RpcStatus.DELIVERED)) {
  724 + if (md.getMsg().getMsg().isOneway()) {
  725 + toDeviceRpcPendingMap.remove(responseMsg.getRequestId());
  726 + if (rpcSequential) {
  727 + systemContext
  728 + .getTbCoreDeviceRpcService()
  729 + .processRpcResponseFromDeviceActor(new FromDeviceRpcResponse(rpcId, null, null));
  730 + }
749 731 } else {
750   - sessions.forEach((sessionId, sessionMd) -> notifyTransportAboutClosedSession(sessionId, sessionMd, "device credentials updated!"));
751   - attributeSubscriptions.clear();
752   - rpcSubscriptions.clear();
753   - dumpSessions();
754   -
  732 + md.setDelivered(true);
755 733 }
756   - }
757   -
758   - private void notifyTransportAboutClosedSessionMaxSessionsLimit(UUID sessionId, SessionInfoMetaData sessionMd) {
759   - log.debug("remove eldest session (max concurrent sessions limit reached per device) sessionId [{}] sessionMd [{}]", sessionId, sessionMd);
760   - notifyTransportAboutClosedSession(sessionId, sessionMd, "max concurrent sessions limit reached per device!");
761   - }
762   -
763   - private void notifyTransportAboutClosedSession(UUID sessionId, SessionInfoMetaData sessionMd, String message) {
764   - SessionCloseNotificationProto sessionCloseNotificationProto = SessionCloseNotificationProto
765   - .newBuilder()
766   - .setMessage(message).build();
767   - ToTransportMsg msg = ToTransportMsg.newBuilder()
768   - .setSessionIdMSB(sessionId.getMostSignificantBits())
769   - .setSessionIdLSB(sessionId.getLeastSignificantBits())
770   - .setSessionCloseNotification(sessionCloseNotificationProto)
771   - .build();
772   - systemContext.getTbCoreToTransportService().process(sessionMd.getSessionInfo().getNodeId(), msg);
773   - }
774   -
775   - void notifyTransportAboutDeviceCredentialsUpdate(UUID sessionId, SessionInfoMetaData sessionMd, DeviceCredentials deviceCredentials) {
776   - ToTransportUpdateCredentialsProto.Builder notification = ToTransportUpdateCredentialsProto.newBuilder();
777   - notification.addCredentialsId(deviceCredentials.getCredentialsId());
778   - notification.addCredentialsValue(deviceCredentials.getCredentialsValue());
779   - ToTransportMsg msg = ToTransportMsg.newBuilder()
780   - .setSessionIdMSB(sessionId.getMostSignificantBits())
781   - .setSessionIdLSB(sessionId.getLeastSignificantBits())
782   - .setToTransportUpdateCredentialsNotification(notification).build();
783   - systemContext.getTbCoreToTransportService().process(sessionMd.getSessionInfo().getNodeId(), msg);
784   - }
785   -
786   - void processNameOrTypeUpdate(DeviceNameOrTypeUpdateMsg msg) {
787   - this.deviceName = msg.getDeviceName();
788   - this.deviceType = msg.getDeviceType();
789   - this.defaultMetaData = new TbMsgMetaData();
790   - this.defaultMetaData.putValue("deviceName", deviceName);
791   - this.defaultMetaData.putValue("deviceType", deviceType);
792   - }
793   -
794   - void processEdgeUpdate(DeviceEdgeUpdateMsg msg) {
795   - log.trace("[{}] Processing edge update {}", deviceId, msg);
796   - this.edgeId = msg.getEdgeId();
797   - }
798   -
799   - private void sendToTransport(GetAttributeResponseMsg responseMsg, SessionInfoProto sessionInfo) {
800   - ToTransportMsg msg = ToTransportMsg.newBuilder()
801   - .setSessionIdMSB(sessionInfo.getSessionIdMSB())
802   - .setSessionIdLSB(sessionInfo.getSessionIdLSB())
803   - .setGetAttributesResponse(responseMsg).build();
804   - systemContext.getTbCoreToTransportService().process(sessionInfo.getNodeId(), msg);
805   - }
806   -
807   - private void sendToTransport(AttributeUpdateNotificationMsg notificationMsg, UUID sessionId, String nodeId) {
808   - ToTransportMsg msg = ToTransportMsg.newBuilder()
809   - .setSessionIdMSB(sessionId.getMostSignificantBits())
810   - .setSessionIdLSB(sessionId.getLeastSignificantBits())
811   - .setAttributeUpdateNotification(notificationMsg).build();
812   - systemContext.getTbCoreToTransportService().process(nodeId, msg);
813   - }
814   -
815   - private void sendToTransport(ToDeviceRpcRequestMsg rpcMsg, UUID sessionId, String nodeId) {
816   - ToTransportMsg msg = ToTransportMsg.newBuilder()
817   - .setSessionIdMSB(sessionId.getMostSignificantBits())
818   - .setSessionIdLSB(sessionId.getLeastSignificantBits())
819   - .setToDeviceRequest(rpcMsg).build();
820   - systemContext.getTbCoreToTransportService().process(nodeId, msg);
821   - }
822   -
823   - private void sendToTransport(ToServerRpcResponseMsg rpcMsg, UUID sessionId, String nodeId) {
824   - ToTransportMsg msg = ToTransportMsg.newBuilder()
825   - .setSessionIdMSB(sessionId.getMostSignificantBits())
826   - .setSessionIdLSB(sessionId.getLeastSignificantBits())
827   - .setToServerResponse(rpcMsg).build();
828   - systemContext.getTbCoreToTransportService().process(nodeId, msg);
829   - }
830   -
831   - private void saveRpcRequestToEdgeQueue(ToDeviceRpcRequest msg, Integer requestId) {
832   - EdgeEvent edgeEvent = new EdgeEvent();
833   - edgeEvent.setTenantId(tenantId);
834   - edgeEvent.setAction(EdgeEventActionType.RPC_CALL);
835   - edgeEvent.setEntityId(deviceId.getId());
836   - edgeEvent.setType(EdgeEventType.DEVICE);
837   -
838   - ObjectNode body = mapper.createObjectNode();
839   - body.put("requestId", requestId);
840   - body.put("requestUUID", msg.getId().toString());
841   - body.put("oneway", msg.isOneway());
842   - body.put("expirationTime", msg.getExpirationTime());
843   - body.put("method", msg.getBody().getMethod());
844   - body.put("params", msg.getBody().getParams());
845   - edgeEvent.setBody(body);
846   -
847   - edgeEvent.setEdgeId(edgeId);
848   - systemContext.getEdgeEventService().save(edgeEvent);
849   - systemContext.getClusterService().onEdgeEventUpdate(tenantId, edgeId);
850   - }
851   -
852   - private List<TsKvProto> toTsKvProtos(@Nullable List<AttributeKvEntry> result) {
853   - List<TsKvProto> clientAttributes;
854   - if (result == null || result.isEmpty()) {
855   - clientAttributes = Collections.emptyList();
  734 + } else if (status.equals(RpcStatus.TIMEOUT)) {
  735 + Integer maxRpcRetries = md.getMsg().getMsg().getRetries();
  736 + maxRpcRetries =
  737 + maxRpcRetries == null
  738 + ? systemContext.getMaxRpcRetries()
  739 + : Math.min(maxRpcRetries, systemContext.getMaxRpcRetries());
  740 + if (maxRpcRetries <= md.getRetries()) {
  741 + toDeviceRpcPendingMap.remove(responseMsg.getRequestId());
  742 + status = RpcStatus.FAILED;
  743 + response =
  744 + JacksonUtil.newObjectNode()
  745 + .put(
  746 + "error",
  747 + "There was a Timeout and all retry attempts have been exhausted. Retry attempts set: "
  748 + + maxRpcRetries);
856 749 } else {
857   - clientAttributes = new ArrayList<>(result.size());
858   - for (AttributeKvEntry attrEntry : result) {
859   - clientAttributes.add(toTsKvProto(attrEntry));
860   - }
861   - }
862   - return clientAttributes;
863   - }
864   -
865   - private TsKvProto toTsKvProto(AttributeKvEntry attrEntry) {
866   - return TsKvProto.newBuilder().setTs(attrEntry.getLastUpdateTs())
867   - .setKv(toKeyValueProto(attrEntry)).build();
868   - }
869   -
870   - private KeyValueProto toKeyValueProto(KvEntry kvEntry) {
871   - KeyValueProto.Builder builder = KeyValueProto.newBuilder();
872   - builder.setKey(kvEntry.getKey());
873   - switch (kvEntry.getDataType()) {
874   - case BOOLEAN:
875   - builder.setType(KeyValueType.BOOLEAN_V);
876   - builder.setBoolV(kvEntry.getBooleanValue().get());
877   - break;
878   - case DOUBLE:
879   - builder.setType(KeyValueType.DOUBLE_V);
880   - builder.setDoubleV(kvEntry.getDoubleValue().get());
881   - break;
882   - case LONG:
883   - builder.setType(KeyValueType.LONG_V);
884   - builder.setLongV(kvEntry.getLongValue().get());
885   - break;
886   - case STRING:
887   - builder.setType(KeyValueType.STRING_V);
888   - builder.setStringV(kvEntry.getStrValue().get());
889   - break;
890   - case JSON:
891   - builder.setType(KeyValueType.JSON_V);
892   - builder.setJsonV(kvEntry.getJsonValue().get());
893   - break;
  750 + md.setRetries(md.getRetries() + 1);
894 751 }
895   - return builder.build();
  752 + }
  753 +
  754 + if (md.getMsg().getMsg().isPersisted()) {
  755 + systemContext.getTbRpcService().save(tenantId, new RpcId(rpcId), status, response);
  756 + }
  757 + if (status != RpcStatus.SENT) {
  758 + sendNextPendingRequest(context);
  759 + }
  760 + } else {
  761 + log.info("[{}][{}] Rpc has already removed from pending map.", deviceId, rpcId);
896 762 }
897   -
898   - void restoreSessions() {
899   - if (systemContext.isLocalCacheType()) {
900   - return;
901   - }
902   - log.debug("[{}] Restoring sessions from cache", deviceId);
903   - DeviceSessionsCacheEntry sessionsDump = null;
904   - try {
905   - sessionsDump = DeviceSessionsCacheEntry.parseFrom(systemContext.getDeviceSessionCacheService().get(deviceId));
906   - } catch (InvalidProtocolBufferException e) {
907   - log.warn("[{}] Failed to decode device sessions from cache", deviceId);
908   - return;
909   - }
910   - if (sessionsDump.getSessionsCount() == 0) {
911   - log.debug("[{}] No session information found", deviceId);
912   - return;
913   - }
914   - // TODO: Take latest max allowed sessions size from cache
915   - for (SessionSubscriptionInfoProto sessionSubscriptionInfoProto : sessionsDump.getSessionsList()) {
916   - SessionInfoProto sessionInfoProto = sessionSubscriptionInfoProto.getSessionInfo();
917   - UUID sessionId = getSessionId(sessionInfoProto);
918   - SessionInfo sessionInfo = new SessionInfo(SessionType.ASYNC, sessionInfoProto.getNodeId());
919   - SubscriptionInfoProto subInfo = sessionSubscriptionInfoProto.getSubscriptionInfo();
920   - SessionInfoMetaData sessionMD = new SessionInfoMetaData(sessionInfo, subInfo.getLastActivityTime());
921   - sessions.put(sessionId, sessionMD);
922   - if (subInfo.getAttributeSubscription()) {
923   - attributeSubscriptions.put(sessionId, sessionInfo);
924   - sessionMD.setSubscribedToAttributes(true);
925   - }
926   - if (subInfo.getRpcSubscription()) {
927   - rpcSubscriptions.put(sessionId, sessionInfo);
928   - sessionMD.setSubscribedToRPC(true);
929   - }
930   - log.debug("[{}] Restored session: {}", deviceId, sessionMD);
931   - }
932   - log.debug("[{}] Restored sessions: {}, rpc subscriptions: {}, attribute subscriptions: {}", deviceId, sessions.size(), rpcSubscriptions.size(), attributeSubscriptions.size());
  763 + }
  764 +
  765 + private void processSubscriptionCommands(
  766 + TbActorCtx context,
  767 + SessionInfoProto sessionInfo,
  768 + SubscribeToAttributeUpdatesMsg subscribeCmd) {
  769 + UUID sessionId = getSessionId(sessionInfo);
  770 + if (subscribeCmd.getUnsubscribe()) {
  771 + log.debug("[{}] Canceling attributes subscription for session [{}]", deviceId, sessionId);
  772 + attributeSubscriptions.remove(sessionId);
  773 + } else {
  774 + SessionInfoMetaData sessionMD = sessions.get(sessionId);
  775 + if (sessionMD == null) {
  776 + sessionMD =
  777 + new SessionInfoMetaData(
  778 + new SessionInfo(subscribeCmd.getSessionType(), sessionInfo.getNodeId()));
  779 + }
  780 + sessionMD.setSubscribedToAttributes(true);
  781 + log.debug("[{}] Registering attributes subscription for session [{}]", deviceId, sessionId);
  782 + attributeSubscriptions.put(sessionId, sessionMD.getSessionInfo());
  783 + dumpSessions();
  784 + }
  785 + }
  786 +
  787 + private UUID getSessionId(SessionInfoProto sessionInfo) {
  788 + return new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
  789 + }
  790 +
  791 + private void processSubscriptionCommands(
  792 + TbActorCtx context, SessionInfoProto sessionInfo, SubscribeToRPCMsg subscribeCmd) {
  793 + UUID sessionId = getSessionId(sessionInfo);
  794 + if (subscribeCmd.getUnsubscribe()) {
  795 + log.debug("[{}] Canceling rpc subscription for session [{}]", deviceId, sessionId);
  796 + rpcSubscriptions.remove(sessionId);
  797 + } else {
  798 + SessionInfoMetaData sessionMD = sessions.get(sessionId);
  799 + if (sessionMD == null) {
  800 + sessionMD =
  801 + new SessionInfoMetaData(
  802 + new SessionInfo(subscribeCmd.getSessionType(), sessionInfo.getNodeId()));
  803 + }
  804 + sessionMD.setSubscribedToRPC(true);
  805 + log.debug("[{}] Registering rpc subscription for session [{}]", deviceId, sessionId);
  806 + rpcSubscriptions.put(sessionId, sessionMD.getSessionInfo());
  807 + sendPendingRequests(context, sessionId, sessionInfo.getNodeId());
  808 + dumpSessions();
  809 + }
  810 + }
  811 +
  812 + private void processSessionStateMsgs(SessionInfoProto sessionInfo, SessionEventMsg msg) {
  813 + UUID sessionId = getSessionId(sessionInfo);
  814 + Objects.requireNonNull(sessionId);
  815 + if (msg.getEvent() == SessionEvent.OPEN) {
  816 + if (sessions.containsKey(sessionId)) {
  817 + log.debug("[{}] Received duplicate session open event [{}]", deviceId, sessionId);
  818 + return;
  819 + }
  820 + log.debug(
  821 + "[{}] Processing new session [{}]. Current sessions size {}",
  822 + deviceId,
  823 + sessionId,
  824 + sessions.size());
  825 +
  826 + sessions.put(
  827 + sessionId,
  828 + new SessionInfoMetaData(new SessionInfo(SessionType.ASYNC, sessionInfo.getNodeId())));
  829 + if (sessions.size() == 1) {
  830 + reportSessionOpen();
  831 + }
  832 + systemContext
  833 + .getDeviceStateService()
  834 + .onDeviceActivity(tenantId, deviceId, System.currentTimeMillis());
  835 + dumpSessions();
  836 + } else if (msg.getEvent() == SessionEvent.CLOSED) {
  837 + log.debug("[{}] Canceling subscriptions for closed session [{}]", deviceId, sessionId);
  838 + sessions.remove(sessionId);
  839 + attributeSubscriptions.remove(sessionId);
  840 + rpcSubscriptions.remove(sessionId);
  841 + if (sessions.isEmpty()) {
  842 + reportSessionClose();
  843 + }
  844 + dumpSessions();
  845 + }
  846 + }
  847 +
  848 + private void handleSessionActivity(
  849 + TbActorCtx context,
  850 + SessionInfoProto sessionInfoProto,
  851 + SubscriptionInfoProto subscriptionInfo) {
  852 + UUID sessionId = getSessionId(sessionInfoProto);
  853 + Objects.requireNonNull(sessionId);
  854 +
  855 + SessionInfoMetaData sessionMD = sessions.get(sessionId);
  856 + if (sessionMD != null) {
  857 + sessionMD.setLastActivityTime(subscriptionInfo.getLastActivityTime());
  858 + sessionMD.setSubscribedToAttributes(subscriptionInfo.getAttributeSubscription());
  859 + sessionMD.setSubscribedToRPC(subscriptionInfo.getRpcSubscription());
  860 + if (subscriptionInfo.getAttributeSubscription()) {
  861 + attributeSubscriptions.putIfAbsent(sessionId, sessionMD.getSessionInfo());
  862 + }
  863 + if (subscriptionInfo.getRpcSubscription()) {
  864 + rpcSubscriptions.putIfAbsent(sessionId, sessionMD.getSessionInfo());
  865 + }
933 866 }
  867 + systemContext
  868 + .getDeviceStateService()
  869 + .onDeviceActivity(tenantId, deviceId, subscriptionInfo.getLastActivityTime());
  870 + if (sessionMD != null) {
  871 + dumpSessions();
  872 + }
  873 + }
  874 +
  875 + void processCredentialsUpdate(TbActorMsg msg) {
  876 + if (((DeviceCredentialsUpdateNotificationMsg) msg).getDeviceCredentials().getCredentialsType()
  877 + == DeviceCredentialsType.LWM2M_CREDENTIALS) {
  878 + sessions.forEach(
  879 + (k, v) -> {
  880 + notifyTransportAboutDeviceCredentialsUpdate(
  881 + k, v, ((DeviceCredentialsUpdateNotificationMsg) msg).getDeviceCredentials());
  882 + });
  883 + } else {
  884 + sessions.forEach(
  885 + (sessionId, sessionMd) ->
  886 + notifyTransportAboutClosedSession(
  887 + sessionId, sessionMd, "device credentials updated!"));
  888 + attributeSubscriptions.clear();
  889 + rpcSubscriptions.clear();
  890 + dumpSessions();
  891 + }
  892 + }
  893 +
  894 + private void notifyTransportAboutClosedSessionMaxSessionsLimit(
  895 + UUID sessionId, SessionInfoMetaData sessionMd) {
  896 + log.debug(
  897 + "remove eldest session (max concurrent sessions limit reached per device) sessionId [{}] sessionMd [{}]",
  898 + sessionId,
  899 + sessionMd);
  900 + notifyTransportAboutClosedSession(
  901 + sessionId, sessionMd, "max concurrent sessions limit reached per device!");
  902 + }
  903 +
  904 + private void notifyTransportAboutClosedSession(
  905 + UUID sessionId, SessionInfoMetaData sessionMd, String message) {
  906 + SessionCloseNotificationProto sessionCloseNotificationProto =
  907 + SessionCloseNotificationProto.newBuilder().setMessage(message).build();
  908 + ToTransportMsg msg =
  909 + ToTransportMsg.newBuilder()
  910 + .setSessionIdMSB(sessionId.getMostSignificantBits())
  911 + .setSessionIdLSB(sessionId.getLeastSignificantBits())
  912 + .setSessionCloseNotification(sessionCloseNotificationProto)
  913 + .build();
  914 + systemContext
  915 + .getTbCoreToTransportService()
  916 + .process(sessionMd.getSessionInfo().getNodeId(), msg);
  917 + }
  918 +
  919 + void notifyTransportAboutDeviceCredentialsUpdate(
  920 + UUID sessionId, SessionInfoMetaData sessionMd, DeviceCredentials deviceCredentials) {
  921 + ToTransportUpdateCredentialsProto.Builder notification =
  922 + ToTransportUpdateCredentialsProto.newBuilder();
  923 + notification.addCredentialsId(deviceCredentials.getCredentialsId());
  924 + notification.addCredentialsValue(deviceCredentials.getCredentialsValue());
  925 + ToTransportMsg msg =
  926 + ToTransportMsg.newBuilder()
  927 + .setSessionIdMSB(sessionId.getMostSignificantBits())
  928 + .setSessionIdLSB(sessionId.getLeastSignificantBits())
  929 + .setToTransportUpdateCredentialsNotification(notification)
  930 + .build();
  931 + systemContext
  932 + .getTbCoreToTransportService()
  933 + .process(sessionMd.getSessionInfo().getNodeId(), msg);
  934 + }
  935 +
  936 + void processNameOrTypeUpdate(DeviceNameOrTypeUpdateMsg msg) {
  937 + this.deviceName = msg.getDeviceName();
  938 + this.deviceType = msg.getDeviceType();
  939 + this.defaultMetaData = new TbMsgMetaData();
  940 + this.defaultMetaData.putValue("deviceName", deviceName);
  941 + this.defaultMetaData.putValue("deviceType", deviceType);
  942 + }
  943 +
  944 + void processEdgeUpdate(DeviceEdgeUpdateMsg msg) {
  945 + log.trace("[{}] Processing edge update {}", deviceId, msg);
  946 + this.edgeId = msg.getEdgeId();
  947 + }
  948 +
  949 + private void sendToTransport(GetAttributeResponseMsg responseMsg, SessionInfoProto sessionInfo) {
  950 + ToTransportMsg msg =
  951 + ToTransportMsg.newBuilder()
  952 + .setSessionIdMSB(sessionInfo.getSessionIdMSB())
  953 + .setSessionIdLSB(sessionInfo.getSessionIdLSB())
  954 + .setGetAttributesResponse(responseMsg)
  955 + .build();
  956 + systemContext.getTbCoreToTransportService().process(sessionInfo.getNodeId(), msg);
  957 + }
  958 +
  959 + private void sendToTransport(
  960 + AttributeUpdateNotificationMsg notificationMsg, UUID sessionId, String nodeId) {
  961 + ToTransportMsg msg =
  962 + ToTransportMsg.newBuilder()
  963 + .setSessionIdMSB(sessionId.getMostSignificantBits())
  964 + .setSessionIdLSB(sessionId.getLeastSignificantBits())
  965 + .setAttributeUpdateNotification(notificationMsg)
  966 + .build();
  967 + systemContext.getTbCoreToTransportService().process(nodeId, msg);
  968 + }
  969 +
  970 + private void sendToTransport(ToDeviceRpcRequestMsg rpcMsg, UUID sessionId, String nodeId) {
  971 + ToTransportMsg msg =
  972 + ToTransportMsg.newBuilder()
  973 + .setSessionIdMSB(sessionId.getMostSignificantBits())
  974 + .setSessionIdLSB(sessionId.getLeastSignificantBits())
  975 + .setToDeviceRequest(rpcMsg)
  976 + .build();
  977 + systemContext.getTbCoreToTransportService().process(nodeId, msg);
  978 + }
  979 +
  980 + private void sendToTransport(ToServerRpcResponseMsg rpcMsg, UUID sessionId, String nodeId) {
  981 + ToTransportMsg msg =
  982 + ToTransportMsg.newBuilder()
  983 + .setSessionIdMSB(sessionId.getMostSignificantBits())
  984 + .setSessionIdLSB(sessionId.getLeastSignificantBits())
  985 + .setToServerResponse(rpcMsg)
  986 + .build();
  987 + systemContext.getTbCoreToTransportService().process(nodeId, msg);
  988 + }
  989 +
  990 + private void saveRpcRequestToEdgeQueue(ToDeviceRpcRequest msg, Integer requestId) {
  991 + EdgeEvent edgeEvent = new EdgeEvent();
  992 + edgeEvent.setTenantId(tenantId);
  993 + edgeEvent.setAction(EdgeEventActionType.RPC_CALL);
  994 + edgeEvent.setEntityId(deviceId.getId());
  995 + edgeEvent.setType(EdgeEventType.DEVICE);
  996 +
  997 + ObjectNode body = mapper.createObjectNode();
  998 + body.put("requestId", requestId);
  999 + body.put("requestUUID", msg.getId().toString());
  1000 + body.put("oneway", msg.isOneway());
  1001 + body.put("expirationTime", msg.getExpirationTime());
  1002 + body.put("method", msg.getBody().getMethod());
  1003 + body.put("params", msg.getBody().getParams());
  1004 + edgeEvent.setBody(body);
  1005 +
  1006 + edgeEvent.setEdgeId(edgeId);
  1007 + systemContext.getEdgeEventService().save(edgeEvent);
  1008 + systemContext.getClusterService().onEdgeEventUpdate(tenantId, edgeId);
  1009 + }
  1010 +
  1011 + private List<TsKvProto> toTsKvProtos(@Nullable List<AttributeKvEntry> result) {
  1012 + List<TsKvProto> clientAttributes;
  1013 + if (result == null || result.isEmpty()) {
  1014 + clientAttributes = Collections.emptyList();
  1015 + } else {
  1016 + clientAttributes = new ArrayList<>(result.size());
  1017 + for (AttributeKvEntry attrEntry : result) {
  1018 + clientAttributes.add(toTsKvProto(attrEntry));
  1019 + }
  1020 + }
  1021 + return clientAttributes;
  1022 + }
  1023 +
  1024 + private TsKvProto toTsKvProto(AttributeKvEntry attrEntry) {
  1025 + return TsKvProto.newBuilder()
  1026 + .setTs(attrEntry.getLastUpdateTs())
  1027 + .setKv(toKeyValueProto(attrEntry))
  1028 + .build();
  1029 + }
  1030 +
  1031 + private KeyValueProto toKeyValueProto(KvEntry kvEntry) {
  1032 + KeyValueProto.Builder builder = KeyValueProto.newBuilder();
  1033 + builder.setKey(kvEntry.getKey());
  1034 + switch (kvEntry.getDataType()) {
  1035 + case BOOLEAN:
  1036 + builder.setType(KeyValueType.BOOLEAN_V);
  1037 + builder.setBoolV(kvEntry.getBooleanValue().get());
  1038 + break;
  1039 + case DOUBLE:
  1040 + builder.setType(KeyValueType.DOUBLE_V);
  1041 + builder.setDoubleV(kvEntry.getDoubleValue().get());
  1042 + break;
  1043 + case LONG:
  1044 + builder.setType(KeyValueType.LONG_V);
  1045 + builder.setLongV(kvEntry.getLongValue().get());
  1046 + break;
  1047 + case STRING:
  1048 + builder.setType(KeyValueType.STRING_V);
  1049 + builder.setStringV(kvEntry.getStrValue().get());
  1050 + break;
  1051 + case JSON:
  1052 + builder.setType(KeyValueType.JSON_V);
  1053 + builder.setJsonV(kvEntry.getJsonValue().get());
  1054 + break;
  1055 + }
  1056 + return builder.build();
  1057 + }
934 1058
935   - private void dumpSessions() {
936   - if (systemContext.isLocalCacheType()) {
  1059 + void restoreSessions() {
  1060 + if (systemContext.isLocalCacheType()) {
  1061 + return;
  1062 + }
  1063 + log.debug("[{}] Restoring sessions from cache", deviceId);
  1064 + DeviceSessionsCacheEntry sessionsDump = null;
  1065 + try {
  1066 + sessionsDump =
  1067 + DeviceSessionsCacheEntry.parseFrom(
  1068 + systemContext.getDeviceSessionCacheService().get(deviceId));
  1069 + } catch (InvalidProtocolBufferException e) {
  1070 + log.warn("[{}] Failed to decode device sessions from cache", deviceId);
  1071 + return;
  1072 + }
  1073 + if (sessionsDump.getSessionsCount() == 0) {
  1074 + log.debug("[{}] No session information found", deviceId);
  1075 + return;
  1076 + }
  1077 + // TODO: Take latest max allowed sessions size from cache
  1078 + for (SessionSubscriptionInfoProto sessionSubscriptionInfoProto :
  1079 + sessionsDump.getSessionsList()) {
  1080 + SessionInfoProto sessionInfoProto = sessionSubscriptionInfoProto.getSessionInfo();
  1081 + UUID sessionId = getSessionId(sessionInfoProto);
  1082 + SessionInfo sessionInfo = new SessionInfo(SessionType.ASYNC, sessionInfoProto.getNodeId());
  1083 + SubscriptionInfoProto subInfo = sessionSubscriptionInfoProto.getSubscriptionInfo();
  1084 + SessionInfoMetaData sessionMD =
  1085 + new SessionInfoMetaData(sessionInfo, subInfo.getLastActivityTime());
  1086 + sessions.put(sessionId, sessionMD);
  1087 + if (subInfo.getAttributeSubscription()) {
  1088 + attributeSubscriptions.put(sessionId, sessionInfo);
  1089 + sessionMD.setSubscribedToAttributes(true);
  1090 + }
  1091 + if (subInfo.getRpcSubscription()) {
  1092 + rpcSubscriptions.put(sessionId, sessionInfo);
  1093 + sessionMD.setSubscribedToRPC(true);
  1094 + }
  1095 + log.debug("[{}] Restored session: {}", deviceId, sessionMD);
  1096 + }
  1097 + log.debug(
  1098 + "[{}] Restored sessions: {}, rpc subscriptions: {}, attribute subscriptions: {}",
  1099 + deviceId,
  1100 + sessions.size(),
  1101 + rpcSubscriptions.size(),
  1102 + attributeSubscriptions.size());
  1103 + }
  1104 +
  1105 + private void dumpSessions() {
  1106 + if (systemContext.isLocalCacheType()) {
  1107 + return;
  1108 + }
  1109 + log.debug(
  1110 + "[{}] Dumping sessions: {}, rpc subscriptions: {}, attribute subscriptions: {} to cache",
  1111 + deviceId,
  1112 + sessions.size(),
  1113 + rpcSubscriptions.size(),
  1114 + attributeSubscriptions.size());
  1115 + List<SessionSubscriptionInfoProto> sessionsList = new ArrayList<>(sessions.size());
  1116 + sessions.forEach(
  1117 + (uuid, sessionMD) -> {
  1118 + if (sessionMD.getSessionInfo().getType() == SessionType.SYNC) {
937 1119 return;
938   - }
939   - log.debug("[{}] Dumping sessions: {}, rpc subscriptions: {}, attribute subscriptions: {} to cache", deviceId, sessions.size(), rpcSubscriptions.size(), attributeSubscriptions.size());
940   - List<SessionSubscriptionInfoProto> sessionsList = new ArrayList<>(sessions.size());
941   - sessions.forEach((uuid, sessionMD) -> {
942   - if (sessionMD.getSessionInfo().getType() == SessionType.SYNC) {
943   - return;
944   - }
945   - SessionInfo sessionInfo = sessionMD.getSessionInfo();
946   - SubscriptionInfoProto subscriptionInfoProto = SubscriptionInfoProto.newBuilder()
947   - .setLastActivityTime(sessionMD.getLastActivityTime())
948   - .setAttributeSubscription(sessionMD.isSubscribedToAttributes())
949   - .setRpcSubscription(sessionMD.isSubscribedToRPC()).build();
950   - SessionInfoProto sessionInfoProto = SessionInfoProto.newBuilder()
951   - .setSessionIdMSB(uuid.getMostSignificantBits())
952   - .setSessionIdLSB(uuid.getLeastSignificantBits())
953   - .setNodeId(sessionInfo.getNodeId()).build();
954   - sessionsList.add(SessionSubscriptionInfoProto.newBuilder()
955   - .setSessionInfo(sessionInfoProto)
956   - .setSubscriptionInfo(subscriptionInfoProto).build());
957   - log.debug("[{}] Dumping session: {}", deviceId, sessionMD);
  1120 + }
  1121 + SessionInfo sessionInfo = sessionMD.getSessionInfo();
  1122 + SubscriptionInfoProto subscriptionInfoProto =
  1123 + SubscriptionInfoProto.newBuilder()
  1124 + .setLastActivityTime(sessionMD.getLastActivityTime())
  1125 + .setAttributeSubscription(sessionMD.isSubscribedToAttributes())
  1126 + .setRpcSubscription(sessionMD.isSubscribedToRPC())
  1127 + .build();
  1128 + SessionInfoProto sessionInfoProto =
  1129 + SessionInfoProto.newBuilder()
  1130 + .setSessionIdMSB(uuid.getMostSignificantBits())
  1131 + .setSessionIdLSB(uuid.getLeastSignificantBits())
  1132 + .setNodeId(sessionInfo.getNodeId())
  1133 + .build();
  1134 + sessionsList.add(
  1135 + SessionSubscriptionInfoProto.newBuilder()
  1136 + .setSessionInfo(sessionInfoProto)
  1137 + .setSubscriptionInfo(subscriptionInfoProto)
  1138 + .build());
  1139 + log.debug("[{}] Dumping session: {}", deviceId, sessionMD);
958 1140 });
959   - systemContext.getDeviceSessionCacheService()
960   - .put(deviceId, DeviceSessionsCacheEntry.newBuilder()
961   - .addAllSessions(sessionsList).build().toByteArray());
962   - }
963   -
964   - void init(TbActorCtx ctx) {
965   - PageLink pageLink = new PageLink(1024, 0, null, new SortOrder("createdTime"));
966   - PageData<Rpc> pageData;
967   - do {
968   - pageData = systemContext.getTbRpcService().findAllByDeviceIdAndStatus(tenantId, deviceId, RpcStatus.QUEUED, pageLink);
969   - pageData.getData().forEach(rpc -> {
970   - ToDeviceRpcRequest msg = JacksonUtil.convertValue(rpc.getRequest(), ToDeviceRpcRequest.class);
  1141 + systemContext
  1142 + .getDeviceSessionCacheService()
  1143 + .put(
  1144 + deviceId,
  1145 + DeviceSessionsCacheEntry.newBuilder()
  1146 + .addAllSessions(sessionsList)
  1147 + .build()
  1148 + .toByteArray());
  1149 + }
  1150 +
  1151 + void init(TbActorCtx ctx) {
  1152 + PageLink pageLink = new PageLink(1024, 0, null, new SortOrder("createdTime"));
  1153 + PageData<Rpc> pageData;
  1154 + do {
  1155 + pageData =
  1156 + systemContext
  1157 + .getTbRpcService()
  1158 + .findAllByDeviceIdAndStatus(tenantId, deviceId, RpcStatus.QUEUED, pageLink);
  1159 + pageData
  1160 + .getData()
  1161 + .forEach(
  1162 + rpc -> {
  1163 + ToDeviceRpcRequest msg =
  1164 + JacksonUtil.convertValue(rpc.getRequest(), ToDeviceRpcRequest.class);
971 1165 long timeout = rpc.getExpirationTime() - System.currentTimeMillis();
972 1166 if (timeout <= 0) {
973   - rpc.setStatus(RpcStatus.EXPIRED);
974   - systemContext.getTbRpcService().save(tenantId, rpc);
  1167 + rpc.setStatus(RpcStatus.EXPIRED);
  1168 + systemContext.getTbRpcService().save(tenantId, rpc);
975 1169 } else {
976   - registerPendingRpcRequest(ctx, new ToDeviceRpcRequestActorMsg(systemContext.getServiceId(), msg), false, creteToDeviceRpcRequestMsg(msg), timeout);
977   - }
978   - });
979   - if (pageData.hasNext()) {
980   - pageLink = pageLink.nextPageLink();
981   - }
982   - } while (pageData.hasNext());
983   - }
984   -
985   - void checkSessionsTimeout() {
986   - final long expTime = System.currentTimeMillis() - systemContext.getSessionInactivityTimeout();
987   - List<UUID> expiredIds = null;
988   -
989   - for (Map.Entry<UUID, SessionInfoMetaData> kv : sessions.entrySet()) { //entry set are cached for stable sessions
990   - if (kv.getValue().getLastActivityTime() < expTime) {
991   - final UUID id = kv.getKey();
992   - if (expiredIds == null) {
993   - expiredIds = new ArrayList<>(1); //most of the expired sessions is a single event
  1170 + registerPendingRpcRequest(
  1171 + ctx,
  1172 + new ToDeviceRpcRequestActorMsg(systemContext.getServiceId(), msg),
  1173 + false,
  1174 + creteToDeviceRpcRequestMsg(msg),
  1175 + timeout);
994 1176 }
995   - expiredIds.add(id);
996   - }
  1177 + });
  1178 + if (pageData.hasNext()) {
  1179 + pageLink = pageLink.nextPageLink();
  1180 + }
  1181 + } while (pageData.hasNext());
  1182 + }
  1183 +
  1184 + void checkSessionsTimeout() {
  1185 + final long expTime = System.currentTimeMillis() - systemContext.getSessionInactivityTimeout();
  1186 + List<UUID> expiredIds = null;
  1187 +
  1188 + for (Map.Entry<UUID, SessionInfoMetaData> kv :
  1189 + sessions.entrySet()) { // entry set are cached for stable sessions
  1190 + if (kv.getValue().getLastActivityTime() < expTime) {
  1191 + final UUID id = kv.getKey();
  1192 + if (expiredIds == null) {
  1193 + expiredIds = new ArrayList<>(1); // most of the expired sessions is a single event
997 1194 }
  1195 + expiredIds.add(id);
  1196 + }
  1197 + }
998 1198
999   - if (expiredIds != null) {
1000   - int removed = 0;
1001   - for (UUID id : expiredIds) {
1002   - final SessionInfoMetaData session = sessions.remove(id);
1003   - rpcSubscriptions.remove(id);
1004   - attributeSubscriptions.remove(id);
1005   - if (session != null) {
1006   - removed++;
1007   - notifyTransportAboutClosedSession(id, session, SESSION_TIMEOUT_MESSAGE);
1008   - }
1009   - }
1010   - if (removed != 0) {
1011   - dumpSessions();
1012   - }
  1199 + if (expiredIds != null) {
  1200 + int removed = 0;
  1201 + for (UUID id : expiredIds) {
  1202 + final SessionInfoMetaData session = sessions.remove(id);
  1203 + rpcSubscriptions.remove(id);
  1204 + attributeSubscriptions.remove(id);
  1205 + if (session != null) {
  1206 + removed++;
  1207 + notifyTransportAboutClosedSession(id, session, SESSION_TIMEOUT_MESSAGE);
1013 1208 }
1014   -
  1209 + }
  1210 + if (removed != 0) {
  1211 + dumpSessions();
  1212 + }
1015 1213 }
1016   -
  1214 + }
1017 1215 }
... ...
... ... @@ -78,6 +78,7 @@ import org.thingsboard.server.dao.rule.RuleChainService;
78 78 import org.thingsboard.server.dao.tenant.TenantService;
79 79 import org.thingsboard.server.dao.timeseries.TimeseriesService;
80 80 import org.thingsboard.server.dao.user.UserService;
  81 +import org.thingsboard.server.dao.yunteng.service.TkDeviceService;
81 82 import org.thingsboard.server.gen.transport.TransportProtos;
82 83 import org.thingsboard.server.queue.TbQueueCallback;
83 84 import org.thingsboard.server.queue.TbQueueMsgMetadata;
... ... @@ -689,4 +690,10 @@ class DefaultTbContext implements TbContext {
689 690 }
690 691 }
691 692 }
  693 +
  694 + //Thingskit function
  695 + @Override
  696 + public TkDeviceService getTkDeviceService() {
  697 + return mainCtx.getTkDeviceService();
  698 + }
692 699 }
... ...
... ... @@ -21,27 +21,19 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
21 21 import lombok.extern.slf4j.Slf4j;
22 22 import org.springframework.beans.factory.annotation.Autowired;
23 23 import org.springframework.stereotype.Service;
24   -import org.thingsboard.common.util.JacksonUtil;
25 24 import org.thingsboard.common.util.ThingsBoardThreadFactory;
26 25 import org.thingsboard.server.actors.ActorSystemContext;
27 26 import org.thingsboard.server.cluster.TbClusterService;
28 27 import org.thingsboard.server.common.data.DataConstants;
29 28 import org.thingsboard.server.common.data.Device;
30   -import org.thingsboard.server.common.data.StringUtils;
31 29 import org.thingsboard.server.common.data.User;
32   -import org.thingsboard.server.common.data.id.DeviceId;
33 30 import org.thingsboard.server.common.data.rpc.RpcError;
34   -import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
35   -import org.thingsboard.server.common.data.yunteng.constant.FastIotConstants;
36   -import org.thingsboard.server.common.data.yunteng.dto.DeviceDTO;
37   -import org.thingsboard.server.common.data.yunteng.enums.DeviceTypeEnum;
38 31 import org.thingsboard.server.common.msg.TbMsg;
39 32 import org.thingsboard.server.common.msg.TbMsgDataType;
40 33 import org.thingsboard.server.common.msg.TbMsgMetaData;
41 34 import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse;
42 35 import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
43 36 import org.thingsboard.server.dao.device.DeviceService;
44   -import org.thingsboard.server.dao.yunteng.service.TkDeviceService;
45 37 import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
46 38 import org.thingsboard.server.queue.util.TbCoreComponent;
47 39 import org.thingsboard.server.service.security.model.SecurityUser;
... ... @@ -79,8 +71,6 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService {
79 71 private ScheduledExecutorService scheduler;
80 72 private String serviceId;
81 73
82   - @Autowired
83   - protected TkDeviceService tkDeviceService;
84 74 public DefaultTbCoreDeviceRpcService(DeviceService deviceService, TbClusterService clusterService, TbServiceInfoProvider serviceInfoProvider,
85 75 ActorSystemContext actorContext) {
86 76 this.deviceService = deviceService;
... ... @@ -110,32 +100,6 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService {
110 100 @Override
111 101 public void processRestApiRpcRequest(ToDeviceRpcRequest request, Consumer<FromDeviceRpcResponse> responseConsumer, SecurityUser currentUser) {
112 102 log.trace("[{}][{}] Processing REST API call to rule engine [{}]", request.getTenantId(), request.getId(), request.getDeviceId());
113   -
114   - //Thingskit function 修改RPC三要素:设备ID、参数、附加信息
115   - //只有命令下发的时候走此逻辑,其他method的时候不走以下逻辑
116   - ToDeviceRpcRequestBody body = request.getBody();
117   - if(body.getMethod().contains("method"))
118   - {
119   - DeviceId targetId = request.getDeviceId();
120   - DeviceDTO targetDevice = tkDeviceService.findDeviceInfoByTbDeviceId(currentUser.getCurrentTenantId(), targetId.getId().toString());
121   - DeviceId netEnableId = DeviceTypeEnum.SENSOR == targetDevice.getDeviceType()?new DeviceId(UUID.fromString(targetDevice.getGatewayId())):targetId;
122   - String paramStr = body.getParams();
123   - if(DeviceTypeEnum.SENSOR == targetDevice.getDeviceType() && paramStr.contains("}")){
124   - ObjectNode methodParams = (ObjectNode) JacksonUtil.toJsonNode(paramStr);
125   - methodParams.put(FastIotConstants.Rpc.TARGET_NAME,targetDevice.getName());
126   - body = new ToDeviceRpcRequestBody(body.getMethod(),JacksonUtil.toString(methodParams));
127   - }
128   -
129   - ObjectNode additional = JacksonUtil.newObjectNode();
130   - if(StringUtils.isNotBlank(request.getAdditionalInfo())){
131   - additional = (ObjectNode) JacksonUtil.toJsonNode(request.getAdditionalInfo());
132   - }
133   - if(!additional.has(FastIotConstants.Rpc.TARGET_ID)){
134   - additional.put(FastIotConstants.Rpc.TARGET_ID, targetId.getId().toString());
135   - }
136   - request = new ToDeviceRpcRequest(request.getId(),request.getTenantId(),netEnableId,request.isOneway(),request.getExpirationTime(),body,request.isPersisted(),request.getRetries(),JacksonUtil.toString(additional));
137   -
138   - }
139 103 UUID requestId = request.getId();
140 104 localToRuleEngineRpcRequests.put(requestId, responseConsumer);
141 105 sendRpcRequestToRuleEngine(request, currentUser);
... ...
... ... @@ -44,8 +44,8 @@
44 44 <scope>provided</scope>
45 45 </dependency>
46 46 <dependency>
47   - <groupId>org.thingsboard.common</groupId>
48   - <artifactId>dao-api</artifactId>
  47 + <groupId>org.thingsboard</groupId>
  48 + <artifactId>dao</artifactId>
49 49 <scope>provided</scope>
50 50 </dependency>
51 51 <dependency>
... ...
... ... @@ -56,6 +56,7 @@ import org.thingsboard.server.dao.rule.RuleChainService;
56 56 import org.thingsboard.server.dao.tenant.TenantService;
57 57 import org.thingsboard.server.dao.timeseries.TimeseriesService;
58 58 import org.thingsboard.server.dao.user.UserService;
  59 +import org.thingsboard.server.dao.yunteng.service.TkDeviceService;
59 60
60 61 import java.util.Set;
61 62 import java.util.function.BiConsumer;
... ... @@ -285,4 +286,8 @@ public interface TbContext {
285 286 void removeListeners();
286 287
287 288 TenantProfile getTenantProfile();
  289 +
  290 +
  291 + //Thingskit function
  292 + TkDeviceService getTkDeviceService();
288 293 }
... ...
... ... @@ -16,12 +16,13 @@
16 16 package org.thingsboard.rule.engine.rpc;
17 17
18 18 import com.datastax.oss.driver.api.core.uuid.Uuids;
  19 +import com.fasterxml.jackson.databind.node.ObjectNode;
19 20 import com.google.gson.Gson;
20 21 import com.google.gson.JsonElement;
21 22 import com.google.gson.JsonObject;
22 23 import com.google.gson.JsonParser;
23 24 import lombok.extern.slf4j.Slf4j;
24   -import org.springframework.util.StringUtils;
  25 +import org.thingsboard.common.util.JacksonUtil;
25 26 import org.thingsboard.rule.engine.api.RuleEngineDeviceRpcRequest;
26 27 import org.thingsboard.rule.engine.api.RuleNode;
27 28 import org.thingsboard.rule.engine.api.TbContext;
... ... @@ -31,9 +32,17 @@ import org.thingsboard.rule.engine.api.TbNodeException;
31 32 import org.thingsboard.rule.engine.api.TbRelationTypes;
32 33 import org.thingsboard.rule.engine.api.util.TbNodeUtils;
33 34 import org.thingsboard.server.common.data.DataConstants;
  35 +import org.thingsboard.server.common.data.Device;
34 36 import org.thingsboard.server.common.data.EntityType;
  37 +import org.thingsboard.server.common.data.StringUtils;
35 38 import org.thingsboard.server.common.data.id.DeviceId;
36 39 import org.thingsboard.server.common.data.plugin.ComponentType;
  40 +import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
  41 +import org.thingsboard.server.common.data.yunteng.constant.FastIotConstants;
  42 +import org.thingsboard.server.common.data.yunteng.constant.ModelConstants;
  43 +import org.thingsboard.server.common.data.yunteng.dto.DeviceDTO;
  44 +import org.thingsboard.server.common.data.yunteng.enums.CmdTypeEnum;
  45 +import org.thingsboard.server.common.data.yunteng.enums.DeviceTypeEnum;
37 46 import org.thingsboard.server.common.msg.TbMsg;
38 47
39 48 import java.util.Random;
... ... @@ -98,12 +107,41 @@ public class TbSendRPCRequestNode implements TbNode {
98 107 String params = parseJsonData(json.get("params"));
99 108 String additionalInfo = parseJsonData(json.get(DataConstants.ADDITIONAL_INFO));
100 109
  110 +
  111 + /** Thingskit function
  112 + * 1.1、目标设备为网关子设备时,表单数据增加属性deviceName。
  113 + * 1.2、目标设备为网关子设备时,附加信息增加属性target,便于命令下发记录挂靠。
  114 + * 1.3、目标设备为网关子设备时,将RuleEngineDeviceRpcRequest对象的deviceId改为网关设备。
  115 + * 2、附加信息增加属性cmdType,记录命令下发类型,例如:定时任务。
  116 + */
  117 + UUID orginatorId = msg.getOriginator().getId();
  118 + DeviceId targetId = new DeviceId(orginatorId);
  119 + DeviceDTO targetDevice = ctx.getTkDeviceService().findDeviceInfoByTbDeviceId(ctx.getTenantId().getId().toString(), orginatorId.toString());
  120 + DeviceId netEnableId = DeviceTypeEnum.SENSOR == targetDevice.getDeviceType()?new DeviceId(UUID.fromString(targetDevice.getGatewayId())):targetId;
  121 + if(DeviceTypeEnum.SENSOR == targetDevice.getDeviceType() && params.contains("}")){
  122 + ObjectNode methodParams = (ObjectNode) JacksonUtil.toJsonNode(params);
  123 + methodParams.put(FastIotConstants.Rpc.TARGET_NAME,targetDevice.getName());
  124 + params = JacksonUtil.toString(methodParams);
  125 + }
  126 + ObjectNode additional = JacksonUtil.newObjectNode();
  127 + if(StringUtils.isNotBlank(additionalInfo)){
  128 + additional = (ObjectNode) JacksonUtil.toJsonNode(additionalInfo);
  129 + }
  130 + if(!additional.has(FastIotConstants.Rpc.TARGET_ID)){
  131 + additional.put(FastIotConstants.Rpc.TARGET_ID, orginatorId.toString());
  132 + }
  133 + if(!additional.has(ModelConstants.TablePropertyMapping.COMMAND_TYPE)){
  134 + additional.put(ModelConstants.TablePropertyMapping.COMMAND_TYPE, CmdTypeEnum.DIY.ordinal());
  135 + }
  136 + additionalInfo = JacksonUtil.toString(additional);
  137 +
  138 +
101 139 RuleEngineDeviceRpcRequest request = RuleEngineDeviceRpcRequest.builder()
102 140 .oneway(oneway)
103 141 .method(json.get("method").getAsString())
104 142 .body(params)
105 143 .tenantId(ctx.getTenantId())
106   - .deviceId(new DeviceId(msg.getOriginator().getId()))
  144 + .deviceId(netEnableId)
107 145 .requestId(requestId)
108 146 .requestUUID(requestUUID)
109 147 .originServiceId(originServiceId)
... ...