Commit 69994adb18df9d831fc6a6e4e4d6d6114922c7ef

Authored by Volodymyr Babak
1 parent ba14bc11

Code review fixes

... ... @@ -118,8 +118,8 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
118 118 log.error("Failed to start Edge RPC server!", e);
119 119 throw new RuntimeException("Failed to start Edge RPC server!");
120 120 }
121   - log.info("Edge RPC service initialized!");
122 121 this.scheduler = Executors.newScheduledThreadPool(schedulerPoolSize, ThingsBoardThreadFactory.forName("edge-scheduler"));
  122 + log.info("Edge RPC service initialized!");
123 123 }
124 124
125 125 @PreDestroy
... ...
... ... @@ -16,7 +16,6 @@
16 16 package org.thingsboard.server.service.edge.rpc;
17 17
18 18 import com.datastax.driver.core.utils.UUIDs;
19   -import com.fasterxml.jackson.core.JsonProcessingException;
20 19 import com.fasterxml.jackson.databind.ObjectMapper;
21 20 import com.google.common.util.concurrent.FutureCallback;
22 21 import com.google.common.util.concurrent.Futures;
... ... @@ -120,7 +119,7 @@ import java.util.function.Consumer;
120 119 @Data
121 120 public final class EdgeGrpcSession implements Closeable {
122 121
123   - private static final ReentrantLock responseMsgLock = new ReentrantLock();
  122 + private static final ReentrantLock downlinkMsgLock = new ReentrantLock();
124 123
125 124 private static final String QUEUE_START_TS_ATTR_KEY = "queueStartTs";
126 125
... ... @@ -199,7 +198,7 @@ public final class EdgeGrpcSession implements Closeable {
199 198 @Override
200 199 public void onSuccess(@Nullable List<Void> result) {
201 200 UplinkResponseMsg uplinkResponseMsg = UplinkResponseMsg.newBuilder().setSuccess(true).build();
202   - sendResponseMsg(ResponseMsg.newBuilder()
  201 + sendDownlinkMsg(ResponseMsg.newBuilder()
203 202 .setUplinkResponseMsg(uplinkResponseMsg)
204 203 .build());
205 204 }
... ... @@ -207,7 +206,7 @@ public final class EdgeGrpcSession implements Closeable {
207 206 @Override
208 207 public void onFailure(Throwable t) {
209 208 UplinkResponseMsg uplinkResponseMsg = UplinkResponseMsg.newBuilder().setSuccess(false).setErrorMsg(t.getMessage()).build();
210   - sendResponseMsg(ResponseMsg.newBuilder()
  209 + sendDownlinkMsg(ResponseMsg.newBuilder()
211 210 .setUplinkResponseMsg(uplinkResponseMsg)
212 211 .build());
213 212 }
... ... @@ -227,35 +226,32 @@ public final class EdgeGrpcSession implements Closeable {
227 226 }
228 227 }
229 228
230   - private void sendResponseMsg(ResponseMsg responseMsg) {
231   - log.trace("[{}] Sending response msg [{}]", this.sessionId, responseMsg);
  229 + private void sendDownlinkMsg(ResponseMsg downlinkMsg) {
  230 + log.trace("[{}] Sending downlink msg [{}]", this.sessionId, downlinkMsg);
232 231 if (isConnected()) {
233 232 try {
234   - responseMsgLock.lock();
235   - outputStream.onNext(responseMsg);
  233 + downlinkMsgLock.lock();
  234 + outputStream.onNext(downlinkMsg);
236 235 } catch (Exception e) {
237   - log.error("[{}] Failed to send response message [{}]", this.sessionId, responseMsg, e);
  236 + log.error("[{}] Failed to send downlink message [{}]", this.sessionId, downlinkMsg, e);
238 237 connected = false;
239 238 sessionCloseListener.accept(edge.getId());
240 239 } finally {
241   - responseMsgLock.unlock();
  240 + downlinkMsgLock.unlock();
242 241 }
243   - log.trace("[{}] Response msg successfully sent [{}]", this.sessionId, responseMsg);
  242 + log.trace("[{}] Response msg successfully sent [{}]", this.sessionId, downlinkMsg);
244 243 }
245 244 }
246 245
247 246 void onConfigurationUpdate(Edge edge) {
248 247 log.debug("[{}] onConfigurationUpdate [{}]", this.sessionId, edge);
249   - try {
250   - this.edge = edge;
251   - EdgeUpdateMsg edgeConfig = EdgeUpdateMsg.newBuilder()
252   - .setConfiguration(constructEdgeConfigProto(edge)).build();
253   - outputStream.onNext(ResponseMsg.newBuilder()
254   - .setEdgeUpdateMsg(edgeConfig)
255   - .build());
256   - } catch (Exception e) {
257   - log.error("[{}] Failed to construct proto objects!", this.sessionId, e);
258   - }
  248 + this.edge = edge;
  249 + EdgeUpdateMsg edgeConfig = EdgeUpdateMsg.newBuilder()
  250 + .setConfiguration(constructEdgeConfigProto(edge)).build();
  251 + ResponseMsg edgeConfigMsg = ResponseMsg.newBuilder()
  252 + .setEdgeUpdateMsg(edgeConfig)
  253 + .build();
  254 + sendDownlinkMsg(edgeConfigMsg);
259 255 }
260 256
261 257 void processEdgeEvents() throws ExecutionException, InterruptedException {
... ... @@ -276,7 +272,7 @@ public final class EdgeGrpcSession implements Closeable {
276 272
277 273 latch = new CountDownLatch(downlinkMsgsPack.size());
278 274 for (DownlinkMsg downlinkMsg : downlinkMsgsPack) {
279   - sendResponseMsg(ResponseMsg.newBuilder()
  275 + sendDownlinkMsg(ResponseMsg.newBuilder()
280 276 .setDownlinkMsg(downlinkMsg)
281 277 .build());
282 278 }
... ...
... ... @@ -151,7 +151,7 @@ public class DefaultSyncEdgeService implements SyncEdgeService {
151 151 syncWidgetsBundleAndWidgetTypes(edge);
152 152 syncAdminSettings(edge);
153 153 syncRuleChains(edge, new TimePageLink(DEFAULT_LIMIT));
154   - syncUsers(edge);
  154 + syncUsers(edge, new TextPageLink(DEFAULT_LIMIT));
155 155 syncDevices(edge, new TimePageLink(DEFAULT_LIMIT));
156 156 syncAssets(edge, new TimePageLink(DEFAULT_LIMIT));
157 157 syncEntityViews(edge, new TimePageLink(DEFAULT_LIMIT));
... ... @@ -303,10 +303,9 @@ public class DefaultSyncEdgeService implements SyncEdgeService {
303 303 }
304 304 }
305 305
306   - private void syncUsers(Edge edge) {
  306 + private void syncUsers(Edge edge, TextPageLink pageLink) {
307 307 log.trace("[{}] syncUsers [{}]", edge.getTenantId(), edge.getName());
308 308 try {
309   - TextPageLink pageLink = new TextPageLink(DEFAULT_LIMIT);
310 309 TextPageData<User> pageData;
311 310 do {
312 311 pageData = userService.findTenantAdmins(edge.getTenantId(), pageLink);
... ...