Commit 0c3d1556daaba73d0c64fdf30c085fe465df499b

Authored by Volodymyr Babak
Committed by Andrew Shvayka
1 parent dc87f1d5

Introduced edge session state object

@@ -100,10 +100,7 @@ public final class EdgeGrpcSession implements Closeable { @@ -100,10 +100,7 @@ public final class EdgeGrpcSession implements Closeable {
100 private final Consumer<EdgeId> sessionCloseListener; 100 private final Consumer<EdgeId> sessionCloseListener;
101 private final ObjectMapper mapper; 101 private final ObjectMapper mapper;
102 102
103 - private final Map<Integer, DownlinkMsg> pendingMsgsMap = new LinkedHashMap<>();  
104 - // TODO: voba - global future - possible refactoring  
105 - private SettableFuture<Void> sendDownlinkMsgsFuture;  
106 - private ScheduledFuture<?> scheduledSendDownlinkTask; 103 + private final EdgeSessionState sessionState = new EdgeSessionState();
107 104
108 private EdgeContextComponent ctx; 105 private EdgeContextComponent ctx;
109 private Edge edge; 106 private Edge edge;
@@ -256,17 +253,17 @@ public final class EdgeGrpcSession implements Closeable { @@ -256,17 +253,17 @@ public final class EdgeGrpcSession implements Closeable {
256 private void onDownlinkResponse(DownlinkResponseMsg msg) { 253 private void onDownlinkResponse(DownlinkResponseMsg msg) {
257 try { 254 try {
258 if (msg.getSuccess()) { 255 if (msg.getSuccess()) {
259 - pendingMsgsMap.remove(msg.getDownlinkMsgId()); 256 + sessionState.getPendingMsgsMap().remove(msg.getDownlinkMsgId());
260 log.debug("[{}] Msg has been processed successfully! {}", edge.getRoutingKey(), msg); 257 log.debug("[{}] Msg has been processed successfully! {}", edge.getRoutingKey(), msg);
261 } else { 258 } else {
262 log.error("[{}] Msg processing failed! Error msg: {}", edge.getRoutingKey(), msg.getErrorMsg()); 259 log.error("[{}] Msg processing failed! Error msg: {}", edge.getRoutingKey(), msg.getErrorMsg());
263 } 260 }
264 - if (pendingMsgsMap.size() == 0) { 261 + if (sessionState.getPendingMsgsMap().isEmpty()) {
265 log.debug("[{}] Pending msgs map is empty. Stopping current iteration {}", edge.getRoutingKey(), msg); 262 log.debug("[{}] Pending msgs map is empty. Stopping current iteration {}", edge.getRoutingKey(), msg);
266 - if (scheduledSendDownlinkTask != null) {  
267 - scheduledSendDownlinkTask.cancel(false); 263 + if (sessionState.getScheduledSendDownlinkTask() != null) {
  264 + sessionState.getScheduledSendDownlinkTask().cancel(false);
268 } 265 }
269 - sendDownlinkMsgsFuture.set(null); 266 + sessionState.getSendDownlinkMsgsFuture().set(null);
270 } 267 }
271 } catch (Exception e) { 268 } catch (Exception e) {
272 log.error("[{}] Can't process downlink response message [{}]", this.sessionId, msg, e); 269 log.error("[{}] Can't process downlink response message [{}]", this.sessionId, msg, e);
@@ -389,27 +386,27 @@ public final class EdgeGrpcSession implements Closeable { @@ -389,27 +386,27 @@ public final class EdgeGrpcSession implements Closeable {
389 } 386 }
390 387
391 private ListenableFuture<Void> sendDownlinkMsgsPack(List<DownlinkMsg> downlinkMsgsPack) { 388 private ListenableFuture<Void> sendDownlinkMsgsPack(List<DownlinkMsg> downlinkMsgsPack) {
392 - if (sendDownlinkMsgsFuture != null && !sendDownlinkMsgsFuture.isDone()) { 389 + if (sessionState.getSendDownlinkMsgsFuture() != null && !sessionState.getSendDownlinkMsgsFuture().isDone()) {
393 String erroMsg = "[" + this.sessionId + "] Previous send downdlink future was not properly completed, stopping it now"; 390 String erroMsg = "[" + this.sessionId + "] Previous send downdlink future was not properly completed, stopping it now";
394 log.error(erroMsg); 391 log.error(erroMsg);
395 - sendDownlinkMsgsFuture.setException(new RuntimeException(erroMsg)); 392 + sessionState.getSendDownlinkMsgsFuture().setException(new RuntimeException(erroMsg));
396 } 393 }
397 - sendDownlinkMsgsFuture = SettableFuture.create();  
398 - pendingMsgsMap.clear();  
399 - downlinkMsgsPack.forEach(msg -> pendingMsgsMap.put(msg.getDownlinkMsgId(), msg)); 394 + sessionState.setSendDownlinkMsgsFuture(SettableFuture.create());
  395 + sessionState.getPendingMsgsMap().clear();
  396 + downlinkMsgsPack.forEach(msg -> sessionState.getPendingMsgsMap().put(msg.getDownlinkMsgId(), msg));
400 scheduleDownlinkMsgsPackSend(true); 397 scheduleDownlinkMsgsPackSend(true);
401 - return sendDownlinkMsgsFuture; 398 + return sessionState.getSendDownlinkMsgsFuture();
402 } 399 }
403 400
404 private void scheduleDownlinkMsgsPackSend(boolean firstRun) { 401 private void scheduleDownlinkMsgsPackSend(boolean firstRun) {
405 Runnable sendDownlinkMsgsTask = () -> { 402 Runnable sendDownlinkMsgsTask = () -> {
406 try { 403 try {
407 - if (isConnected() && pendingMsgsMap.values().size() > 0) { 404 + if (isConnected() && sessionState.getPendingMsgsMap().values().size() > 0) {
408 if (!firstRun) { 405 if (!firstRun) {
409 - log.warn("[{}] Failed to deliver the batch: {}", this.sessionId, pendingMsgsMap.values()); 406 + log.warn("[{}] Failed to deliver the batch: {}", this.sessionId, sessionState.getPendingMsgsMap().values());
410 } 407 }
411 - log.trace("[{}] [{}] downlink msg(s) are going to be send.", this.sessionId, pendingMsgsMap.values().size());  
412 - List<DownlinkMsg> copy = new ArrayList<>(pendingMsgsMap.values()); 408 + log.trace("[{}] [{}] downlink msg(s) are going to be send.", this.sessionId, sessionState.getPendingMsgsMap().values().size());
  409 + List<DownlinkMsg> copy = new ArrayList<>(sessionState.getPendingMsgsMap().values());
413 for (DownlinkMsg downlinkMsg : copy) { 410 for (DownlinkMsg downlinkMsg : copy) {
414 sendDownlinkMsg(ResponseMsg.newBuilder() 411 sendDownlinkMsg(ResponseMsg.newBuilder()
415 .setDownlinkMsg(downlinkMsg) 412 .setDownlinkMsg(downlinkMsg)
@@ -417,17 +414,22 @@ public final class EdgeGrpcSession implements Closeable { @@ -417,17 +414,22 @@ public final class EdgeGrpcSession implements Closeable {
417 } 414 }
418 scheduleDownlinkMsgsPackSend(false); 415 scheduleDownlinkMsgsPackSend(false);
419 } else { 416 } else {
420 - sendDownlinkMsgsFuture.set(null); 417 + sessionState.getSendDownlinkMsgsFuture().set(null);
421 } 418 }
422 } catch (Exception e) { 419 } catch (Exception e) {
423 - sendDownlinkMsgsFuture.setException(e); 420 + sessionState.getSendDownlinkMsgsFuture().setException(e);
424 } 421 }
425 }; 422 };
426 423
427 if (firstRun) { 424 if (firstRun) {
428 sendDownlinkExecutorService.submit(sendDownlinkMsgsTask); 425 sendDownlinkExecutorService.submit(sendDownlinkMsgsTask);
429 } else { 426 } else {
430 - scheduledSendDownlinkTask = sendDownlinkExecutorService.schedule(sendDownlinkMsgsTask, ctx.getEdgeEventStorageSettings().getSleepIntervalBetweenBatches(), TimeUnit.MILLISECONDS); 427 + sessionState.setScheduledSendDownlinkTask(
  428 + sendDownlinkExecutorService.schedule(
  429 + sendDownlinkMsgsTask,
  430 + ctx.getEdgeEventStorageSettings().getSleepIntervalBetweenBatches(),
  431 + TimeUnit.MILLISECONDS)
  432 + );
431 } 433 }
432 434
433 } 435 }
  1 +/**
  2 + * Copyright © 2016-2021 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.edge.rpc;
  17 +
  18 +import com.google.common.util.concurrent.SettableFuture;
  19 +import lombok.Data;
  20 +import org.thingsboard.server.gen.edge.v1.DownlinkMsg;
  21 +
  22 +import java.util.LinkedHashMap;
  23 +import java.util.Map;
  24 +import java.util.concurrent.ScheduledFuture;
  25 +
  26 +@Data
  27 +public class EdgeSessionState {
  28 +
  29 + private final Map<Integer, DownlinkMsg> pendingMsgsMap = new LinkedHashMap<>();
  30 + private SettableFuture<Void> sendDownlinkMsgsFuture;
  31 + private ScheduledFuture<?> scheduledSendDownlinkTask;
  32 +}