Commit 214cc7a9069491170e2e0573c66987f673334cbf

Authored by Volodymyr Babak
2 parents d1c1b444 07556195

Merge branch 'feature/edge' of github.com:volodymyr-babak/thingsboard into feature/edge

@@ -20,7 +20,6 @@ import lombok.Data; @@ -20,7 +20,6 @@ import lombok.Data;
20 import org.thingsboard.server.common.data.BaseData; 20 import org.thingsboard.server.common.data.BaseData;
21 import org.thingsboard.server.common.data.id.EdgeEventId; 21 import org.thingsboard.server.common.data.id.EdgeEventId;
22 import org.thingsboard.server.common.data.id.EdgeId; 22 import org.thingsboard.server.common.data.id.EdgeId;
23 -import org.thingsboard.server.common.data.id.EntityId;  
24 import org.thingsboard.server.common.data.id.TenantId; 23 import org.thingsboard.server.common.data.id.TenantId;
25 24
26 import java.util.UUID; 25 import java.util.UUID;
@@ -15,6 +15,7 @@ @@ -15,6 +15,7 @@
15 */ 15 */
16 package org.thingsboard.rule.engine.edge; 16 package org.thingsboard.rule.engine.edge;
17 17
  18 +import com.fasterxml.jackson.core.JsonProcessingException;
18 import com.fasterxml.jackson.databind.ObjectMapper; 19 import com.fasterxml.jackson.databind.ObjectMapper;
19 import com.google.common.util.concurrent.FutureCallback; 20 import com.google.common.util.concurrent.FutureCallback;
20 import com.google.common.util.concurrent.Futures; 21 import com.google.common.util.concurrent.Futures;
@@ -77,33 +78,38 @@ public class TbMsgPushToEdgeNode implements TbNode { @@ -77,33 +78,38 @@ public class TbMsgPushToEdgeNode implements TbNode {
77 if (isSupportedOriginator(msg.getOriginator().getEntityType())) { 78 if (isSupportedOriginator(msg.getOriginator().getEntityType())) {
78 if (isSupportedMsgType(msg.getType())) { 79 if (isSupportedMsgType(msg.getType())) {
79 ListenableFuture<EdgeId> getEdgeIdFuture = getEdgeIdByOriginatorId(ctx, ctx.getTenantId(), msg.getOriginator()); 80 ListenableFuture<EdgeId> getEdgeIdFuture = getEdgeIdByOriginatorId(ctx, ctx.getTenantId(), msg.getOriginator());
80 - Futures.transform(getEdgeIdFuture, edgeId -> {  
81 - EdgeEventType edgeEventTypeByEntityType = ctx.getEdgeEventService().getEdgeEventTypeByEntityType(msg.getOriginator().getEntityType());  
82 - if (edgeEventTypeByEntityType == null) {  
83 - log.debug("Edge event type is null. Entity Type {}", msg.getOriginator().getEntityType());  
84 - ctx.tellFailure(msg, new RuntimeException("Edge event type is null. Entity Type '" + msg.getOriginator().getEntityType() + "'"));  
85 - }  
86 - EdgeEvent edgeEvent = new EdgeEvent();  
87 - edgeEvent.setTenantId(ctx.getTenantId());  
88 - edgeEvent.setEdgeId(edgeId);  
89 - edgeEvent.setEdgeEventAction(getActionTypeByMsgType(msg.getType()).name());  
90 - edgeEvent.setEntityId(msg.getOriginator().getId());  
91 - edgeEvent.setEdgeEventType(edgeEventTypeByEntityType);  
92 - edgeEvent.setEntityBody(json.valueToTree(msg.getData()));  
93 - ListenableFuture<EdgeEvent> saveFuture = ctx.getEdgeEventService().saveAsync(edgeEvent);  
94 - Futures.addCallback(saveFuture, new FutureCallback<EdgeEvent>() {  
95 - @Override  
96 - public void onSuccess(@Nullable EdgeEvent event) {  
97 - ctx.tellNext(msg, SUCCESS); 81 + Futures.addCallback(getEdgeIdFuture, new FutureCallback<EdgeId>() {
  82 + @Override
  83 + public void onSuccess(@Nullable EdgeId edgeId) {
  84 + EdgeEventType edgeEventTypeByEntityType = ctx.getEdgeEventService().getEdgeEventTypeByEntityType(msg.getOriginator().getEntityType());
  85 + if (edgeEventTypeByEntityType == null) {
  86 + log.debug("Edge event type is null. Entity Type {}", msg.getOriginator().getEntityType());
  87 + ctx.tellFailure(msg, new RuntimeException("Edge event type is null. Entity Type '" + msg.getOriginator().getEntityType() + "'"));
98 } 88 }
99 -  
100 - @Override  
101 - public void onFailure(Throwable th) {  
102 - log.error("Could not save edge event", th);  
103 - ctx.tellFailure(msg, th); 89 + EdgeEvent edgeEvent = null;
  90 + try {
  91 + edgeEvent = buildEdgeEvent(ctx, msg, edgeId, edgeEventTypeByEntityType);
  92 + } catch (JsonProcessingException e) {
  93 + log.error("Failed to build edge event", e);
104 } 94 }
105 - }, ctx.getDbCallbackExecutor());  
106 - return null; 95 + ListenableFuture<EdgeEvent> saveFuture = ctx.getEdgeEventService().saveAsync(edgeEvent);
  96 + Futures.addCallback(saveFuture, new FutureCallback<EdgeEvent>() {
  97 + @Override
  98 + public void onSuccess(@Nullable EdgeEvent event) {
  99 + ctx.tellNext(msg, SUCCESS);
  100 + }
  101 + @Override
  102 + public void onFailure(Throwable th) {
  103 + log.error("Could not save edge event", th);
  104 + ctx.tellFailure(msg, th);
  105 + }
  106 + }, ctx.getDbCallbackExecutor());
  107 + }
  108 + @Override
  109 + public void onFailure(Throwable t) {
  110 + ctx.tellFailure(msg, t);
  111 + }
  112 +
107 }, ctx.getDbCallbackExecutor()); 113 }, ctx.getDbCallbackExecutor());
108 } else { 114 } else {
109 log.debug("Unsupported msg type {}", msg.getType()); 115 log.debug("Unsupported msg type {}", msg.getType());
@@ -115,6 +121,17 @@ public class TbMsgPushToEdgeNode implements TbNode { @@ -115,6 +121,17 @@ public class TbMsgPushToEdgeNode implements TbNode {
115 } 121 }
116 } 122 }
117 123
  124 + private EdgeEvent buildEdgeEvent(TbContext ctx, TbMsg msg, EdgeId edgeId, EdgeEventType edgeEventTypeByEntityType) throws JsonProcessingException {
  125 + EdgeEvent edgeEvent = new EdgeEvent();
  126 + edgeEvent.setTenantId(ctx.getTenantId());
  127 + edgeEvent.setEdgeId(edgeId);
  128 + edgeEvent.setEdgeEventAction(getActionTypeByMsgType(msg.getType()).name());
  129 + edgeEvent.setEntityId(msg.getOriginator().getId());
  130 + edgeEvent.setEdgeEventType(edgeEventTypeByEntityType);
  131 + edgeEvent.setEntityBody(json.readTree(msg.getData()));
  132 + return edgeEvent;
  133 + }
  134 +
118 private ActionType getActionTypeByMsgType(String msgType) { 135 private ActionType getActionTypeByMsgType(String msgType) {
119 ActionType actionType; 136 ActionType actionType;
120 if (SessionMsgType.POST_TELEMETRY_REQUEST.name().equals(msgType)) { 137 if (SessionMsgType.POST_TELEMETRY_REQUEST.name().equals(msgType)) {