|
@@ -83,8 +83,10 @@ import org.thingsboard.server.service.edge.rpc.fetch.TenantWidgetsBundlesEdgeEve |
|
@@ -83,8 +83,10 @@ import org.thingsboard.server.service.edge.rpc.fetch.TenantWidgetsBundlesEdgeEve |
83
|
|
83
|
|
84
|
import java.io.Closeable;
|
84
|
import java.io.Closeable;
|
85
|
import java.util.ArrayList;
|
85
|
import java.util.ArrayList;
|
|
|
86
|
+import java.util.Collection;
|
86
|
import java.util.Collections;
|
87
|
import java.util.Collections;
|
87
|
import java.util.HashMap;
|
88
|
import java.util.HashMap;
|
|
|
89
|
+import java.util.Iterator;
|
88
|
import java.util.List;
|
90
|
import java.util.List;
|
89
|
import java.util.Map;
|
91
|
import java.util.Map;
|
90
|
import java.util.Objects;
|
92
|
import java.util.Objects;
|
|
@@ -103,6 +105,7 @@ import java.util.stream.Collectors; |
|
@@ -103,6 +105,7 @@ import java.util.stream.Collectors; |
103
|
public final class EdgeGrpcSession implements Closeable {
|
105
|
public final class EdgeGrpcSession implements Closeable {
|
104
|
|
106
|
|
105
|
private static final ReentrantLock downlinkMsgLock = new ReentrantLock();
|
107
|
private static final ReentrantLock downlinkMsgLock = new ReentrantLock();
|
|
|
108
|
+ private static final ReentrantLock downlinkMsgsPackLock = new ReentrantLock();
|
106
|
|
109
|
|
107
|
private static final String QUEUE_START_TS_ATTR_KEY = "queueStartTs";
|
110
|
private static final String QUEUE_START_TS_ATTR_KEY = "queueStartTs";
|
108
|
|
111
|
|
|
@@ -111,7 +114,7 @@ public final class EdgeGrpcSession implements Closeable { |
|
@@ -111,7 +114,7 @@ public final class EdgeGrpcSession implements Closeable { |
111
|
private final Consumer<EdgeId> sessionCloseListener;
|
114
|
private final Consumer<EdgeId> sessionCloseListener;
|
112
|
private final ObjectMapper mapper;
|
115
|
private final ObjectMapper mapper;
|
113
|
|
116
|
|
114
|
- private final Map<Integer, DownlinkMsg> pendingMsgsMap;
|
117
|
+ private final Map<Integer, DownlinkMsg> pendingMsgsMap = new HashMap<>();
|
115
|
|
118
|
|
116
|
private EdgeContextComponent ctx;
|
119
|
private EdgeContextComponent ctx;
|
117
|
private Edge edge;
|
120
|
private Edge edge;
|
|
@@ -133,7 +136,6 @@ public final class EdgeGrpcSession implements Closeable { |
|
@@ -133,7 +136,6 @@ public final class EdgeGrpcSession implements Closeable { |
133
|
this.sessionCloseListener = sessionCloseListener;
|
136
|
this.sessionCloseListener = sessionCloseListener;
|
134
|
this.mapper = mapper;
|
137
|
this.mapper = mapper;
|
135
|
this.syncExecutorService = syncExecutorService;
|
138
|
this.syncExecutorService = syncExecutorService;
|
136
|
- this.pendingMsgsMap = new HashMap<>();
|
|
|
137
|
initInputStream();
|
139
|
initInputStream();
|
138
|
}
|
140
|
}
|
139
|
|
141
|
|
|
@@ -197,6 +199,7 @@ public final class EdgeGrpcSession implements Closeable { |
|
@@ -197,6 +199,7 @@ public final class EdgeGrpcSession implements Closeable { |
197
|
|
199
|
|
198
|
public void startSyncProcess(TenantId tenantId, EdgeId edgeId) {
|
200
|
public void startSyncProcess(TenantId tenantId, EdgeId edgeId) {
|
199
|
log.trace("[{}][{}] Staring edge sync process", tenantId, edgeId);
|
201
|
log.trace("[{}][{}] Staring edge sync process", tenantId, edgeId);
|
|
|
202
|
+ syncCompleted = false;
|
200
|
syncExecutorService.submit(() -> {
|
203
|
syncExecutorService.submit(() -> {
|
201
|
try {
|
204
|
try {
|
202
|
startProcessingEdgeEvents(new SystemWidgetsBundlesEdgeEventFetcher(ctx.getWidgetsBundleService()));
|
205
|
startProcessingEdgeEvents(new SystemWidgetsBundlesEdgeEventFetcher(ctx.getWidgetsBundleService()));
|
|
@@ -336,30 +339,36 @@ public final class EdgeGrpcSession implements Closeable { |
|
@@ -336,30 +339,36 @@ public final class EdgeGrpcSession implements Closeable { |
336
|
}
|
339
|
}
|
337
|
|
340
|
|
338
|
private boolean sendDownlinkMsgsPack(List<DownlinkMsg> downlinkMsgsPack) throws InterruptedException {
|
341
|
private boolean sendDownlinkMsgsPack(List<DownlinkMsg> downlinkMsgsPack) throws InterruptedException {
|
339
|
- boolean success;
|
|
|
340
|
- pendingMsgsMap.clear();
|
|
|
341
|
- downlinkMsgsPack.forEach(msg -> pendingMsgsMap.put(msg.getDownlinkMsgId(), msg));
|
|
|
342
|
- do {
|
|
|
343
|
- log.trace("[{}] [{}] downlink msg(s) are going to be send.", this.sessionId, pendingMsgsMap.values().size());
|
|
|
344
|
- latch = new CountDownLatch(pendingMsgsMap.values().size());
|
|
|
345
|
- for (DownlinkMsg downlinkMsg : pendingMsgsMap.values()) {
|
|
|
346
|
- sendDownlinkMsg(ResponseMsg.newBuilder()
|
|
|
347
|
- .setDownlinkMsg(downlinkMsg)
|
|
|
348
|
- .build());
|
|
|
349
|
- }
|
|
|
350
|
- success = latch.await(10, TimeUnit.SECONDS);
|
|
|
351
|
- if (!success) {
|
|
|
352
|
- log.warn("[{}] Failed to deliver the batch: {}", this.sessionId, pendingMsgsMap.values());
|
|
|
353
|
- }
|
|
|
354
|
- if (isConnected() && !success) {
|
|
|
355
|
- try {
|
|
|
356
|
- Thread.sleep(ctx.getEdgeEventStorageSettings().getSleepIntervalBetweenBatches());
|
|
|
357
|
- } catch (InterruptedException e) {
|
|
|
358
|
- log.error("[{}] Error during sleep between next send of single downlink msg", this.sessionId, e);
|
342
|
+ try {
|
|
|
343
|
+ downlinkMsgsPackLock.lock();
|
|
|
344
|
+ boolean success;
|
|
|
345
|
+ pendingMsgsMap.clear();
|
|
|
346
|
+ downlinkMsgsPack.forEach(msg -> pendingMsgsMap.put(msg.getDownlinkMsgId(), msg));
|
|
|
347
|
+ do {
|
|
|
348
|
+ log.trace("[{}] [{}] downlink msg(s) are going to be send.", this.sessionId, pendingMsgsMap.values().size());
|
|
|
349
|
+ latch = new CountDownLatch(pendingMsgsMap.values().size());
|
|
|
350
|
+ Collection<DownlinkMsg> copy = new ArrayList<>(pendingMsgsMap.values());
|
|
|
351
|
+ for (DownlinkMsg downlinkMsg : copy) {
|
|
|
352
|
+ sendDownlinkMsg(ResponseMsg.newBuilder()
|
|
|
353
|
+ .setDownlinkMsg(downlinkMsg)
|
|
|
354
|
+ .build());
|
359
|
}
|
355
|
}
|
360
|
- }
|
|
|
361
|
- } while (isConnected() && !success);
|
|
|
362
|
- return success;
|
356
|
+ success = latch.await(10, TimeUnit.SECONDS);
|
|
|
357
|
+ if (!success || pendingMsgsMap.values().size() > 0) {
|
|
|
358
|
+ log.warn("[{}] Failed to deliver the batch: {}", this.sessionId, pendingMsgsMap.values());
|
|
|
359
|
+ }
|
|
|
360
|
+ if (isConnected() && (!success || pendingMsgsMap.values().size() > 0)) {
|
|
|
361
|
+ try {
|
|
|
362
|
+ Thread.sleep(ctx.getEdgeEventStorageSettings().getSleepIntervalBetweenBatches());
|
|
|
363
|
+ } catch (InterruptedException e) {
|
|
|
364
|
+ log.error("[{}] Error during sleep between batches", this.sessionId, e);
|
|
|
365
|
+ }
|
|
|
366
|
+ }
|
|
|
367
|
+ } while (isConnected() && (!success || pendingMsgsMap.values().size() > 0));
|
|
|
368
|
+ return success;
|
|
|
369
|
+ } finally {
|
|
|
370
|
+ downlinkMsgsPackLock.unlock();
|
|
|
371
|
+ }
|
363
|
}
|
372
|
}
|
364
|
|
373
|
|
365
|
private DownlinkMsg convertToDownlinkMsg(EdgeEvent edgeEvent) {
|
374
|
private DownlinkMsg convertToDownlinkMsg(EdgeEvent edgeEvent) {
|