Showing
1 changed file
with
21 additions
and
14 deletions
... | ... | @@ -131,20 +131,20 @@ public final class EdgeGrpcSession implements Closeable { |
131 | 131 | public void onNext(RequestMsg requestMsg) { |
132 | 132 | if (!connected && requestMsg.getMsgType().equals(RequestMsgType.CONNECT_RPC_MESSAGE)) { |
133 | 133 | ConnectResponseMsg responseMsg = processConnect(requestMsg.getConnectRequestMsg()); |
134 | - lifecycleEvent(getTenantId(), getEdge().getId(), ComponentLifecycleEvent.CREATED, true, null); | |
134 | + lifecycleEvent(getTenantId(), getEdge(), ComponentLifecycleEvent.CREATED, true, null); | |
135 | 135 | outputStream.onNext(ResponseMsg.newBuilder() |
136 | 136 | .setConnectResponseMsg(responseMsg) |
137 | 137 | .build()); |
138 | 138 | if (ConnectResponseCode.ACCEPTED != responseMsg.getResponseCode()) { |
139 | 139 | outputStream.onError(new RuntimeException(responseMsg.getErrorMsg())); |
140 | - lifecycleEvent(getTenantId(), getEdge().getId(), ComponentLifecycleEvent.FAILED, false, new RuntimeException(responseMsg.getErrorMsg())); | |
140 | + lifecycleEvent(getTenantId(), getEdge(), ComponentLifecycleEvent.FAILED, false, new RuntimeException(responseMsg.getErrorMsg())); | |
141 | 141 | } else { |
142 | 142 | if (requestMsg.getConnectRequestMsg().hasMaxInboundMessageSize()) { |
143 | 143 | log.debug("[{}][{}] Client max inbound message size: {}", tenantId, sessionId, requestMsg.getConnectRequestMsg().getMaxInboundMessageSize()); |
144 | 144 | clientMaxInboundMessageSize = requestMsg.getConnectRequestMsg().getMaxInboundMessageSize(); |
145 | 145 | } |
146 | 146 | connected = true; |
147 | - lifecycleEvent(getTenantId(), getEdge().getId(), ComponentLifecycleEvent.STARTED, true, null); | |
147 | + lifecycleEvent(getTenantId(), getEdge(), ComponentLifecycleEvent.STARTED, true, null); | |
148 | 148 | } |
149 | 149 | } |
150 | 150 | if (connected) { |
... | ... | @@ -174,7 +174,7 @@ public final class EdgeGrpcSession implements Closeable { |
174 | 174 | @Override |
175 | 175 | public void onError(Throwable t) { |
176 | 176 | log.error("[{}][{}] Stream was terminated due to error:", tenantId, sessionId, t); |
177 | - lifecycleEvent(getTenantId(), getEdge().getId(), ComponentLifecycleEvent.FAILED, false, t); | |
177 | + lifecycleEvent(getTenantId(), getEdge(), ComponentLifecycleEvent.FAILED, false, t); | |
178 | 178 | closeSession(); |
179 | 179 | } |
180 | 180 | |
... | ... | @@ -189,15 +189,15 @@ public final class EdgeGrpcSession implements Closeable { |
189 | 189 | if (edge != null) { |
190 | 190 | try { |
191 | 191 | sessionCloseListener.accept(edge, sessionId); |
192 | - lifecycleEvent(getTenantId(), getEdge().getId(), ComponentLifecycleEvent.STOPPED, true, null); | |
192 | + lifecycleEvent(getTenantId(), getEdge(), ComponentLifecycleEvent.STOPPED, true, null); | |
193 | 193 | } catch (Exception ignored) { |
194 | - lifecycleEvent(getTenantId(), getEdge().getId(), ComponentLifecycleEvent.FAILED, false, ignored); | |
194 | + lifecycleEvent(getTenantId(), getEdge(), ComponentLifecycleEvent.FAILED, false, ignored); | |
195 | 195 | } |
196 | 196 | } |
197 | 197 | try { |
198 | 198 | outputStream.onCompleted(); |
199 | 199 | } catch (Exception ignored) { |
200 | - lifecycleEvent(getTenantId(), getEdge().getId(), ComponentLifecycleEvent.FAILED, false, ignored); | |
200 | + lifecycleEvent(getTenantId(), getEdge(), ComponentLifecycleEvent.FAILED, false, ignored); | |
201 | 201 | } |
202 | 202 | } |
203 | 203 | }; |
... | ... | @@ -208,7 +208,7 @@ public final class EdgeGrpcSession implements Closeable { |
208 | 208 | syncCompleted = false; |
209 | 209 | interruptGeneralProcessingOnSync(); |
210 | 210 | doSync(new EdgeSyncCursor(ctx, edge, fullSync)); |
211 | - lifecycleEvent(getTenantId(), getEdge().getId(), ComponentLifecycleEvent.ACTIVATED, true, null); | |
211 | + lifecycleEvent(getTenantId(), getEdge(), ComponentLifecycleEvent.ACTIVATED, true, null); | |
212 | 212 | } |
213 | 213 | |
214 | 214 | private void doSync(EdgeSyncCursor cursor) { |
... | ... | @@ -333,7 +333,7 @@ public final class EdgeGrpcSession implements Closeable { |
333 | 333 | .setEdgeUpdateMsg(edgeConfig) |
334 | 334 | .build(); |
335 | 335 | sendDownlinkMsg(edgeConfigMsg); |
336 | - lifecycleEvent(getTenantId(), getEdge().getId(), ComponentLifecycleEvent.UPDATED, true, null); | |
336 | + lifecycleEvent(getTenantId(), getEdge(), ComponentLifecycleEvent.UPDATED, true, null); | |
337 | 337 | } |
338 | 338 | |
339 | 339 | ListenableFuture<Boolean> processEdgeEvents() throws Exception { |
... | ... | @@ -873,6 +873,8 @@ public final class EdgeGrpcSession implements Closeable { |
873 | 873 | .setErrorMsg(failureMsg) |
874 | 874 | .setConfiguration(EdgeConfiguration.getDefaultInstance()).build(); |
875 | 875 | } |
876 | + } else { | |
877 | + log.warn("非法边端连接,EdgeRoutingKey [{}] 不存在",request.getEdgeRoutingKey()); | |
876 | 878 | } |
877 | 879 | return ConnectResponseMsg.newBuilder() |
878 | 880 | .setResponseCode(ConnectResponseCode.BAD_CREDENTIALS) |
... | ... | @@ -918,23 +920,28 @@ public final class EdgeGrpcSession implements Closeable { |
918 | 920 | } |
919 | 921 | } |
920 | 922 | //thingskit |
921 | - private void lifecycleEvent(TenantId tenantId, EdgeId edgeId, ComponentLifecycleEvent eventType, boolean success, Throwable error) { | |
923 | + private void lifecycleEvent(TenantId tenantId, Edge edge, ComponentLifecycleEvent eventType, boolean success, Throwable error) { | |
922 | 924 | try { |
925 | + | |
926 | + if(tenantId==null || edge==null || edge.getId()==null){ | |
927 | + return; | |
928 | + } | |
929 | + | |
923 | 930 | ToCoreMsg msg = ToCoreMsg.newBuilder() |
924 | 931 | .setLifecycleEventMsg( |
925 | 932 | LifecycleEventProto.newBuilder() |
926 | 933 | .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) |
927 | 934 | .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) |
928 | - .setEntityIdMSB(edgeId.getId().getMostSignificantBits()) | |
929 | - .setEntityIdLSB(edgeId.getId().getLeastSignificantBits()) | |
935 | + .setEntityIdMSB(edge.getId().getId().getMostSignificantBits()) | |
936 | + .setEntityIdLSB(edge.getId().getId().getLeastSignificantBits()) | |
930 | 937 | .setServiceId(ctx.getServiceInfoProvider().getServiceId()) |
931 | 938 | .setLcEventType(eventType.name()) |
932 | 939 | .setSuccess(success) |
933 | 940 | .setError(error != null ? ExceptionUtils.getStackTrace(error) : "") |
934 | 941 | ).build(); |
935 | - ctx.getClusterService().pushMsgToCore(tenantId, (EntityId) edgeId, msg, null); | |
942 | + ctx.getClusterService().pushMsgToCore(tenantId, (EntityId) edge.getId(), msg, null); | |
936 | 943 | } catch (Exception e) { |
937 | - log.error("[{}][{}] Failed to send lifecycle event to core", tenantId, edgeId, e); | |
944 | + log.error("[{}][{}] Failed to send lifecycle event to core", tenantId, edge.getId(), e); | |
938 | 945 | } |
939 | 946 | } |
940 | 947 | ... | ... |