Commit 9242811ba2b85a7668da8d160d319cff27d19f3c

Authored by xp.Huang
2 parents ae98cb4d 4b64b5f6

Merge branch 'fix/teambition/2024-10-31' into 'master_dev'

perf:优化边端边接日志打印

See merge request yunteng/thingskit!465
@@ -131,20 +131,20 @@ public final class EdgeGrpcSession implements Closeable { @@ -131,20 +131,20 @@ public final class EdgeGrpcSession implements Closeable {
131 public void onNext(RequestMsg requestMsg) { 131 public void onNext(RequestMsg requestMsg) {
132 if (!connected && requestMsg.getMsgType().equals(RequestMsgType.CONNECT_RPC_MESSAGE)) { 132 if (!connected && requestMsg.getMsgType().equals(RequestMsgType.CONNECT_RPC_MESSAGE)) {
133 ConnectResponseMsg responseMsg = processConnect(requestMsg.getConnectRequestMsg()); 133 ConnectResponseMsg responseMsg = processConnect(requestMsg.getConnectRequestMsg());
134 - lifecycleEvent(getTenantId(), getEdge().getId(), ComponentLifecycleEvent.CREATED, true, null); 134 + lifecycleEvent(getTenantId(), getEdge(), ComponentLifecycleEvent.CREATED, true, null);
135 outputStream.onNext(ResponseMsg.newBuilder() 135 outputStream.onNext(ResponseMsg.newBuilder()
136 .setConnectResponseMsg(responseMsg) 136 .setConnectResponseMsg(responseMsg)
137 .build()); 137 .build());
138 if (ConnectResponseCode.ACCEPTED != responseMsg.getResponseCode()) { 138 if (ConnectResponseCode.ACCEPTED != responseMsg.getResponseCode()) {
139 outputStream.onError(new RuntimeException(responseMsg.getErrorMsg())); 139 outputStream.onError(new RuntimeException(responseMsg.getErrorMsg()));
140 - lifecycleEvent(getTenantId(), getEdge().getId(), ComponentLifecycleEvent.FAILED, false, new RuntimeException(responseMsg.getErrorMsg())); 140 + lifecycleEvent(getTenantId(), getEdge(), ComponentLifecycleEvent.FAILED, false, new RuntimeException(responseMsg.getErrorMsg()));
141 } else { 141 } else {
142 if (requestMsg.getConnectRequestMsg().hasMaxInboundMessageSize()) { 142 if (requestMsg.getConnectRequestMsg().hasMaxInboundMessageSize()) {
143 log.debug("[{}][{}] Client max inbound message size: {}", tenantId, sessionId, requestMsg.getConnectRequestMsg().getMaxInboundMessageSize()); 143 log.debug("[{}][{}] Client max inbound message size: {}", tenantId, sessionId, requestMsg.getConnectRequestMsg().getMaxInboundMessageSize());
144 clientMaxInboundMessageSize = requestMsg.getConnectRequestMsg().getMaxInboundMessageSize(); 144 clientMaxInboundMessageSize = requestMsg.getConnectRequestMsg().getMaxInboundMessageSize();
145 } 145 }
146 connected = true; 146 connected = true;
147 - lifecycleEvent(getTenantId(), getEdge().getId(), ComponentLifecycleEvent.STARTED, true, null); 147 + lifecycleEvent(getTenantId(), getEdge(), ComponentLifecycleEvent.STARTED, true, null);
148 } 148 }
149 } 149 }
150 if (connected) { 150 if (connected) {
@@ -174,7 +174,7 @@ public final class EdgeGrpcSession implements Closeable { @@ -174,7 +174,7 @@ public final class EdgeGrpcSession implements Closeable {
174 @Override 174 @Override
175 public void onError(Throwable t) { 175 public void onError(Throwable t) {
176 log.error("[{}][{}] Stream was terminated due to error:", tenantId, sessionId, t); 176 log.error("[{}][{}] Stream was terminated due to error:", tenantId, sessionId, t);
177 - lifecycleEvent(getTenantId(), getEdge().getId(), ComponentLifecycleEvent.FAILED, false, t); 177 + lifecycleEvent(getTenantId(), getEdge(), ComponentLifecycleEvent.FAILED, false, t);
178 closeSession(); 178 closeSession();
179 } 179 }
180 180
@@ -189,15 +189,15 @@ public final class EdgeGrpcSession implements Closeable { @@ -189,15 +189,15 @@ public final class EdgeGrpcSession implements Closeable {
189 if (edge != null) { 189 if (edge != null) {
190 try { 190 try {
191 sessionCloseListener.accept(edge, sessionId); 191 sessionCloseListener.accept(edge, sessionId);
192 - lifecycleEvent(getTenantId(), getEdge().getId(), ComponentLifecycleEvent.STOPPED, true, null); 192 + lifecycleEvent(getTenantId(), getEdge(), ComponentLifecycleEvent.STOPPED, true, null);
193 } catch (Exception ignored) { 193 } catch (Exception ignored) {
194 - lifecycleEvent(getTenantId(), getEdge().getId(), ComponentLifecycleEvent.FAILED, false, ignored); 194 + lifecycleEvent(getTenantId(), getEdge(), ComponentLifecycleEvent.FAILED, false, ignored);
195 } 195 }
196 } 196 }
197 try { 197 try {
198 outputStream.onCompleted(); 198 outputStream.onCompleted();
199 } catch (Exception ignored) { 199 } catch (Exception ignored) {
200 - lifecycleEvent(getTenantId(), getEdge().getId(), ComponentLifecycleEvent.FAILED, false, ignored); 200 + lifecycleEvent(getTenantId(), getEdge(), ComponentLifecycleEvent.FAILED, false, ignored);
201 } 201 }
202 } 202 }
203 }; 203 };
@@ -208,7 +208,7 @@ public final class EdgeGrpcSession implements Closeable { @@ -208,7 +208,7 @@ public final class EdgeGrpcSession implements Closeable {
208 syncCompleted = false; 208 syncCompleted = false;
209 interruptGeneralProcessingOnSync(); 209 interruptGeneralProcessingOnSync();
210 doSync(new EdgeSyncCursor(ctx, edge, fullSync)); 210 doSync(new EdgeSyncCursor(ctx, edge, fullSync));
211 - lifecycleEvent(getTenantId(), getEdge().getId(), ComponentLifecycleEvent.ACTIVATED, true, null); 211 + lifecycleEvent(getTenantId(), getEdge(), ComponentLifecycleEvent.ACTIVATED, true, null);
212 } 212 }
213 213
214 private void doSync(EdgeSyncCursor cursor) { 214 private void doSync(EdgeSyncCursor cursor) {
@@ -333,7 +333,7 @@ public final class EdgeGrpcSession implements Closeable { @@ -333,7 +333,7 @@ public final class EdgeGrpcSession implements Closeable {
333 .setEdgeUpdateMsg(edgeConfig) 333 .setEdgeUpdateMsg(edgeConfig)
334 .build(); 334 .build();
335 sendDownlinkMsg(edgeConfigMsg); 335 sendDownlinkMsg(edgeConfigMsg);
336 - lifecycleEvent(getTenantId(), getEdge().getId(), ComponentLifecycleEvent.UPDATED, true, null); 336 + lifecycleEvent(getTenantId(), getEdge(), ComponentLifecycleEvent.UPDATED, true, null);
337 } 337 }
338 338
339 ListenableFuture<Boolean> processEdgeEvents() throws Exception { 339 ListenableFuture<Boolean> processEdgeEvents() throws Exception {
@@ -873,6 +873,8 @@ public final class EdgeGrpcSession implements Closeable { @@ -873,6 +873,8 @@ public final class EdgeGrpcSession implements Closeable {
873 .setErrorMsg(failureMsg) 873 .setErrorMsg(failureMsg)
874 .setConfiguration(EdgeConfiguration.getDefaultInstance()).build(); 874 .setConfiguration(EdgeConfiguration.getDefaultInstance()).build();
875 } 875 }
  876 + } else {
  877 + log.warn("非法边端连接,EdgeRoutingKey [{}] 不存在",request.getEdgeRoutingKey());
876 } 878 }
877 return ConnectResponseMsg.newBuilder() 879 return ConnectResponseMsg.newBuilder()
878 .setResponseCode(ConnectResponseCode.BAD_CREDENTIALS) 880 .setResponseCode(ConnectResponseCode.BAD_CREDENTIALS)
@@ -918,23 +920,28 @@ public final class EdgeGrpcSession implements Closeable { @@ -918,23 +920,28 @@ public final class EdgeGrpcSession implements Closeable {
918 } 920 }
919 } 921 }
920 //thingskit 922 //thingskit
921 - private void lifecycleEvent(TenantId tenantId, EdgeId edgeId, ComponentLifecycleEvent eventType, boolean success, Throwable error) { 923 + private void lifecycleEvent(TenantId tenantId, Edge edge, ComponentLifecycleEvent eventType, boolean success, Throwable error) {
922 try { 924 try {
  925 +
  926 + if(tenantId==null || edge==null || edge.getId()==null){
  927 + return;
  928 + }
  929 +
923 ToCoreMsg msg = ToCoreMsg.newBuilder() 930 ToCoreMsg msg = ToCoreMsg.newBuilder()
924 .setLifecycleEventMsg( 931 .setLifecycleEventMsg(
925 LifecycleEventProto.newBuilder() 932 LifecycleEventProto.newBuilder()
926 .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) 933 .setTenantIdMSB(tenantId.getId().getMostSignificantBits())
927 .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) 934 .setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
928 - .setEntityIdMSB(edgeId.getId().getMostSignificantBits())  
929 - .setEntityIdLSB(edgeId.getId().getLeastSignificantBits()) 935 + .setEntityIdMSB(edge.getId().getId().getMostSignificantBits())
  936 + .setEntityIdLSB(edge.getId().getId().getLeastSignificantBits())
930 .setServiceId(ctx.getServiceInfoProvider().getServiceId()) 937 .setServiceId(ctx.getServiceInfoProvider().getServiceId())
931 .setLcEventType(eventType.name()) 938 .setLcEventType(eventType.name())
932 .setSuccess(success) 939 .setSuccess(success)
933 .setError(error != null ? ExceptionUtils.getStackTrace(error) : "") 940 .setError(error != null ? ExceptionUtils.getStackTrace(error) : "")
934 ).build(); 941 ).build();
935 - ctx.getClusterService().pushMsgToCore(tenantId, (EntityId) edgeId, msg, null); 942 + ctx.getClusterService().pushMsgToCore(tenantId, (EntityId) edge.getId(), msg, null);
936 } catch (Exception e) { 943 } catch (Exception e) {
937 - log.error("[{}][{}] Failed to send lifecycle event to core", tenantId, edgeId, e); 944 + log.error("[{}][{}] Failed to send lifecycle event to core", tenantId, edge.getId(), e);
938 } 945 }
939 } 946 }
940 947