Commit f0bccc7cfdfd92b35c586be9e6b364a1e20678d1

Authored by Andrew Shvayka
1 parent 6c611694

WIP: Implementation

Showing 32 changed files with 877 additions and 1140 deletions
... ... @@ -31,6 +31,7 @@ import lombok.Setter;
31 31 import lombok.extern.slf4j.Slf4j;
32 32 import org.springframework.beans.factory.annotation.Autowired;
33 33 import org.springframework.beans.factory.annotation.Value;
  34 +import org.springframework.context.annotation.Lazy;
34 35 import org.springframework.stereotype.Component;
35 36 import org.thingsboard.rule.engine.api.MailService;
36 37 import org.thingsboard.server.actors.service.ActorService;
... ... @@ -69,6 +70,7 @@ import org.thingsboard.server.service.script.JsExecutorService;
69 70 import org.thingsboard.server.service.script.JsInvokeService;
70 71 import org.thingsboard.server.service.state.DeviceStateService;
71 72 import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
  73 +import org.thingsboard.server.service.transport.RuleEngineTransportService;
72 74
73 75 import javax.annotation.Nullable;
74 76 import java.io.IOException;
... ... @@ -204,6 +206,11 @@ public class ActorSystemContext {
204 206 @Getter
205 207 private DeviceStateService deviceStateService;
206 208
  209 + @Lazy
  210 + @Autowired
  211 + @Getter
  212 + private RuleEngineTransportService ruleEngineTransportService;
  213 +
207 214 @Value("${cluster.partition_id}")
208 215 @Getter
209 216 private long queuePartitionId;
... ...
... ... @@ -39,7 +39,6 @@ import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
39 39 import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
40 40 import org.thingsboard.server.common.msg.cluster.ServerAddress;
41 41 import org.thingsboard.server.common.msg.core.BasicActorSystemToDeviceSessionActorMsg;
42   -import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
43 42 import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
44 43 import org.thingsboard.server.common.msg.system.ServiceToRuleEngineMsg;
45 44 import org.thingsboard.server.dao.model.ModelConstants;
... ... @@ -105,7 +104,7 @@ public class AppActor extends RuleChainManagerActor {
105 104 case SERVICE_TO_RULE_ENGINE_MSG:
106 105 onServiceToRuleEngineMsg((ServiceToRuleEngineMsg) msg);
107 106 break;
108   - case DEVICE_SESSION_TO_DEVICE_ACTOR_MSG:
  107 + case TRANSPORT_TO_DEVICE_ACTOR_MSG:
109 108 case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG:
110 109 case DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG:
111 110 case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG:
... ... @@ -169,16 +168,6 @@ public class AppActor extends RuleChainManagerActor {
169 168 getOrCreateTenantActor(msg.getTenantId()).tell(msg, ActorRef.noSender());
170 169 }
171 170
172   - private void processDeviceMsg(DeviceToDeviceActorMsg deviceToDeviceActorMsg) {
173   - TenantId tenantId = deviceToDeviceActorMsg.getTenantId();
174   - ActorRef tenantActor = getOrCreateTenantActor(tenantId);
175   - if (deviceToDeviceActorMsg.getPayload().getMsgType().requiresRulesProcessing()) {
176   -// tenantActor.tell(new RuleChainDeviceMsg(deviceToDeviceActorMsg, ruleManager.getRuleChain(this.context())), context().self());
177   - } else {
178   - tenantActor.tell(deviceToDeviceActorMsg, context().self());
179   - }
180   - }
181   -
182 171 private ActorRef getOrCreateTenantActor(TenantId tenantId) {
183 172 return tenantActors.computeIfAbsent(tenantId, k -> context().actorOf(Props.create(new TenantActor.ActorCreator(systemContext, tenantId))
184 173 .withDispatcher(DefaultActorService.CORE_DISPATCHER_NAME), tenantId.toString()));
... ...
... ... @@ -26,12 +26,12 @@ import org.thingsboard.server.common.data.id.DeviceId;
26 26 import org.thingsboard.server.common.data.id.TenantId;
27 27 import org.thingsboard.server.common.msg.TbActorMsg;
28 28 import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
29   -import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
30 29 import org.thingsboard.server.common.msg.timeout.DeviceActorClientSideRpcTimeoutMsg;
31 30 import org.thingsboard.server.common.msg.timeout.DeviceActorQueueTimeoutMsg;
32 31 import org.thingsboard.server.common.msg.timeout.DeviceActorServerSideRpcTimeoutMsg;
33 32 import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
34 33 import org.thingsboard.server.service.rpc.ToServerRpcResponseActorMsg;
  34 +import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper;
35 35
36 36 public class DeviceActor extends ContextAwareActor {
37 37
... ... @@ -50,8 +50,8 @@ public class DeviceActor extends ContextAwareActor {
50 50 case CLUSTER_EVENT_MSG:
51 51 processor.processClusterEventMsg((ClusterEventMsg) msg);
52 52 break;
53   - case DEVICE_SESSION_TO_DEVICE_ACTOR_MSG:
54   - processor.process(context(), (DeviceToDeviceActorMsg) msg);
  53 + case TRANSPORT_TO_DEVICE_ACTOR_MSG:
  54 + processor.process(context(), (TransportToDeviceActorMsgWrapper) msg);
55 55 break;
56 56 case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG:
57 57 processor.processAttributesUpdate(context(), (DeviceAttributesEventNotificationMsg) msg);
... ...
1 1 /**
2 2 * Copyright © 2016-2018 The Thingsboard Authors
3   - *
  3 + * <p>
4 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7   - *
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - *
  7 + * <p>
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + * <p>
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
... ... @@ -61,7 +61,6 @@ import org.thingsboard.server.common.msg.core.TelemetryUploadRequest;
61 61 import org.thingsboard.server.common.msg.core.ToDeviceRpcRequestMsg;
62 62 import org.thingsboard.server.common.msg.core.ToDeviceRpcResponseMsg;
63 63 import org.thingsboard.server.common.msg.core.ToServerRpcRequestMsg;
64   -import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
65 64 import org.thingsboard.server.common.msg.kv.BasicAttributeKVMsg;
66 65 import org.thingsboard.server.common.msg.rpc.ToDeviceRpcRequest;
67 66 import org.thingsboard.server.common.msg.session.FromDeviceMsg;
... ... @@ -71,9 +70,11 @@ import org.thingsboard.server.common.msg.session.ToDeviceMsg;
71 70 import org.thingsboard.server.common.msg.timeout.DeviceActorClientSideRpcTimeoutMsg;
72 71 import org.thingsboard.server.common.msg.timeout.DeviceActorQueueTimeoutMsg;
73 72 import org.thingsboard.server.common.msg.timeout.DeviceActorServerSideRpcTimeoutMsg;
  73 +import org.thingsboard.server.gen.transport.TransportProtos;
74 74 import org.thingsboard.server.service.rpc.FromDeviceRpcResponse;
75 75 import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
76 76 import org.thingsboard.server.service.rpc.ToServerRpcResponseActorMsg;
  77 +import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper;
77 78
78 79 import javax.annotation.Nullable;
79 80 import java.util.ArrayList;
... ... @@ -92,6 +93,8 @@ import java.util.function.Consumer;
92 93 import java.util.function.Predicate;
93 94 import java.util.stream.Collectors;
94 95
  96 +import org.thingsboard.server.gen.transport.TransportProtos.*;
  97 +
95 98 /**
96 99 * @author Andrew Shvayka
97 100 */
... ... @@ -99,12 +102,11 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
99 102
100 103 private final TenantId tenantId;
101 104 private final DeviceId deviceId;
102   - private final Map<SessionId, SessionInfo> sessions;
103   - private final Map<SessionId, SessionInfo> attributeSubscriptions;
104   - private final Map<SessionId, SessionInfo> rpcSubscriptions;
  105 + private final Map<UUID, SessionInfo> sessions;
  106 + private final Map<UUID, SessionInfo> attributeSubscriptions;
  107 + private final Map<UUID, SessionInfo> rpcSubscriptions;
105 108 private final Map<Integer, ToDeviceRpcRequestMetadata> toDeviceRpcPendingMap;
106 109 private final Map<Integer, ToServerRpcRequestMetadata> toServerRpcPendingMap;
107   - private final Map<UUID, PendingSessionMsgData> pendingMsgs;
108 110
109 111 private final Gson gson = new Gson();
110 112 private final JsonParser jsonParser = new JsonParser();
... ... @@ -123,7 +125,6 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
123 125 this.rpcSubscriptions = new HashMap<>();
124 126 this.toDeviceRpcPendingMap = new HashMap<>();
125 127 this.toServerRpcPendingMap = new HashMap<>();
126   - this.pendingMsgs = new HashMap<>();
127 128 initAttributes();
128 129 }
129 130
... ... @@ -154,11 +155,11 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
154 155 boolean sent = rpcSubscriptions.size() > 0;
155 156 Set<SessionId> syncSessionSet = new HashSet<>();
156 157 rpcSubscriptions.entrySet().forEach(sub -> {
157   - ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(rpcRequest, sub.getKey());
158   - sendMsgToSessionActor(response, sub.getValue().getServer());
159   - if (SessionType.SYNC == sub.getValue().getType()) {
160   - syncSessionSet.add(sub.getKey());
161   - }
  158 +// ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(rpcRequest, sub.getKey());
  159 +// sendMsgToSessionActor(response, sub.getValue().getServer());
  160 +// if (SessionType.SYNC == sub.getValue().getType()) {
  161 +// syncSessionSet.add(sub.getKey());
  162 +// }
162 163 });
163 164 syncSessionSet.forEach(rpcSubscriptions::remove);
164 165
... ... @@ -191,15 +192,6 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
191 192 }
192 193 }
193 194
194   - void processQueueTimeout(ActorContext context, DeviceActorQueueTimeoutMsg msg) {
195   - PendingSessionMsgData data = pendingMsgs.remove(msg.getId());
196   - if (data != null) {
197   - logger.debug("[{}] Queue put [{}] timeout detected!", deviceId, msg.getId());
198   - ToDeviceMsg toDeviceMsg = new RuleEngineErrorMsg(data.getSessionMsgType(), RuleEngineError.QUEUE_PUT_TIMEOUT);
199   - sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(toDeviceMsg, data.getSessionId()), data.getServerAddress());
200   - }
201   - }
202   -
203 195 void processQueueAck(ActorContext context, RuleEngineQueuePutAckMsg msg) {
204 196 PendingSessionMsgData data = pendingMsgs.remove(msg.getId());
205 197 if (data != null && data.isReplyOnQueueAck()) {
... ... @@ -252,31 +244,37 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
252 244 };
253 245 }
254 246
255   - void process(ActorContext context, DeviceToDeviceActorMsg msg) {
256   - processSubscriptionCommands(context, msg);
257   - processRpcResponses(context, msg);
258   - processSessionStateMsgs(msg);
  247 + void process(ActorContext context, TransportToDeviceActorMsgWrapper wrapper) {
  248 + TransportToDeviceActorMsg msg = wrapper.getMsg();
  249 +// processSubscriptionCommands(context, msg);
  250 +// processRpcResponses(context, msg);
  251 + if (msg.hasSessionEvent()) {
  252 + processSessionStateMsgs(msg.getSessionInfo(), msg.getSessionEvent());
  253 + }
259 254
260   - SessionMsgType sessionMsgType = msg.getPayload().getMsgType();
261   - if (sessionMsgType.requiresRulesProcessing()) {
262   - switch (sessionMsgType) {
263   - case GET_ATTRIBUTES_REQUEST:
264   - handleGetAttributesRequest(msg);
265   - break;
266   - case POST_ATTRIBUTES_REQUEST:
267   - handlePostAttributesRequest(context, msg);
268   - reportActivity();
269   - break;
270   - case POST_TELEMETRY_REQUEST:
271   - handlePostTelemetryRequest(context, msg);
272   - reportActivity();
273   - break;
274   - case TO_SERVER_RPC_REQUEST:
275   - handleClientSideRPCRequest(context, msg);
276   - reportActivity();
277   - break;
278   - }
  255 + if (msg.hasPostAttributes()) {
  256 + handlePostAttributesRequest(context, msg.getSessionInfo(), msg.getPostAttributes());
  257 + reportActivity();
279 258 }
  259 + if (msg.hasPostTelemetry()) {
  260 + handlePostTelemetryRequest(context, msg.getSessionInfo(), msg.getPostTelemetry());
  261 + reportActivity();
  262 + }
  263 + if (msg.hasGetAttributes()) {
  264 + handleGetAttributesRequest(context, msg.getSessionInfo(), msg.getGetAttributes());
  265 + }
  266 +// SessionMsgType sessionMsgType = msg.getPayload().getMsgType();
  267 +// if (sessionMsgType.requiresRulesProcessing()) {
  268 +// switch (sessionMsgType) {
  269 +// case GET_ATTRIBUTES_REQUEST:
  270 +// handleGetAttributesRequest(msg);
  271 +// break;
  272 +// case TO_SERVER_RPC_REQUEST:
  273 +// handleClientSideRPCRequest(context, msg);
  274 +// reportActivity();
  275 +// break;
  276 +// }
  277 +// }
280 278 }
281 279
282 280 private void reportActivity() {
... ... @@ -291,6 +289,39 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
291 289 systemContext.getDeviceStateService().onDeviceDisconnect(deviceId);
292 290 }
293 291
  292 + private void handleGetAttributesRequest(ActorContext context, SessionInfoProto sessionInfo, GetAttributeRequestMsg request) {
  293 + ListenableFuture<List<AttributeKvEntry>> clientAttributesFuture = getAttributeKvEntries(deviceId, DataConstants.CLIENT_SCOPE, toOptionalSet(request.getClientAttributeNamesList()));
  294 + ListenableFuture<List<AttributeKvEntry>> sharedAttributesFuture = getAttributeKvEntries(deviceId, DataConstants.SHARED_SCOPE, toOptionalSet(request.getSharedAttributeNamesList()));
  295 +
  296 + Futures.addCallback(Futures.allAsList(Arrays.asList(clientAttributesFuture, sharedAttributesFuture)), new FutureCallback<List<List<AttributeKvEntry>>>() {
  297 + @Override
  298 + public void onSuccess(@Nullable List<List<AttributeKvEntry>> result) {
  299 + systemContext.getRuleEngineTransportService().process();
  300 + BasicGetAttributesResponse response = BasicGetAttributesResponse.onSuccess(request.getMsgType(),
  301 + request.getRequestId(), BasicAttributeKVMsg.from(result.get(0), result.get(1)));
  302 + sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(response, src.getSessionId()), src.getServerAddress());
  303 + }
  304 +
  305 + @Override
  306 + public void onFailure(Throwable t) {
  307 + if (t instanceof Exception) {
  308 + ToDeviceMsg toDeviceMsg = BasicStatusCodeResponse.onError(SessionMsgType.GET_ATTRIBUTES_REQUEST, request.getRequestId(), (Exception) t);
  309 + sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(toDeviceMsg, src.getSessionId()), src.getServerAddress());
  310 + } else {
  311 + logger.error("[{}] Failed to process attributes request", deviceId, t);
  312 + }
  313 + }
  314 + });
  315 + }
  316 +
  317 + private Optional<Set<String>> toOptionalSet(List<String> strings) {
  318 + if (strings == null || strings.isEmpty()) {
  319 + return Optional.empty();
  320 + } else {
  321 + return Optional.of(new HashSet<>(strings));
  322 + }
  323 + }
  324 +
294 325 private void handleGetAttributesRequest(DeviceToDeviceActorMsg src) {
295 326 GetAttributesRequest request = (GetAttributesRequest) src.getPayload();
296 327 ListenableFuture<List<AttributeKvEntry>> clientAttributesFuture = getAttributeKvEntries(deviceId, DataConstants.CLIENT_SCOPE, request.getClientAttributeNames());
... ... @@ -328,43 +359,20 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
328 359 }
329 360 }
330 361
331   - private void handlePostAttributesRequest(ActorContext context, DeviceToDeviceActorMsg src) {
332   - AttributesUpdateRequest request = (AttributesUpdateRequest) src.getPayload();
333   -
334   - JsonObject json = new JsonObject();
335   - for (AttributeKvEntry kv : request.getAttributes()) {
336   - kv.getBooleanValue().ifPresent(v -> json.addProperty(kv.getKey(), v));
337   - kv.getLongValue().ifPresent(v -> json.addProperty(kv.getKey(), v));
338   - kv.getDoubleValue().ifPresent(v -> json.addProperty(kv.getKey(), v));
339   - kv.getStrValue().ifPresent(v -> json.addProperty(kv.getKey(), v));
340   - }
341   -
342   - TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.POST_ATTRIBUTES_REQUEST.name(), deviceId, defaultMetaData.copy(), TbMsgDataType.JSON, gson.toJson(json), null, null, 0L);
343   - PendingSessionMsgData msgData = new PendingSessionMsgData(src.getSessionId(), src.getServerAddress(),
344   - SessionMsgType.POST_ATTRIBUTES_REQUEST, request.getRequestId(), true, 1);
345   - pushToRuleEngineWithTimeout(context, tbMsg, msgData);
  362 + private void handlePostAttributesRequest(ActorContext context, SessionInfoProto sessionInfo, PostAttributeMsg postAttributes) {
  363 + JsonObject json = getJsonObject(postAttributes.getKvList());
  364 + TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.POST_ATTRIBUTES_REQUEST.name(), deviceId, defaultMetaData.copy(),
  365 + TbMsgDataType.JSON, gson.toJson(json), null, null, 0L);
  366 + pushToRuleEngine(context, tbMsg);
346 367 }
347 368
348   - private void handlePostTelemetryRequest(ActorContext context, DeviceToDeviceActorMsg src) {
349   - TelemetryUploadRequest request = (TelemetryUploadRequest) src.getPayload();
350   -
351   - Map<Long, List<KvEntry>> tsData = request.getData();
352   -
353   - PendingSessionMsgData msgData = new PendingSessionMsgData(src.getSessionId(), src.getServerAddress(),
354   - SessionMsgType.POST_TELEMETRY_REQUEST, request.getRequestId(), true, tsData.size());
355   -
356   - for (Map.Entry<Long, List<KvEntry>> entry : tsData.entrySet()) {
357   - JsonObject json = new JsonObject();
358   - for (KvEntry kv : entry.getValue()) {
359   - kv.getBooleanValue().ifPresent(v -> json.addProperty(kv.getKey(), v));
360   - kv.getLongValue().ifPresent(v -> json.addProperty(kv.getKey(), v));
361   - kv.getDoubleValue().ifPresent(v -> json.addProperty(kv.getKey(), v));
362   - kv.getStrValue().ifPresent(v -> json.addProperty(kv.getKey(), v));
363   - }
  369 + private void handlePostTelemetryRequest(ActorContext context, SessionInfoProto sessionInfo, PostTelemetryMsg postTelemetry) {
  370 + for (TsKvListProto tsKv : postTelemetry.getTsKvListList()) {
  371 + JsonObject json = getJsonObject(tsKv.getKvList());
364 372 TbMsgMetaData metaData = defaultMetaData.copy();
365   - metaData.putValue("ts", entry.getKey() + "");
  373 + metaData.putValue("ts", tsKv.getTs() + "");
366 374 TbMsg tbMsg = new TbMsg(UUIDs.timeBased(), SessionMsgType.POST_TELEMETRY_REQUEST.name(), deviceId, metaData, TbMsgDataType.JSON, gson.toJson(json), null, null, 0L);
367   - pushToRuleEngineWithTimeout(context, tbMsg, msgData);
  375 + pushToRuleEngine(context, tbMsg);
368 376 }
369 377 }
370 378
... ... @@ -401,16 +409,7 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
401 409 }
402 410 }
403 411
404   - private void pushToRuleEngineWithTimeout(ActorContext context, TbMsg tbMsg, PendingSessionMsgData pendingMsgData) {
405   - SessionMsgType sessionMsgType = pendingMsgData.getSessionMsgType();
406   - int requestId = pendingMsgData.getRequestId();
407   - if (systemContext.isQueuePersistenceEnabled()) {
408   - pendingMsgs.put(tbMsg.getId(), pendingMsgData);
409   - scheduleMsgWithDelay(context, new DeviceActorQueueTimeoutMsg(tbMsg.getId(), systemContext.getQueuePersistenceTimeout()), systemContext.getQueuePersistenceTimeout());
410   - } else {
411   - ActorSystemToDeviceSessionActorMsg response = new BasicActorSystemToDeviceSessionActorMsg(BasicStatusCodeResponse.onSuccess(sessionMsgType, requestId), pendingMsgData.getSessionId());
412   - sendMsgToSessionActor(response, pendingMsgData.getServerAddress());
413   - }
  412 + private void pushToRuleEngine(ActorContext context, TbMsg tbMsg) {
414 413 context.parent().tell(new DeviceActorToRuleEngineMsg(context.self(), tbMsg), context.self());
415 414 }
416 415
... ... @@ -497,13 +496,12 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
497 496 }
498 497 }
499 498
500   - private void processSessionStateMsgs(DeviceToDeviceActorMsg msg) {
501   - SessionId sessionId = msg.getSessionId();
502   - FromDeviceMsg inMsg = msg.getPayload();
503   - if (inMsg instanceof SessionOpenMsg) {
  499 + private void processSessionStateMsgs(SessionInfoProto sessionInfo, SessionEventMsg msg) {
  500 + UUID sessionId = new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
  501 + if (msg.getEvent() == SessionEvent.OPEN) {
504 502 logger.debug("[{}] Processing new session [{}]", deviceId, sessionId);
505 503 if (sessions.size() >= systemContext.getMaxConcurrentSessionsPerDevice()) {
506   - SessionId sessionIdToRemove = sessions.keySet().stream().findFirst().orElse(null);
  504 + UUID sessionIdToRemove = sessions.keySet().stream().findFirst().orElse(null);
507 505 if (sessionIdToRemove != null) {
508 506 closeSession(sessionIdToRemove, sessions.remove(sessionIdToRemove));
509 507 }
... ... @@ -512,6 +510,10 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
512 510 if (sessions.size() == 1) {
513 511 reportSessionOpen();
514 512 }
  513 + }
  514 + FromDeviceMsg inMsg = msg.getPayload();
  515 + if (inMsg instanceof SessionOpenMsg) {
  516 + logger.debug("[{}] Processing new session [{}]", deviceId, sessionId);
515 517 } else if (inMsg instanceof SessionCloseMsg) {
516 518 logger.debug("[{}] Canceling subscriptions for closed session [{}]", deviceId, sessionId);
517 519 sessions.remove(sessionId);
... ... @@ -540,8 +542,12 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
540 542 rpcSubscriptions.clear();
541 543 }
542 544
543   - private void closeSession(SessionId sessionId, SessionInfo sessionInfo) {
544   - sendMsgToSessionActor(new BasicActorSystemToDeviceSessionActorMsg(new SessionCloseNotification(), sessionId), sessionInfo.getServer());
  545 + private void closeSession(UUID sessionId, SessionInfo sessionInfo) {
  546 + DeviceActorToTransportMsg msg = DeviceActorToTransportMsg.newBuilder()
  547 + .setSessionIdMSB(sessionId.getMostSignificantBits())
  548 + .setSessionIdLSB(sessionId.getLeastSignificantBits())
  549 + .setSessionCloseNotification(SessionCloseNotificationProto.getDefaultInstance()).build();
  550 + systemContext.getRuleEngineTransportService().process(sessionInfo.getNodeId(), msg);
545 551 }
546 552
547 553 void processNameOrTypeUpdate(DeviceNameOrTypeUpdateMsg msg) {
... ... @@ -552,4 +558,24 @@ public class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcesso
552 558 this.defaultMetaData.putValue("deviceType", deviceType);
553 559 }
554 560
  561 + private JsonObject getJsonObject(List<KeyValueProto> tsKv) {
  562 + JsonObject json = new JsonObject();
  563 + for (KeyValueProto kv : tsKv) {
  564 + switch (kv.getType()) {
  565 + case BOOLEAN_V:
  566 + json.addProperty(kv.getKey(), kv.getBoolV());
  567 + break;
  568 + case LONG_V:
  569 + json.addProperty(kv.getKey(), kv.getLongV());
  570 + break;
  571 + case DOUBLE_V:
  572 + json.addProperty(kv.getKey(), kv.getDoubleV());
  573 + break;
  574 + case STRING_V:
  575 + json.addProperty(kv.getKey(), kv.getStringV());
  576 + break;
  577 + }
  578 + }
  579 + return json;
  580 + }
555 581 }
... ...
... ... @@ -22,6 +22,7 @@ import org.thingsboard.server.common.msg.cluster.ServerAddress;
22 22 import org.thingsboard.server.common.msg.session.SessionMsgType;
23 23
24 24 import java.util.Optional;
  25 +import java.util.UUID;
25 26
26 27 /**
27 28 * Created by ashvayka on 17.04.18.
... ... @@ -30,7 +31,7 @@ import java.util.Optional;
30 31 @AllArgsConstructor
31 32 public final class PendingSessionMsgData {
32 33
33   - private final SessionId sessionId;
  34 + private final UUID sessionId;
34 35 private final Optional<ServerAddress> serverAddress;
35 36 private final SessionMsgType sessionMsgType;
36 37 private final int requestId;
... ...
1 1 /**
2 2 * Copyright © 2016-2018 The Thingsboard Authors
3   - *
  3 + * <p>
4 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7   - *
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - *
  7 + * <p>
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + * <p>
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
... ... @@ -16,10 +16,7 @@
16 16 package org.thingsboard.server.actors.device;
17 17
18 18 import lombok.Data;
19   -import org.thingsboard.server.common.msg.cluster.ServerAddress;
20   -import org.thingsboard.server.common.msg.session.SessionType;
21   -
22   -import java.util.Optional;
  19 +import org.thingsboard.server.gen.transport.TransportProtos.SessionType;
23 20
24 21 /**
25 22 * @author Andrew Shvayka
... ... @@ -27,5 +24,6 @@ import java.util.Optional;
27 24 @Data
28 25 public class SessionInfo {
29 26 private final SessionType type;
30   - private final Optional<ServerAddress> server;
  27 + private final String nodeId;
  28 +
31 29 }
... ...
1   -/**
2   - * Copyright © 2016-2018 The Thingsboard Authors
3   - *
4   - * Licensed under the Apache License, Version 2.0 (the "License");
5   - * you may not use this file except in compliance with the License.
6   - * You may obtain a copy of the License at
7   - *
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - *
10   - * Unless required by applicable law or agreed to in writing, software
11   - * distributed under the License is distributed on an "AS IS" BASIS,
12   - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   - * See the License for the specific language governing permissions and
14   - * limitations under the License.
15   - */
16   -package org.thingsboard.server.actors.session;
17   -
18   -import akka.actor.ActorContext;
19   -import akka.event.LoggingAdapter;
20   -import org.thingsboard.server.actors.ActorSystemContext;
21   -import org.thingsboard.server.actors.shared.SessionTimeoutMsg;
22   -import org.thingsboard.server.common.data.id.SessionId;
23   -import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
24   -import org.thingsboard.server.common.msg.cluster.ServerAddress;
25   -import org.thingsboard.server.common.msg.core.AttributesSubscribeMsg;
26   -import org.thingsboard.server.common.msg.core.ResponseMsg;
27   -import org.thingsboard.server.common.msg.core.RpcSubscribeMsg;
28   -import org.thingsboard.server.common.msg.core.SessionCloseMsg;
29   -import org.thingsboard.server.common.msg.core.SessionOpenMsg;
30   -import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
31   -import org.thingsboard.server.common.msg.session.BasicSessionActorToAdaptorMsg;
32   -import org.thingsboard.server.common.msg.session.FromDeviceMsg;
33   -import org.thingsboard.server.common.msg.session.FromDeviceRequestMsg;
34   -import org.thingsboard.server.common.msg.session.SessionMsgType;
35   -import org.thingsboard.server.common.msg.session.SessionType;
36   -import org.thingsboard.server.common.msg.session.ToDeviceMsg;
37   -import org.thingsboard.server.common.msg.session.TransportToDeviceSessionActorMsg;
38   -import org.thingsboard.server.common.msg.session.ex.SessionException;
39   -
40   -import java.util.HashMap;
41   -import java.util.Map;
42   -import java.util.Optional;
43   -
44   -class ASyncMsgProcessor extends AbstractSessionActorMsgProcessor {
45   -
46   - private boolean firstMsg = true;
47   - private Map<Integer, DeviceToDeviceActorMsg> pendingMap = new HashMap<>();
48   - private Optional<ServerAddress> currentTargetServer;
49   - private boolean subscribedToAttributeUpdates;
50   - private boolean subscribedToRpcCommands;
51   -
52   - public ASyncMsgProcessor(ActorSystemContext ctx, LoggingAdapter logger, SessionId sessionId) {
53   - super(ctx, logger, sessionId);
54   - }
55   -
56   - @Override
57   - protected void processToDeviceActorMsg(ActorContext ctx, TransportToDeviceSessionActorMsg msg) {
58   - updateSessionCtx(msg, SessionType.ASYNC);
59   - DeviceToDeviceActorMsg pendingMsg = toDeviceMsg(msg);
60   - FromDeviceMsg fromDeviceMsg = pendingMsg.getPayload();
61   - if (firstMsg) {
62   - if (fromDeviceMsg.getMsgType() != SessionMsgType.SESSION_OPEN) {
63   - toDeviceMsg(new SessionOpenMsg()).ifPresent(m -> forwardToAppActor(ctx, m));
64   - }
65   - firstMsg = false;
66   - }
67   - switch (fromDeviceMsg.getMsgType()) {
68   - case POST_TELEMETRY_REQUEST:
69   - case POST_ATTRIBUTES_REQUEST:
70   - FromDeviceRequestMsg requestMsg = (FromDeviceRequestMsg) fromDeviceMsg;
71   - if (requestMsg.getRequestId() >= 0) {
72   - logger.debug("[{}] Pending request {} registered", requestMsg.getRequestId(), requestMsg.getMsgType());
73   - //TODO: handle duplicates.
74   - pendingMap.put(requestMsg.getRequestId(), pendingMsg);
75   - }
76   - break;
77   - case SUBSCRIBE_ATTRIBUTES_REQUEST:
78   - subscribedToAttributeUpdates = true;
79   - break;
80   - case UNSUBSCRIBE_ATTRIBUTES_REQUEST:
81   - subscribedToAttributeUpdates = false;
82   - break;
83   - case SUBSCRIBE_RPC_COMMANDS_REQUEST:
84   - subscribedToRpcCommands = true;
85   - break;
86   - case UNSUBSCRIBE_RPC_COMMANDS_REQUEST:
87   - subscribedToRpcCommands = false;
88   - break;
89   - default:
90   - break;
91   - }
92   - currentTargetServer = forwardToAppActor(ctx, pendingMsg);
93   - }
94   -
95   - @Override
96   - public void processToDeviceMsg(ActorContext context, ToDeviceMsg msg) {
97   - try {
98   - if (msg.getSessionMsgType() != SessionMsgType.SESSION_CLOSE) {
99   - switch (msg.getSessionMsgType()) {
100   - case STATUS_CODE_RESPONSE:
101   - case GET_ATTRIBUTES_RESPONSE:
102   - ResponseMsg responseMsg = (ResponseMsg) msg;
103   - if (responseMsg.getRequestId() >= 0) {
104   - logger.debug("[{}] Pending request processed: {}", responseMsg.getRequestId(), responseMsg);
105   - pendingMap.remove(responseMsg.getRequestId());
106   - }
107   - break;
108   - default:
109   - break;
110   - }
111   - sessionCtx.onMsg(new BasicSessionActorToAdaptorMsg(this.sessionCtx, msg));
112   - } else {
113   - sessionCtx.onMsg(org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg.onCredentialsRevoked(sessionCtx.getSessionId()));
114   - }
115   - } catch (SessionException e) {
116   - logger.warning("Failed to push session response msg", e);
117   - }
118   - }
119   -
120   - @Override
121   - public void processTimeoutMsg(ActorContext context, SessionTimeoutMsg msg) {
122   - // TODO Auto-generated method stub
123   - }
124   -
125   - @Override
126   - protected void cleanupSession(ActorContext ctx) {
127   - toDeviceMsg(new SessionCloseMsg()).ifPresent(m -> forwardToAppActor(ctx, m));
128   - }
129   -
130   - @Override
131   - public void processClusterEvent(ActorContext context, ClusterEventMsg msg) {
132   - if (pendingMap.size() > 0 || subscribedToAttributeUpdates || subscribedToRpcCommands) {
133   - Optional<ServerAddress> newTargetServer = systemContext.getRoutingService().resolveById(getDeviceId());
134   - if (!newTargetServer.equals(currentTargetServer)) {
135   - firstMsg = true;
136   - currentTargetServer = newTargetServer;
137   - pendingMap.values().forEach(v -> {
138   - forwardToAppActor(context, v, currentTargetServer);
139   - if (currentTargetServer.isPresent()) {
140   - logger.debug("[{}] Forwarded msg to new server: {}", sessionId, currentTargetServer.get());
141   - } else {
142   - logger.debug("[{}] Forwarded msg to local server.", sessionId);
143   - }
144   - });
145   - if (subscribedToAttributeUpdates) {
146   - toDeviceMsg(new AttributesSubscribeMsg()).ifPresent(m -> forwardToAppActor(context, m, currentTargetServer));
147   - logger.debug("[{}] Forwarded attributes subscription.", sessionId);
148   - }
149   - if (subscribedToRpcCommands) {
150   - toDeviceMsg(new RpcSubscribeMsg()).ifPresent(m -> forwardToAppActor(context, m, currentTargetServer));
151   - logger.debug("[{}] Forwarded rpc commands subscription.", sessionId);
152   - }
153   - }
154   - }
155   - }
156   -}
1   -/**
2   - * Copyright © 2016-2018 The Thingsboard Authors
3   - *
4   - * Licensed under the Apache License, Version 2.0 (the "License");
5   - * you may not use this file except in compliance with the License.
6   - * You may obtain a copy of the License at
7   - *
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - *
10   - * Unless required by applicable law or agreed to in writing, software
11   - * distributed under the License is distributed on an "AS IS" BASIS,
12   - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   - * See the License for the specific language governing permissions and
14   - * limitations under the License.
15   - */
16   -package org.thingsboard.server.actors.session;
17   -
18   -import akka.actor.ActorContext;
19   -import akka.actor.ActorRef;
20   -import akka.event.LoggingAdapter;
21   -import org.thingsboard.server.actors.ActorSystemContext;
22   -import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor;
23   -import org.thingsboard.server.actors.shared.SessionTimeoutMsg;
24   -import org.thingsboard.server.common.data.id.DeviceId;
25   -import org.thingsboard.server.common.data.id.SessionId;
26   -import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
27   -import org.thingsboard.server.common.msg.cluster.SendToClusterMsg;
28   -import org.thingsboard.server.common.msg.cluster.ServerAddress;
29   -import org.thingsboard.server.common.msg.device.BasicDeviceToDeviceActorMsg;
30   -import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
31   -import org.thingsboard.server.common.msg.session.AdaptorToSessionActorMsg;
32   -import org.thingsboard.server.common.msg.session.FromDeviceMsg;
33   -import org.thingsboard.server.common.msg.session.SessionContext;
34   -import org.thingsboard.server.common.msg.session.SessionCtrlMsg;
35   -import org.thingsboard.server.common.msg.session.SessionType;
36   -import org.thingsboard.server.common.msg.session.ToDeviceMsg;
37   -import org.thingsboard.server.common.msg.session.TransportToDeviceSessionActorMsg;
38   -import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg;
39   -
40   -import java.util.Optional;
41   -
42   -abstract class AbstractSessionActorMsgProcessor extends AbstractContextAwareMsgProcessor {
43   -
44   - protected final SessionId sessionId;
45   - protected SessionContext sessionCtx;
46   - protected DeviceToDeviceActorMsg deviceToDeviceActorMsgPrototype;
47   -
48   - protected AbstractSessionActorMsgProcessor(ActorSystemContext ctx, LoggingAdapter logger, SessionId sessionId) {
49   - super(ctx, logger);
50   - this.sessionId = sessionId;
51   - }
52   -
53   - protected abstract void processToDeviceActorMsg(ActorContext ctx, TransportToDeviceSessionActorMsg msg);
54   -
55   - protected abstract void processTimeoutMsg(ActorContext context, SessionTimeoutMsg msg);
56   -
57   - protected abstract void processToDeviceMsg(ActorContext context, ToDeviceMsg msg);
58   -
59   - public abstract void processClusterEvent(ActorContext context, ClusterEventMsg msg);
60   -
61   - protected void processSessionCtrlMsg(ActorContext ctx, SessionCtrlMsg msg) {
62   - if (msg instanceof SessionCloseMsg) {
63   - cleanupSession(ctx);
64   - terminateSession(ctx, sessionId);
65   - }
66   - }
67   -
68   - protected void cleanupSession(ActorContext ctx) {
69   - }
70   -
71   - protected void updateSessionCtx(TransportToDeviceSessionActorMsg msg, SessionType type) {
72   - sessionCtx = msg.getSessionMsg().getSessionContext();
73   - deviceToDeviceActorMsgPrototype = new BasicDeviceToDeviceActorMsg(msg, type);
74   - }
75   -
76   - protected DeviceToDeviceActorMsg toDeviceMsg(TransportToDeviceSessionActorMsg msg) {
77   - AdaptorToSessionActorMsg adaptorMsg = msg.getSessionMsg();
78   - return new BasicDeviceToDeviceActorMsg(deviceToDeviceActorMsgPrototype, adaptorMsg.getMsg());
79   - }
80   -
81   - protected Optional<DeviceToDeviceActorMsg> toDeviceMsg(FromDeviceMsg msg) {
82   - if (deviceToDeviceActorMsgPrototype != null) {
83   - return Optional.of(new BasicDeviceToDeviceActorMsg(deviceToDeviceActorMsgPrototype, msg));
84   - } else {
85   - return Optional.empty();
86   - }
87   - }
88   -
89   - protected Optional<ServerAddress> forwardToAppActor(ActorContext ctx, DeviceToDeviceActorMsg toForward) {
90   - Optional<ServerAddress> address = systemContext.getRoutingService().resolveById(toForward.getDeviceId());
91   - forwardToAppActor(ctx, toForward, address);
92   - return address;
93   - }
94   -
95   - protected Optional<ServerAddress> forwardToAppActorIfAddressChanged(ActorContext ctx, DeviceToDeviceActorMsg toForward, Optional<ServerAddress> oldAddress) {
96   -
97   - Optional<ServerAddress> newAddress = systemContext.getRoutingService().resolveById(toForward.getDeviceId());
98   - if (!newAddress.equals(oldAddress)) {
99   - getAppActor().tell(new SendToClusterMsg(toForward.getDeviceId(), toForward
100   - .toOtherAddress(systemContext.getRoutingService().getCurrentServer())), ctx.self());
101   - }
102   - return newAddress;
103   - }
104   -
105   - protected void forwardToAppActor(ActorContext ctx, DeviceToDeviceActorMsg toForward, Optional<ServerAddress> address) {
106   - if (address.isPresent()) {
107   - systemContext.getRpcService().tell(systemContext.getEncodingService().convertToProtoDataMessage(address.get(),
108   - toForward.toOtherAddress(systemContext.getRoutingService().getCurrentServer())));
109   - } else {
110   - getAppActor().tell(toForward, ctx.self());
111   - }
112   - }
113   -
114   - public static void terminateSession(ActorContext ctx, SessionId sessionId) {
115   - ctx.parent().tell(new SessionTerminationMsg(sessionId), ActorRef.noSender());
116   - ctx.stop(ctx.self());
117   - }
118   -
119   - public DeviceId getDeviceId() {
120   - return deviceToDeviceActorMsgPrototype.getDeviceId();
121   - }
122   -}
1   -/**
2   - * Copyright © 2016-2018 The Thingsboard Authors
3   - *
4   - * Licensed under the Apache License, Version 2.0 (the "License");
5   - * you may not use this file except in compliance with the License.
6   - * You may obtain a copy of the License at
7   - *
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - *
10   - * Unless required by applicable law or agreed to in writing, software
11   - * distributed under the License is distributed on an "AS IS" BASIS,
12   - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   - * See the License for the specific language governing permissions and
14   - * limitations under the License.
15   - */
16   -package org.thingsboard.server.actors.session;
17   -
18   -import akka.actor.OneForOneStrategy;
19   -import akka.actor.SupervisorStrategy;
20   -import akka.event.Logging;
21   -import akka.event.LoggingAdapter;
22   -import org.thingsboard.server.actors.ActorSystemContext;
23   -import org.thingsboard.server.actors.service.ContextAwareActor;
24   -import org.thingsboard.server.actors.service.ContextBasedCreator;
25   -import org.thingsboard.server.actors.shared.SessionTimeoutMsg;
26   -import org.thingsboard.server.common.data.id.SessionId;
27   -import org.thingsboard.server.common.msg.TbActorMsg;
28   -import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
29   -import org.thingsboard.server.common.msg.core.ActorSystemToDeviceSessionActorMsg;
30   -import org.thingsboard.server.common.msg.session.SessionCtrlMsg;
31   -import org.thingsboard.server.common.msg.session.SessionMsg;
32   -import org.thingsboard.server.common.msg.session.SessionType;
33   -import org.thingsboard.server.common.msg.session.TransportToDeviceSessionActorMsg;
34   -import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg;
35   -import scala.concurrent.duration.Duration;
36   -
37   -public class SessionActor extends ContextAwareActor {
38   -
39   - private final LoggingAdapter logger = Logging.getLogger(getContext().system(), this);
40   -
41   - private final SessionId sessionId;
42   - private AbstractSessionActorMsgProcessor processor;
43   -
44   - private SessionActor(ActorSystemContext systemContext, SessionId sessionId) {
45   - super(systemContext);
46   - this.sessionId = sessionId;
47   - }
48   -
49   - @Override
50   - public SupervisorStrategy supervisorStrategy() {
51   - return new OneForOneStrategy(-1, Duration.Inf(),
52   - throwable -> {
53   - logger.error(throwable, "Unknown session error");
54   - if (throwable instanceof Error) {
55   - return OneForOneStrategy.escalate();
56   - } else {
57   - return OneForOneStrategy.resume();
58   - }
59   - });
60   - }
61   -
62   - @Override
63   - protected boolean process(TbActorMsg msg) {
64   - switch (msg.getMsgType()) {
65   - case TRANSPORT_TO_DEVICE_SESSION_ACTOR_MSG:
66   - processTransportToSessionMsg((TransportToDeviceSessionActorMsg) msg);
67   - break;
68   - case ACTOR_SYSTEM_TO_DEVICE_SESSION_ACTOR_MSG:
69   - processActorsToSessionMsg((ActorSystemToDeviceSessionActorMsg) msg);
70   - break;
71   - case SESSION_TIMEOUT_MSG:
72   - processTimeoutMsg((SessionTimeoutMsg) msg);
73   - break;
74   - case SESSION_CTRL_MSG:
75   - processSessionCloseMsg((SessionCtrlMsg) msg);
76   - break;
77   - case CLUSTER_EVENT_MSG:
78   - processClusterEvent((ClusterEventMsg) msg);
79   - break;
80   - default: return false;
81   - }
82   - return true;
83   - }
84   -
85   - private void processClusterEvent(ClusterEventMsg msg) {
86   - processor.processClusterEvent(context(), msg);
87   - }
88   -
89   - private void processTransportToSessionMsg(TransportToDeviceSessionActorMsg msg) {
90   - initProcessor(msg);
91   - processor.processToDeviceActorMsg(context(), msg);
92   - }
93   -
94   - private void processActorsToSessionMsg(ActorSystemToDeviceSessionActorMsg msg) {
95   - processor.processToDeviceMsg(context(), msg.getMsg());
96   - }
97   -
98   - private void processTimeoutMsg(SessionTimeoutMsg msg) {
99   - if (processor != null) {
100   - processor.processTimeoutMsg(context(), msg);
101   - } else {
102   - logger.warning("[{}] Can't process timeout msg: {} without processor", sessionId, msg);
103   - }
104   - }
105   -
106   - private void processSessionCloseMsg(SessionCtrlMsg msg) {
107   - if (processor != null) {
108   - processor.processSessionCtrlMsg(context(), msg);
109   - } else if (msg instanceof SessionCloseMsg) {
110   - AbstractSessionActorMsgProcessor.terminateSession(context(), sessionId);
111   - } else {
112   - logger.warning("[{}] Can't process session ctrl msg: {} without processor", sessionId, msg);
113   - }
114   - }
115   -
116   - private void initProcessor(TransportToDeviceSessionActorMsg msg) {
117   - if (processor == null) {
118   - SessionMsg sessionMsg = (SessionMsg) msg.getSessionMsg();
119   - if (sessionMsg.getSessionContext().getSessionType() == SessionType.SYNC) {
120   - processor = new SyncMsgProcessor(systemContext, logger, sessionId);
121   - } else {
122   - processor = new ASyncMsgProcessor(systemContext, logger, sessionId);
123   - }
124   - }
125   - }
126   -
127   - public static class ActorCreator extends ContextBasedCreator<SessionActor> {
128   - private static final long serialVersionUID = 1L;
129   -
130   - private final SessionId sessionId;
131   -
132   - public ActorCreator(ActorSystemContext context, SessionId sessionId) {
133   - super(context);
134   - this.sessionId = sessionId;
135   - }
136   -
137   - @Override
138   - public SessionActor create() throws Exception {
139   - return new SessionActor(context, sessionId);
140   - }
141   - }
142   -
143   -}
1   -/**
2   - * Copyright © 2016-2018 The Thingsboard Authors
3   - *
4   - * Licensed under the Apache License, Version 2.0 (the "License");
5   - * you may not use this file except in compliance with the License.
6   - * You may obtain a copy of the License at
7   - *
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - *
10   - * Unless required by applicable law or agreed to in writing, software
11   - * distributed under the License is distributed on an "AS IS" BASIS,
12   - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   - * See the License for the specific language governing permissions and
14   - * limitations under the License.
15   - */
16   -package org.thingsboard.server.actors.session;
17   -
18   -import akka.actor.ActorInitializationException;
19   -import akka.actor.ActorRef;
20   -import akka.actor.InvalidActorNameException;
21   -import akka.actor.LocalActorRef;
22   -import akka.actor.OneForOneStrategy;
23   -import akka.actor.Props;
24   -import akka.actor.SupervisorStrategy;
25   -import akka.actor.Terminated;
26   -import akka.event.Logging;
27   -import akka.event.LoggingAdapter;
28   -import akka.japi.Function;
29   -import org.thingsboard.server.actors.ActorSystemContext;
30   -import org.thingsboard.server.actors.service.ContextAwareActor;
31   -import org.thingsboard.server.actors.service.ContextBasedCreator;
32   -import org.thingsboard.server.actors.service.DefaultActorService;
33   -import org.thingsboard.server.actors.shared.SessionTimeoutMsg;
34   -import org.thingsboard.server.common.data.id.SessionId;
35   -import org.thingsboard.server.common.msg.TbActorMsg;
36   -import org.thingsboard.server.common.msg.aware.SessionAwareMsg;
37   -import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
38   -import org.thingsboard.server.common.msg.core.ActorSystemToDeviceSessionActorMsg;
39   -import org.thingsboard.server.common.msg.core.SessionCloseMsg;
40   -import org.thingsboard.server.common.msg.session.SessionCtrlMsg;
41   -import scala.concurrent.duration.Duration;
42   -
43   -import java.util.HashMap;
44   -import java.util.Map;
45   -
46   -public class SessionManagerActor extends ContextAwareActor {
47   -
48   - private static final int INITIAL_SESSION_MAP_SIZE = 1024;
49   -
50   - private final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
51   -
52   - private final Map<String, ActorRef> sessionActors;
53   -
54   - SessionManagerActor(ActorSystemContext systemContext) {
55   - super(systemContext);
56   - this.sessionActors = new HashMap<>(INITIAL_SESSION_MAP_SIZE);
57   - }
58   -
59   - @Override
60   - public SupervisorStrategy supervisorStrategy() {
61   - return strategy;
62   - }
63   -
64   - @Override
65   - protected boolean process(TbActorMsg msg) {
66   - //TODO Move everything here, to work with TbActorMsg
67   - return false;
68   - }
69   -
70   - @Override
71   - public void onReceive(Object msg) throws Exception {
72   - if (msg instanceof SessionCtrlMsg) {
73   - onSessionCtrlMsg((SessionCtrlMsg) msg);
74   - } else if (msg instanceof SessionAwareMsg) {
75   - forwardToSessionActor((SessionAwareMsg) msg);
76   - } else if (msg instanceof SessionTerminationMsg) {
77   - onSessionTermination((SessionTerminationMsg) msg);
78   - } else if (msg instanceof Terminated) {
79   - onTermination((Terminated) msg);
80   - } else if (msg instanceof SessionTimeoutMsg) {
81   - onSessionTimeout((SessionTimeoutMsg) msg);
82   - } else if (msg instanceof ClusterEventMsg) {
83   - broadcast(msg);
84   - }
85   - }
86   -
87   - private void broadcast(Object msg) {
88   - sessionActors.values().forEach(actorRef -> actorRef.tell(msg, ActorRef.noSender()));
89   - }
90   -
91   - private void onSessionTimeout(SessionTimeoutMsg msg) {
92   - String sessionIdStr = msg.getSessionId().toUidStr();
93   - ActorRef sessionActor = sessionActors.get(sessionIdStr);
94   - if (sessionActor != null) {
95   - sessionActor.tell(msg, ActorRef.noSender());
96   - }
97   - }
98   -
99   - private void onSessionCtrlMsg(SessionCtrlMsg msg) {
100   - String sessionIdStr = msg.getSessionId().toUidStr();
101   - ActorRef sessionActor = sessionActors.get(sessionIdStr);
102   - if (sessionActor != null) {
103   - sessionActor.tell(msg, ActorRef.noSender());
104   - }
105   - }
106   -
107   - private void onSessionTermination(SessionTerminationMsg msg) {
108   - String sessionIdStr = msg.getId().toUidStr();
109   - ActorRef sessionActor = sessionActors.remove(sessionIdStr);
110   - if (sessionActor != null) {
111   - log.debug("[{}] Removed session actor.", sessionIdStr);
112   - //TODO: onSubscriptionUpdate device actor about session close;
113   - } else {
114   - log.debug("[{}] Session actor was already removed.", sessionIdStr);
115   - }
116   - }
117   -
118   - private void forwardToSessionActor(SessionAwareMsg msg) {
119   - if (msg instanceof ActorSystemToDeviceSessionActorMsg || msg instanceof SessionCloseMsg) {
120   - String sessionIdStr = msg.getSessionId().toUidStr();
121   - ActorRef sessionActor = sessionActors.get(sessionIdStr);
122   - if (sessionActor != null) {
123   - sessionActor.tell(msg, ActorRef.noSender());
124   - } else {
125   - log.debug("[{}] Session actor was already removed.", sessionIdStr);
126   - }
127   - } else {
128   - try {
129   - getOrCreateSessionActor(msg.getSessionId()).tell(msg, self());
130   - } catch (InvalidActorNameException e) {
131   - log.info("Invalid msg : {}", msg);
132   - }
133   - }
134   - }
135   -
136   - private ActorRef getOrCreateSessionActor(SessionId sessionId) {
137   - String sessionIdStr = sessionId.toUidStr();
138   - ActorRef sessionActor = sessionActors.get(sessionIdStr);
139   - if (sessionActor == null) {
140   - log.debug("[{}] Creating session actor.", sessionIdStr);
141   - sessionActor = context().actorOf(
142   - Props.create(new SessionActor.ActorCreator(systemContext, sessionId)).withDispatcher(DefaultActorService.SESSION_DISPATCHER_NAME),
143   - sessionIdStr);
144   - sessionActors.put(sessionIdStr, sessionActor);
145   - log.debug("[{}] Created session actor.", sessionIdStr);
146   - }
147   - return sessionActor;
148   - }
149   -
150   - private void onTermination(Terminated message) {
151   - ActorRef terminated = message.actor();
152   - if (terminated instanceof LocalActorRef) {
153   - log.info("Removed actor: {}.", terminated);
154   - //TODO: cleanup session actors map
155   - } else {
156   - throw new IllegalStateException("Remote actors are not supported!");
157   - }
158   - }
159   -
160   - public static class ActorCreator extends ContextBasedCreator<SessionManagerActor> {
161   - private static final long serialVersionUID = 1L;
162   -
163   - public ActorCreator(ActorSystemContext context) {
164   - super(context);
165   - }
166   -
167   - @Override
168   - public SessionManagerActor create() throws Exception {
169   - return new SessionManagerActor(context);
170   - }
171   - }
172   -
173   - private final SupervisorStrategy strategy = new OneForOneStrategy(3, Duration.create("1 minute"), new Function<Throwable, SupervisorStrategy.Directive>() {
174   - @Override
175   - public SupervisorStrategy.Directive apply(Throwable t) {
176   - logger.error(t, "Unknown failure");
177   - return SupervisorStrategy.stop();
178   - }
179   - });
180   -}
1   -/**
2   - * Copyright © 2016-2018 The Thingsboard Authors
3   - *
4   - * Licensed under the Apache License, Version 2.0 (the "License");
5   - * you may not use this file except in compliance with the License.
6   - * You may obtain a copy of the License at
7   - *
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - *
10   - * Unless required by applicable law or agreed to in writing, software
11   - * distributed under the License is distributed on an "AS IS" BASIS,
12   - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   - * See the License for the specific language governing permissions and
14   - * limitations under the License.
15   - */
16   -package org.thingsboard.server.actors.session;
17   -
18   -import akka.actor.ActorContext;
19   -import akka.event.LoggingAdapter;
20   -import org.thingsboard.server.actors.ActorSystemContext;
21   -import org.thingsboard.server.actors.shared.SessionTimeoutMsg;
22   -import org.thingsboard.server.common.data.id.SessionId;
23   -import org.thingsboard.server.common.msg.cluster.ClusterEventMsg;
24   -import org.thingsboard.server.common.msg.cluster.ServerAddress;
25   -import org.thingsboard.server.common.msg.device.DeviceToDeviceActorMsg;
26   -import org.thingsboard.server.common.msg.session.BasicSessionActorToAdaptorMsg;
27   -import org.thingsboard.server.common.msg.session.SessionContext;
28   -import org.thingsboard.server.common.msg.session.SessionType;
29   -import org.thingsboard.server.common.msg.session.ToDeviceMsg;
30   -import org.thingsboard.server.common.msg.session.TransportToDeviceSessionActorMsg;
31   -import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg;
32   -import org.thingsboard.server.common.msg.session.ex.SessionException;
33   -
34   -import java.util.Optional;
35   -
36   -class SyncMsgProcessor extends AbstractSessionActorMsgProcessor {
37   - private DeviceToDeviceActorMsg pendingMsg;
38   - private Optional<ServerAddress> currentTargetServer;
39   - private boolean pendingResponse;
40   -
41   - public SyncMsgProcessor(ActorSystemContext ctx, LoggingAdapter logger, SessionId sessionId) {
42   - super(ctx, logger, sessionId);
43   - }
44   -
45   - @Override
46   - protected void processToDeviceActorMsg(ActorContext ctx, TransportToDeviceSessionActorMsg msg) {
47   - updateSessionCtx(msg, SessionType.SYNC);
48   - pendingMsg = toDeviceMsg(msg);
49   - pendingResponse = true;
50   - currentTargetServer = forwardToAppActor(ctx, pendingMsg);
51   - scheduleMsgWithDelay(ctx, new SessionTimeoutMsg(sessionId), getTimeout(systemContext, msg.getSessionMsg().getSessionContext()), ctx.parent());
52   - }
53   -
54   - public void processTimeoutMsg(ActorContext context, SessionTimeoutMsg msg) {
55   - if (pendingResponse) {
56   - try {
57   - sessionCtx.onMsg(SessionCloseMsg.onTimeout(sessionId));
58   - } catch (SessionException e) {
59   - logger.warning("Failed to push session close msg", e);
60   - }
61   - terminateSession(context, this.sessionId);
62   - }
63   - }
64   -
65   - public void processToDeviceMsg(ActorContext context, ToDeviceMsg msg) {
66   - try {
67   - sessionCtx.onMsg(new BasicSessionActorToAdaptorMsg(this.sessionCtx, msg));
68   - pendingResponse = false;
69   - } catch (SessionException e) {
70   - logger.warning("Failed to push session response msg", e);
71   - }
72   - terminateSession(context, this.sessionId);
73   - }
74   -
75   - @Override
76   - public void processClusterEvent(ActorContext context, ClusterEventMsg msg) {
77   - if (pendingResponse) {
78   - Optional<ServerAddress> newTargetServer = forwardToAppActorIfAddressChanged(context, pendingMsg, currentTargetServer);
79   - if (logger.isDebugEnabled()) {
80   - if (!newTargetServer.equals(currentTargetServer)) {
81   - if (newTargetServer.isPresent()) {
82   - logger.debug("[{}] Forwarded msg to new server: {}", sessionId, newTargetServer.get());
83   - } else {
84   - logger.debug("[{}] Forwarded msg to local server.", sessionId);
85   - }
86   - }
87   - }
88   - currentTargetServer = newTargetServer;
89   - }
90   - }
91   -
92   - private long getTimeout(ActorSystemContext ctx, SessionContext sessionCtx) {
93   - return sessionCtx.getTimeout() > 0 ? sessionCtx.getTimeout() : ctx.getSyncSessionTimeout();
94   - }
95   -}
... ... @@ -87,7 +87,7 @@ public class TenantActor extends RuleChainManagerActor {
87 87 case DEVICE_ACTOR_TO_RULE_ENGINE_MSG:
88 88 onDeviceActorToRuleEngineMsg((DeviceActorToRuleEngineMsg) msg);
89 89 break;
90   - case DEVICE_SESSION_TO_DEVICE_ACTOR_MSG:
  90 + case TRANSPORT_TO_DEVICE_ACTOR_MSG:
91 91 case DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG:
92 92 case DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG:
93 93 case DEVICE_NAME_OR_TYPE_UPDATE_TO_DEVICE_ACTOR_MSG:
... ...
  1 +package org.thingsboard.server.service.transport;
  2 +
  3 +import akka.actor.ActorRef;
  4 +import com.fasterxml.jackson.databind.ObjectMapper;
  5 +import lombok.extern.slf4j.Slf4j;
  6 +import org.apache.kafka.clients.consumer.ConsumerRecords;
  7 +import org.apache.kafka.clients.producer.Callback;
  8 +import org.apache.kafka.clients.producer.RecordMetadata;
  9 +import org.springframework.beans.factory.annotation.Autowired;
  10 +import org.springframework.beans.factory.annotation.Value;
  11 +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
  12 +import org.springframework.stereotype.Service;
  13 +import org.thingsboard.server.actors.ActorSystemContext;
  14 +import org.thingsboard.server.actors.service.ActorService;
  15 +import org.thingsboard.server.common.msg.cluster.ServerAddress;
  16 +import org.thingsboard.server.gen.transport.TransportProtos.DeviceActorToTransportMsg;
  17 +import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
  18 +import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
  19 +import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg;
  20 +import org.thingsboard.server.kafka.TBKafkaConsumerTemplate;
  21 +import org.thingsboard.server.kafka.TBKafkaProducerTemplate;
  22 +import org.thingsboard.server.kafka.TbKafkaSettings;
  23 +import org.thingsboard.server.service.cluster.discovery.DiscoveryService;
  24 +import org.thingsboard.server.service.cluster.routing.ClusterRoutingService;
  25 +import org.thingsboard.server.service.cluster.rpc.ClusterRpcService;
  26 +import org.thingsboard.server.service.encoding.DataDecodingEncodingService;
  27 +import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper;
  28 +
  29 +import javax.annotation.PostConstruct;
  30 +import javax.annotation.PreDestroy;
  31 +import java.time.Duration;
  32 +import java.util.Optional;
  33 +import java.util.concurrent.ExecutorService;
  34 +import java.util.concurrent.Executors;
  35 +import java.util.function.Consumer;
  36 +
  37 +/**
  38 + * Created by ashvayka on 09.10.18.
  39 + */
  40 +@Slf4j
  41 +@Service
  42 +@ConditionalOnProperty(prefix = "transport.remote", value = "enabled", havingValue = "true")
  43 +public class RemoteRuleEngineTransportService implements RuleEngineTransportService {
  44 +
  45 + private static final ObjectMapper mapper = new ObjectMapper();
  46 +
  47 + @Value("${transport.remote.rule_engine.topic}")
  48 + private String ruleEngineTopic;
  49 + @Value("${transport.remote.notifications.topic}")
  50 + private String notificationsTopic;
  51 + @Value("${transport.remote.rule_engine.poll_interval}")
  52 + private int pollDuration;
  53 + @Value("${transport.remote.rule_engine.auto_commit_interval}")
  54 + private int autoCommitInterval;
  55 +
  56 + @Autowired
  57 + private TbKafkaSettings kafkaSettings;
  58 +
  59 + @Autowired
  60 + private DiscoveryService discoveryService;
  61 +
  62 + @Autowired
  63 + private ActorSystemContext actorContext;
  64 +
  65 + @Autowired
  66 + private ActorService actorService;
  67 +
  68 + //TODO: completely replace this routing with the Kafka routing by partition ids.
  69 + @Autowired
  70 + private ClusterRoutingService routingService;
  71 + @Autowired
  72 + private ClusterRpcService rpcService;
  73 + @Autowired
  74 + private DataDecodingEncodingService encodingService;
  75 +
  76 + private TBKafkaConsumerTemplate<ToRuleEngineMsg> ruleEngineConsumer;
  77 + private TBKafkaProducerTemplate<ToTransportMsg> notificationsProducer;
  78 +
  79 + private ExecutorService mainConsumerExecutor = Executors.newSingleThreadExecutor();
  80 +
  81 + private volatile boolean stopped = false;
  82 +
  83 + @PostConstruct
  84 + public void init() {
  85 + TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<ToTransportMsg> notificationsProducerBuilder = TBKafkaProducerTemplate.builder();
  86 + notificationsProducerBuilder.settings(kafkaSettings);
  87 + notificationsProducerBuilder.defaultTopic(notificationsTopic);
  88 + notificationsProducerBuilder.encoder(new ToTransportMsgEncoder());
  89 +
  90 + notificationsProducer = notificationsProducerBuilder.build();
  91 + notificationsProducer.init();
  92 +
  93 + TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<ToRuleEngineMsg> ruleEngineConsumerBuilder = TBKafkaConsumerTemplate.builder();
  94 + ruleEngineConsumerBuilder.settings(kafkaSettings);
  95 + ruleEngineConsumerBuilder.topic(ruleEngineTopic);
  96 + ruleEngineConsumerBuilder.clientId(discoveryService.getNodeId());
  97 + ruleEngineConsumerBuilder.groupId("tb-node");
  98 + ruleEngineConsumerBuilder.autoCommit(true);
  99 + ruleEngineConsumerBuilder.autoCommitIntervalMs(autoCommitInterval);
  100 + ruleEngineConsumerBuilder.decoder(new ToRuleEngineMsgDecoder());
  101 +
  102 + ruleEngineConsumer = ruleEngineConsumerBuilder.build();
  103 + ruleEngineConsumer.subscribe();
  104 +
  105 + mainConsumerExecutor.execute(() -> {
  106 + while (!stopped) {
  107 + try {
  108 + ConsumerRecords<String, byte[]> records = ruleEngineConsumer.poll(Duration.ofMillis(pollDuration));
  109 + records.forEach(record -> {
  110 + try {
  111 + ToRuleEngineMsg toRuleEngineMsg = ruleEngineConsumer.decode(record);
  112 + if (toRuleEngineMsg.hasToDeviceActorMsg()) {
  113 + forwardToDeviceActor(toRuleEngineMsg.getToDeviceActorMsg());
  114 + }
  115 + } catch (Throwable e) {
  116 + log.warn("Failed to process the notification.", e);
  117 + }
  118 + });
  119 + } catch (Exception e) {
  120 + log.warn("Failed to obtain messages from queue.", e);
  121 + try {
  122 + Thread.sleep(pollDuration);
  123 + } catch (InterruptedException e2) {
  124 + log.trace("Failed to wait until the server has capacity to handle new requests", e2);
  125 + }
  126 + }
  127 + }
  128 + });
  129 + }
  130 +
  131 + @Override
  132 + public void process(String nodeId, DeviceActorToTransportMsg msg) {
  133 + process(nodeId, msg, null, null);
  134 + }
  135 +
  136 + @Override
  137 + public void process(String nodeId, DeviceActorToTransportMsg msg, Runnable onSuccess, Consumer<Throwable> onFailure) {
  138 + notificationsProducer.send(notificationsTopic + "." + nodeId,
  139 + ToTransportMsg.newBuilder().setToDeviceSessionMsg(msg).build()
  140 + , new QueueCallbackAdaptor(onSuccess, onFailure));
  141 + }
  142 +
  143 + private void forwardToDeviceActor(TransportToDeviceActorMsg toDeviceActorMsg) {
  144 + TransportToDeviceActorMsgWrapper wrapper = new TransportToDeviceActorMsgWrapper(toDeviceActorMsg);
  145 + Optional<ServerAddress> address = routingService.resolveById(wrapper.getDeviceId());
  146 + if (address.isPresent()) {
  147 + rpcService.tell(encodingService.convertToProtoDataMessage(address.get(), wrapper));
  148 + } else {
  149 + actorContext.getAppActor().tell(wrapper, ActorRef.noSender());
  150 + }
  151 + }
  152 +
  153 + @PreDestroy
  154 + public void destroy() {
  155 + stopped = true;
  156 + if (ruleEngineConsumer != null) {
  157 + ruleEngineConsumer.unsubscribe();
  158 + }
  159 + if (mainConsumerExecutor != null) {
  160 + mainConsumerExecutor.shutdownNow();
  161 + }
  162 + }
  163 +
  164 + private static class QueueCallbackAdaptor implements Callback {
  165 + private final Runnable onSuccess;
  166 + private final Consumer<Throwable> onFailure;
  167 +
  168 + QueueCallbackAdaptor(Runnable onSuccess, Consumer<Throwable> onFailure) {
  169 + this.onSuccess = onSuccess;
  170 + this.onFailure = onFailure;
  171 + }
  172 +
  173 + @Override
  174 + public void onCompletion(RecordMetadata metadata, Exception exception) {
  175 + if (exception == null) {
  176 + if (onSuccess != null) {
  177 + onSuccess.run();
  178 + }
  179 + } else {
  180 + if (onFailure != null) {
  181 + onFailure.accept(exception);
  182 + }
  183 + }
  184 + }
  185 + }
  186 +
  187 +}
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + * <p>
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + * <p>
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + * <p>
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.transport;
  17 +
  18 +import org.thingsboard.server.gen.transport.TransportProtos.DeviceActorToTransportMsg;
  19 +import org.thingsboard.server.gen.transport.TransportProtos.;
  20 +
  21 +import java.util.function.Consumer;
  22 +
  23 +/**
  24 + * Created by ashvayka on 05.10.18.
  25 + */
  26 +public interface RuleEngineTransportService {
  27 +
  28 + void process(String nodeId, DeviceActorToTransportMsg msg);
  29 +
  30 + void process(String nodeId, DeviceActorToTransportMsg msg, Runnable onSuccess, Consumer<Throwable> onFailure);
  31 +
  32 +}
... ...
application/src/main/java/org/thingsboard/server/service/transport/ToRuleEngineMsgDecoder.java renamed from application/src/main/java/org/thingsboard/server/actors/session/SessionTerminationMsg.java
... ... @@ -13,14 +13,19 @@
13 13 * See the License for the specific language governing permissions and
14 14 * limitations under the License.
15 15 */
16   -package org.thingsboard.server.actors.session;
  16 +package org.thingsboard.server.service.transport;
17 17
18   -import org.thingsboard.server.actors.shared.ActorTerminationMsg;
19   -import org.thingsboard.server.common.data.id.SessionId;
  18 +import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
  19 +import org.thingsboard.server.kafka.TbKafkaDecoder;
20 20
21   -public class SessionTerminationMsg extends ActorTerminationMsg<SessionId> {
  21 +import java.io.IOException;
22 22
23   - public SessionTerminationMsg(SessionId id) {
24   - super(id);
  23 +/**
  24 + * Created by ashvayka on 05.10.18.
  25 + */
  26 +public class ToRuleEngineMsgDecoder implements TbKafkaDecoder<ToRuleEngineMsg> {
  27 + @Override
  28 + public ToRuleEngineMsg decode(byte[] data) throws IOException {
  29 + return ToRuleEngineMsg.parseFrom(data);
25 30 }
26 31 }
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + * <p>
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + * <p>
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + * <p>
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.transport;
  17 +
  18 +import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
  19 +import org.thingsboard.server.kafka.TbKafkaEncoder;
  20 +
  21 +/**
  22 + * Created by ashvayka on 05.10.18.
  23 + */
  24 +public class ToTransportMsgEncoder implements TbKafkaEncoder<ToTransportMsg> {
  25 + @Override
  26 + public byte[] encode(ToTransportMsg value) {
  27 + return value.toByteArray();
  28 + }
  29 +}
... ...
  1 +package org.thingsboard.server.service.transport.msg;
  2 +
  3 +import lombok.Data;
  4 +import org.thingsboard.server.common.data.id.DeviceId;
  5 +import org.thingsboard.server.common.data.id.TenantId;
  6 +import org.thingsboard.server.common.msg.MsgType;
  7 +import org.thingsboard.server.common.msg.TbActorMsg;
  8 +import org.thingsboard.server.common.msg.aware.DeviceAwareMsg;
  9 +import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
  10 +import org.thingsboard.server.common.msg.cluster.ServerAddress;
  11 +import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg;
  12 +
  13 +import java.io.Serializable;
  14 +import java.util.UUID;
  15 +
  16 +/**
  17 + * Created by ashvayka on 09.10.18.
  18 + */
  19 +@Data
  20 +public class TransportToDeviceActorMsgWrapper implements TbActorMsg, DeviceAwareMsg, TenantAwareMsg, Serializable {
  21 +
  22 + private final TenantId tenantId;
  23 + private final DeviceId deviceId;
  24 + private final TransportToDeviceActorMsg msg;
  25 +
  26 + public TransportToDeviceActorMsgWrapper(TransportToDeviceActorMsg msg) {
  27 + this.msg = msg;
  28 + this.tenantId = new TenantId(new UUID(msg.getSessionInfo().getTenantIdMSB(), msg.getSessionInfo().getTenantIdLSB()));
  29 + this.deviceId = new DeviceId(new UUID(msg.getSessionInfo().getDeviceIdMSB(), msg.getSessionInfo().getDeviceIdLSB()));
  30 + }
  31 +
  32 + @Override
  33 + public MsgType getMsgType() {
  34 + return MsgType.TRANSPORT_TO_DEVICE_ACTOR_MSG;
  35 + }
  36 +}
... ...
... ... @@ -462,4 +462,8 @@ transport:
462 462 request_poll_interval: "${TB_TRANSPORT_RESPONSE_POLL_INTERVAL_MS:25}"
463 463 request_auto_commit_interval: "${TB_TRANSPORT_RESPONSE_AUTO_COMMIT_INTERVAL_MS:1000}"
464 464 rule_engine:
465   - topic: "${TB_RULE_ENGINE_TOPIC:tb.rule-engine}"
\ No newline at end of file
  465 + topic: "${TB_RULE_ENGINE_TOPIC:tb.rule-engine}"
  466 + poll_interval: "${TB_RULE_ENGINE_POLL_INTERVAL_MS:25}"
  467 + auto_commit_interval: "${TB_RULE_ENGINE_AUTO_COMMIT_INTERVAL_MS:100}"
  468 + notifications:
  469 + topic: "${TB_TRANSPORT_NOTIFICATIONS_TOPIC:tb.transport.notifications}"
\ No newline at end of file
... ...
... ... @@ -77,11 +77,6 @@ public enum MsgType {
77 77 */
78 78 RULE_TO_SELF_MSG,
79 79
80   - /**
81   - * Message that is sent by Session Actor to Device Actor. Represents messages from the device itself.
82   - */
83   - DEVICE_SESSION_TO_DEVICE_ACTOR_MSG,
84   -
85 80 DEVICE_ATTRIBUTES_UPDATE_TO_DEVICE_ACTOR_MSG,
86 81
87 82 DEVICE_CREDENTIALS_UPDATE_TO_DEVICE_ACTOR_MSG,
... ... @@ -111,6 +106,12 @@ public enum MsgType {
111 106 TRANSPORT_TO_DEVICE_SESSION_ACTOR_MSG,
112 107 SESSION_TIMEOUT_MSG,
113 108 SESSION_CTRL_MSG,
114   - STATS_PERSIST_TICK_MSG;
  109 + STATS_PERSIST_TICK_MSG,
  110 +
  111 +
  112 + /**
  113 + * Message that is sent by TransportRuleEngineService to Device Actor. Represents messages from the device itself.
  114 + */
  115 + TRANSPORT_TO_DEVICE_ACTOR_MSG;
115 116
116 117 }
... ...
1   -/**
2   - * Copyright © 2016-2018 The Thingsboard Authors
3   - *
4   - * Licensed under the Apache License, Version 2.0 (the "License");
5   - * you may not use this file except in compliance with the License.
6   - * You may obtain a copy of the License at
7   - *
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - *
10   - * Unless required by applicable law or agreed to in writing, software
11   - * distributed under the License is distributed on an "AS IS" BASIS,
12   - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   - * See the License for the specific language governing permissions and
14   - * limitations under the License.
15   - */
16   -package org.thingsboard.server.common.msg.device;
17   -
18   -import lombok.ToString;
19   -import org.thingsboard.server.common.data.id.CustomerId;
20   -import org.thingsboard.server.common.data.id.DeviceId;
21   -import org.thingsboard.server.common.data.id.SessionId;
22   -import org.thingsboard.server.common.data.id.TenantId;
23   -import org.thingsboard.server.common.msg.MsgType;
24   -import org.thingsboard.server.common.msg.cluster.ServerAddress;
25   -import org.thingsboard.server.common.msg.session.FromDeviceMsg;
26   -import org.thingsboard.server.common.msg.session.SessionType;
27   -import org.thingsboard.server.common.msg.session.TransportToDeviceSessionActorMsg;
28   -
29   -import java.util.Optional;
30   -
31   -@ToString
32   -public class BasicDeviceToDeviceActorMsg implements DeviceToDeviceActorMsg {
33   -
34   - private static final long serialVersionUID = -1866795134993115408L;
35   -
36   - private final TenantId tenantId;
37   - private final CustomerId customerId;
38   - private final DeviceId deviceId;
39   - private final SessionId sessionId;
40   - private final SessionType sessionType;
41   - private final ServerAddress serverAddress;
42   - private final FromDeviceMsg msg;
43   -
44   - public BasicDeviceToDeviceActorMsg(DeviceToDeviceActorMsg other, FromDeviceMsg msg) {
45   - this(null, other.getTenantId(), other.getCustomerId(), other.getDeviceId(), other.getSessionId(), other.getSessionType(), msg);
46   - }
47   -
48   - public BasicDeviceToDeviceActorMsg(TransportToDeviceSessionActorMsg msg, SessionType sessionType) {
49   - this(null, msg.getTenantId(), msg.getCustomerId(), msg.getDeviceId(), msg.getSessionId(), sessionType, msg.getSessionMsg().getMsg());
50   - }
51   -
52   - private BasicDeviceToDeviceActorMsg(ServerAddress serverAddress, TenantId tenantId, CustomerId customerId, DeviceId deviceId, SessionId sessionId, SessionType sessionType,
53   - FromDeviceMsg msg) {
54   - super();
55   - this.serverAddress = serverAddress;
56   - this.tenantId = tenantId;
57   - this.customerId = customerId;
58   - this.deviceId = deviceId;
59   - this.sessionId = sessionId;
60   - this.sessionType = sessionType;
61   - this.msg = msg;
62   - }
63   -
64   - @Override
65   - public DeviceId getDeviceId() {
66   - return deviceId;
67   - }
68   -
69   - @Override
70   - public CustomerId getCustomerId() {
71   - return customerId;
72   - }
73   -
74   - public TenantId getTenantId() {
75   - return tenantId;
76   - }
77   -
78   - @Override
79   - public SessionId getSessionId() {
80   - return sessionId;
81   - }
82   -
83   - @Override
84   - public SessionType getSessionType() {
85   - return sessionType;
86   - }
87   -
88   - @Override
89   - public Optional<ServerAddress> getServerAddress() {
90   - return Optional.ofNullable(serverAddress);
91   - }
92   -
93   - @Override
94   - public FromDeviceMsg getPayload() {
95   - return msg;
96   - }
97   -
98   - @Override
99   - public DeviceToDeviceActorMsg toOtherAddress(ServerAddress otherAddress) {
100   - return new BasicDeviceToDeviceActorMsg(otherAddress, tenantId, customerId, deviceId, sessionId, sessionType, msg);
101   - }
102   -
103   - @Override
104   - public MsgType getMsgType() {
105   - return MsgType.DEVICE_SESSION_TO_DEVICE_ACTOR_MSG;
106   - }
107   -}
1 1 /**
2 2 * Copyright © 2016-2018 The Thingsboard Authors
3   - *
  3 + * <p>
4 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7   - *
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - *
  7 + * <p>
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + * <p>
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
... ... @@ -49,7 +49,9 @@ public class TBKafkaConsumerTemplate<T> {
49 49 boolean autoCommit, int autoCommitIntervalMs) {
50 50 Properties props = settings.toProps();
51 51 props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
52   - props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
  52 + if (groupId != null) {
  53 + props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
  54 + }
53 55 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
54 56 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
55 57 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
... ...
1 1 /**
2 2 * Copyright © 2016-2018 The Thingsboard Authors
3   - *
  3 + * <p>
4 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7   - *
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - *
  7 + * <p>
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + * <p>
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
... ... @@ -20,6 +20,7 @@ import lombok.Getter;
20 20 import lombok.extern.slf4j.Slf4j;
21 21 import org.apache.kafka.clients.admin.CreateTopicsResult;
22 22 import org.apache.kafka.clients.admin.NewTopic;
  23 +import org.apache.kafka.clients.producer.Callback;
23 24 import org.apache.kafka.clients.producer.KafkaProducer;
24 25 import org.apache.kafka.clients.producer.ProducerConfig;
25 26 import org.apache.kafka.clients.producer.ProducerRecord;
... ... @@ -89,28 +90,28 @@ public class TBKafkaProducerTemplate<T> {
89 90 }
90 91 }
91 92
92   - public Future<RecordMetadata> send(String key, T value) {
93   - return send(key, value, null, null);
  93 + public Future<RecordMetadata> send(String key, T value, Callback callback) {
  94 + return send(key, value, null, callback);
94 95 }
95 96
96   - public Future<RecordMetadata> send(String key, T value, Iterable<Header> headers) {
97   - return send(key, value, null, headers);
  97 + public Future<RecordMetadata> send(String key, T value, Iterable<Header> headers, Callback callback) {
  98 + return send(key, value, null, headers, callback);
98 99 }
99 100
100   - public Future<RecordMetadata> send(String key, T value, Long timestamp, Iterable<Header> headers) {
101   - return send(this.defaultTopic, key, value, timestamp, headers);
  101 + public Future<RecordMetadata> send(String key, T value, Long timestamp, Iterable<Header> headers, Callback callback) {
  102 + return send(this.defaultTopic, key, value, timestamp, headers, callback);
102 103 }
103 104
104   - public Future<RecordMetadata> send(String topic, String key, T value, Iterable<Header> headers) {
105   - return send(topic, key, value, null, headers);
  105 + public Future<RecordMetadata> send(String topic, String key, T value, Iterable<Header> headers, Callback callback) {
  106 + return send(topic, key, value, null, headers, callback);
106 107 }
107 108
108   - public Future<RecordMetadata> send(String topic, String key, T value, Long timestamp, Iterable<Header> headers) {
  109 + public Future<RecordMetadata> send(String topic, String key, T value, Long timestamp, Iterable<Header> headers, Callback callback) {
109 110 byte[] data = encoder.encode(value);
110 111 ProducerRecord<String, byte[]> record;
111 112 Integer partition = getPartition(topic, key, value, data);
112 113 record = new ProducerRecord<>(topic, partition, timestamp, key, data, headers);
113   - return producer.send(record);
  114 + return producer.send(record, callback);
114 115 }
115 116
116 117 private Integer getPartition(String topic, String key, T value, byte[] data) {
... ...
1 1 /**
2 2 * Copyright © 2016-2018 The Thingsboard Authors
3   - *
  3 + * <p>
4 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7   - *
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - *
  7 + * <p>
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + * <p>
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
... ... @@ -77,55 +77,64 @@ public class TbKafkaResponseTemplate<Request, Response> extends AbstractTbKafkaT
77 77 requestTemplate.subscribe();
78 78 loopExecutor.submit(() -> {
79 79 while (!stopped) {
80   - while (pendingRequestCount.get() >= maxPendingRequests) {
  80 + try {
  81 + while (pendingRequestCount.get() >= maxPendingRequests) {
  82 + try {
  83 + Thread.sleep(pollInterval);
  84 + } catch (InterruptedException e) {
  85 + log.trace("Failed to wait until the server has capacity to handle new requests", e);
  86 + }
  87 + }
  88 + ConsumerRecords<String, byte[]> requests = requestTemplate.poll(Duration.ofMillis(pollInterval));
  89 + requests.forEach(request -> {
  90 + Header requestIdHeader = request.headers().lastHeader(TbKafkaSettings.REQUEST_ID_HEADER);
  91 + if (requestIdHeader == null) {
  92 + log.error("[{}] Missing requestId in header", request);
  93 + return;
  94 + }
  95 + UUID requestId = bytesToUuid(requestIdHeader.value());
  96 + if (requestId == null) {
  97 + log.error("[{}] Missing requestId in header and body", request);
  98 + return;
  99 + }
  100 + Header responseTopicHeader = request.headers().lastHeader(TbKafkaSettings.RESPONSE_TOPIC_HEADER);
  101 + if (responseTopicHeader == null) {
  102 + log.error("[{}] Missing response topic in header", request);
  103 + return;
  104 + }
  105 + String responseTopic = bytesToString(responseTopicHeader.value());
  106 + try {
  107 + pendingRequestCount.getAndIncrement();
  108 + Request decodedRequest = requestTemplate.decode(request);
  109 + AsyncCallbackTemplate.withCallbackAndTimeout(handler.handle(decodedRequest),
  110 + response -> {
  111 + pendingRequestCount.decrementAndGet();
  112 + reply(requestId, responseTopic, response);
  113 + },
  114 + e -> {
  115 + pendingRequestCount.decrementAndGet();
  116 + if (e.getCause() != null && e.getCause() instanceof TimeoutException) {
  117 + log.warn("[{}] Timedout to process the request: {}", requestId, request, e);
  118 + } else {
  119 + log.trace("[{}] Failed to process the request: {}", requestId, request, e);
  120 + }
  121 + },
  122 + requestTimeout,
  123 + timeoutExecutor,
  124 + callbackExecutor);
  125 + } catch (Throwable e) {
  126 + pendingRequestCount.decrementAndGet();
  127 + log.warn("[{}] Failed to process the request: {}", requestId, request, e);
  128 + }
  129 + });
  130 + } catch (Throwable e) {
  131 + log.warn("Failed to obtain messages from queue.", e);
81 132 try {
82 133 Thread.sleep(pollInterval);
83   - } catch (InterruptedException e) {
84   - log.trace("Failed to wait until the server has capacity to handle new requests", e);
  134 + } catch (InterruptedException e2) {
  135 + log.trace("Failed to wait until the server has capacity to handle new requests", e2);
85 136 }
86 137 }
87   - ConsumerRecords<String, byte[]> requests = requestTemplate.poll(Duration.ofMillis(pollInterval));
88   - requests.forEach(request -> {
89   - Header requestIdHeader = request.headers().lastHeader(TbKafkaSettings.REQUEST_ID_HEADER);
90   - if (requestIdHeader == null) {
91   - log.error("[{}] Missing requestId in header", request);
92   - return;
93   - }
94   - UUID requestId = bytesToUuid(requestIdHeader.value());
95   - if (requestId == null) {
96   - log.error("[{}] Missing requestId in header and body", request);
97   - return;
98   - }
99   - Header responseTopicHeader = request.headers().lastHeader(TbKafkaSettings.RESPONSE_TOPIC_HEADER);
100   - if (responseTopicHeader == null) {
101   - log.error("[{}] Missing response topic in header", request);
102   - return;
103   - }
104   - String responseTopic = bytesToString(responseTopicHeader.value());
105   - try {
106   - pendingRequestCount.getAndIncrement();
107   - Request decodedRequest = requestTemplate.decode(request);
108   - AsyncCallbackTemplate.withCallbackAndTimeout(handler.handle(decodedRequest),
109   - response -> {
110   - pendingRequestCount.decrementAndGet();
111   - reply(requestId, responseTopic, response);
112   - },
113   - e -> {
114   - pendingRequestCount.decrementAndGet();
115   - if (e.getCause() != null && e.getCause() instanceof TimeoutException) {
116   - log.warn("[{}] Timedout to process the request: {}", requestId, request, e);
117   - } else {
118   - log.trace("[{}] Failed to process the request: {}", requestId, request, e);
119   - }
120   - },
121   - requestTimeout,
122   - timeoutExecutor,
123   - callbackExecutor);
124   - } catch (Throwable e) {
125   - pendingRequestCount.decrementAndGet();
126   - log.warn("[{}] Failed to process the request: {}", requestId, request, e);
127   - }
128   - });
129 138 }
130 139 });
131 140 }
... ... @@ -141,7 +150,7 @@ public class TbKafkaResponseTemplate<Request, Response> extends AbstractTbKafkaT
141 150 }
142 151
143 152 private void reply(UUID requestId, String topic, Response response) {
144   - responseTemplate.send(topic, requestId.toString(), response, Collections.singletonList(new RecordHeader(TbKafkaSettings.REQUEST_ID_HEADER, uuidToBytes(requestId))));
  153 + responseTemplate.send(topic, requestId.toString(), response, Collections.singletonList(new RecordHeader(TbKafkaSettings.REQUEST_ID_HEADER, uuidToBytes(requestId))), null);
145 154 }
146 155
147 156 }
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + * <p>
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + * <p>
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + * <p>
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.common.transport;
  17 +
  18 +import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponseMsg;
  19 +
  20 +/**
  21 + * Created by ashvayka on 04.10.18.
  22 + */
  23 +public interface SessionMsgListener {
  24 +
  25 + void onGetAttributesResponse(GetAttributeResponseMsg getAttributesResponse);
  26 +}
... ...
1 1 /**
2 2 * Copyright © 2016-2018 The Thingsboard Authors
3   - *
  3 + * <p>
4 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7   - *
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - *
  7 + * <p>
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + * <p>
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
... ... @@ -15,23 +15,33 @@
15 15 */
16 16 package org.thingsboard.server.common.transport;
17 17
18   -import org.thingsboard.server.gen.transport.TransportProtos;
  18 +import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
  19 +import org.thingsboard.server.gen.transport.TransportProtos.PostAttributeMsg;
  20 +import org.thingsboard.server.gen.transport.TransportProtos.PostTelemetryMsg;
  21 +import org.thingsboard.server.gen.transport.TransportProtos.SessionEventMsg;
  22 +import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg;
  23 +import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg;
  24 +import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg;
19 25
20 26 /**
21 27 * Created by ashvayka on 04.10.18.
22 28 */
23 29 public interface TransportService {
24 30
25   - void process(TransportProtos.ValidateDeviceTokenRequestMsg msg,
26   - TransportServiceCallback<TransportProtos.ValidateDeviceCredentialsResponseMsg> callback);
  31 + void process(ValidateDeviceTokenRequestMsg msg,
  32 + TransportServiceCallback<ValidateDeviceCredentialsResponseMsg> callback);
27 33
28   - void process(TransportProtos.ValidateDeviceX509CertRequestMsg msg,
29   - TransportServiceCallback<TransportProtos.ValidateDeviceCredentialsResponseMsg> callback);
  34 + void process(ValidateDeviceX509CertRequestMsg msg,
  35 + TransportServiceCallback<ValidateDeviceCredentialsResponseMsg> callback);
30 36
31   - void process(TransportProtos.SessionEventMsg msg, TransportServiceCallback<Void> callback);
  37 + void process(SessionInfoProto sessionInfo, SessionEventMsg msg, TransportServiceCallback<Void> callback);
32 38
33   - void process(TransportProtos.PostTelemetryMsg msg, TransportServiceCallback<Void> callback);
  39 + void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback<Void> callback);
34 40
35   - void process(TransportProtos.PostAttributeMsg msg, TransportServiceCallback<Void> callback);
  41 + void process(SessionInfoProto sessionInfo, PostAttributeMsg msg, TransportServiceCallback<Void> callback);
  42 +
  43 + void registerSession(SessionInfoProto sessionInfo, SessionMsgListener listener);
  44 +
  45 + void deregisterSession(SessionInfoProto sessionInfo);
36 46
37 47 }
... ...
1 1 /**
2 2 * Copyright © 2016-2018 The Thingsboard Authors
3   - *
  3 + * <p>
4 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7   - *
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - *
  7 + * <p>
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + * <p>
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
... ... @@ -16,17 +16,12 @@
16 16 package org.thingsboard.server.common.transport.session;
17 17
18 18 import lombok.Data;
19   -import lombok.extern.slf4j.Slf4j;
20   -import org.thingsboard.server.common.data.Device;
21   -import org.thingsboard.server.common.data.security.DeviceCredentialsFilter;
22   -import org.thingsboard.server.common.data.security.DeviceTokenCredentials;
  19 +import lombok.Getter;
  20 +import org.thingsboard.server.common.data.id.DeviceId;
23 21 import org.thingsboard.server.common.msg.session.SessionContext;
24   -import org.thingsboard.server.common.transport.SessionMsgProcessor;
25   -import org.thingsboard.server.common.transport.auth.DeviceAuthResult;
26   -import org.thingsboard.server.common.transport.auth.DeviceAuthService;
27   -import org.thingsboard.server.gen.transport.TransportProtos;
  22 +import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto;
28 23
29   -import java.util.Optional;
  24 +import java.util.UUID;
30 25
31 26 /**
32 27 * @author Andrew Shvayka
... ... @@ -34,7 +29,9 @@ import java.util.Optional;
34 29 @Data
35 30 public abstract class DeviceAwareSessionContext implements SessionContext {
36 31
37   - private volatile TransportProtos.DeviceInfoProto deviceInfo;
  32 + @Getter
  33 + private volatile DeviceId deviceId;
  34 + private volatile DeviceInfoProto deviceInfo;
38 35
39 36 public long getDeviceIdMSB() {
40 37 return deviceInfo.getDeviceIdMSB();
... ... @@ -44,6 +41,16 @@ public abstract class DeviceAwareSessionContext implements SessionContext {
44 41 return deviceInfo.getDeviceIdLSB();
45 42 }
46 43
  44 + public DeviceId getDeviceId() {
  45 + return deviceId;
  46 + }
  47 +
  48 + public void setDeviceInfo(DeviceInfoProto deviceInfo) {
  49 + this.deviceInfo = deviceInfo;
  50 + this.deviceId = new DeviceId(new UUID(deviceInfo.getDeviceIdMSB(), deviceInfo.getDeviceIdLSB()));
  51 + }
  52 +
  53 +
47 54 public boolean isConnected() {
48 55 return deviceInfo != null;
49 56 }
... ...
... ... @@ -23,9 +23,12 @@ option java_outer_classname = "TransportProtos";
23 23 * Data Structures;
24 24 */
25 25 message SessionInfoProto {
26   - string nodeId = 1;
27   - int64 sessionIdMSB = 2;
28   - int64 sessionIdLSB = 3;
  26 + int64 sessionIdMSB = 1;
  27 + int64 sessionIdLSB = 2;
  28 + int64 tenantIdMSB = 3;
  29 + int64 tenantIdLSB = 4;
  30 + int64 deviceIdMSB = 5;
  31 + int64 deviceIdLSB = 6;
29 32 }
30 33
31 34 enum SessionEvent {
... ... @@ -33,12 +36,25 @@ enum SessionEvent {
33 36 CLOSED = 1;
34 37 }
35 38
  39 +enum SessionType {
  40 + SYNC = 0;
  41 + ASYNC = 1;
  42 +}
  43 +
  44 +enum KeyValueType {
  45 + BOOLEAN_V = 0;
  46 + LONG_V = 1;
  47 + DOUBLE_V = 2;
  48 + STRING_V = 3;
  49 +}
  50 +
36 51 message KeyValueProto {
37 52 string key = 1;
38   - bool bool_v = 2;
39   - int64 long_v = 3;
40   - double double_v = 4;
41   - string string_v = 5;
  53 + KeyValueType type = 2;
  54 + bool bool_v = 3;
  55 + int64 long_v = 4;
  56 + double double_v = 5;
  57 + string string_v = 6;
42 58 }
43 59
44 60 message TsKvListProto {
... ... @@ -60,33 +76,28 @@ message DeviceInfoProto {
60 76 * Messages that use Data Structures;
61 77 */
62 78 message SessionEventMsg {
63   - SessionInfoProto sessionInfo = 1;
64   - int64 deviceIdMSB = 2;
65   - int64 deviceIdLSB = 3;
66   - SessionEvent event = 4;
  79 + string nodeId = 1;
  80 + SessionType sessionType = 2;
  81 + SessionEvent event = 3;
67 82 }
68 83
69 84 message PostTelemetryMsg {
70   - SessionInfoProto sessionInfo = 1;
71   - repeated TsKvListProto tsKvList = 2;
  85 + repeated TsKvListProto tsKvList = 1;
72 86 }
73 87
74 88 message PostAttributeMsg {
75   - SessionInfoProto sessionInfo = 1;
76   - repeated TsKvListProto tsKvList = 2;
  89 + repeated KeyValueProto kv = 1;
77 90 }
78 91
79 92 message GetAttributeRequestMsg {
80   - SessionInfoProto sessionInfo = 1;
81   - repeated string clientAttributeNames = 2;
82   - repeated string sharedAttributeNames = 3;
  93 + repeated string clientAttributeNames = 1;
  94 + repeated string sharedAttributeNames = 2;
83 95 }
84 96
85 97 message GetAttributeResponseMsg {
86   - SessionInfoProto sessionInfo = 1;
87   - repeated TsKvListProto clientAttributeList = 2;
88   - repeated TsKvListProto sharedAttributeList = 3;
89   - repeated string deletedAttributeKeys = 4;
  98 + repeated TsKvListProto clientAttributeList = 1;
  99 + repeated TsKvListProto sharedAttributeList = 2;
  100 + repeated string deletedAttributeKeys = 3;
90 101 }
91 102
92 103 message ValidateDeviceTokenRequestMsg {
... ... @@ -101,11 +112,34 @@ message ValidateDeviceCredentialsResponseMsg {
101 112 DeviceInfoProto deviceInfo = 1;
102 113 }
103 114
  115 +message SessionCloseNotificationProto {
  116 + string message = 1;
  117 +}
  118 +
  119 +message TransportToDeviceActorMsg {
  120 + SessionInfoProto sessionInfo = 1;
  121 + SessionEventMsg sessionEvent = 2;
  122 + PostTelemetryMsg postTelemetry = 3;
  123 + PostAttributeMsg postAttributes = 4;
  124 + GetAttributeRequestMsg getAttributes = 5;
  125 +}
  126 +
  127 +message DeviceActorToTransportMsg {
  128 + int64 sessionIdMSB = 1;
  129 + int64 sessionIdLSB = 2;
  130 + SessionCloseNotificationProto sessionCloseNotification = 3;
  131 + GetAttributeResponseMsg getAttributesResponse = 4;
  132 +}
  133 +
104 134 /**
105 135 * Main messages;
106 136 */
107   -message TransportToRuleEngineMsg {
  137 +message ToRuleEngineMsg {
  138 + TransportToDeviceActorMsg toDeviceActorMsg = 1;
  139 +}
108 140
  141 +message ToTransportMsg {
  142 + DeviceActorToTransportMsg toDeviceSessionMsg = 1;
109 143 }
110 144
111 145 message TransportApiRequestMsg {
... ...
1 1 /**
2 2 * Copyright © 2016-2018 The Thingsboard Authors
3   - *
  3 + * <p>
4 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7   - *
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - *
  7 + * <p>
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + * <p>
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
... ... @@ -26,32 +26,25 @@ import io.netty.handler.codec.mqtt.MqttFixedHeader;
26 26 import io.netty.handler.codec.mqtt.MqttMessage;
27 27 import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
28 28 import io.netty.handler.codec.mqtt.MqttPubAckMessage;
29   -import io.netty.handler.codec.mqtt.MqttPublishMessage;
30 29 import io.netty.handler.codec.mqtt.MqttQoS;
31 30 import io.netty.handler.codec.mqtt.MqttSubAckMessage;
32 31 import io.netty.handler.codec.mqtt.MqttSubAckPayload;
33   -import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
34   -import io.netty.handler.codec.mqtt.MqttTopicSubscription;
35   -import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
36 32 import io.netty.handler.ssl.SslHandler;
37 33 import io.netty.util.concurrent.Future;
38 34 import io.netty.util.concurrent.GenericFutureListener;
39 35 import lombok.extern.slf4j.Slf4j;
40 36 import org.springframework.util.StringUtils;
41   -import org.thingsboard.server.common.data.Device;
42   -import org.thingsboard.server.common.data.security.DeviceTokenCredentials;
43   -import org.thingsboard.server.common.data.security.DeviceX509Credentials;
44   -import org.thingsboard.server.common.msg.core.SessionOpenMsg;
45   -import org.thingsboard.server.common.msg.session.AdaptorToSessionActorMsg;
46   -import org.thingsboard.server.common.msg.session.BasicAdaptorToSessionActorMsg;
47   -import org.thingsboard.server.common.msg.session.BasicTransportToDeviceSessionActorMsg;
48   -import org.thingsboard.server.common.msg.session.ctrl.SessionCloseMsg;
49 37 import org.thingsboard.server.common.transport.TransportService;
50 38 import org.thingsboard.server.common.transport.TransportServiceCallback;
51   -import org.thingsboard.server.common.transport.adaptor.AdaptorException;
52 39 import org.thingsboard.server.common.transport.quota.QuotaService;
53 40 import org.thingsboard.server.dao.EncryptionUtil;
54   -import org.thingsboard.server.gen.transport.TransportProtos;
  41 +import org.thingsboard.server.gen.transport.TransportProtos.DeviceInfoProto;
  42 +import org.thingsboard.server.gen.transport.TransportProtos.SessionEvent;
  43 +import org.thingsboard.server.gen.transport.TransportProtos.SessionEventMsg;
  44 +import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
  45 +import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg;
  46 +import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg;
  47 +import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg;
55 48 import org.thingsboard.server.transport.mqtt.adaptors.MqttTransportAdaptor;
56 49 import org.thingsboard.server.transport.mqtt.session.DeviceSessionCtx;
57 50 import org.thingsboard.server.transport.mqtt.session.GatewaySessionCtx;
... ... @@ -61,49 +54,19 @@ import javax.net.ssl.SSLPeerUnverifiedException;
61 54 import javax.security.cert.X509Certificate;
62 55 import java.io.IOException;
63 56 import java.net.InetSocketAddress;
64   -import java.util.ArrayList;
65 57 import java.util.List;
66 58 import java.util.UUID;
67 59 import java.util.concurrent.ConcurrentHashMap;
68 60 import java.util.concurrent.ConcurrentMap;
69 61
70   -import org.thingsboard.server.gen.transport.TransportProtos.*;
71   -
72 62 import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_ACCEPTED;
73 63 import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD;
74 64 import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED;
75 65 import static io.netty.handler.codec.mqtt.MqttMessageType.CONNACK;
76   -import static io.netty.handler.codec.mqtt.MqttMessageType.PINGRESP;
77 66 import static io.netty.handler.codec.mqtt.MqttMessageType.PUBACK;
78 67 import static io.netty.handler.codec.mqtt.MqttMessageType.SUBACK;
79   -import static io.netty.handler.codec.mqtt.MqttMessageType.UNSUBACK;
80 68 import static io.netty.handler.codec.mqtt.MqttQoS.AT_LEAST_ONCE;
81 69 import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE;
82   -import static io.netty.handler.codec.mqtt.MqttQoS.FAILURE;
83   -import static org.thingsboard.server.common.msg.session.SessionMsgType.GET_ATTRIBUTES_REQUEST;
84   -import static org.thingsboard.server.common.msg.session.SessionMsgType.POST_ATTRIBUTES_REQUEST;
85   -import static org.thingsboard.server.common.msg.session.SessionMsgType.POST_TELEMETRY_REQUEST;
86   -import static org.thingsboard.server.common.msg.session.SessionMsgType.SUBSCRIBE_ATTRIBUTES_REQUEST;
87   -import static org.thingsboard.server.common.msg.session.SessionMsgType.SUBSCRIBE_RPC_COMMANDS_REQUEST;
88   -import static org.thingsboard.server.common.msg.session.SessionMsgType.TO_DEVICE_RPC_RESPONSE;
89   -import static org.thingsboard.server.common.msg.session.SessionMsgType.TO_SERVER_RPC_REQUEST;
90   -import static org.thingsboard.server.common.msg.session.SessionMsgType.UNSUBSCRIBE_ATTRIBUTES_REQUEST;
91   -import static org.thingsboard.server.common.msg.session.SessionMsgType.UNSUBSCRIBE_RPC_COMMANDS_REQUEST;
92   -import static org.thingsboard.server.transport.mqtt.MqttTopics.BASE_GATEWAY_API_TOPIC;
93   -import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_ATTRIBUTES_REQUEST_TOPIC_PREFIX;
94   -import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_ATTRIBUTES_RESPONSES_TOPIC;
95   -import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_ATTRIBUTES_TOPIC;
96   -import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_RPC_REQUESTS_SUB_TOPIC;
97   -import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_RPC_REQUESTS_TOPIC;
98   -import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_RPC_RESPONSE_SUB_TOPIC;
99   -import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_RPC_RESPONSE_TOPIC;
100   -import static org.thingsboard.server.transport.mqtt.MqttTopics.DEVICE_TELEMETRY_TOPIC;
101   -import static org.thingsboard.server.transport.mqtt.MqttTopics.GATEWAY_ATTRIBUTES_REQUEST_TOPIC;
102   -import static org.thingsboard.server.transport.mqtt.MqttTopics.GATEWAY_ATTRIBUTES_TOPIC;
103   -import static org.thingsboard.server.transport.mqtt.MqttTopics.GATEWAY_CONNECT_TOPIC;
104   -import static org.thingsboard.server.transport.mqtt.MqttTopics.GATEWAY_DISCONNECT_TOPIC;
105   -import static org.thingsboard.server.transport.mqtt.MqttTopics.GATEWAY_RPC_TOPIC;
106   -import static org.thingsboard.server.transport.mqtt.MqttTopics.GATEWAY_TELEMETRY_TOPIC;
107 70
108 71 /**
109 72 * @author Andrew Shvayka
... ... @@ -389,7 +352,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
389 352 } else {
390 353 ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
391 354 deviceSessionCtx.setDeviceInfo(msg.getDeviceInfo());
392   - transportService.process(getSessionEventMsg(SessionEvent.OPEN), null);
  355 + transportService.process(deviceSessionCtx, getSessionEventMsg(SessionEvent.OPEN), null);
393 356 checkGatewaySession();
394 357 }
395 358 }
... ... @@ -418,7 +381,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
418 381 } else {
419 382 ctx.writeAndFlush(createMqttConnAckMsg(CONNECTION_ACCEPTED));
420 383 deviceSessionCtx.setDeviceInfo(msg.getDeviceInfo());
421   - transportService.process(getSessionEventMsg(SessionEvent.OPEN), null);
  384 + transportService.process(deviceSessionCtx, getSessionEventMsg(SessionEvent.OPEN), null);
422 385 checkGatewaySession();
423 386 }
424 387 }
... ... @@ -452,7 +415,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
452 415 private void processDisconnect(ChannelHandlerContext ctx) {
453 416 ctx.close();
454 417 if (deviceSessionCtx.isConnected()) {
455   - transportService.process(getSessionEventMsg(SessionEvent.CLOSED), null);
  418 + transportService.process(deviceSessionCtx, getSessionEventMsg(SessionEvent.CLOSED), null);
456 419 if (gatewaySessionCtx != null) {
457 420 gatewaySessionCtx.onGatewayDisconnect();
458 421 }
... ... @@ -534,7 +497,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
534 497 @Override
535 498 public void operationComplete(Future<? super Void> future) throws Exception {
536 499 if (deviceSessionCtx.isConnected()) {
537   - transportService.process(getSessionEventMsg(SessionEvent.CLOSED), null);
  500 + transportService.process(deviceSessionCtx, getSessionEventMsg(SessionEvent.CLOSED), null);
538 501 }
539 502 }
540 503 }
... ...
1 1 /**
2 2 * Copyright © 2016-2018 The Thingsboard Authors
3   - *
  3 + * <p>
4 4 * Licensed under the Apache License, Version 2.0 (the "License");
5 5 * you may not use this file except in compliance with the License.
6 6 * You may obtain a copy of the License at
7   - *
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - *
  7 + * <p>
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + * <p>
10 10 * Unless required by applicable law or agreed to in writing, software
11 11 * distributed under the License is distributed on an "AS IS" BASIS,
12 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
... ... @@ -15,21 +15,41 @@
15 15 */
16 16 package org.thingsboard.server.mqtt.service;
17 17
  18 +import lombok.extern.slf4j.Slf4j;
  19 +import org.apache.kafka.clients.consumer.ConsumerRecords;
  20 +import org.apache.kafka.clients.producer.Callback;
  21 +import org.apache.kafka.clients.producer.RecordMetadata;
18 22 import org.springframework.beans.factory.annotation.Autowired;
19 23 import org.springframework.beans.factory.annotation.Value;
20 24 import org.springframework.stereotype.Service;
  25 +import org.thingsboard.server.common.transport.SessionMsgListener;
21 26 import org.thingsboard.server.common.transport.TransportService;
22 27 import org.thingsboard.server.common.transport.TransportServiceCallback;
  28 +import org.thingsboard.server.gen.transport.TransportProtos;
  29 +import org.thingsboard.server.gen.transport.TransportProtos.PostAttributeMsg;
  30 +import org.thingsboard.server.gen.transport.TransportProtos.PostTelemetryMsg;
  31 +import org.thingsboard.server.gen.transport.TransportProtos.SessionEventMsg;
  32 +import org.thingsboard.server.gen.transport.TransportProtos.SessionInfoProto;
  33 +import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
  34 +import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
  35 +import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
  36 +import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
  37 +import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceCredentialsResponseMsg;
  38 +import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg;
  39 +import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg;
23 40 import org.thingsboard.server.kafka.AsyncCallbackTemplate;
24 41 import org.thingsboard.server.kafka.TBKafkaConsumerTemplate;
25 42 import org.thingsboard.server.kafka.TBKafkaProducerTemplate;
26 43 import org.thingsboard.server.kafka.TbKafkaRequestTemplate;
27   -import org.thingsboard.server.gen.transport.TransportProtos.*;
28 44 import org.thingsboard.server.kafka.TbKafkaSettings;
29 45 import org.thingsboard.server.transport.mqtt.MqttTransportContext;
30 46
31 47 import javax.annotation.PostConstruct;
32 48 import javax.annotation.PreDestroy;
  49 +import java.time.Duration;
  50 +import java.util.UUID;
  51 +import java.util.concurrent.ConcurrentHashMap;
  52 +import java.util.concurrent.ConcurrentMap;
33 53 import java.util.concurrent.ExecutorService;
34 54 import java.util.concurrent.Executors;
35 55
... ... @@ -37,10 +57,17 @@ import java.util.concurrent.Executors;
37 57 * Created by ashvayka on 05.10.18.
38 58 */
39 59 @Service
  60 +@Slf4j
40 61 public class MqttTransportService implements TransportService {
41 62
42 63 @Value("${kafka.rule_engine.topic}")
43 64 private String ruleEngineTopic;
  65 + @Value("${kafka.notifications.topic}")
  66 + private String notificationsTopic;
  67 + @Value("${kafka.notifications.poll_interval}")
  68 + private int notificationsPollDuration;
  69 + @Value("${kafka.notifications.auto_commit_interval}")
  70 + private int notificationsAutoCommitInterval;
44 71 @Value("${kafka.transport_api.requests_topic}")
45 72 private String transportApiRequestsTopic;
46 73 @Value("${kafka.transport_api.responses_topic}")
... ... @@ -54,6 +81,8 @@ public class MqttTransportService implements TransportService {
54 81 @Value("${kafka.transport_api.response_auto_commit_interval}")
55 82 private int autoCommitInterval;
56 83
  84 + private ConcurrentMap<UUID, SessionMsgListener> sessions = new ConcurrentHashMap<>();
  85 +
57 86 @Autowired
58 87 private TbKafkaSettings kafkaSettings;
59 88 //We use this to get the node id. We should replace this with a component that provides the node id.
... ... @@ -63,6 +92,12 @@ public class MqttTransportService implements TransportService {
63 92 private ExecutorService transportCallbackExecutor;
64 93
65 94 private TbKafkaRequestTemplate<TransportApiRequestMsg, TransportApiResponseMsg> transportApiTemplate;
  95 + private TBKafkaProducerTemplate<ToRuleEngineMsg> ruleEngineProducer;
  96 + private TBKafkaConsumerTemplate<ToTransportMsg> mainConsumer;
  97 +
  98 + private ExecutorService mainConsumerExecutor = Executors.newSingleThreadExecutor();
  99 +
  100 + private volatile boolean stopped = false;
66 101
67 102 @PostConstruct
68 103 public void init() {
... ... @@ -77,7 +112,7 @@ public class MqttTransportService implements TransportService {
77 112 responseBuilder.settings(kafkaSettings);
78 113 responseBuilder.topic(transportApiResponsesTopic + "." + transportContext.getNodeId());
79 114 responseBuilder.clientId(transportContext.getNodeId());
80   - responseBuilder.groupId("transport-node");
  115 + responseBuilder.groupId(null);
81 116 responseBuilder.autoCommit(true);
82 117 responseBuilder.autoCommitIntervalMs(autoCommitInterval);
83 118 responseBuilder.decoder(new TransportApiResponseDecoder());
... ... @@ -91,16 +126,79 @@ public class MqttTransportService implements TransportService {
91 126 builder.pollInterval(responsePollDuration);
92 127 transportApiTemplate = builder.build();
93 128 transportApiTemplate.init();
  129 +
  130 + TBKafkaProducerTemplate.TBKafkaProducerTemplateBuilder<ToRuleEngineMsg> ruleEngineProducerBuilder = TBKafkaProducerTemplate.builder();
  131 + ruleEngineProducerBuilder.settings(kafkaSettings);
  132 + ruleEngineProducerBuilder.defaultTopic(ruleEngineTopic);
  133 + ruleEngineProducerBuilder.encoder(new ToRuleEngineMsgEncoder());
  134 + ruleEngineProducer = ruleEngineProducerBuilder.build();
  135 + ruleEngineProducer.init();
  136 +
  137 + TBKafkaConsumerTemplate.TBKafkaConsumerTemplateBuilder<ToTransportMsg> mainConsumerBuilder = TBKafkaConsumerTemplate.builder();
  138 + mainConsumerBuilder.settings(kafkaSettings);
  139 + mainConsumerBuilder.topic(notificationsTopic + "." + transportContext.getNodeId());
  140 + mainConsumerBuilder.clientId(transportContext.getNodeId());
  141 + mainConsumerBuilder.groupId(null);
  142 + mainConsumerBuilder.autoCommit(true);
  143 + mainConsumerBuilder.autoCommitIntervalMs(notificationsAutoCommitInterval);
  144 + mainConsumerBuilder.decoder(new ToTransportMsgResponseDecoder());
  145 + mainConsumer = mainConsumerBuilder.build();
  146 + mainConsumer.subscribe();
  147 +
  148 + mainConsumerExecutor.execute(() -> {
  149 + while (!stopped) {
  150 + try {
  151 + ConsumerRecords<String, byte[]> records = mainConsumer.poll(Duration.ofMillis(notificationsPollDuration));
  152 + records.forEach(record -> {
  153 + try {
  154 + ToTransportMsg toTransportMsg = mainConsumer.decode(record);
  155 + if (toTransportMsg.hasToDeviceSessionMsg()) {
  156 + TransportProtos.DeviceActorToTransportMsg toSessionMsg = toTransportMsg.getToDeviceSessionMsg();
  157 + UUID sessionId = new UUID(toSessionMsg.getSessionIdMSB(), toSessionMsg.getSessionIdLSB());
  158 + SessionMsgListener listener = sessions.get(sessionId);
  159 + if (listener != null) {
  160 + transportCallbackExecutor.submit(() -> {
  161 + if (toSessionMsg.hasGetAttributesResponse()) {
  162 + listener.onGetAttributesResponse(toSessionMsg.getGetAttributesResponse());
  163 + }
  164 + });
  165 + } else {
  166 + //TODO: should we notify the device actor about missed session?
  167 + log.debug("[{}] Missing session.", sessionId);
  168 + }
  169 +
  170 + }
  171 + } catch (Throwable e) {
  172 + log.warn("Failed to process the notification.", e);
  173 + }
  174 + });
  175 + } catch (Exception e) {
  176 + log.warn("Failed to obtain messages from queue.", e);
  177 + try {
  178 + Thread.sleep(notificationsPollDuration);
  179 + } catch (InterruptedException e2) {
  180 + log.trace("Failed to wait until the server has capacity to handle new requests", e2);
  181 + }
  182 + }
  183 + }
  184 + });
94 185 }
95 186
96 187 @PreDestroy
97 188 public void destroy() {
  189 + stopped = true;
98 190 if (transportApiTemplate != null) {
99 191 transportApiTemplate.stop();
100 192 }
101 193 if (transportCallbackExecutor != null) {
102 194 transportCallbackExecutor.shutdownNow();
103 195 }
  196 + if (mainConsumer != null) {
  197 + mainConsumer.unsubscribe();
  198 + }
  199 + if (mainConsumerExecutor != null) {
  200 + mainConsumerExecutor.shutdownNow();
  201 + }
104 202 }
105 203
106 204 @Override
... ... @@ -118,17 +216,69 @@ public class MqttTransportService implements TransportService {
118 216 }
119 217
120 218 @Override
121   - public void process(SessionEventMsg msg, TransportServiceCallback<Void> callback) {
  219 + public void process(SessionInfoProto sessionInfo, SessionEventMsg msg, TransportServiceCallback<Void> callback) {
  220 + ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
  221 + TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
  222 + .setSessionEvent(msg).build()
  223 + ).build();
  224 + send(sessionInfo, toRuleEngineMsg, callback);
  225 + }
122 226
  227 + @Override
  228 + public void process(SessionInfoProto sessionInfo, PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {
  229 + ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
  230 + TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
  231 + .setPostTelemetry(msg).build()
  232 + ).build();
  233 + send(sessionInfo, toRuleEngineMsg, callback);
123 234 }
124 235
125 236 @Override
126   - public void process(PostTelemetryMsg msg, TransportServiceCallback<Void> callback) {
  237 + public void process(SessionInfoProto sessionInfo, PostAttributeMsg msg, TransportServiceCallback<Void> callback) {
  238 + ToRuleEngineMsg toRuleEngineMsg = ToRuleEngineMsg.newBuilder().setToDeviceActorMsg(
  239 + TransportProtos.TransportToDeviceActorMsg.newBuilder().setSessionInfo(sessionInfo)
  240 + .setPostAttributes(msg).build()
  241 + ).build();
  242 + send(sessionInfo, toRuleEngineMsg, callback);
  243 + }
127 244
  245 + @Override
  246 + public void registerSession(SessionInfoProto sessionInfo, SessionMsgListener listener) {
  247 + sessions.putIfAbsent(toId(sessionInfo), listener);
  248 + //TODO: monitor sessions periodically: PING REQ/RESP, etc.
128 249 }
129 250
130 251 @Override
131   - public void process(PostAttributeMsg msg, TransportServiceCallback<Void> callback) {
  252 + public void deregisterSession(SessionInfoProto sessionInfo) {
  253 + sessions.remove(toId(sessionInfo));
  254 + }
  255 +
  256 + private UUID toId(SessionInfoProto sessionInfo) {
  257 + return new UUID(sessionInfo.getSessionIdMSB(), sessionInfo.getSessionIdLSB());
  258 + }
  259 +
  260 + private String getRoutingKey(SessionInfoProto sessionInfo) {
  261 + return new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()).toString();
  262 + }
  263 +
  264 + private static class TransportCallbackAdaptor implements Callback {
  265 + private final TransportServiceCallback<Void> callback;
  266 +
  267 + TransportCallbackAdaptor(TransportServiceCallback<Void> callback) {
  268 + this.callback = callback;
  269 + }
  270 +
  271 + @Override
  272 + public void onCompletion(RecordMetadata metadata, Exception exception) {
  273 + if (exception == null) {
  274 + callback.onSuccess(null);
  275 + } else {
  276 + callback.onError(exception);
  277 + }
  278 + }
  279 + }
132 280
  281 + private void send(SessionInfoProto sessionInfo, ToRuleEngineMsg toRuleEngineMsg, TransportServiceCallback<Void> callback) {
  282 + ruleEngineProducer.send(getRoutingKey(sessionInfo), toRuleEngineMsg, new TransportCallbackAdaptor(callback));
133 283 }
134 284 }
... ...
  1 +/**
  2 + * Copyright © 2016-2018 The Thingsboard Authors
  3 + * <p>
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + * <p>
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + * <p>
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.mqtt.service;
  17 +
  18 +import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
  19 +import org.thingsboard.server.kafka.TbKafkaEncoder;
  20 +
  21 +/**
  22 + * Created by ashvayka on 05.10.18.
  23 + */
  24 +public class ToRuleEngineMsgEncoder implements TbKafkaEncoder<ToRuleEngineMsg> {
  25 + @Override
  26 + public byte[] encode(ToRuleEngineMsg value) {
  27 + return value.toByteArray();
  28 + }
  29 +}
... ...
transport/mqtt-transport/src/main/java/org/thingsboard/server/mqtt/service/ToTransportMsgResponseDecoder.java renamed from common/message/src/main/java/org/thingsboard/server/common/msg/device/DeviceToDeviceActorMsg.java
... ... @@ -13,29 +13,19 @@
13 13 * See the License for the specific language governing permissions and
14 14 * limitations under the License.
15 15 */
16   -package org.thingsboard.server.common.msg.device;
  16 +package org.thingsboard.server.mqtt.service;
17 17
18   -import java.io.Serializable;
19   -import java.util.Optional;
  18 +import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
  19 +import org.thingsboard.server.kafka.TbKafkaDecoder;
20 20
21   -import org.thingsboard.server.common.data.id.SessionId;
22   -import org.thingsboard.server.common.msg.TbActorMsg;
23   -import org.thingsboard.server.common.msg.aware.CustomerAwareMsg;
24   -import org.thingsboard.server.common.msg.aware.DeviceAwareMsg;
25   -import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
26   -import org.thingsboard.server.common.msg.cluster.ServerAddress;
27   -import org.thingsboard.server.common.msg.session.FromDeviceMsg;
28   -import org.thingsboard.server.common.msg.session.SessionType;
  21 +import java.io.IOException;
29 22
30   -public interface DeviceToDeviceActorMsg extends TbActorMsg, DeviceAwareMsg, CustomerAwareMsg, TenantAwareMsg, Serializable {
31   -
32   - SessionId getSessionId();
33   -
34   - SessionType getSessionType();
35   -
36   - Optional<ServerAddress> getServerAddress();
37   -
38   - FromDeviceMsg getPayload();
39   -
40   - DeviceToDeviceActorMsg toOtherAddress(ServerAddress otherAddress);
  23 +/**
  24 + * Created by ashvayka on 05.10.18.
  25 + */
  26 +public class ToTransportMsgResponseDecoder implements TbKafkaDecoder<ToTransportMsg> {
  27 + @Override
  28 + public ToTransportMsg decode(byte[] data) throws IOException {
  29 + return ToTransportMsg.parseFrom(data);
  30 + }
41 31 }
... ...
... ... @@ -82,3 +82,7 @@ kafka:
82 82 response_auto_commit_interval: "${TB_TRANSPORT_RESPONSE_AUTO_COMMIT_INTERVAL_MS:100}"
83 83 rule_engine:
84 84 topic: "${TB_RULE_ENGINE_TOPIC:tb.rule-engine}"
  85 + notifications:
  86 + topic: "${TB_TRANSPORT_NOTIFICATIONS_TOPIC:tb.transport.notifications}"
  87 + poll_interval: "${TB_TRANSPORT_NOTIFICATIONS_POLL_INTERVAL_MS:25}"
  88 + auto_commit_interval: "${TB_TRANSPORT_NOTIFICATIONS_AUTO_COMMIT_INTERVAL_MS:100}"
... ...