Commit f83d3016d63cc973e2bfce8dd1547b9dd40a7cfb

Authored by vparomskiy
1 parent 90e4ff61

node debug messages saved in async way

@@ -21,6 +21,9 @@ import akka.actor.Scheduler; @@ -21,6 +21,9 @@ import akka.actor.Scheduler;
21 import com.fasterxml.jackson.databind.JsonNode; 21 import com.fasterxml.jackson.databind.JsonNode;
22 import com.fasterxml.jackson.databind.ObjectMapper; 22 import com.fasterxml.jackson.databind.ObjectMapper;
23 import com.fasterxml.jackson.databind.node.ObjectNode; 23 import com.fasterxml.jackson.databind.node.ObjectNode;
  24 +import com.google.common.util.concurrent.FutureCallback;
  25 +import com.google.common.util.concurrent.Futures;
  26 +import com.google.common.util.concurrent.ListenableFuture;
24 import com.typesafe.config.Config; 27 import com.typesafe.config.Config;
25 import com.typesafe.config.ConfigFactory; 28 import com.typesafe.config.ConfigFactory;
26 import lombok.Getter; 29 import lombok.Getter;
@@ -66,6 +69,7 @@ import org.thingsboard.server.service.script.JsSandboxService; @@ -66,6 +69,7 @@ import org.thingsboard.server.service.script.JsSandboxService;
66 import org.thingsboard.server.service.state.DeviceStateService; 69 import org.thingsboard.server.service.state.DeviceStateService;
67 import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; 70 import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
68 71
  72 +import javax.annotation.Nullable;
69 import java.io.IOException; 73 import java.io.IOException;
70 import java.io.PrintWriter; 74 import java.io.PrintWriter;
71 import java.io.StringWriter; 75 import java.io.StringWriter;
@@ -314,22 +318,22 @@ public class ActorSystemContext { @@ -314,22 +318,22 @@ public class ActorSystemContext {
314 } 318 }
315 319
316 public void persistDebugInput(TenantId tenantId, EntityId entityId, TbMsg tbMsg, String relationType) { 320 public void persistDebugInput(TenantId tenantId, EntityId entityId, TbMsg tbMsg, String relationType) {
317 - persistDebug(tenantId, entityId, "IN", tbMsg, relationType, null); 321 + persistDebugAsync(tenantId, entityId, "IN", tbMsg, relationType, null);
318 } 322 }
319 323
320 public void persistDebugInput(TenantId tenantId, EntityId entityId, TbMsg tbMsg, String relationType, Throwable error) { 324 public void persistDebugInput(TenantId tenantId, EntityId entityId, TbMsg tbMsg, String relationType, Throwable error) {
321 - persistDebug(tenantId, entityId, "IN", tbMsg, relationType, error); 325 + persistDebugAsync(tenantId, entityId, "IN", tbMsg, relationType, error);
322 } 326 }
323 327
324 public void persistDebugOutput(TenantId tenantId, EntityId entityId, TbMsg tbMsg, String relationType, Throwable error) { 328 public void persistDebugOutput(TenantId tenantId, EntityId entityId, TbMsg tbMsg, String relationType, Throwable error) {
325 - persistDebug(tenantId, entityId, "OUT", tbMsg, relationType, error); 329 + persistDebugAsync(tenantId, entityId, "OUT", tbMsg, relationType, error);
326 } 330 }
327 331
328 public void persistDebugOutput(TenantId tenantId, EntityId entityId, TbMsg tbMsg, String relationType) { 332 public void persistDebugOutput(TenantId tenantId, EntityId entityId, TbMsg tbMsg, String relationType) {
329 - persistDebug(tenantId, entityId, "OUT", tbMsg, relationType, null); 333 + persistDebugAsync(tenantId, entityId, "OUT", tbMsg, relationType, null);
330 } 334 }
331 335
332 - private void persistDebug(TenantId tenantId, EntityId entityId, String type, TbMsg tbMsg, String relationType, Throwable error) { 336 + private void persistDebugAsync(TenantId tenantId, EntityId entityId, String type, TbMsg tbMsg, String relationType, Throwable error) {
333 try { 337 try {
334 Event event = new Event(); 338 Event event = new Event();
335 event.setTenantId(tenantId); 339 event.setTenantId(tenantId);
@@ -355,7 +359,18 @@ public class ActorSystemContext { @@ -355,7 +359,18 @@ public class ActorSystemContext {
355 } 359 }
356 360
357 event.setBody(node); 361 event.setBody(node);
358 - eventService.save(event); 362 + ListenableFuture<Event> future = eventService.saveAsync(event);
  363 + Futures.addCallback(future, new FutureCallback<Event>() {
  364 + @Override
  365 + public void onSuccess(@Nullable Event event) {
  366 +
  367 + }
  368 +
  369 + @Override
  370 + public void onFailure(Throwable th) {
  371 + log.error("Could not save debug Event for Node", th);
  372 + }
  373 + });
359 } catch (IOException ex) { 374 } catch (IOException ex) {
360 log.warn("Failed to persist rule node debug message", ex); 375 log.warn("Failed to persist rule node debug message", ex);
361 } 376 }
@@ -15,6 +15,7 @@ @@ -15,6 +15,7 @@
15 */ 15 */
16 package org.thingsboard.server.dao.event; 16 package org.thingsboard.server.dao.event;
17 17
  18 +import com.google.common.util.concurrent.ListenableFuture;
18 import lombok.extern.slf4j.Slf4j; 19 import lombok.extern.slf4j.Slf4j;
19 import org.apache.commons.lang3.StringUtils; 20 import org.apache.commons.lang3.StringUtils;
20 import org.springframework.beans.factory.annotation.Autowired; 21 import org.springframework.beans.factory.annotation.Autowired;
@@ -44,6 +45,12 @@ public class BaseEventService implements EventService { @@ -44,6 +45,12 @@ public class BaseEventService implements EventService {
44 } 45 }
45 46
46 @Override 47 @Override
  48 + public ListenableFuture<Event> saveAsync(Event event) {
  49 + eventValidator.validate(event);
  50 + return eventDao.saveAsync(event);
  51 + }
  52 +
  53 + @Override
47 public Optional<Event> saveIfNotExists(Event event) { 54 public Optional<Event> saveIfNotExists(Event event) {
48 eventValidator.validate(event); 55 eventValidator.validate(event);
49 if (StringUtils.isEmpty(event.getUid())) { 56 if (StringUtils.isEmpty(event.getUid())) {
@@ -15,11 +15,13 @@ @@ -15,11 +15,13 @@
15 */ 15 */
16 package org.thingsboard.server.dao.event; 16 package org.thingsboard.server.dao.event;
17 17
18 -import com.datastax.driver.core.ResultSet; 18 +import com.datastax.driver.core.ResultSetFuture;
19 import com.datastax.driver.core.querybuilder.Insert; 19 import com.datastax.driver.core.querybuilder.Insert;
20 import com.datastax.driver.core.querybuilder.QueryBuilder; 20 import com.datastax.driver.core.querybuilder.QueryBuilder;
21 import com.datastax.driver.core.querybuilder.Select; 21 import com.datastax.driver.core.querybuilder.Select;
22 import com.datastax.driver.core.utils.UUIDs; 22 import com.datastax.driver.core.utils.UUIDs;
  23 +import com.google.common.util.concurrent.Futures;
  24 +import com.google.common.util.concurrent.ListenableFuture;
23 import lombok.extern.slf4j.Slf4j; 25 import lombok.extern.slf4j.Slf4j;
24 import org.apache.commons.lang3.StringUtils; 26 import org.apache.commons.lang3.StringUtils;
25 import org.springframework.stereotype.Component; 27 import org.springframework.stereotype.Component;
@@ -38,13 +40,11 @@ import java.util.Arrays; @@ -38,13 +40,11 @@ import java.util.Arrays;
38 import java.util.List; 40 import java.util.List;
39 import java.util.Optional; 41 import java.util.Optional;
40 import java.util.UUID; 42 import java.util.UUID;
  43 +import java.util.concurrent.ExecutionException;
41 44
42 import static com.datastax.driver.core.querybuilder.QueryBuilder.eq; 45 import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
43 import static com.datastax.driver.core.querybuilder.QueryBuilder.select; 46 import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
44 -import static org.thingsboard.server.dao.model.ModelConstants.EVENT_BY_ID_VIEW_NAME;  
45 -import static org.thingsboard.server.dao.model.ModelConstants.EVENT_BY_TYPE_AND_ID_VIEW_NAME;  
46 -import static org.thingsboard.server.dao.model.ModelConstants.EVENT_COLUMN_FAMILY_NAME;  
47 -import static org.thingsboard.server.dao.model.ModelConstants.NULL_UUID; 47 +import static org.thingsboard.server.dao.model.ModelConstants.*;
48 48
49 @Component 49 @Component
50 @Slf4j 50 @Slf4j
@@ -65,6 +65,15 @@ public class CassandraBaseEventDao extends CassandraAbstractSearchTimeDao<EventE @@ -65,6 +65,15 @@ public class CassandraBaseEventDao extends CassandraAbstractSearchTimeDao<EventE
65 65
66 @Override 66 @Override
67 public Event save(Event event) { 67 public Event save(Event event) {
  68 + try {
  69 + return saveAsync(event).get();
  70 + } catch (InterruptedException | ExecutionException e) {
  71 + throw new IllegalStateException("Could not save EventEntity", e);
  72 + }
  73 + }
  74 +
  75 + @Override
  76 + public ListenableFuture<Event> saveAsync(Event event) {
68 log.debug("Save event [{}] ", event); 77 log.debug("Save event [{}] ", event);
69 if (event.getTenantId() == null) { 78 if (event.getTenantId() == null) {
70 log.trace("Save system event with predefined id {}", systemTenantId); 79 log.trace("Save system event with predefined id {}", systemTenantId);
@@ -76,7 +85,8 @@ public class CassandraBaseEventDao extends CassandraAbstractSearchTimeDao<EventE @@ -76,7 +85,8 @@ public class CassandraBaseEventDao extends CassandraAbstractSearchTimeDao<EventE
76 if (StringUtils.isEmpty(event.getUid())) { 85 if (StringUtils.isEmpty(event.getUid())) {
77 event.setUid(event.getId().toString()); 86 event.setUid(event.getId().toString());
78 } 87 }
79 - return save(new EventEntity(event), false).orElse(null); 88 + ListenableFuture<Optional<Event>> optionalSave = saveAsync(new EventEntity(event), false);
  89 + return Futures.transform(optionalSave, opt -> opt.orElse(null));
80 } 90 }
81 91
82 @Override 92 @Override
@@ -153,6 +163,14 @@ public class CassandraBaseEventDao extends CassandraAbstractSearchTimeDao<EventE @@ -153,6 +163,14 @@ public class CassandraBaseEventDao extends CassandraAbstractSearchTimeDao<EventE
153 } 163 }
154 164
155 private Optional<Event> save(EventEntity entity, boolean ifNotExists) { 165 private Optional<Event> save(EventEntity entity, boolean ifNotExists) {
  166 + try {
  167 + return saveAsync(entity, ifNotExists).get();
  168 + } catch (InterruptedException | ExecutionException e) {
  169 + throw new IllegalStateException("Could not save EventEntity", e);
  170 + }
  171 + }
  172 +
  173 + private ListenableFuture<Optional<Event>> saveAsync(EventEntity entity, boolean ifNotExists) {
156 if (entity.getId() == null) { 174 if (entity.getId() == null) {
157 entity.setId(UUIDs.timeBased()); 175 entity.setId(UUIDs.timeBased());
158 } 176 }
@@ -167,11 +185,13 @@ public class CassandraBaseEventDao extends CassandraAbstractSearchTimeDao<EventE @@ -167,11 +185,13 @@ public class CassandraBaseEventDao extends CassandraAbstractSearchTimeDao<EventE
167 if (ifNotExists) { 185 if (ifNotExists) {
168 insert = insert.ifNotExists(); 186 insert = insert.ifNotExists();
169 } 187 }
170 - ResultSet rs = executeWrite(insert);  
171 - if (rs.wasApplied()) {  
172 - return Optional.of(DaoUtil.getData(entity));  
173 - } else {  
174 - return Optional.empty();  
175 - } 188 + ResultSetFuture resultSetFuture = executeAsyncWrite(insert);
  189 + return Futures.transform(resultSetFuture, rs -> {
  190 + if (rs.wasApplied()) {
  191 + return Optional.of(DaoUtil.getData(entity));
  192 + } else {
  193 + return Optional.empty();
  194 + }
  195 + });
176 } 196 }
177 } 197 }
@@ -15,6 +15,7 @@ @@ -15,6 +15,7 @@
15 */ 15 */
16 package org.thingsboard.server.dao.event; 16 package org.thingsboard.server.dao.event;
17 17
  18 +import com.google.common.util.concurrent.ListenableFuture;
18 import org.thingsboard.server.common.data.Event; 19 import org.thingsboard.server.common.data.Event;
19 import org.thingsboard.server.common.data.id.EntityId; 20 import org.thingsboard.server.common.data.id.EntityId;
20 import org.thingsboard.server.common.data.page.TimePageLink; 21 import org.thingsboard.server.common.data.page.TimePageLink;
@@ -38,6 +39,14 @@ public interface EventDao extends Dao<Event> { @@ -38,6 +39,14 @@ public interface EventDao extends Dao<Event> {
38 Event save(Event event); 39 Event save(Event event);
39 40
40 /** 41 /**
  42 + * Save or update event object async
  43 + *
  44 + * @param event the event object
  45 + * @return saved event object future
  46 + */
  47 + ListenableFuture<Event> saveAsync(Event event);
  48 +
  49 + /**
41 * Save event object if it is not yet saved 50 * Save event object if it is not yet saved
42 * 51 *
43 * @param event the event object 52 * @param event the event object
@@ -15,6 +15,7 @@ @@ -15,6 +15,7 @@
15 */ 15 */
16 package org.thingsboard.server.dao.event; 16 package org.thingsboard.server.dao.event;
17 17
  18 +import com.google.common.util.concurrent.ListenableFuture;
18 import org.thingsboard.server.common.data.Event; 19 import org.thingsboard.server.common.data.Event;
19 import org.thingsboard.server.common.data.id.EntityId; 20 import org.thingsboard.server.common.data.id.EntityId;
20 import org.thingsboard.server.common.data.id.TenantId; 21 import org.thingsboard.server.common.data.id.TenantId;
@@ -28,6 +29,8 @@ public interface EventService { @@ -28,6 +29,8 @@ public interface EventService {
28 29
29 Event save(Event event); 30 Event save(Event event);
30 31
  32 + ListenableFuture<Event> saveAsync(Event event);
  33 +
31 Optional<Event> saveIfNotExists(Event event); 34 Optional<Event> saveIfNotExists(Event event);
32 35
33 Optional<Event> findEvent(TenantId tenantId, EntityId entityId, String eventType, String eventUid); 36 Optional<Event> findEvent(TenantId tenantId, EntityId entityId, String eventType, String eventUid);
@@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
16 package org.thingsboard.server.dao.sql.event; 16 package org.thingsboard.server.dao.sql.event;
17 17
18 import com.datastax.driver.core.utils.UUIDs; 18 import com.datastax.driver.core.utils.UUIDs;
  19 +import com.google.common.util.concurrent.ListenableFuture;
19 import lombok.extern.slf4j.Slf4j; 20 import lombok.extern.slf4j.Slf4j;
20 import org.apache.commons.lang3.StringUtils; 21 import org.apache.commons.lang3.StringUtils;
21 import org.springframework.beans.factory.annotation.Autowired; 22 import org.springframework.beans.factory.annotation.Autowired;
@@ -82,6 +83,18 @@ public class JpaBaseEventDao extends JpaAbstractSearchTimeDao<EventEntity, Event @@ -82,6 +83,18 @@ public class JpaBaseEventDao extends JpaAbstractSearchTimeDao<EventEntity, Event
82 } 83 }
83 84
84 @Override 85 @Override
  86 + public ListenableFuture<Event> saveAsync(Event event) {
  87 + log.debug("Save event [{}] ", event);
  88 + if (event.getId() == null) {
  89 + event.setId(new EventId(UUIDs.timeBased()));
  90 + }
  91 + if (StringUtils.isEmpty(event.getUid())) {
  92 + event.setUid(event.getId().toString());
  93 + }
  94 + return service.submit(() -> save(new EventEntity(event), false).orElse(null));
  95 + }
  96 +
  97 + @Override
85 public Optional<Event> saveIfNotExists(Event event) { 98 public Optional<Event> saveIfNotExists(Event event) {
86 return save(new EventEntity(event), true); 99 return save(new EventEntity(event), true);
87 } 100 }
@@ -89,7 +102,7 @@ public class JpaBaseEventDao extends JpaAbstractSearchTimeDao<EventEntity, Event @@ -89,7 +102,7 @@ public class JpaBaseEventDao extends JpaAbstractSearchTimeDao<EventEntity, Event
89 @Override 102 @Override
90 public Event findEvent(UUID tenantId, EntityId entityId, String eventType, String eventUid) { 103 public Event findEvent(UUID tenantId, EntityId entityId, String eventType, String eventUid) {
91 return DaoUtil.getData(eventRepository.findByTenantIdAndEntityTypeAndEntityIdAndEventTypeAndEventUid( 104 return DaoUtil.getData(eventRepository.findByTenantIdAndEntityTypeAndEntityIdAndEventTypeAndEventUid(
92 - UUIDConverter.fromTimeUUID(tenantId), entityId.getEntityType(), UUIDConverter.fromTimeUUID(entityId.getId()), eventType, eventUid)); 105 + UUIDConverter.fromTimeUUID(tenantId), entityId.getEntityType(), UUIDConverter.fromTimeUUID(entityId.getId()), eventType, eventUid));
93 } 106 }
94 107
95 @Override 108 @Override