Commit f4baab4969623a4c290105dc118ce891e62cb1b4

Authored by Volodymyr Babak
1 parent cd2908fb

Edge Notifitcation Service

... ... @@ -2,7 +2,7 @@
2 2 "ruleChain": {
3 3 "additionalInfo": null,
4 4 "name": "Thermostat Alarms",
5   - "type": "SYSTEM",
  5 + "type": "CORE",
6 6 "firstRuleNodeId": null,
7 7 "root": false,
8 8 "debugMode": false,
... ...
... ... @@ -2,7 +2,7 @@
2 2 "ruleChain": {
3 3 "additionalInfo": null,
4 4 "name": "Root Rule Chain",
5   - "type": "SYSTEM",
  5 + "type": "CORE",
6 6 "firstRuleNodeId": null,
7 7 "root": true,
8 8 "debugMode": false,
... ...
... ... @@ -15,29 +15,25 @@
15 15 */
16 16 package org.thingsboard.server.service.edge;
17 17
  18 +import com.fasterxml.jackson.databind.JsonNode;
18 19 import com.fasterxml.jackson.databind.ObjectMapper;
19   -import com.google.common.util.concurrent.FutureCallback;
20 20 import com.google.common.util.concurrent.Futures;
21 21 import com.google.common.util.concurrent.ListenableFuture;
22   -import com.google.common.util.concurrent.MoreExecutors;
23 22 import lombok.extern.slf4j.Slf4j;
24   -import org.apache.commons.codec.binary.Base64;
25 23 import org.springframework.beans.factory.annotation.Autowired;
26 24 import org.springframework.stereotype.Service;
27   -import org.thingsboard.server.common.data.Dashboard;
28   -import org.thingsboard.server.common.data.DataConstants;
29   -import org.thingsboard.server.common.data.Device;
30 25 import org.thingsboard.server.common.data.EntityType;
31   -import org.thingsboard.server.common.data.EntityView;
32   -import org.thingsboard.server.common.data.Event;
33 26 import org.thingsboard.server.common.data.alarm.Alarm;
34   -import org.thingsboard.server.common.data.asset.Asset;
35 27 import org.thingsboard.server.common.data.audit.ActionType;
36 28 import org.thingsboard.server.common.data.edge.Edge;
37 29 import org.thingsboard.server.common.data.edge.EdgeEvent;
38 30 import org.thingsboard.server.common.data.edge.EdgeEventType;
  31 +import org.thingsboard.server.common.data.id.AlarmId;
  32 +import org.thingsboard.server.common.data.id.DashboardId;
39 33 import org.thingsboard.server.common.data.id.EdgeId;
40 34 import org.thingsboard.server.common.data.id.EntityId;
  35 +import org.thingsboard.server.common.data.id.EntityIdFactory;
  36 +import org.thingsboard.server.common.data.id.IdBased;
41 37 import org.thingsboard.server.common.data.id.RuleChainId;
42 38 import org.thingsboard.server.common.data.id.TenantId;
43 39 import org.thingsboard.server.common.data.page.TimePageData;
... ... @@ -46,29 +42,28 @@ import org.thingsboard.server.common.data.relation.EntityRelation;
46 42 import org.thingsboard.server.common.data.relation.RelationTypeGroup;
47 43 import org.thingsboard.server.common.data.rule.RuleChain;
48 44 import org.thingsboard.server.common.data.rule.RuleChainMetaData;
49   -import org.thingsboard.server.common.data.rule.RuleChainType;
50   -import org.thingsboard.server.common.msg.TbMsg;
51 45 import org.thingsboard.server.common.msg.queue.TbCallback;
52   -import org.thingsboard.server.common.msg.session.SessionMsgType;
53   -import org.thingsboard.server.dao.asset.AssetService;
54   -import org.thingsboard.server.dao.device.DeviceService;
  46 +import org.thingsboard.server.dao.alarm.AlarmService;
55 47 import org.thingsboard.server.dao.edge.EdgeEventService;
56 48 import org.thingsboard.server.dao.edge.EdgeService;
57   -import org.thingsboard.server.dao.entityview.EntityViewService;
58   -import org.thingsboard.server.dao.event.EventService;
59 49 import org.thingsboard.server.dao.relation.RelationService;
60 50 import org.thingsboard.server.dao.rule.RuleChainService;
61 51 import org.thingsboard.server.gen.transport.TransportProtos;
62 52 import org.thingsboard.server.queue.util.TbCoreComponent;
  53 +import org.thingsboard.server.service.executors.DbCallbackExecutorService;
63 54
64   -import javax.annotation.Nullable;
65 55 import javax.annotation.PostConstruct;
66 56 import javax.annotation.PreDestroy;
67 57 import java.io.IOException;
  58 +import java.util.ArrayList;
  59 +import java.util.Collections;
  60 +import java.util.HashSet;
68 61 import java.util.List;
  62 +import java.util.Set;
69 63 import java.util.UUID;
70 64 import java.util.concurrent.ExecutorService;
71 65 import java.util.concurrent.Executors;
  66 +import java.util.stream.Collectors;
72 67
73 68 @Service
74 69 @TbCoreComponent
... ... @@ -81,16 +76,10 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService {
81 76 private EdgeService edgeService;
82 77
83 78 @Autowired
84   - private DeviceService deviceService;
85   -
86   - @Autowired
87   - private AssetService assetService;
88   -
89   - @Autowired
90   - private EntityViewService entityViewService;
  79 + private RuleChainService ruleChainService;
91 80
92 81 @Autowired
93   - private RuleChainService ruleChainService;
  82 + private AlarmService alarmService;
94 83
95 84 @Autowired
96 85 private RelationService relationService;
... ... @@ -98,6 +87,9 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService {
98 87 @Autowired
99 88 private EdgeEventService edgeEventService;
100 89
  90 + @Autowired
  91 + private DbCallbackExecutorService dbCallbackExecutorService;
  92 +
101 93 private ExecutorService tsCallBackExecutor;
102 94
103 95 @PostConstruct
... ... @@ -121,315 +113,160 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService {
121 113 public Edge setEdgeRootRuleChain(TenantId tenantId, Edge edge, RuleChainId ruleChainId) throws IOException {
122 114 edge.setRootRuleChainId(ruleChainId);
123 115 Edge savedEdge = edgeService.saveEdge(edge);
124   - RuleChain ruleChain = ruleChainService.findRuleChainById(tenantId, ruleChainId);
125   - saveEventToEdgeQueue(tenantId, edge.getId(), EdgeEventType.RULE_CHAIN, DataConstants.ENTITY_UPDATED, mapper.writeValueAsString(ruleChain), new FutureCallback<Void>() {
126   - @Override
127   - public void onSuccess(@Nullable Void aVoid) {
128   - log.debug("Event saved successfully!");
129   - }
130   -
131   - @Override
132   - public void onFailure(Throwable t) {
133   - log.debug("Failure during event save", t);
134   - }
135   - });
  116 + saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.RULE_CHAIN, ActionType.UPDATED, ruleChainId, null);
136 117 return savedEdge;
137 118 }
138 119
139   - private void saveEventToEdgeQueue(TenantId tenantId, EdgeId edgeId, EdgeEventType entityType, String type, String data, FutureCallback<Void> callback) throws IOException {
140   - log.debug("Pushing single event to edge queue. tenantId [{}], edgeId [{}], entityType [{}], type[{}], data [{}]", tenantId, edgeId, entityType, type, data);
141   -
142   -// EdgeEQueueEntry queueEntry = new EdgeQueueEntry();
143   -// queueEntry.setEntityType(entityType);
144   -// queueEntry.setType(type);
145   -// queueEntry.setData(data);
  120 + private void saveEdgeEvent(TenantId tenantId,
  121 + EdgeId edgeId,
  122 + EdgeEventType edgeEventType,
  123 + ActionType edgeEventAction,
  124 + EntityId entityId,
  125 + JsonNode entityBody) {
  126 + log.debug("Pushing edge event to edge queue. tenantId [{}], edgeId [{}], edgeEventType [{}], edgeEventAction[{}], entityId [{}], entityBody [{}]",
  127 + tenantId, edgeId, edgeEventType, edgeEventAction, entityId, entityBody);
146 128
147 129 EdgeEvent edgeEvent = new EdgeEvent();
148 130 edgeEvent.setEdgeId(edgeId);
149 131 edgeEvent.setTenantId(tenantId);
150   -// event.setType(DataConstants.EDGE_QUEUE_EVENT_TYPE);
151   -// event.setBody(mapper.valueToTree(queueEntry));
152   - ListenableFuture<EdgeEvent> saveFuture = edgeEventService.saveAsync(edgeEvent);
153   -
154   - addMainCallback(saveFuture, callback);
155   - }
156   -
157   - private void addMainCallback(ListenableFuture<EdgeEvent> saveFuture, final FutureCallback<Void> callback) {
158   - Futures.addCallback(saveFuture, new FutureCallback<EdgeEvent>() {
159   - @Override
160   - public void onSuccess(@Nullable EdgeEvent result) {
161   - callback.onSuccess(null);
162   - }
163   -
164   - @Override
165   - public void onFailure(Throwable t) {
166   - callback.onFailure(t);
167   - }
168   - }, tsCallBackExecutor);
  132 + edgeEvent.setEdgeEventType(edgeEventType);
  133 + edgeEvent.setEdgeEventAction(edgeEventAction.name());
  134 + if (entityId != null) {
  135 + edgeEvent.setEntityId(entityId.getId());
  136 + }
  137 + edgeEvent.setEntityBody(entityBody);
  138 + edgeEventService.saveAsync(edgeEvent);
169 139 }
170 140
171 141 @Override
172 142 public void pushNotificationToEdge(TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg, TbCallback callback) {
173   -// if (tbMsg.getType().equals(SessionMsgType.POST_TELEMETRY_REQUEST.name()) ||
174   -// tbMsg.getType().equals(SessionMsgType.POST_ATTRIBUTES_REQUEST.name()) ||
175   -// tbMsg.getType().equals(DataConstants.ATTRIBUTES_UPDATED) ||
176   -// tbMsg.getType().equals(DataConstants.ATTRIBUTES_DELETED)) {
177   -// processCustomTbMsg(tenantId, tbMsg, callback);
178   -// } else {
179   -// try {
180   -// switch (tbMsg.getOriginator().getEntityType()) {
181   -// case EDGE:
182   -// processEdge(tenantId, tbMsg, callback);
183   -// break;
184   -// case ASSET:
185   -// processAsset(tenantId, tbMsg, callback);
186   -// break;
187   -// case DEVICE:
188   -// processDevice(tenantId, tbMsg, callback);
189   -// break;
190   -// case DASHBOARD:
191   -// processDashboard(tenantId, tbMsg, callback);
192   -// break;
193   -// case RULE_CHAIN:
194   -// processRuleChain(tenantId, tbMsg, callback);
195   -// break;
196   -// case ENTITY_VIEW:
197   -// processEntityView(tenantId, tbMsg, callback);
198   -// break;
199   -// case ALARM:
200   -// processAlarm(tenantId, tbMsg, callback);
201   -// break;
202   -// default:
203   -// log.debug("Entity type [{}] is not designed to be pushed to edge", tbMsg.getOriginator().getEntityType());
204   -// }
205   -// } catch (IOException e) {
206   -// log.error("Can't push to edge updates, entity type [{}], data [{}]", tbMsg.getOriginator().getEntityType(), tbMsg.getData(), e);
207   -// }
208   -// }
209   - }
210   -
211   -
212   - private void processCustomTbMsg(TenantId tenantId, TbMsg tbMsg, FutureCallback<Void> callback) {
213   - ListenableFuture<EdgeId> edgeIdFuture = getEdgeIdByOriginatorId(tenantId, tbMsg.getOriginator());
214   - Futures.transform(edgeIdFuture, edgeId -> {
215   - EdgeEventType edgeEventType = getEdgeQueueTypeByEntityType(tbMsg.getOriginator().getEntityType());
216   - if (edgeId != null && edgeEventType != null) {
217   - try {
218   - saveEventToEdgeQueue(tenantId, edgeId, edgeEventType, tbMsg.getType(), Base64.encodeBase64String(TbMsg.toByteArray(tbMsg)), callback);
219   - } catch (IOException e) {
220   - log.error("Error while saving custom tbMsg into Edge Queue", e);
221   - }
  143 + try {
  144 + EdgeEventType edgeEventType = EdgeEventType.valueOf(edgeNotificationMsg.getEdgeEventType());
  145 + ActionType edgeEventAction = ActionType.valueOf(edgeNotificationMsg.getEdgeEventAction());
  146 + TenantId tenantId = new TenantId(new UUID(edgeNotificationMsg.getTenantIdMSB(), edgeNotificationMsg.getTenantIdLSB()));
  147 + switch (edgeEventType) {
  148 + // TODO: voba - handle edge updates
  149 + // case EDGE:
  150 + case ASSET:
  151 + case DEVICE:
  152 + case ENTITY_VIEW:
  153 + case DASHBOARD:
  154 + case RULE_CHAIN:
  155 + EntityId entityId = EntityIdFactory.getByEdgeEventTypeAndUuid(edgeEventType, new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB()));
  156 + ListenableFuture<List<EdgeId>> edgeIdsFuture = findRelatedEdgeIdsEntityId(tenantId, entityId);
  157 + Futures.transform(edgeIdsFuture, edgeIds -> {
  158 + if (edgeIds != null && !edgeIds.isEmpty()) {
  159 + for (EdgeId edgeId : edgeIds) {
  160 + try {
  161 + saveEdgeEvent(tenantId, edgeId, edgeEventType, edgeEventAction, entityId, null);
  162 + if (edgeEventType.equals(EdgeEventType.RULE_CHAIN) &&
  163 + (ActionType.UPDATED.equals(edgeEventAction) || ActionType.ADDED.equals(edgeEventAction))) {
  164 + RuleChainMetaData ruleChainMetaData = ruleChainService.loadRuleChainMetaData(tenantId, new RuleChainId(entityId.getId()));
  165 + saveEdgeEvent(tenantId, edgeId, EdgeEventType.RULE_CHAIN_METADATA, edgeEventAction, ruleChainMetaData.getRuleChainId(), null);
  166 + }
  167 + } catch (Exception e) {
  168 + log.error("[{}] Failed to push event to edge, edgeId [{}], edgeEventType [{}], edgeEventAction [{}], entityId [{}]",
  169 + tenantId, edgeId, edgeEventType, edgeEventAction, entityId, e);
  170 + }
  171 + }
  172 + }
  173 + return null;
  174 + }, dbCallbackExecutorService);
  175 + break;
  176 + case ALARM:
  177 + EntityId alarmId = EntityIdFactory.getByEdgeEventTypeAndUuid(edgeEventType, new UUID(edgeNotificationMsg.getEntityIdMSB(), edgeNotificationMsg.getEntityIdLSB()));
  178 + processAlarm(tenantId, edgeEventAction, alarmId);
  179 + break;
  180 + case RELATION:
  181 + EntityRelation entityRelation = mapper.convertValue(edgeNotificationMsg.getEntityBody(), EntityRelation.class);
  182 + processRelation(tenantId, edgeEventAction, entityRelation);
  183 + break;
  184 + default:
  185 + log.debug("Edge event type [{}] is not designed to be pushed to edge", edgeEventType);
222 186 }
223   - return null;
224   - }, MoreExecutors.directExecutor());
225   - }
226   -
227   - private void processDevice(TenantId tenantId, TbMsg tbMsg, FutureCallback<Void> callback) throws IOException {
228   - switch (tbMsg.getType()) {
229   - case DataConstants.ENTITY_ASSIGNED_TO_EDGE:
230   - case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE:
231   - processAssignedEntity(tenantId, tbMsg, EdgeEventType.DEVICE, callback);
232   - break;
233   - case DataConstants.ENTITY_DELETED:
234   - case DataConstants.ENTITY_CREATED:
235   - case DataConstants.ENTITY_UPDATED:
236   - Device device = mapper.readValue(tbMsg.getData(), Device.class);
237   - pushEventToEdge(tenantId, device.getId(), EdgeEventType.DEVICE, tbMsg, callback);
238   - break;
239   - default:
240   - log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg);
241   - }
242   - }
243   -
244   - private void processEdge(TenantId tenantId, TbMsg tbMsg, FutureCallback<Void> callback) throws IOException {
245   - switch (tbMsg.getType()) {
246   - case DataConstants.ENTITY_DELETED:
247   - case DataConstants.ENTITY_CREATED:
248   - case DataConstants.ENTITY_UPDATED:
249   - // TODO: voba - handle properly edge creation
250   - break;
251   - default:
252   - log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg);
253   - }
254   - }
255   -
256   - private void processAsset(TenantId tenantId, TbMsg tbMsg, FutureCallback<Void> callback) throws IOException {
257   - switch (tbMsg.getType()) {
258   - case DataConstants.ENTITY_ASSIGNED_TO_EDGE:
259   - case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE:
260   - processAssignedEntity(tenantId, tbMsg, EdgeEventType.ASSET, callback);
261   - break;
262   - case DataConstants.ENTITY_DELETED:
263   - case DataConstants.ENTITY_CREATED:
264   - case DataConstants.ENTITY_UPDATED:
265   - Asset asset = mapper.readValue(tbMsg.getData(), Asset.class);
266   - pushEventToEdge(tenantId, asset.getId(), EdgeEventType.ASSET, tbMsg, callback);
267   - break;
268   - default:
269   - log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg);
  187 + } catch (Exception e) {
  188 + callback.onFailure(e);
  189 + log.error("Can't push to edge updates, edgeNotificationMsg [{}]", edgeNotificationMsg, e);
  190 + } finally {
  191 + callback.onSuccess();
270 192 }
271 193 }
272 194
273   - private void processEntityView(TenantId tenantId, TbMsg tbMsg, FutureCallback<Void> callback) throws IOException {
274   - switch (tbMsg.getType()) {
275   - case DataConstants.ENTITY_ASSIGNED_TO_EDGE:
276   - case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE:
277   - processAssignedEntity(tenantId, tbMsg, EdgeEventType.ENTITY_VIEW, callback);
278   - break;
279   - case DataConstants.ENTITY_DELETED:
280   - case DataConstants.ENTITY_CREATED:
281   - case DataConstants.ENTITY_UPDATED:
282   - EntityView entityView = mapper.readValue(tbMsg.getData(), EntityView.class);
283   - pushEventToEdge(tenantId, entityView.getId(), EdgeEventType.ENTITY_VIEW, tbMsg, callback);
284   - break;
285   - default:
286   - log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg);
287   - }
288   - }
289   -
290   - private void processAlarm(TenantId tenantId, TbMsg tbMsg, FutureCallback<Void> callback) throws IOException {
291   - switch (tbMsg.getType()) {
292   - case DataConstants.ENTITY_DELETED:
293   - case DataConstants.ENTITY_CREATED:
294   - case DataConstants.ENTITY_UPDATED:
295   - case DataConstants.ALARM_ACK:
296   - case DataConstants.ALARM_CLEAR:
297   - Alarm alarm = mapper.readValue(tbMsg.getData(), Alarm.class);
  195 + private void processAlarm(TenantId tenantId, ActionType edgeActionType, EntityId alarmId) {
  196 + ListenableFuture<Alarm> alarmFuture = alarmService.findAlarmByIdAsync(tenantId, new AlarmId(alarmId.getId()));
  197 + Futures.transform(alarmFuture, alarm -> {
  198 + if (alarm != null) {
298 199 EdgeEventType edgeEventType = getEdgeQueueTypeByEntityType(alarm.getOriginator().getEntityType());
299 200 if (edgeEventType != null) {
300   - pushEventToEdge(tenantId, alarm.getOriginator(), EdgeEventType.ALARM, tbMsg, callback);
301   - }
302   - break;
303   - default:
304   - log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg);
305   - }
306   - }
307   -
308   - private void processDashboard(TenantId tenantId, TbMsg tbMsg, FutureCallback<Void> callback) throws IOException {
309   - switch (tbMsg.getType()) {
310   - case DataConstants.ENTITY_ASSIGNED_TO_EDGE:
311   - case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE:
312   - processAssignedEntity(tenantId, tbMsg, EdgeEventType.DASHBOARD, callback);
313   - break;
314   - case DataConstants.ENTITY_DELETED:
315   - case DataConstants.ENTITY_CREATED:
316   - case DataConstants.ENTITY_UPDATED:
317   - Dashboard dashboard = mapper.readValue(tbMsg.getData(), Dashboard.class);
318   - ListenableFuture<TimePageData<Edge>> future = edgeService.findEdgesByTenantIdAndDashboardId(tenantId, dashboard.getId(), new TimePageLink(Integer.MAX_VALUE));
319   - Futures.transform(future, edges -> {
320   - if (edges != null && edges.getData() != null && !edges.getData().isEmpty()) {
321   - try {
322   - for (Edge edge : edges.getData()) {
323   - pushEventToEdge(tenantId, edge.getId(), EdgeEventType.DASHBOARD, tbMsg, callback);
324   - }
325   - } catch (IOException e) {
326   - log.error("Can't push event to edge", e);
327   - }
328   - }
329   - return null;
330   - }, MoreExecutors.directExecutor());
331   - break;
332   - default:
333   - log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg);
334   - }
335   - }
336   -
337   - private void processRuleChain(TenantId tenantId, TbMsg tbMsg, FutureCallback<Void> callback) throws IOException {
338   - switch (tbMsg.getType()) {
339   - case DataConstants.ENTITY_ASSIGNED_TO_EDGE:
340   - case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE:
341   - processAssignedEntity(tenantId, tbMsg, EdgeEventType.RULE_CHAIN, callback);
342   - break;
343   - case DataConstants.ENTITY_DELETED:
344   - case DataConstants.ENTITY_CREATED:
345   - case DataConstants.ENTITY_UPDATED:
346   - RuleChain ruleChain = mapper.readValue(tbMsg.getData(), RuleChain.class);
347   - if (RuleChainType.EDGE.equals(ruleChain.getType())) {
348   - ListenableFuture<TimePageData<Edge>> future = edgeService.findEdgesByTenantIdAndRuleChainId(tenantId, ruleChain.getId(), new TimePageLink(Integer.MAX_VALUE));
349   - Futures.transform(future, edges -> {
350   - if (edges != null && edges.getData() != null && !edges.getData().isEmpty()) {
351   - try {
352   - for (Edge edge : edges.getData()) {
353   - pushEventToEdge(tenantId, edge.getId(), EdgeEventType.RULE_CHAIN, tbMsg, callback);
354   - }
355   - } catch (IOException e) {
356   - log.error("Can't push event to edge", e);
  201 + ListenableFuture<List<EdgeId>> relatedEdgeIdsEntityIdFuture = findRelatedEdgeIdsEntityId(tenantId, alarm.getOriginator());
  202 + Futures.transform(relatedEdgeIdsEntityIdFuture, relatedEdgeIdsEntityId -> {
  203 + if (relatedEdgeIdsEntityId != null) {
  204 + for (EdgeId edgeId : relatedEdgeIdsEntityId) {
  205 + saveEdgeEvent(tenantId, edgeId, EdgeEventType.ALARM, edgeActionType, alarmId, null);
357 206 }
358 207 }
359 208 return null;
360   - }, MoreExecutors.directExecutor());
  209 + }, dbCallbackExecutorService);
361 210 }
362   - break;
363   - default:
364   - log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg);
365   - }
366   - }
367   -
368   -
369   - private void processAssignedEntity(TenantId tenantId, TbMsg tbMsg, EdgeEventType entityType, FutureCallback<Void> callback) throws IOException {
370   - EdgeId edgeId;
371   - switch (tbMsg.getType()) {
372   - case DataConstants.ENTITY_ASSIGNED_TO_EDGE:
373   - edgeId = new EdgeId(UUID.fromString(tbMsg.getMetaData().getValue("assignedEdgeId")));
374   - pushEventToEdge(tenantId, edgeId, entityType, tbMsg, callback);
375   - break;
376   - case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE:
377   - edgeId = new EdgeId(UUID.fromString(tbMsg.getMetaData().getValue("unassignedEdgeId")));
378   - pushEventToEdge(tenantId, edgeId, entityType, tbMsg, callback);
379   - break;
380   -
381   - }
  211 + }
  212 + return null;
  213 + }, dbCallbackExecutorService);
382 214 }
383 215
384   - private void pushEventToEdge(TenantId tenantId, EntityId originatorId, EdgeEventType edgeEventType, TbMsg tbMsg, FutureCallback<Void> callback) {
385   - ListenableFuture<EdgeId> edgeIdFuture = getEdgeIdByOriginatorId(tenantId, originatorId);
386   - Futures.transform(edgeIdFuture, edgeId -> {
387   - if (edgeId != null) {
388   - try {
389   - pushEventToEdge(tenantId, edgeId, edgeEventType, tbMsg, callback);
390   - } catch (Exception e) {
391   - log.error("Failed to push event to edge, edgeId [{}], tbMsg [{}]", edgeId, tbMsg, e);
392   - }
  216 + private void processRelation(TenantId tenantId, ActionType edgeActionType, EntityRelation entityRelation) {
  217 + List<ListenableFuture<List<EdgeId>>> futures = new ArrayList<>();
  218 + futures.add(findRelatedEdgeIdsEntityId(tenantId, entityRelation.getTo()));
  219 + futures.add(findRelatedEdgeIdsEntityId(tenantId, entityRelation.getFrom()));
  220 + ListenableFuture<List<List<EdgeId>>> combinedFuture = Futures.allAsList(futures);
  221 + Futures.transform(combinedFuture, listOfListsEdgeIds -> {
  222 + Set<EdgeId> uniqueEdgeIds = new HashSet<>();
  223 + if (listOfListsEdgeIds != null && !listOfListsEdgeIds.isEmpty()) {
  224 + for (List<EdgeId> listOfListsEdgeId : listOfListsEdgeIds) {
  225 + if (listOfListsEdgeId != null) {
  226 + uniqueEdgeIds.addAll(listOfListsEdgeId);
393 227 }
394   - return null;
395   - },
396   - MoreExecutors.directExecutor());
397   - }
398   -
399   - private ListenableFuture<EdgeId> getEdgeIdByOriginatorId(TenantId tenantId, EntityId originatorId) {
400   - List<EntityRelation> originatorEdgeRelations = relationService.findByToAndType(tenantId, originatorId, EntityRelation.CONTAINS_TYPE, RelationTypeGroup.EDGE);
401   - if (originatorEdgeRelations != null && originatorEdgeRelations.size() > 0) {
402   - return Futures.immediateFuture(new EdgeId(originatorEdgeRelations.get(0).getFrom().getId()));
403   - } else {
404   - return Futures.immediateFuture(null);
405   - }
406   - }
407   -
408   -
409   - private void pushEventToEdge(TenantId tenantId, EdgeId edgeId, EdgeEventType entityType, TbMsg tbMsg, FutureCallback<Void> callback) throws IOException {
410   - log.debug("Pushing event(s) to edge queue. tenantId [{}], edgeId [{}], entityType [{}], tbMsg [{}]", tenantId, edgeId, entityType, tbMsg);
411   -
412   - saveEventToEdgeQueue(tenantId, edgeId, entityType, tbMsg.getType(), tbMsg.getData(), callback);
413   -
414   - if (entityType.equals(EdgeEventType.RULE_CHAIN)) {
415   - pushRuleChainMetadataToEdge(tenantId, edgeId, tbMsg, callback);
416   - }
  228 + }
  229 + }
  230 + if (!uniqueEdgeIds.isEmpty()) {
  231 + for (EdgeId edgeId : uniqueEdgeIds) {
  232 + saveEdgeEvent(tenantId, edgeId, EdgeEventType.RELATION, edgeActionType, null, mapper.valueToTree(entityRelation));
  233 + }
  234 + }
  235 + return null;
  236 + }, dbCallbackExecutorService);
417 237 }
418 238
419   - private void pushRuleChainMetadataToEdge(TenantId tenantId, EdgeId edgeId, TbMsg tbMsg, FutureCallback<Void> callback) throws IOException {
420   - RuleChain ruleChain = mapper.readValue(tbMsg.getData(), RuleChain.class);
421   - switch (tbMsg.getType()) {
422   - case DataConstants.ENTITY_ASSIGNED_TO_EDGE:
423   - case DataConstants.ENTITY_UNASSIGNED_FROM_EDGE:
424   - case DataConstants.ENTITY_UPDATED:
425   - RuleChainMetaData ruleChainMetaData = ruleChainService.loadRuleChainMetaData(tenantId, ruleChain.getId());
426   - saveEventToEdgeQueue(tenantId, edgeId, EdgeEventType.RULE_CHAIN_METADATA, tbMsg.getType(), mapper.writeValueAsString(ruleChainMetaData), callback);
427   - break;
  239 + private ListenableFuture<List<EdgeId>> findRelatedEdgeIdsEntityId(TenantId tenantId, EntityId entityId) {
  240 + switch (entityId.getEntityType()) {
  241 + case DEVICE:
  242 + case ASSET:
  243 + case ENTITY_VIEW:
  244 + ListenableFuture<List<EntityRelation>> originatorEdgeRelationsFuture = relationService.findByToAndTypeAsync(tenantId, entityId, EntityRelation.CONTAINS_TYPE, RelationTypeGroup.EDGE);
  245 + return Futures.transform(originatorEdgeRelationsFuture, originatorEdgeRelations -> {
  246 + if (originatorEdgeRelations != null && originatorEdgeRelations.size() > 0) {
  247 + return Collections.singletonList(new EdgeId(originatorEdgeRelations.get(0).getFrom().getId()));
  248 + } else {
  249 + return Collections.emptyList();
  250 + }
  251 + }, dbCallbackExecutorService);
  252 + case DASHBOARD:
  253 + return convertToEdgeIds(edgeService.findEdgesByTenantIdAndDashboardId(tenantId, new DashboardId(entityId.getId()), new TimePageLink(Integer.MAX_VALUE)));
  254 + case RULE_CHAIN:
  255 + return convertToEdgeIds(edgeService.findEdgesByTenantIdAndRuleChainId(tenantId, new RuleChainId(entityId.getId()), new TimePageLink(Integer.MAX_VALUE)));
428 256 default:
429   - log.warn("Unsupported msgType [{}], tbMsg [{}]", tbMsg.getType(), tbMsg);
  257 + return Futures.immediateFuture(Collections.emptyList());
430 258 }
431 259 }
432 260
  261 + private ListenableFuture<List<EdgeId>> convertToEdgeIds(ListenableFuture<TimePageData<Edge>> future) {
  262 + return Futures.transform(future, edges -> {
  263 + if (edges != null && edges.getData() != null && !edges.getData().isEmpty()) {
  264 + return edges.getData().stream().map(IdBased::getId).collect(Collectors.toList());
  265 + } else {
  266 + return Collections.emptyList();
  267 + }
  268 + }, dbCallbackExecutorService);
  269 + }
433 270
434 271 private EdgeEventType getEdgeQueueTypeByEntityType(EntityType entityType) {
435 272 switch (entityType) {
... ... @@ -446,3 +283,4 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService {
446 283 }
447 284 }
448 285
  286 +
... ...
... ... @@ -31,11 +31,14 @@ import org.thingsboard.server.dao.device.DeviceService;
31 31 import org.thingsboard.server.dao.edge.EdgeService;
32 32 import org.thingsboard.server.dao.entityview.EntityViewService;
33 33 import org.thingsboard.server.dao.relation.RelationService;
  34 +import org.thingsboard.server.dao.rule.RuleChainService;
  35 +import org.thingsboard.server.dao.user.UserService;
34 36 import org.thingsboard.server.service.edge.rpc.EdgeEventStorageSettings;
35 37 import org.thingsboard.server.service.edge.rpc.constructor.AlarmUpdateMsgConstructor;
36 38 import org.thingsboard.server.service.edge.rpc.constructor.AssetUpdateMsgConstructor;
37 39 import org.thingsboard.server.service.edge.rpc.constructor.DashboardUpdateMsgConstructor;
38 40 import org.thingsboard.server.service.edge.rpc.constructor.DeviceUpdateMsgConstructor;
  41 +import org.thingsboard.server.service.edge.rpc.constructor.EntityDataMsgConstructor;
39 42 import org.thingsboard.server.service.edge.rpc.constructor.EntityViewUpdateMsgConstructor;
40 43 import org.thingsboard.server.service.edge.rpc.constructor.RelationUpdateMsgConstructor;
41 44 import org.thingsboard.server.service.edge.rpc.constructor.RuleChainUpdateMsgConstructor;
... ... @@ -95,6 +98,14 @@ public class EdgeContextComponent {
95 98
96 99 @Lazy
97 100 @Autowired
  101 + private RuleChainService ruleChainService;
  102 +
  103 + @Lazy
  104 + @Autowired
  105 + private UserService userService;
  106 +
  107 + @Lazy
  108 + @Autowired
98 109 private ActorService actorService;
99 110
100 111 @Lazy
... ... @@ -143,6 +154,10 @@ public class EdgeContextComponent {
143 154
144 155 @Lazy
145 156 @Autowired
  157 + private EntityDataMsgConstructor entityDataMsgConstructor;
  158 +
  159 + @Lazy
  160 + @Autowired
146 161 private EdgeEventStorageSettings edgeEventStorageSettings;
147 162
148 163 @Autowired
... ...
... ... @@ -19,15 +19,17 @@ import com.datastax.driver.core.utils.UUIDs;
19 19 import com.fasterxml.jackson.core.JsonProcessingException;
20 20 import com.fasterxml.jackson.databind.ObjectMapper;
21 21 import com.fasterxml.jackson.databind.node.ObjectNode;
  22 +import com.google.common.util.concurrent.FutureCallback;
22 23 import com.google.common.util.concurrent.Futures;
23 24 import com.google.common.util.concurrent.ListenableFuture;
24 25 import com.google.common.util.concurrent.MoreExecutors;
25   -import com.google.protobuf.ByteString;
  26 +import com.google.gson.Gson;
  27 +import com.google.gson.JsonElement;
  28 +import com.google.gson.JsonObject;
26 29 import io.grpc.stub.StreamObserver;
27 30 import lombok.Data;
28 31 import lombok.extern.slf4j.Slf4j;
29   -import org.apache.commons.codec.binary.Base64;
30   -import org.thingsboard.server.common.data.Customer;
  32 +import org.checkerframework.checker.nullness.qual.Nullable;
31 33 import org.thingsboard.server.common.data.Dashboard;
32 34 import org.thingsboard.server.common.data.DataConstants;
33 35 import org.thingsboard.server.common.data.Device;
... ... @@ -41,12 +43,16 @@ import org.thingsboard.server.common.data.asset.Asset;
41 43 import org.thingsboard.server.common.data.audit.ActionType;
42 44 import org.thingsboard.server.common.data.edge.Edge;
43 45 import org.thingsboard.server.common.data.edge.EdgeEvent;
  46 +import org.thingsboard.server.common.data.id.AlarmId;
44 47 import org.thingsboard.server.common.data.id.AssetId;
  48 +import org.thingsboard.server.common.data.id.DashboardId;
45 49 import org.thingsboard.server.common.data.id.DeviceId;
46 50 import org.thingsboard.server.common.data.id.EdgeId;
47 51 import org.thingsboard.server.common.data.id.EntityId;
48 52 import org.thingsboard.server.common.data.id.EntityViewId;
  53 +import org.thingsboard.server.common.data.id.RuleChainId;
49 54 import org.thingsboard.server.common.data.id.TenantId;
  55 +import org.thingsboard.server.common.data.id.UserId;
50 56 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
51 57 import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
52 58 import org.thingsboard.server.common.data.kv.DataType;
... ... @@ -60,14 +66,13 @@ import org.thingsboard.server.common.data.rule.RuleChainMetaData;
60 66 import org.thingsboard.server.common.data.security.DeviceCredentials;
61 67 import org.thingsboard.server.common.data.security.DeviceCredentialsType;
62 68 import org.thingsboard.server.common.msg.TbMsg;
63   -import org.thingsboard.server.common.msg.TbMsgDataType;
64 69 import org.thingsboard.server.common.msg.TbMsgMetaData;
65   -import org.thingsboard.server.common.msg.queue.TbMsgCallback;
  70 +import org.thingsboard.server.common.msg.session.SessionMsgType;
  71 +import org.thingsboard.server.common.transport.util.JsonUtils;
66 72 import org.thingsboard.server.gen.edge.AlarmUpdateMsg;
67 73 import org.thingsboard.server.gen.edge.ConnectRequestMsg;
68 74 import org.thingsboard.server.gen.edge.ConnectResponseCode;
69 75 import org.thingsboard.server.gen.edge.ConnectResponseMsg;
70   -import org.thingsboard.server.gen.edge.CustomerUpdateMsg;
71 76 import org.thingsboard.server.gen.edge.DeviceUpdateMsg;
72 77 import org.thingsboard.server.gen.edge.DownlinkMsg;
73 78 import org.thingsboard.server.gen.edge.EdgeConfiguration;
... ... @@ -81,7 +86,7 @@ import org.thingsboard.server.gen.edge.RuleChainMetadataUpdateMsg;
81 86 import org.thingsboard.server.gen.edge.UpdateMsgType;
82 87 import org.thingsboard.server.gen.edge.UplinkMsg;
83 88 import org.thingsboard.server.gen.edge.UplinkResponseMsg;
84   -import org.thingsboard.server.gen.edge.UserUpdateMsg;
  89 +import org.thingsboard.server.gen.transport.TransportProtos;
85 90 import org.thingsboard.server.service.edge.EdgeContextComponent;
86 91
87 92 import java.io.Closeable;
... ... @@ -103,6 +108,8 @@ public final class EdgeGrpcSession implements Closeable {
103 108
104 109 private static final ReentrantLock deviceCreationLock = new ReentrantLock();
105 110
  111 + private final Gson gson = new Gson();
  112 +
106 113 private static final String QUEUE_START_TS_ATTR_KEY = "queueStartTs";
107 114
108 115 private final UUID sessionId;
... ... @@ -182,9 +189,9 @@ public final class EdgeGrpcSession implements Closeable {
182 189 processTelemetryMessage(edgeEvent);
183 190 } else {
184 191 processEntityCRUDMessage(edgeEvent, msgType);
185   - }
186   - if (ENTITY_CREATED_RPC_MESSAGE.equals(msgType)) {
187   - pushEntityAttributesToEdge(edgeEvent);
  192 + if (ENTITY_CREATED_RPC_MESSAGE.equals(msgType)) {
  193 + pushEntityAttributesToEdge(edgeEvent);
  194 + }
188 195 }
189 196 } catch (Exception e) {
190 197 log.error("Exception during processing records from queue", e);
... ... @@ -213,63 +220,66 @@ public final class EdgeGrpcSession implements Closeable {
213 220 }
214 221 }
215 222
  223 + private ListenableFuture<Long> getQueueStartTs() {
  224 + ListenableFuture<Optional<AttributeKvEntry>> future =
  225 + ctx.getAttributesService().find(edge.getTenantId(), edge.getId(), DataConstants.SERVER_SCOPE, QUEUE_START_TS_ATTR_KEY);
  226 + return Futures.transform(future, attributeKvEntryOpt -> {
  227 + if (attributeKvEntryOpt != null && attributeKvEntryOpt.isPresent()) {
  228 + AttributeKvEntry attributeKvEntry = attributeKvEntryOpt.get();
  229 + return attributeKvEntry.getLongValue().isPresent() ? attributeKvEntry.getLongValue().get() : 0L;
  230 + } else {
  231 + return 0L;
  232 + }
  233 + }, MoreExecutors.directExecutor());
  234 + }
  235 +
  236 + private void updateQueueStartTs(Long newStartTs) {
  237 + newStartTs = ++newStartTs; // increments ts by 1 - next edge event search starts from current offset + 1
  238 + List<AttributeKvEntry> attributes = Collections.singletonList(new BaseAttributeKvEntry(new LongDataEntry(QUEUE_START_TS_ATTR_KEY, newStartTs), System.currentTimeMillis()));
  239 + ctx.getAttributesService().save(edge.getTenantId(), edge.getId(), DataConstants.SERVER_SCOPE, attributes);
  240 + }
  241 +
216 242 private void pushEntityAttributesToEdge(EdgeEvent edgeEvent) throws IOException {
217 243 EntityId entityId = null;
218   - String entityName = null;
219 244 switch (edgeEvent.getEdgeEventType()) {
220 245 case EDGE:
221   - Edge edge = objectMapper.readValue(entry.getData(), Edge.class);
222 246 entityId = edge.getId();
223   - entityName = edge.getName();
224 247 break;
225 248 case DEVICE:
226   - Device device = objectMapper.readValue(entry.getData(), Device.class);
227   - entityId = device.getId();
228   - entityName = device.getName();
  249 + entityId = new DeviceId(edgeEvent.getEntityId());
229 250 break;
230 251 case ASSET:
231   - Asset asset = objectMapper.readValue(entry.getData(), Asset.class);
232   - entityId = asset.getId();
233   - entityName = asset.getName();
  252 + entityId = new AssetId(edgeEvent.getEntityId());
234 253 break;
235 254 case ENTITY_VIEW:
236   - EntityView entityView = objectMapper.readValue(entry.getData(), EntityView.class);
237   - entityId = entityView.getId();
238   - entityName = entityView.getName();
  255 + entityId = new EntityViewId(edgeEvent.getEntityId());
239 256 break;
240 257 case DASHBOARD:
241   - Dashboard dashboard = objectMapper.readValue(entry.getData(), Dashboard.class);
242   - entityId = dashboard.getId();
243   - entityName = dashboard.getName();
  258 + entityId = new DashboardId(edgeEvent.getEntityId());
244 259 break;
245 260 }
246 261 if (entityId != null) {
247 262 final EntityId finalEntityId = entityId;
248   - final String finalEntityName = entityName;
249 263 ListenableFuture<List<AttributeKvEntry>> ssAttrFuture = ctx.getAttributesService().findAll(edge.getTenantId(), entityId, DataConstants.SERVER_SCOPE);
250 264 Futures.transform(ssAttrFuture, ssAttributes -> {
251 265 if (ssAttributes != null && !ssAttributes.isEmpty()) {
252 266 try {
253   - TbMsgMetaData metaData = new TbMsgMetaData();
254 267 ObjectNode entityNode = objectMapper.createObjectNode();
255   - metaData.putValue("scope", DataConstants.SERVER_SCOPE);
256 268 for (AttributeKvEntry attr : ssAttributes) {
257   - if (attr.getDataType() == DataType.BOOLEAN) {
  269 + if (attr.getDataType() == DataType.BOOLEAN && attr.getBooleanValue().isPresent()) {
258 270 entityNode.put(attr.getKey(), attr.getBooleanValue().get());
259   - } else if (attr.getDataType() == DataType.DOUBLE) {
  271 + } else if (attr.getDataType() == DataType.DOUBLE && attr.getDoubleValue().isPresent()) {
260 272 entityNode.put(attr.getKey(), attr.getDoubleValue().get());
261   - } else if (attr.getDataType() == DataType.LONG) {
  273 + } else if (attr.getDataType() == DataType.LONG && attr.getLongValue().isPresent()) {
262 274 entityNode.put(attr.getKey(), attr.getLongValue().get());
263 275 } else {
264 276 entityNode.put(attr.getKey(), attr.getValueAsString());
265 277 }
266 278 }
267   - TbMsg tbMsg = TbMsg.newMsg(DataConstants.ATTRIBUTES_UPDATED, finalEntityId, metaData, TbMsgDataType.JSON
268   - , objectMapper.writeValueAsString(entityNode));
269   - log.debug("Sending donwlink entity data msg, entityName [{}], tbMsg [{}]", finalEntityName, tbMsg);
  279 + log.debug("Sending attributes data msg, entityId [{}], attributes [{}]", finalEntityId, entityNode);
  280 + DownlinkMsg value = constructEntityDataProtoMsg(finalEntityId, ActionType.ATTRIBUTES_UPDATED, JsonUtils.parse(objectMapper.writeValueAsString(entityNode)));
270 281 outputStream.onNext(ResponseMsg.newBuilder()
271   - .setDownlinkMsg(constructEntityDataProtoMsg(finalEntityName, finalEntityId, tbMsg))
272   - .build());
  282 + .setDownlinkMsg(value).build());
273 283 } catch (Exception e) {
274 284 log.error("[{}] Failed to send attribute updates to the edge", edge.getName(), e);
275 285 }
... ... @@ -283,188 +293,276 @@ public final class EdgeGrpcSession implements Closeable {
283 293
284 294 private void processTelemetryMessage(EdgeEvent edgeEvent) throws IOException {
285 295 log.trace("Executing processTelemetryMessage, edgeEvent [{}]", edgeEvent);
286   - TbMsg tbMsg = TbMsg.fromBytes(Base64.decodeBase64(entry.getData()), TbMsgCallback.EMPTY);
287   - String entityName = null;
288 296 EntityId entityId = null;
289 297 switch (edgeEvent.getEdgeEventType()) {
290 298 case DEVICE:
291   - Device device = ctx.getDeviceService().findDeviceById(edge.getTenantId(), new DeviceId(tbMsg.getOriginator().getId()));
292   - entityName = device.getName();
293   - entityId = device.getId();
  299 + entityId = new DeviceId(edgeEvent.getEntityId());
294 300 break;
295 301 case ASSET:
296   - Asset asset = ctx.getAssetService().findAssetById(edge.getTenantId(), new AssetId(tbMsg.getOriginator().getId()));
297   - entityName = asset.getName();
298   - entityId = asset.getId();
  302 + entityId = new AssetId(edgeEvent.getEntityId());
299 303 break;
300 304 case ENTITY_VIEW:
301   - EntityView entityView = ctx.getEntityViewService().findEntityViewById(edge.getTenantId(), new EntityViewId(tbMsg.getOriginator().getId()));
302   - entityName = entityView.getName();
303   - entityId = entityView.getId();
  305 + entityId = new EntityViewId(edgeEvent.getEntityId());
304 306 break;
305 307
306 308 }
307   - if (entityName != null && entityId != null) {
308   - log.debug("Sending downlink entity data msg, entityName [{}], tbMsg [{}]", entityName, tbMsg);
309   - outputStream.onNext(ResponseMsg.newBuilder()
310   - .setDownlinkMsg(constructEntityDataProtoMsg(entityName, entityId, tbMsg))
311   - .build());
  309 + if (entityId != null) {
  310 + log.debug("Sending telemetry data msg, entityId [{}], body [{}]", edgeEvent.getEntityId(), edgeEvent.getEntityBody());
  311 + DownlinkMsg downlinkMsg;
  312 + try {
  313 + ActionType actionType = ActionType.valueOf(edgeEvent.getEdgeEventAction());
  314 + downlinkMsg = constructEntityDataProtoMsg(entityId, actionType, JsonUtils.parse(objectMapper.writeValueAsString(edgeEvent.getEntityBody())));
  315 + outputStream.onNext(ResponseMsg.newBuilder()
  316 + .setDownlinkMsg(downlinkMsg)
  317 + .build());
  318 + } catch (Exception e) {
  319 + log.warn("Can't send telemetry data msg, entityId [{}], body [{}]", edgeEvent.getEntityId(), edgeEvent.getEntityBody(), e);
  320 + }
  321 +
312 322 }
313 323 }
314 324
315   - private void processEntityCRUDMessage(EdgeEvent edgeEvent, UpdateMsgType msgType) throws java.io.IOException {
  325 + private void processEntityCRUDMessage(EdgeEvent edgeEvent, UpdateMsgType msgType) {
316 326 log.trace("Executing processEntityCRUDMessage, edgeEvent [{}], msgType [{}]", edgeEvent, msgType);
317 327 switch (edgeEvent.getEdgeEventType()) {
318 328 case EDGE:
319   - Edge edge = objectMapper.readValue(entry.getData(), Edge.class);
320   - onEdgeUpdated(msgType, edge);
  329 + // TODO: voba - add edge update logic
321 330 break;
322 331 case DEVICE:
323   - Device device = objectMapper.readValue(entry.getData(), Device.class);
324   - onDeviceUpdated(msgType, device);
  332 + processDeviceCRUD(edgeEvent, msgType);
325 333 break;
326 334 case ASSET:
327   - Asset asset = objectMapper.readValue(entry.getData(), Asset.class);
328   - onAssetUpdated(msgType, asset);
  335 + processAssetCRUD(edgeEvent, msgType);
329 336 break;
330 337 case ENTITY_VIEW:
331   - EntityView entityView = objectMapper.readValue(entry.getData(), EntityView.class);
332   - onEntityViewUpdated(msgType, entityView);
  338 + processEntityViewCRUD(edgeEvent, msgType);
333 339 break;
334 340 case DASHBOARD:
335   - Dashboard dashboard = objectMapper.readValue(entry.getData(), Dashboard.class);
336   - onDashboardUpdated(msgType, dashboard);
  341 + processDashboardCRUD(edgeEvent, msgType);
337 342 break;
338 343 case RULE_CHAIN:
339   - RuleChain ruleChain = objectMapper.readValue(entry.getData(), RuleChain.class);
340   - onRuleChainUpdated(msgType, ruleChain);
  344 + processRuleChainCRUD(edgeEvent, msgType);
341 345 break;
342 346 case RULE_CHAIN_METADATA:
343   - RuleChainMetaData ruleChainMetaData = objectMapper.readValue(entry.getData(), RuleChainMetaData.class);
344   - onRuleChainMetadataUpdated(msgType, ruleChainMetaData);
  347 + processRuleChainMetadataCRUD(edgeEvent, msgType);
345 348 break;
346 349 case ALARM:
347   - Alarm alarm = objectMapper.readValue(entry.getData(), Alarm.class);
348   - onAlarmUpdated(msgType, alarm);
  350 + processAlarmCRUD(edgeEvent, msgType);
349 351 break;
350 352 case USER:
351   - User user = objectMapper.readValue(entry.getData(), User.class);
352   - onUserUpdated(msgType, user);
  353 + processUserCRUD(edgeEvent, msgType);
353 354 break;
354 355 case RELATION:
355   - EntityRelation entityRelation = objectMapper.readValue(entry.getData(), EntityRelation.class);
356   - onEntityRelationUpdated(msgType, entityRelation);
  356 + processRelationCRUD(edgeEvent, msgType);
357 357 break;
358 358 }
359 359 }
360 360
361   - private void updateQueueStartTs(Long newStartTs) {
362   - newStartTs = ++newStartTs; // increments ts by 1 - next edge event search starts from current offset + 1
363   - List<AttributeKvEntry> attributes = Collections.singletonList(new BaseAttributeKvEntry(new LongDataEntry(QUEUE_START_TS_ATTR_KEY, newStartTs), System.currentTimeMillis()));
364   - ctx.getAttributesService().save(edge.getTenantId(), edge.getId(), DataConstants.SERVER_SCOPE, attributes);
365   - }
366   -
367   - private ListenableFuture<Long> getQueueStartTs() {
368   - ListenableFuture<Optional<AttributeKvEntry>> future =
369   - ctx.getAttributesService().find(edge.getTenantId(), edge.getId(), DataConstants.SERVER_SCOPE, QUEUE_START_TS_ATTR_KEY);
370   - return Futures.transform(future, attributeKvEntryOpt -> {
371   - if (attributeKvEntryOpt != null && attributeKvEntryOpt.isPresent()) {
372   - AttributeKvEntry attributeKvEntry = attributeKvEntryOpt.get();
373   - return attributeKvEntry.getLongValue().isPresent() ? attributeKvEntry.getLongValue().get() : 0L;
374   - } else {
375   - return 0L;
376   - }
377   - }, MoreExecutors.directExecutor());
378   - }
  361 + private void processDeviceCRUD(EdgeEvent edgeEvent, UpdateMsgType msgType) {
  362 + DeviceId deviceId = new DeviceId(edgeEvent.getEntityId());
  363 + ListenableFuture<Device> deviceFuture = ctx.getDeviceService().findDeviceByIdAsync(edgeEvent.getTenantId(), deviceId);
  364 + Futures.addCallback(deviceFuture,
  365 + new FutureCallback<Device>() {
  366 + @Override
  367 + public void onSuccess(@Nullable Device device) {
  368 + if (device != null) {
  369 + EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
  370 + .setDeviceUpdateMsg(ctx.getDeviceUpdateMsgConstructor().constructDeviceUpdatedMsg(msgType, device))
  371 + .build();
  372 + outputStream.onNext(ResponseMsg.newBuilder()
  373 + .setEntityUpdateMsg(entityUpdateMsg)
  374 + .build());
  375 + }
  376 + }
379 377
380   - private void onEdgeUpdated(UpdateMsgType msgType, Edge edge) {
381   - // TODO: voba add configuration update to edge
382   - this.edge = edge;
383   - }
  378 + @Override
  379 + public void onFailure(Throwable t) {
  380 + log.warn("Can't processDeviceCRUD, edgeEvent [{}]", edgeEvent, t);
  381 + }
  382 + }, ctx.getDbCallbackExecutor());
  383 + }
  384 +
  385 + private void processAssetCRUD(EdgeEvent edgeEvent, UpdateMsgType msgType) {
  386 + AssetId assetId = new AssetId(edgeEvent.getEntityId());
  387 + ListenableFuture<Asset> assetFuture = ctx.getAssetService().findAssetByIdAsync(edgeEvent.getTenantId(), assetId);
  388 + Futures.addCallback(assetFuture,
  389 + new FutureCallback<Asset>() {
  390 + @Override
  391 + public void onSuccess(@Nullable Asset asset) {
  392 + if (asset != null) {
  393 + EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
  394 + .setAssetUpdateMsg(ctx.getAssetUpdateMsgConstructor().constructAssetUpdatedMsg(msgType, asset))
  395 + .build();
  396 + outputStream.onNext(ResponseMsg.newBuilder()
  397 + .setEntityUpdateMsg(entityUpdateMsg)
  398 + .build());
  399 + }
  400 + }
384 401
385   - private void onDeviceUpdated(UpdateMsgType msgType, Device device) {
386   - EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
387   - .setDeviceUpdateMsg(ctx.getDeviceUpdateMsgConstructor().constructDeviceUpdatedMsg(msgType, device))
388   - .build();
389   - outputStream.onNext(ResponseMsg.newBuilder()
390   - .setEntityUpdateMsg(entityUpdateMsg)
391   - .build());
392   - }
  402 + @Override
  403 + public void onFailure(Throwable t) {
  404 + log.warn("Can't processAssetCRUD, edgeEvent [{}]", edgeEvent, t);
  405 + }
  406 + }, ctx.getDbCallbackExecutor());
  407 + }
  408 +
  409 + private void processEntityViewCRUD(EdgeEvent edgeEvent, UpdateMsgType msgType) {
  410 + EntityViewId entityViewId = new EntityViewId(edgeEvent.getEntityId());
  411 + ListenableFuture<EntityView> entityViewFuture = ctx.getEntityViewService().findEntityViewByIdAsync(edgeEvent.getTenantId(), entityViewId);
  412 + Futures.addCallback(entityViewFuture,
  413 + new FutureCallback<EntityView>() {
  414 + @Override
  415 + public void onSuccess(@Nullable EntityView entityView) {
  416 + if (entityView != null) {
  417 + EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
  418 + .setEntityViewUpdateMsg(ctx.getEntityViewUpdateMsgConstructor().constructEntityViewUpdatedMsg(msgType, entityView))
  419 + .build();
  420 + outputStream.onNext(ResponseMsg.newBuilder()
  421 + .setEntityUpdateMsg(entityUpdateMsg)
  422 + .build());
  423 + }
  424 + }
393 425
394   - private void onAssetUpdated(UpdateMsgType msgType, Asset asset) {
395   - EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
396   - .setAssetUpdateMsg(ctx.getAssetUpdateMsgConstructor().constructAssetUpdatedMsg(msgType, asset))
397   - .build();
398   - outputStream.onNext(ResponseMsg.newBuilder()
399   - .setEntityUpdateMsg(entityUpdateMsg)
400   - .build());
401   - }
  426 + @Override
  427 + public void onFailure(Throwable t) {
  428 + log.warn("Can't processEntityViewCRUD, edgeEvent [{}]", edgeEvent, t);
  429 + }
  430 + }, ctx.getDbCallbackExecutor());
  431 + }
  432 +
  433 + private void processDashboardCRUD(EdgeEvent edgeEvent, UpdateMsgType msgType) {
  434 + DashboardId dashboardId = new DashboardId(edgeEvent.getEntityId());
  435 + ListenableFuture<Dashboard> dashboardFuture = ctx.getDashboardService().findDashboardByIdAsync(edgeEvent.getTenantId(), dashboardId);
  436 + Futures.addCallback(dashboardFuture,
  437 + new FutureCallback<Dashboard>() {
  438 + @Override
  439 + public void onSuccess(@Nullable Dashboard dashboard) {
  440 + if (dashboard != null) {
  441 + EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
  442 + .setDashboardUpdateMsg(ctx.getDashboardUpdateMsgConstructor().constructDashboardUpdatedMsg(msgType, dashboard))
  443 + .build();
  444 + outputStream.onNext(ResponseMsg.newBuilder()
  445 + .setEntityUpdateMsg(entityUpdateMsg)
  446 + .build());
  447 + }
  448 + }
402 449
403   - private void onEntityViewUpdated(UpdateMsgType msgType, EntityView entityView) {
404   - EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
405   - .setEntityViewUpdateMsg(ctx.getEntityViewUpdateMsgConstructor().constructEntityViewUpdatedMsg(msgType, entityView))
406   - .build();
407   - outputStream.onNext(ResponseMsg.newBuilder()
408   - .setEntityUpdateMsg(entityUpdateMsg)
409   - .build());
410   - }
  450 + @Override
  451 + public void onFailure(Throwable t) {
  452 + log.warn("Can't processDashboardCRUD, edgeEvent [{}]", edgeEvent, t);
  453 + }
  454 + }, ctx.getDbCallbackExecutor());
  455 + }
  456 +
  457 + private void processRuleChainCRUD(EdgeEvent edgeEvent, UpdateMsgType msgType) {
  458 + RuleChainId ruleChainId = new RuleChainId(edgeEvent.getEntityId());
  459 + ListenableFuture<RuleChain> ruleChainFuture = ctx.getRuleChainService().findRuleChainByIdAsync(edgeEvent.getTenantId(), ruleChainId);
  460 + Futures.addCallback(ruleChainFuture,
  461 + new FutureCallback<RuleChain>() {
  462 + @Override
  463 + public void onSuccess(@Nullable RuleChain ruleChain) {
  464 + if (ruleChain != null) {
  465 + EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
  466 + .setRuleChainUpdateMsg(ctx.getRuleChainUpdateMsgConstructor().constructRuleChainUpdatedMsg(edge.getRootRuleChainId(), msgType, ruleChain))
  467 + .build();
  468 + outputStream.onNext(ResponseMsg.newBuilder()
  469 + .setEntityUpdateMsg(entityUpdateMsg)
  470 + .build());
  471 + }
  472 + }
411 473
412   - private void onRuleChainUpdated(UpdateMsgType msgType, RuleChain ruleChain) {
413   - EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
414   - .setRuleChainUpdateMsg(ctx.getRuleChainUpdateMsgConstructor().constructRuleChainUpdatedMsg(edge.getRootRuleChainId(), msgType, ruleChain))
415   - .build();
416   - outputStream.onNext(ResponseMsg.newBuilder()
417   - .setEntityUpdateMsg(entityUpdateMsg)
418   - .build());
419   - }
  474 + @Override
  475 + public void onFailure(Throwable t) {
  476 + log.warn("Can't processRuleChainCRUD, edgeEvent [{}]", edgeEvent, t);
  477 + }
  478 + }, ctx.getDbCallbackExecutor());
  479 + }
  480 +
  481 + private void processRuleChainMetadataCRUD(EdgeEvent edgeEvent, UpdateMsgType msgType) {
  482 + RuleChainId ruleChainId = new RuleChainId(edgeEvent.getEntityId());
  483 + ListenableFuture<RuleChain> ruleChainFuture = ctx.getRuleChainService().findRuleChainByIdAsync(edgeEvent.getTenantId(), ruleChainId);
  484 + Futures.addCallback(ruleChainFuture,
  485 + new FutureCallback<RuleChain>() {
  486 + @Override
  487 + public void onSuccess(@Nullable RuleChain ruleChain) {
  488 + if (ruleChain != null) {
  489 + RuleChainMetaData ruleChainMetaData = ctx.getRuleChainService().loadRuleChainMetaData(edgeEvent.getTenantId(), ruleChainId);
  490 + RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg =
  491 + ctx.getRuleChainUpdateMsgConstructor().constructRuleChainMetadataUpdatedMsg(msgType, ruleChainMetaData);
  492 + if (ruleChainMetadataUpdateMsg != null) {
  493 + EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
  494 + .setRuleChainMetadataUpdateMsg(ruleChainMetadataUpdateMsg)
  495 + .build();
  496 + outputStream.onNext(ResponseMsg.newBuilder()
  497 + .setEntityUpdateMsg(entityUpdateMsg)
  498 + .build());
  499 + }
  500 + }
  501 + }
420 502
421   - private void onRuleChainMetadataUpdated(UpdateMsgType msgType, RuleChainMetaData ruleChainMetaData) {
422   - RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg =
423   - ctx.getRuleChainUpdateMsgConstructor().constructRuleChainMetadataUpdatedMsg(msgType, ruleChainMetaData);
424   - if (ruleChainMetadataUpdateMsg != null) {
425   - EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
426   - .setRuleChainMetadataUpdateMsg(ruleChainMetadataUpdateMsg)
427   - .build();
428   - outputStream.onNext(ResponseMsg.newBuilder()
429   - .setEntityUpdateMsg(entityUpdateMsg)
430   - .build());
431   - }
432   - }
  503 + @Override
  504 + public void onFailure(Throwable t) {
  505 + log.warn("Can't processRuleChainMetadataCRUD, edgeEvent [{}]", edgeEvent, t);
  506 + }
  507 + }, ctx.getDbCallbackExecutor());
  508 + }
  509 +
  510 + private void processUserCRUD(EdgeEvent edgeEvent, UpdateMsgType msgType) {
  511 + UserId userId = new UserId(edgeEvent.getEntityId());
  512 + ListenableFuture<User> userFuture = ctx.getUserService().findUserByIdAsync(edgeEvent.getTenantId(), userId);
  513 + Futures.addCallback(userFuture,
  514 + new FutureCallback<User>() {
  515 + @Override
  516 + public void onSuccess(@Nullable User user) {
  517 + if (user != null) {
  518 + EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
  519 + .setUserUpdateMsg(ctx.getUserUpdateMsgConstructor().constructUserUpdatedMsg(msgType, user))
  520 + .build();
  521 + outputStream.onNext(ResponseMsg.newBuilder()
  522 + .setEntityUpdateMsg(entityUpdateMsg)
  523 + .build());
  524 + }
  525 + }
433 526
434   - private void onDashboardUpdated(UpdateMsgType msgType, Dashboard dashboard) {
435   - EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
436   - .setDashboardUpdateMsg(ctx.getDashboardUpdateMsgConstructor().constructDashboardUpdatedMsg(msgType, dashboard))
437   - .build();
438   - outputStream.onNext(ResponseMsg.newBuilder()
439   - .setEntityUpdateMsg(entityUpdateMsg)
440   - .build());
  527 + @Override
  528 + public void onFailure(Throwable t) {
  529 + log.warn("Can't processUserCRUD, edgeEvent [{}]", edgeEvent, t);
  530 + }
  531 + }, ctx.getDbCallbackExecutor());
441 532 }
442 533
443   - private void onAlarmUpdated(UpdateMsgType msgType, Alarm alarm) {
  534 + private void processRelationCRUD(EdgeEvent edgeEvent, UpdateMsgType msgType) {
  535 + EntityRelation entityRelation = objectMapper.convertValue(edgeEvent.getEntityBody(), EntityRelation.class);
444 536 EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
445   - .setAlarmUpdateMsg(ctx.getAlarmUpdateMsgConstructor().constructAlarmUpdatedMsg(edge.getTenantId(), msgType, alarm))
  537 + .setRelationUpdateMsg(ctx.getRelationUpdateMsgConstructor().constructRelationUpdatedMsg(msgType, entityRelation))
446 538 .build();
447 539 outputStream.onNext(ResponseMsg.newBuilder()
448 540 .setEntityUpdateMsg(entityUpdateMsg)
449 541 .build());
450 542 }
451 543
452   - private void onUserUpdated(UpdateMsgType msgType, User user) {
453   - EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
454   - .setUserUpdateMsg(ctx.getUserUpdateMsgConstructor().constructUserUpdatedMsg(msgType, user))
455   - .build();
456   - outputStream.onNext(ResponseMsg.newBuilder()
457   - .setEntityUpdateMsg(entityUpdateMsg)
458   - .build());
459   - }
  544 + private void processAlarmCRUD(EdgeEvent edgeEvent, UpdateMsgType msgType) {
  545 + AlarmId alarmId = new AlarmId(edgeEvent.getEntityId());
  546 + ListenableFuture<Alarm> alarmFuture = ctx.getAlarmService().findAlarmByIdAsync(edgeEvent.getTenantId(), alarmId);
  547 + Futures.addCallback(alarmFuture,
  548 + new FutureCallback<Alarm>() {
  549 + @Override
  550 + public void onSuccess(@Nullable Alarm alarm) {
  551 + if (alarm != null) {
  552 + EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
  553 + .setAlarmUpdateMsg(ctx.getAlarmUpdateMsgConstructor().constructAlarmUpdatedMsg(edge.getTenantId(), msgType, alarm))
  554 + .build();
  555 + outputStream.onNext(ResponseMsg.newBuilder()
  556 + .setEntityUpdateMsg(entityUpdateMsg)
  557 + .build());
  558 + }
  559 + }
460 560
461   - private void onEntityRelationUpdated(UpdateMsgType msgType, EntityRelation entityRelation) {
462   - EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder()
463   - .setRelationUpdateMsg(ctx.getRelationUpdateMsgConstructor().constructRelationUpdatedMsg(msgType, entityRelation))
464   - .build();
465   - outputStream.onNext(ResponseMsg.newBuilder()
466   - .setEntityUpdateMsg(entityUpdateMsg)
467   - .build());
  561 + @Override
  562 + public void onFailure(Throwable t) {
  563 + log.warn("Can't processAlarmCRUD, edgeEvent [{}]", edgeEvent, t);
  564 + }
  565 + }, ctx.getDbCallbackExecutor());
468 566 }
469 567
470 568 private UpdateMsgType getResponseMsgType(ActionType actionType) {
... ... @@ -490,29 +588,10 @@ public final class EdgeGrpcSession implements Closeable {
490 588 }
491 589 }
492 590
493   - private DownlinkMsg constructEntityDataProtoMsg(String entityName, EntityId entityId, TbMsg tbMsg) {
494   - EntityDataProto entityData = EntityDataProto.newBuilder()
495   - .setEntityName(entityName)
496   - .setTbMsg(ByteString.copyFrom(TbMsg.toByteArray(tbMsg)))
497   - .setEntityIdMSB(entityId.getId().getMostSignificantBits())
498   - .setEntityIdLSB(entityId.getId().getLeastSignificantBits())
499   - .build();
500   -
  591 + private DownlinkMsg constructEntityDataProtoMsg(EntityId entityId, ActionType actionType, JsonElement entityData) {
  592 + EntityDataProto entityDataProto = ctx.getEntityDataMsgConstructor().constructEntityDataMsg(entityId, actionType, entityData);
501 593 DownlinkMsg.Builder builder = DownlinkMsg.newBuilder()
502   - .addAllEntityData(Collections.singletonList(entityData));
503   -
504   - return builder.build();
505   - }
506   -
507   - private CustomerUpdateMsg constructCustomerUpdatedMsg(UpdateMsgType msgType, Customer customer) {
508   - CustomerUpdateMsg.Builder builder = CustomerUpdateMsg.newBuilder()
509   - .setMsgType(msgType);
510   - return builder.build();
511   - }
512   -
513   - private UserUpdateMsg constructUserUpdatedMsg(UpdateMsgType msgType, User user) {
514   - UserUpdateMsg.Builder builder = UserUpdateMsg.newBuilder()
515   - .setMsgType(msgType);
  594 + .addAllEntityData(Collections.singletonList(entityDataProto));
516 595 return builder.build();
517 596 }
518 597
... ... @@ -520,20 +599,21 @@ public final class EdgeGrpcSession implements Closeable {
520 599 try {
521 600 if (uplinkMsg.getEntityDataList() != null && !uplinkMsg.getEntityDataList().isEmpty()) {
522 601 for (EntityDataProto entityData : uplinkMsg.getEntityDataList()) {
523   - TbMsg tbMsg = null;
524   - TbMsg originalTbMsg = TbMsg.fromBytes(entityData.getTbMsg().toByteArray(), TbMsgCallback.EMPTY);
525   - if (originalTbMsg.getOriginator().getEntityType() == EntityType.DEVICE) {
526   - String deviceName = entityData.getEntityName();
527   - Device device = ctx.getDeviceService().findDeviceByTenantIdAndName(edge.getTenantId(), deviceName);
528   - if (device != null) {
529   - tbMsg = TbMsg.newMsg(originalTbMsg.getType(), device.getId(), originalTbMsg.getMetaData().copy(),
530   - originalTbMsg.getDataType(), originalTbMsg.getData());
531   - }
532   - } else {
533   - tbMsg = originalTbMsg;
534   - }
535   - if (tbMsg != null) {
536   - ctx.getTbClusterService().pushMsgToRuleEngine(edge.getTenantId(), tbMsg.getOriginator(), tbMsg, null);
  602 + EntityId entityId = constructEntityId(entityData);
  603 + if ((entityData.hasPostAttributesMsg() || entityData.hasPostTelemetryMsg()) && entityId != null) {
  604 + ListenableFuture<TbMsgMetaData> metaDataFuture = constructBaseMsgMetadata(entityId);
  605 + Futures.transform(metaDataFuture, metaData -> {
  606 + if (metaData != null) {
  607 + metaData.putValue(DataConstants.MSG_SOURCE_KEY, DataConstants.EDGE_MSG_SOURCE);
  608 + if (entityData.hasPostAttributesMsg()) {
  609 + processPostAttributes(entityId, entityData.getPostAttributesMsg(), metaData);
  610 + }
  611 + if (entityData.hasPostTelemetryMsg()) {
  612 + processPostTelemetry(entityId, entityData.getPostTelemetryMsg(), metaData);
  613 + }
  614 + }
  615 + return null;
  616 + }, ctx.getDbCallbackExecutor());
537 617 }
538 618 }
539 619 }
... ... @@ -559,6 +639,78 @@ public final class EdgeGrpcSession implements Closeable {
559 639 return UplinkResponseMsg.newBuilder().setSuccess(true).build();
560 640 }
561 641
  642 + private ListenableFuture<TbMsgMetaData> constructBaseMsgMetadata(EntityId entityId) {
  643 + switch (entityId.getEntityType()) {
  644 + case DEVICE:
  645 + ListenableFuture<Device> deviceFuture = ctx.getDeviceService().findDeviceByIdAsync(edge.getTenantId(), new DeviceId(entityId.getId()));
  646 + return Futures.transform(deviceFuture, device -> {
  647 + TbMsgMetaData metaData = new TbMsgMetaData();
  648 + if (device != null) {
  649 + metaData.putValue("deviceName", device.getName());
  650 + metaData.putValue("deviceType", device.getType());
  651 + }
  652 + return metaData;
  653 + }, ctx.getDbCallbackExecutor());
  654 + case ASSET:
  655 + ListenableFuture<Asset> assetFuture = ctx.getAssetService().findAssetByIdAsync(edge.getTenantId(), new AssetId(entityId.getId()));
  656 + return Futures.transform(assetFuture, asset -> {
  657 + TbMsgMetaData metaData = new TbMsgMetaData();
  658 + if (asset != null) {
  659 + metaData.putValue("assetName", asset.getName());
  660 + metaData.putValue("assetType", asset.getType());
  661 + }
  662 + return metaData;
  663 + }, ctx.getDbCallbackExecutor());
  664 + case ENTITY_VIEW:
  665 + ListenableFuture<EntityView> entityViewFuture = ctx.getEntityViewService().findEntityViewByIdAsync(edge.getTenantId(), new EntityViewId(entityId.getId()));
  666 + return Futures.transform(entityViewFuture, entityView -> {
  667 + TbMsgMetaData metaData = new TbMsgMetaData();
  668 + if (entityView != null) {
  669 + metaData.putValue("entityViewName", entityView.getName());
  670 + metaData.putValue("entityViewType", entityView.getType());
  671 + }
  672 + return metaData;
  673 + }, ctx.getDbCallbackExecutor());
  674 + default:
  675 + log.debug("Constructing empty metadata for entityId [{}]", entityId);
  676 + return Futures.immediateFuture(new TbMsgMetaData());
  677 + }
  678 + }
  679 +
  680 + private EntityId constructEntityId(EntityDataProto entityData) {
  681 + EntityType entityType = EntityType.valueOf(entityData.getEntityType());
  682 + switch (entityType) {
  683 + case DEVICE:
  684 + return new DeviceId(new UUID(entityData.getEntityIdMSB(), entityData.getEntityIdLSB()));
  685 + case ASSET:
  686 + return new AssetId(new UUID(entityData.getEntityIdMSB(), entityData.getEntityIdLSB()));
  687 + case ENTITY_VIEW:
  688 + return new EntityViewId(new UUID(entityData.getEntityIdMSB(), entityData.getEntityIdLSB()));
  689 + case DASHBOARD:
  690 + return new DashboardId(new UUID(entityData.getEntityIdMSB(), entityData.getEntityIdLSB()));
  691 + default:
  692 + log.warn("Unsupported entity type [{}] during construct of entity id. EntityDataProto [{}]", entityData.getEntityType(), entityData);
  693 + return null;
  694 + }
  695 + }
  696 +
  697 + private void processPostTelemetry(EntityId entityId, TransportProtos.PostTelemetryMsg msg, TbMsgMetaData metaData) {
  698 + for (TransportProtos.TsKvListProto tsKv : msg.getTsKvListList()) {
  699 + JsonObject json = JsonUtils.getJsonObject(tsKv.getKvList());
  700 + metaData.putValue("ts", tsKv.getTs() + "");
  701 + TbMsg tbMsg = TbMsg.newMsg(SessionMsgType.POST_TELEMETRY_REQUEST.name(), entityId, metaData, gson.toJson(json));
  702 + // TODO: voba - verify that null callback is OK
  703 + ctx.getTbClusterService().pushMsgToRuleEngine(edge.getTenantId(), tbMsg.getOriginator(), tbMsg, null);
  704 + }
  705 + }
  706 +
  707 + private void processPostAttributes(EntityId entityId, TransportProtos.PostAttributeMsg msg, TbMsgMetaData metaData) {
  708 + JsonObject json = JsonUtils.getJsonObject(msg.getKvList());
  709 + TbMsg tbMsg = TbMsg.newMsg(SessionMsgType.POST_ATTRIBUTES_REQUEST.name(), entityId, metaData, gson.toJson(json));
  710 + // TODO: voba - verify that null callback is OK
  711 + ctx.getTbClusterService().pushMsgToRuleEngine(edge.getTenantId(), tbMsg.getOriginator(), tbMsg, null);
  712 + }
  713 +
562 714 private void onDeviceUpdate(DeviceUpdateMsg deviceUpdateMsg) {
563 715 log.info("onDeviceUpdate {}", deviceUpdateMsg);
564 716 DeviceId edgeDeviceId = new DeviceId(new UUID(deviceUpdateMsg.getIdMSB(), deviceUpdateMsg.getIdLSB()));
... ... @@ -759,7 +911,7 @@ public final class EdgeGrpcSession implements Closeable {
759 911 .setTenantIdLSB(edge.getTenantId().getId().getLeastSignificantBits())
760 912 .setName(edge.getName())
761 913 .setRoutingKey(edge.getRoutingKey())
762   - .setType(edge.getType().toString())
  914 + .setType(edge.getType())
763 915 .build();
764 916 }
765 917
... ...
  1 +/**
  2 + * Copyright © 2016-2020 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.service.edge.rpc.constructor;
  17 +
  18 +import com.google.gson.JsonElement;
  19 +import lombok.extern.slf4j.Slf4j;
  20 +import org.springframework.stereotype.Component;
  21 +import org.thingsboard.server.common.data.audit.ActionType;
  22 +import org.thingsboard.server.common.data.id.EntityId;
  23 +import org.thingsboard.server.common.transport.adaptor.JsonConverter;
  24 +import org.thingsboard.server.gen.edge.EntityDataProto;
  25 +
  26 +@Component
  27 +@Slf4j
  28 +public class EntityDataMsgConstructor {
  29 +
  30 + public EntityDataProto constructEntityDataMsg(EntityId entityId, ActionType actionType, JsonElement entityData) {
  31 + EntityDataProto.Builder builder = EntityDataProto.newBuilder()
  32 + .setEntityIdMSB(entityId.getId().getMostSignificantBits())
  33 + .setEntityIdLSB(entityId.getId().getLeastSignificantBits())
  34 + .setEntityType(entityId.getEntityType().name());
  35 + switch (actionType) {
  36 + case TIMESERIES_UPDATED:
  37 + try {
  38 + builder.setPostTelemetryMsg(JsonConverter.convertToTelemetryProto(entityData));
  39 + } catch (Exception e) {
  40 + log.warn("Can't convert to telemetry proto, entityData [{}]", entityData, e);
  41 + }
  42 + break;
  43 + case ATTRIBUTES_UPDATED:
  44 + try {
  45 + builder.setPostAttributesMsg(JsonConverter.convertToAttributesProto(entityData));
  46 + } catch (Exception e) {
  47 + log.warn("Can't convert to attributes proto, entityData [{}]", entityData, e);
  48 + }
  49 + break;
  50 + // TODO: voba - add support for attribute delete
  51 + // case ATTRIBUTES_DELETED:
  52 + }
  53 + return builder.build();
  54 + }
  55 +
  56 +}
... ...
... ... @@ -65,4 +65,7 @@ public class DataConstants {
65 65 public static final String DEFAULT_SECRET_KEY = "";
66 66 public static final String SECRET_KEY_FIELD_NAME = "secretKey";
67 67 public static final String DURATION_MS_FIELD_NAME = "durationMs";
  68 +
  69 + public static final String EDGE_MSG_SOURCE = "edge";
  70 + public static final String MSG_SOURCE_KEY = "source";
68 71 }
... ...
... ... @@ -16,6 +16,7 @@
16 16 package org.thingsboard.server.common.data.id;
17 17
18 18 import org.thingsboard.server.common.data.EntityType;
  19 +import org.thingsboard.server.common.data.edge.EdgeEventType;
19 20
20 21 import java.util.UUID;
21 22
... ... @@ -68,4 +69,28 @@ public class EntityIdFactory {
68 69 throw new IllegalArgumentException("EntityType " + type + " is not supported!");
69 70 }
70 71
  72 + public static EntityId getByEdgeEventTypeAndUuid(EdgeEventType edgeEventType, UUID uuid) {
  73 + switch (edgeEventType) {
  74 + case CUSTOMER:
  75 + return new CustomerId(uuid);
  76 + case USER:
  77 + return new UserId(uuid);
  78 + case DASHBOARD:
  79 + return new DashboardId(uuid);
  80 + case DEVICE:
  81 + return new DeviceId(uuid);
  82 + case ASSET:
  83 + return new AssetId(uuid);
  84 + case ALARM:
  85 + return new AlarmId(uuid);
  86 + case RULE_CHAIN:
  87 + return new RuleChainId(uuid);
  88 + case ENTITY_VIEW:
  89 + return new EntityViewId(uuid);
  90 + case EDGE:
  91 + return new EdgeId(uuid);
  92 + }
  93 + throw new IllegalArgumentException("EdgeEventType " + edgeEventType + " is not supported!");
  94 + }
  95 +
71 96 }
... ...
... ... @@ -100,9 +100,9 @@ enum UpdateMsgType {
100 100 }
101 101
102 102 message EntityDataProto {
103   - string entityName = 1;
104   - int64 entityIdMSB = 2;
105   - int64 entityIdLSB = 3;
  103 + int64 entityIdMSB = 1;
  104 + int64 entityIdLSB = 2;
  105 + string entityType = 3;
106 106 transport.PostTelemetryMsg postTelemetryMsg = 4;
107 107 transport.PostAttributeMsg postAttributesMsg = 5;
108 108 // transport.ToDeviceRpcRequestMsg ???
... ...
... ... @@ -52,6 +52,8 @@ public class BaseEdgeEventService implements EdgeEventService {
52 52 return EdgeEventType.DASHBOARD;
53 53 case USER:
54 54 return EdgeEventType.USER;
  55 + case ALARM:
  56 + return EdgeEventType.ALARM;
55 57 default:
56 58 log.warn("Failed to push notification to edge service. Unsupported entity type [{}]", entityType);
57 59 return null;
... ...
... ... @@ -70,34 +70,23 @@ public class TbMsgPushToEdgeNode implements TbNode {
70 70
71 71 @Override
72 72 public void onMsg(TbContext ctx, TbMsg msg) {
73   - if (EntityType.DEVICE.equals(msg.getOriginator().getEntityType()) ||
74   - EntityType.DEVICE.equals(msg.getOriginator().getEntityType()) ||
75   - EntityType.DEVICE.equals(msg.getOriginator().getEntityType()) ||
76   - EntityType.DEVICE.equals(msg.getOriginator().getEntityType())) {
77   - if (SessionMsgType.POST_TELEMETRY_REQUEST.name().equals(msg.getType()) ||
78   - SessionMsgType.POST_ATTRIBUTES_REQUEST.name().equals(msg.getType()) ||
79   - DataConstants.ATTRIBUTES_UPDATED.equals(msg.getType()) ||
80   - DataConstants.ATTRIBUTES_DELETED.equals(msg.getType())) {
  73 + if (DataConstants.EDGE_MSG_SOURCE.equalsIgnoreCase(msg.getMetaData().getValue(DataConstants.MSG_SOURCE_KEY))) {
  74 + log.debug("Ignoring msg from the cloud, msg [{}]", msg);
  75 + return;
  76 + }
  77 + if (isSupportedOriginator(msg.getOriginator().getEntityType())) {
  78 + if (isSupportedMsgType(msg.getType())) {
81 79 ListenableFuture<EdgeId> getEdgeIdFuture = getEdgeIdByOriginatorId(ctx, ctx.getTenantId(), msg.getOriginator());
82 80 Futures.transform(getEdgeIdFuture, edgeId -> {
83 81 EdgeEventType edgeEventTypeByEntityType = ctx.getEdgeEventService().getEdgeEventTypeByEntityType(msg.getOriginator().getEntityType());
84 82 if (edgeEventTypeByEntityType == null) {
85 83 log.debug("Edge event type is null. Entity Type {}", msg.getOriginator().getEntityType());
86   - ctx.tellFailure(msg, new RuntimeException("Edge event type is null. Entity Type '"+ msg.getOriginator().getEntityType() + "'"));
87   - }
88   - ActionType actionType;
89   - if (SessionMsgType.POST_TELEMETRY_REQUEST.name().equals(msg.getType())) {
90   - actionType = ActionType.TIMESERIES_UPDATED;
91   - } else if (SessionMsgType.POST_ATTRIBUTES_REQUEST.name().equals(msg.getType()) ||
92   - DataConstants.ATTRIBUTES_UPDATED.equals(msg.getType())) {
93   - actionType = ActionType.ATTRIBUTES_UPDATED;
94   - } else {
95   - actionType = ActionType.ATTRIBUTES_DELETED;
  84 + ctx.tellFailure(msg, new RuntimeException("Edge event type is null. Entity Type '" + msg.getOriginator().getEntityType() + "'"));
96 85 }
97 86 EdgeEvent edgeEvent = new EdgeEvent();
98 87 edgeEvent.setTenantId(ctx.getTenantId());
99 88 edgeEvent.setEdgeId(edgeId);
100   - edgeEvent.setEdgeEventAction(actionType.name());
  89 + edgeEvent.setEdgeEventAction(getActionTypeByMsgType(msg.getType()).name());
101 90 edgeEvent.setEntityId(msg.getOriginator().getId());
102 91 edgeEvent.setEdgeEventType(edgeEventTypeByEntityType);
103 92 edgeEvent.setEntityBody(json.valueToTree(msg.getData()));
... ... @@ -126,6 +115,42 @@ public class TbMsgPushToEdgeNode implements TbNode {
126 115 }
127 116 }
128 117
  118 + private ActionType getActionTypeByMsgType(String msgType) {
  119 + ActionType actionType;
  120 + if (SessionMsgType.POST_TELEMETRY_REQUEST.name().equals(msgType)) {
  121 + actionType = ActionType.TIMESERIES_UPDATED;
  122 + } else if (SessionMsgType.POST_ATTRIBUTES_REQUEST.name().equals(msgType)
  123 + || DataConstants.ATTRIBUTES_UPDATED.equals(msgType)) {
  124 + actionType = ActionType.ATTRIBUTES_UPDATED;
  125 + } else {
  126 + actionType = ActionType.ATTRIBUTES_DELETED;
  127 + }
  128 + return actionType;
  129 + }
  130 +
  131 + private boolean isSupportedOriginator(EntityType entityType) {
  132 + switch (entityType) {
  133 + case DEVICE:
  134 + case ASSET:
  135 + case ENTITY_VIEW:
  136 + case DASHBOARD:
  137 + return true;
  138 + default:
  139 + return false;
  140 + }
  141 + }
  142 +
  143 + private boolean isSupportedMsgType(String msgType) {
  144 + if (SessionMsgType.POST_TELEMETRY_REQUEST.name().equals(msgType)
  145 + || SessionMsgType.POST_ATTRIBUTES_REQUEST.name().equals(msgType)
  146 + || DataConstants.ATTRIBUTES_UPDATED.equals(msgType)
  147 + || DataConstants.ATTRIBUTES_DELETED.equals(msgType)) {
  148 + return true;
  149 + } else {
  150 + return false;
  151 + }
  152 + }
  153 +
129 154 private ListenableFuture<EdgeId> getEdgeIdByOriginatorId(TbContext ctx, TenantId tenantId, EntityId originatorId) {
130 155 ListenableFuture<List<EntityRelation>> future = ctx.getRelationService().findByToAndTypeAsync(tenantId, originatorId, EntityRelation.CONTAINS_TYPE, RelationTypeGroup.EDGE);
131 156 return Futures.transform(future, relations -> {
... ...
... ... @@ -37,7 +37,7 @@ export default function RuleChainRoutes($stateProvider, NodeTemplatePathProvider
37 37 }
38 38 })
39 39 .state('home.ruleChains.core', {
40   - url: '/ruleChains/system',
  40 + url: '/ruleChains/core',
41 41 params: {'topIndex': 0},
42 42 module: 'private',
43 43 auth: ['SYS_ADMIN', 'TENANT_ADMIN'],
... ...