Commit a9e83b484fce64b6a168428c534daa0417f2d9c2

Authored by Andrii Shvaika
2 parents 53bf6af2 23063b16

Merge branch 'feature/firmware' of https://github.com/YevhenBondarenko/thingsboa…

…rd into YevhenBondarenko-feature/firmware
Showing 41 changed files with 569 additions and 71 deletions
@@ -34,16 +34,23 @@ import org.thingsboard.server.common.data.kv.StringDataEntry; @@ -34,16 +34,23 @@ import org.thingsboard.server.common.data.kv.StringDataEntry;
34 import org.thingsboard.server.common.data.kv.TsKvEntry; 34 import org.thingsboard.server.common.data.kv.TsKvEntry;
35 import org.thingsboard.server.common.data.page.PageData; 35 import org.thingsboard.server.common.data.page.PageData;
36 import org.thingsboard.server.common.data.page.PageLink; 36 import org.thingsboard.server.common.data.page.PageLink;
  37 +import org.thingsboard.server.common.msg.queue.TbCallback;
  38 +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
37 import org.thingsboard.server.dao.device.DeviceProfileService; 39 import org.thingsboard.server.dao.device.DeviceProfileService;
38 import org.thingsboard.server.dao.device.DeviceService; 40 import org.thingsboard.server.dao.device.DeviceService;
39 import org.thingsboard.server.dao.firmware.FirmwareService; 41 import org.thingsboard.server.dao.firmware.FirmwareService;
  42 +import org.thingsboard.server.gen.transport.TransportProtos.ToFirmwareStateServiceMsg;
  43 +import org.thingsboard.server.queue.TbQueueProducer;
  44 +import org.thingsboard.server.queue.common.TbProtoQueueMsg;
  45 +import org.thingsboard.server.queue.provider.TbCoreQueueFactory;
40 import org.thingsboard.server.queue.util.TbCoreComponent; 46 import org.thingsboard.server.queue.util.TbCoreComponent;
41 47
42 import javax.annotation.Nullable; 48 import javax.annotation.Nullable;
43 import java.util.ArrayList; 49 import java.util.ArrayList;
44 import java.util.Arrays; 50 import java.util.Arrays;
  51 +import java.util.Collections;
45 import java.util.List; 52 import java.util.List;
46 -import java.util.Objects; 53 +import java.util.UUID;
47 import java.util.function.Consumer; 54 import java.util.function.Consumer;
48 55
49 import static org.thingsboard.server.common.data.DataConstants.FIRMWARE_CHECKSUM; 56 import static org.thingsboard.server.common.data.DataConstants.FIRMWARE_CHECKSUM;
@@ -61,12 +68,18 @@ public class DefaultFirmwareStateService implements FirmwareStateService { @@ -61,12 +68,18 @@ public class DefaultFirmwareStateService implements FirmwareStateService {
61 private final DeviceService deviceService; 68 private final DeviceService deviceService;
62 private final DeviceProfileService deviceProfileService; 69 private final DeviceProfileService deviceProfileService;
63 private final RuleEngineTelemetryService telemetryService; 70 private final RuleEngineTelemetryService telemetryService;
  71 + private final TbQueueProducer<TbProtoQueueMsg<ToFirmwareStateServiceMsg>> fwStateMsgProducer;
64 72
65 - public DefaultFirmwareStateService(FirmwareService firmwareService, DeviceService deviceService, DeviceProfileService deviceProfileService, RuleEngineTelemetryService telemetryService) { 73 + public DefaultFirmwareStateService(FirmwareService firmwareService,
  74 + DeviceService deviceService,
  75 + DeviceProfileService deviceProfileService,
  76 + RuleEngineTelemetryService telemetryService,
  77 + TbCoreQueueFactory coreQueueFactory) {
66 this.firmwareService = firmwareService; 78 this.firmwareService = firmwareService;
67 this.deviceService = deviceService; 79 this.deviceService = deviceService;
68 this.deviceProfileService = deviceProfileService; 80 this.deviceProfileService = deviceProfileService;
69 this.telemetryService = telemetryService; 81 this.telemetryService = telemetryService;
  82 + this.fwStateMsgProducer = coreQueueFactory.createToFirmwareStateServiceMsgProducer();
70 } 83 }
71 84
72 @Override 85 @Override
@@ -85,7 +98,7 @@ public class DefaultFirmwareStateService implements FirmwareStateService { @@ -85,7 +98,7 @@ public class DefaultFirmwareStateService implements FirmwareStateService {
85 } 98 }
86 if (!newFirmwareId.equals(oldFirmwareId)) { 99 if (!newFirmwareId.equals(oldFirmwareId)) {
87 // Device was updated and new firmware is different from previous firmware. 100 // Device was updated and new firmware is different from previous firmware.
88 - update(device, firmwareService.findFirmwareById(device.getTenantId(), newFirmwareId), System.currentTimeMillis()); 101 + send(device.getTenantId(), device.getId(), newFirmwareId, System.currentTimeMillis());
89 } 102 }
90 } else { 103 } else {
91 // Device was updated and new firmware is not set. 104 // Device was updated and new firmware is not set.
@@ -93,7 +106,7 @@ public class DefaultFirmwareStateService implements FirmwareStateService { @@ -93,7 +106,7 @@ public class DefaultFirmwareStateService implements FirmwareStateService {
93 } 106 }
94 } else if (newFirmwareId != null) { 107 } else if (newFirmwareId != null) {
95 // Device was created and firmware is defined. 108 // Device was created and firmware is defined.
96 - update(device, firmwareService.findFirmwareById(device.getTenantId(), newFirmwareId), System.currentTimeMillis()); 109 + send(device.getTenantId(), device.getId(), newFirmwareId, System.currentTimeMillis());
97 } 110 }
98 } 111 }
99 112
@@ -103,9 +116,8 @@ public class DefaultFirmwareStateService implements FirmwareStateService { @@ -103,9 +116,8 @@ public class DefaultFirmwareStateService implements FirmwareStateService {
103 116
104 Consumer<Device> updateConsumer; 117 Consumer<Device> updateConsumer;
105 if (deviceProfile.getFirmwareId() != null) { 118 if (deviceProfile.getFirmwareId() != null) {
106 - Firmware firmware = firmwareService.findFirmwareById(tenantId, deviceProfile.getFirmwareId());  
107 long ts = System.currentTimeMillis(); 119 long ts = System.currentTimeMillis();
108 - updateConsumer = d -> update(d, firmware, ts); 120 + updateConsumer = d -> send(d.getTenantId(), d.getId(), deviceProfile.getFirmwareId(), ts);
109 } else { 121 } else {
110 updateConsumer = this::remove; 122 updateConsumer = this::remove;
111 } 123 }
@@ -113,10 +125,9 @@ public class DefaultFirmwareStateService implements FirmwareStateService { @@ -113,10 +125,9 @@ public class DefaultFirmwareStateService implements FirmwareStateService {
113 PageLink pageLink = new PageLink(100); 125 PageLink pageLink = new PageLink(100);
114 PageData<Device> pageData; 126 PageData<Device> pageData;
115 do { 127 do {
116 - //TODO: create a query which will return devices without firmware  
117 - pageData = deviceService.findDevicesByTenantIdAndType(tenantId, deviceProfile.getName(), pageLink); 128 + pageData = deviceService.findDevicesByTenantIdAndTypeAndEmptyFirmware(tenantId, deviceProfile.getName(), pageLink);
118 129
119 - pageData.getData().stream().filter(d -> d.getFirmwareId() == null).forEach(updateConsumer); 130 + pageData.getData().forEach(updateConsumer);
120 131
121 if (pageData.hasNext()) { 132 if (pageData.hasNext()) {
122 pageLink = pageLink.nextPageLink(); 133 pageLink = pageLink.nextPageLink();
@@ -124,6 +135,64 @@ public class DefaultFirmwareStateService implements FirmwareStateService { @@ -124,6 +135,64 @@ public class DefaultFirmwareStateService implements FirmwareStateService {
124 } while (pageData.hasNext()); 135 } while (pageData.hasNext());
125 } 136 }
126 137
  138 + @Override
  139 + public boolean process(ToFirmwareStateServiceMsg msg) {
  140 + boolean isSuccess = false;
  141 + FirmwareId targetFirmwareId = new FirmwareId(new UUID(msg.getFirmwareIdMSB(), msg.getFirmwareIdLSB()));
  142 + DeviceId deviceId = new DeviceId(new UUID(msg.getDeviceIdMSB(), msg.getDeviceIdLSB()));
  143 + TenantId tenantId = new TenantId(new UUID(msg.getTenantIdMSB(), msg.getTenantIdLSB()));
  144 + long ts = msg.getTs();
  145 +
  146 + Device device = deviceService.findDeviceById(tenantId, deviceId);
  147 + if (device == null) {
  148 + log.warn("[{}] [{}] Device was removed during firmware update msg was queued!", tenantId, deviceId);
  149 + } else {
  150 + FirmwareId currentFirmwareId = device.getFirmwareId();
  151 +
  152 + if (currentFirmwareId == null) {
  153 + currentFirmwareId = deviceProfileService.findDeviceProfileById(tenantId, device.getDeviceProfileId()).getFirmwareId();
  154 + }
  155 +
  156 + if (targetFirmwareId.equals(currentFirmwareId)) {
  157 + update(device, firmwareService.findFirmwareById(device.getTenantId(), targetFirmwareId), ts);
  158 + isSuccess = true;
  159 + } else {
  160 + log.warn("[{}] [{}] Can`t update firmware for the device, target firmwareId: [{}], current firmwareId: [{}]!", tenantId, deviceId, targetFirmwareId, currentFirmwareId);
  161 + }
  162 + }
  163 + return isSuccess;
  164 + }
  165 +
  166 + private void send(TenantId tenantId, DeviceId deviceId, FirmwareId firmwareId, long ts) {
  167 + ToFirmwareStateServiceMsg msg = ToFirmwareStateServiceMsg.newBuilder()
  168 + .setTenantIdMSB(tenantId.getId().getMostSignificantBits())
  169 + .setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
  170 + .setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
  171 + .setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
  172 + .setFirmwareIdMSB(firmwareId.getId().getMostSignificantBits())
  173 + .setFirmwareIdLSB(firmwareId.getId().getLeastSignificantBits())
  174 + .setTs(ts)
  175 + .build();
  176 +
  177 + TopicPartitionInfo tpi = new TopicPartitionInfo(fwStateMsgProducer.getDefaultTopic(), null, null, false);
  178 + fwStateMsgProducer.send(tpi, new TbProtoQueueMsg<>(UUID.randomUUID(), msg), null);
  179 +
  180 + BasicTsKvEntry status = new BasicTsKvEntry(ts, new StringDataEntry(DataConstants.FIRMWARE_STATE, FirmwareUpdateStatus.QUEUED.name()));
  181 +
  182 + telemetryService.saveAndNotify(tenantId, deviceId, Collections.singletonList(status), new FutureCallback<>() {
  183 + @Override
  184 + public void onSuccess(@Nullable Void tmp) {
  185 + log.trace("[{}] Success save firmware status!", deviceId);
  186 + }
  187 +
  188 + @Override
  189 + public void onFailure(Throwable t) {
  190 + log.error("[{}] Failed to save firmware status!", deviceId, t);
  191 + }
  192 + });
  193 + }
  194 +
  195 +
127 private void update(Device device, Firmware firmware, long ts) { 196 private void update(Device device, Firmware firmware, long ts) {
128 TenantId tenantId = device.getTenantId(); 197 TenantId tenantId = device.getTenantId();
129 DeviceId deviceId = device.getId(); 198 DeviceId deviceId = device.getId();
@@ -131,6 +200,7 @@ public class DefaultFirmwareStateService implements FirmwareStateService { @@ -131,6 +200,7 @@ public class DefaultFirmwareStateService implements FirmwareStateService {
131 List<TsKvEntry> telemetry = new ArrayList<>(); 200 List<TsKvEntry> telemetry = new ArrayList<>();
132 telemetry.add(new BasicTsKvEntry(ts, new StringDataEntry(DataConstants.TARGET_FIRMWARE_TITLE, firmware.getTitle()))); 201 telemetry.add(new BasicTsKvEntry(ts, new StringDataEntry(DataConstants.TARGET_FIRMWARE_TITLE, firmware.getTitle())));
133 telemetry.add(new BasicTsKvEntry(ts, new StringDataEntry(DataConstants.TARGET_FIRMWARE_VERSION, firmware.getVersion()))); 202 telemetry.add(new BasicTsKvEntry(ts, new StringDataEntry(DataConstants.TARGET_FIRMWARE_VERSION, firmware.getVersion())));
  203 + telemetry.add(new BasicTsKvEntry(ts, new StringDataEntry(DataConstants.FIRMWARE_STATE, FirmwareUpdateStatus.INITIATED.name())));
134 204
135 telemetryService.saveAndNotify(tenantId, deviceId, telemetry, new FutureCallback<>() { 205 telemetryService.saveAndNotify(tenantId, deviceId, telemetry, new FutureCallback<>() {
136 @Override 206 @Override
@@ -17,6 +17,7 @@ package org.thingsboard.server.service.firmware; @@ -17,6 +17,7 @@ package org.thingsboard.server.service.firmware;
17 17
18 import org.thingsboard.server.common.data.Device; 18 import org.thingsboard.server.common.data.Device;
19 import org.thingsboard.server.common.data.DeviceProfile; 19 import org.thingsboard.server.common.data.DeviceProfile;
  20 +import org.thingsboard.server.gen.transport.TransportProtos.ToFirmwareStateServiceMsg;
20 21
21 public interface FirmwareStateService { 22 public interface FirmwareStateService {
22 23
@@ -24,4 +25,6 @@ public interface FirmwareStateService { @@ -24,4 +25,6 @@ public interface FirmwareStateService {
24 25
25 void update(DeviceProfile deviceProfile); 26 void update(DeviceProfile deviceProfile);
26 27
  28 + boolean process(ToFirmwareStateServiceMsg msg);
  29 +
27 } 30 }
  1 +package org.thingsboard.server.service.firmware;
  2 +
  3 +public enum FirmwareUpdateStatus {
  4 + QUEUED, INITIATED, DOWNLOADING, DOWNLOADED, VERIFIED, UPDATING, UPDATED, FAILED
  5 +}
@@ -24,6 +24,7 @@ import org.springframework.context.event.EventListener; @@ -24,6 +24,7 @@ import org.springframework.context.event.EventListener;
24 import org.springframework.core.annotation.Order; 24 import org.springframework.core.annotation.Order;
25 import org.springframework.scheduling.annotation.Scheduled; 25 import org.springframework.scheduling.annotation.Scheduled;
26 import org.springframework.stereotype.Service; 26 import org.springframework.stereotype.Service;
  27 +import org.thingsboard.common.util.JacksonUtil;
27 import org.thingsboard.common.util.ThingsBoardThreadFactory; 28 import org.thingsboard.common.util.ThingsBoardThreadFactory;
28 import org.thingsboard.rule.engine.api.RpcError; 29 import org.thingsboard.rule.engine.api.RpcError;
29 import org.thingsboard.server.actors.ActorSystemContext; 30 import org.thingsboard.server.actors.ActorSystemContext;
@@ -35,7 +36,7 @@ import org.thingsboard.server.common.msg.queue.ServiceType; @@ -35,7 +36,7 @@ import org.thingsboard.server.common.msg.queue.ServiceType;
35 import org.thingsboard.server.common.msg.queue.TbCallback; 36 import org.thingsboard.server.common.msg.queue.TbCallback;
36 import org.thingsboard.server.common.stats.StatsFactory; 37 import org.thingsboard.server.common.stats.StatsFactory;
37 import org.thingsboard.server.common.transport.util.DataDecodingEncodingService; 38 import org.thingsboard.server.common.transport.util.DataDecodingEncodingService;
38 -import org.thingsboard.common.util.JacksonUtil; 39 +import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
39 import org.thingsboard.server.gen.transport.TransportProtos.DeviceStateServiceMsgProto; 40 import org.thingsboard.server.gen.transport.TransportProtos.DeviceStateServiceMsgProto;
40 import org.thingsboard.server.gen.transport.TransportProtos.EdgeNotificationMsgProto; 41 import org.thingsboard.server.gen.transport.TransportProtos.EdgeNotificationMsgProto;
41 import org.thingsboard.server.gen.transport.TransportProtos.FromDeviceRPCResponseProto; 42 import org.thingsboard.server.gen.transport.TransportProtos.FromDeviceRPCResponseProto;
@@ -49,6 +50,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionCloseP @@ -49,6 +50,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionCloseP
49 import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesUpdateProto; 50 import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesUpdateProto;
50 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; 51 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
51 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; 52 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
  53 +import org.thingsboard.server.gen.transport.TransportProtos.ToFirmwareStateServiceMsg;
52 import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg; 54 import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
53 import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg; 55 import org.thingsboard.server.gen.transport.TransportProtos.TransportToDeviceActorMsg;
54 import org.thingsboard.server.queue.TbQueueConsumer; 56 import org.thingsboard.server.queue.TbQueueConsumer;
@@ -58,8 +60,8 @@ import org.thingsboard.server.queue.provider.TbCoreQueueFactory; @@ -58,8 +60,8 @@ import org.thingsboard.server.queue.provider.TbCoreQueueFactory;
58 import org.thingsboard.server.queue.util.TbCoreComponent; 60 import org.thingsboard.server.queue.util.TbCoreComponent;
59 import org.thingsboard.server.service.apiusage.TbApiUsageStateService; 61 import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
60 import org.thingsboard.server.service.edge.EdgeNotificationService; 62 import org.thingsboard.server.service.edge.EdgeNotificationService;
  63 +import org.thingsboard.server.service.firmware.FirmwareStateService;
61 import org.thingsboard.server.service.profile.TbDeviceProfileCache; 64 import org.thingsboard.server.service.profile.TbDeviceProfileCache;
62 -import org.thingsboard.server.dao.tenant.TbTenantProfileCache;  
63 import org.thingsboard.server.service.queue.processing.AbstractConsumerService; 65 import org.thingsboard.server.service.queue.processing.AbstractConsumerService;
64 import org.thingsboard.server.service.rpc.FromDeviceRpcResponse; 66 import org.thingsboard.server.service.rpc.FromDeviceRpcResponse;
65 import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService; 67 import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService;
@@ -97,6 +99,11 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore @@ -97,6 +99,11 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
97 @Value("${queue.core.stats.enabled:false}") 99 @Value("${queue.core.stats.enabled:false}")
98 private boolean statsEnabled; 100 private boolean statsEnabled;
99 101
  102 + @Value("${queue.core.firmware.pack-interval-ms:60000}")
  103 + private long firmwarePackInterval;
  104 + @Value("${queue.core.firmware.pack-size:100}")
  105 + private int firmwarePackSize;
  106 +
100 private final TbQueueConsumer<TbProtoQueueMsg<ToCoreMsg>> mainConsumer; 107 private final TbQueueConsumer<TbProtoQueueMsg<ToCoreMsg>> mainConsumer;
101 private final DeviceStateService stateService; 108 private final DeviceStateService stateService;
102 private final TbApiUsageStateService statsService; 109 private final TbApiUsageStateService statsService;
@@ -104,11 +111,15 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore @@ -104,11 +111,15 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
104 private final SubscriptionManagerService subscriptionManagerService; 111 private final SubscriptionManagerService subscriptionManagerService;
105 private final TbCoreDeviceRpcService tbCoreDeviceRpcService; 112 private final TbCoreDeviceRpcService tbCoreDeviceRpcService;
106 private final EdgeNotificationService edgeNotificationService; 113 private final EdgeNotificationService edgeNotificationService;
  114 + private final FirmwareStateService firmwareStateService;
107 private final TbCoreConsumerStats stats; 115 private final TbCoreConsumerStats stats;
108 protected final TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> usageStatsConsumer; 116 protected final TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> usageStatsConsumer;
  117 + private final TbQueueConsumer<TbProtoQueueMsg<ToFirmwareStateServiceMsg>> firmwareStatesConsumer;
109 118
110 protected volatile ExecutorService usageStatsExecutor; 119 protected volatile ExecutorService usageStatsExecutor;
111 120
  121 + private volatile ExecutorService firmwareStatesExecutor;
  122 +
112 public DefaultTbCoreConsumerService(TbCoreQueueFactory tbCoreQueueFactory, 123 public DefaultTbCoreConsumerService(TbCoreQueueFactory tbCoreQueueFactory,
113 ActorSystemContext actorContext, 124 ActorSystemContext actorContext,
114 DeviceStateService stateService, 125 DeviceStateService stateService,
@@ -121,10 +132,12 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore @@ -121,10 +132,12 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
121 TbApiUsageStateService statsService, 132 TbApiUsageStateService statsService,
122 TbTenantProfileCache tenantProfileCache, 133 TbTenantProfileCache tenantProfileCache,
123 TbApiUsageStateService apiUsageStateService, 134 TbApiUsageStateService apiUsageStateService,
124 - EdgeNotificationService edgeNotificationService) { 135 + EdgeNotificationService edgeNotificationService,
  136 + FirmwareStateService firmwareStateService) {
125 super(actorContext, encodingService, tenantProfileCache, deviceProfileCache, apiUsageStateService, tbCoreQueueFactory.createToCoreNotificationsMsgConsumer()); 137 super(actorContext, encodingService, tenantProfileCache, deviceProfileCache, apiUsageStateService, tbCoreQueueFactory.createToCoreNotificationsMsgConsumer());
126 this.mainConsumer = tbCoreQueueFactory.createToCoreMsgConsumer(); 138 this.mainConsumer = tbCoreQueueFactory.createToCoreMsgConsumer();
127 this.usageStatsConsumer = tbCoreQueueFactory.createToUsageStatsServiceMsgConsumer(); 139 this.usageStatsConsumer = tbCoreQueueFactory.createToUsageStatsServiceMsgConsumer();
  140 + this.firmwareStatesConsumer = tbCoreQueueFactory.createToFirmwareStateServiceMsgConsumer();
128 this.stateService = stateService; 141 this.stateService = stateService;
129 this.localSubscriptionService = localSubscriptionService; 142 this.localSubscriptionService = localSubscriptionService;
130 this.subscriptionManagerService = subscriptionManagerService; 143 this.subscriptionManagerService = subscriptionManagerService;
@@ -132,12 +145,14 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore @@ -132,12 +145,14 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
132 this.edgeNotificationService = edgeNotificationService; 145 this.edgeNotificationService = edgeNotificationService;
133 this.stats = new TbCoreConsumerStats(statsFactory); 146 this.stats = new TbCoreConsumerStats(statsFactory);
134 this.statsService = statsService; 147 this.statsService = statsService;
  148 + this.firmwareStateService = firmwareStateService;
135 } 149 }
136 150
137 @PostConstruct 151 @PostConstruct
138 public void init() { 152 public void init() {
139 super.init("tb-core-consumer", "tb-core-notifications-consumer"); 153 super.init("tb-core-consumer", "tb-core-notifications-consumer");
140 this.usageStatsExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("tb-core-usage-stats-consumer")); 154 this.usageStatsExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("tb-core-usage-stats-consumer"));
  155 + this.firmwareStatesExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("tb-core-firmware-notifications-consumer"));
141 } 156 }
142 157
143 @PreDestroy 158 @PreDestroy
@@ -146,6 +161,9 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore @@ -146,6 +161,9 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
146 if (usageStatsExecutor != null) { 161 if (usageStatsExecutor != null) {
147 usageStatsExecutor.shutdownNow(); 162 usageStatsExecutor.shutdownNow();
148 } 163 }
  164 + if (firmwareStatesExecutor != null) {
  165 + firmwareStatesExecutor.shutdownNow();
  166 + }
149 } 167 }
150 168
151 @EventListener(ApplicationReadyEvent.class) 169 @EventListener(ApplicationReadyEvent.class)
@@ -153,6 +171,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore @@ -153,6 +171,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
153 public void onApplicationEvent(ApplicationReadyEvent event) { 171 public void onApplicationEvent(ApplicationReadyEvent event) {
154 super.onApplicationEvent(event); 172 super.onApplicationEvent(event);
155 launchUsageStatsConsumer(); 173 launchUsageStatsConsumer();
  174 + launchFirmwareUpdateNotificationConsumer();
156 } 175 }
157 176
158 @Override 177 @Override
@@ -167,6 +186,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore @@ -167,6 +186,7 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
167 .map(tpi -> tpi.newByTopic(usageStatsConsumer.getTopic())) 186 .map(tpi -> tpi.newByTopic(usageStatsConsumer.getTopic()))
168 .collect(Collectors.toSet())); 187 .collect(Collectors.toSet()));
169 } 188 }
  189 + this.firmwareStatesConsumer.subscribe();
170 } 190 }
171 191
172 @Override 192 @Override
@@ -336,10 +356,57 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore @@ -336,10 +356,57 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
336 }); 356 });
337 } 357 }
338 358
  359 + private void launchFirmwareUpdateNotificationConsumer() {
  360 + long maxProcessingTimeoutPerRecord = firmwarePackInterval / firmwarePackSize;
  361 + firmwareStatesExecutor.submit(() -> {
  362 + while (!stopped) {
  363 + try {
  364 + List<TbProtoQueueMsg<ToFirmwareStateServiceMsg>> msgs = firmwareStatesConsumer.poll(getNotificationPollDuration());
  365 + if (msgs.isEmpty()) {
  366 + Thread.sleep(maxProcessingTimeoutPerRecord);
  367 + continue;
  368 + }
  369 + long timeToSleep = maxProcessingTimeoutPerRecord;
  370 + for (TbProtoQueueMsg<ToFirmwareStateServiceMsg> msg : msgs) {
  371 + try {
  372 + long startTime = System.currentTimeMillis();
  373 + boolean isSuccessUpdate = handleFirmwareUpdates(msg);
  374 + long endTime = System.currentTimeMillis();
  375 + long spentTime = endTime - startTime;
  376 + timeToSleep = timeToSleep - spentTime;
  377 + if (isSuccessUpdate && timeToSleep > 0) {
  378 + log.debug("Spent time per record is: [{}]!", spentTime);
  379 + Thread.sleep(timeToSleep);
  380 + timeToSleep = maxProcessingTimeoutPerRecord;
  381 + }
  382 + } catch (Throwable e) {
  383 + log.warn("Failed to process firmware update msg: {}", msg, e);
  384 + }
  385 + }
  386 + firmwareStatesConsumer.commit();
  387 + } catch (Exception e) {
  388 + if (!stopped) {
  389 + log.warn("Failed to obtain usage stats from queue.", e);
  390 + try {
  391 + Thread.sleep(getNotificationPollDuration());
  392 + } catch (InterruptedException e2) {
  393 + log.trace("Failed to wait until the server has capacity to handle new firmware updates", e2);
  394 + }
  395 + }
  396 + }
  397 + }
  398 + log.info("TB Firmware States Consumer stopped.");
  399 + });
  400 + }
  401 +
339 private void handleUsageStats(TbProtoQueueMsg<ToUsageStatsServiceMsg> msg, TbCallback callback) { 402 private void handleUsageStats(TbProtoQueueMsg<ToUsageStatsServiceMsg> msg, TbCallback callback) {
340 statsService.process(msg, callback); 403 statsService.process(msg, callback);
341 } 404 }
342 405
  406 + private boolean handleFirmwareUpdates(TbProtoQueueMsg<ToFirmwareStateServiceMsg> msg) {
  407 + return firmwareStateService.process(msg.getValue());
  408 + }
  409 +
343 private void forwardToCoreRpcService(FromDeviceRPCResponseProto proto, TbCallback callback) { 410 private void forwardToCoreRpcService(FromDeviceRPCResponseProto proto, TbCallback callback) {
344 RpcError error = proto.getError() > 0 ? RpcError.values()[proto.getError()] : null; 411 RpcError error = proto.getError() > 0 ? RpcError.values()[proto.getError()] : null;
345 FromDeviceRpcResponse response = new FromDeviceRpcResponse(new UUID(proto.getRequestIdMSB(), proto.getRequestIdLSB()) 412 FromDeviceRpcResponse response = new FromDeviceRpcResponse(new UUID(proto.getRequestIdMSB(), proto.getRequestIdLSB())
@@ -448,6 +515,9 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore @@ -448,6 +515,9 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
448 if (usageStatsConsumer != null) { 515 if (usageStatsConsumer != null) {
449 usageStatsConsumer.unsubscribe(); 516 usageStatsConsumer.unsubscribe();
450 } 517 }
  518 + if (firmwareStatesConsumer != null) {
  519 + firmwareStatesConsumer.unsubscribe();
  520 + }
451 } 521 }
452 522
453 } 523 }
@@ -481,6 +481,8 @@ public class DefaultTransportApiService implements TransportApiService { @@ -481,6 +481,8 @@ public class DefaultTransportApiService implements TransportApiService {
481 builder.setResponseStatus(TransportProtos.ResponseStatus.SUCCESS); 481 builder.setResponseStatus(TransportProtos.ResponseStatus.SUCCESS);
482 builder.setFirmwareIdMSB(firmwareId.getId().getMostSignificantBits()); 482 builder.setFirmwareIdMSB(firmwareId.getId().getMostSignificantBits());
483 builder.setFirmwareIdLSB(firmwareId.getId().getLeastSignificantBits()); 483 builder.setFirmwareIdLSB(firmwareId.getId().getLeastSignificantBits());
  484 + builder.setTitle(firmware.getTitle());
  485 + builder.setVersion(firmware.getVersion());
484 builder.setFileName(firmware.getFileName()); 486 builder.setFileName(firmware.getFileName());
485 builder.setContentType(firmware.getContentType()); 487 builder.setContentType(firmware.getContentType());
486 firmwareCacheWriter.put(firmwareId.toString(), firmware.getData().array()); 488 firmwareCacheWriter.put(firmwareId.toString(), firmware.getData().array());
@@ -744,6 +744,10 @@ queue: @@ -744,6 +744,10 @@ queue:
744 sasl.mechanism: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_MECHANISM:PLAIN}" 744 sasl.mechanism: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_MECHANISM:PLAIN}"
745 sasl.config: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_JAAS_CONFIG:org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CLUSTER_API_KEY\" password=\"CLUSTER_API_SECRET\";}" 745 sasl.config: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_JAAS_CONFIG:org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CLUSTER_API_KEY\" password=\"CLUSTER_API_SECRET\";}"
746 security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}" 746 security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}"
  747 + consumer-properties-per-topic:
  748 + tb_firmware:
  749 + - key: max.poll.records
  750 + value: 10
747 other: 751 other:
748 topic-properties: 752 topic-properties:
749 rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" 753 rule-engine: "${TB_QUEUE_KAFKA_RE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
@@ -751,6 +755,7 @@ queue: @@ -751,6 +755,7 @@ queue:
751 transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" 755 transport-api: "${TB_QUEUE_KAFKA_TA_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
752 notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}" 756 notifications: "${TB_QUEUE_KAFKA_NOTIFICATIONS_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:1;min.insync.replicas:1}"
753 js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600;partitions:100;min.insync.replicas:1}" 757 js-executor: "${TB_QUEUE_KAFKA_JE_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:104857600;partitions:100;min.insync.replicas:1}"
  758 + fw-updates: "${TB_QUEUE_KAFKA_FW_TOPIC_PROPERTIES:retention.ms:604800000;segment.bytes:26214400;retention.bytes:1048576000;partitions:10;min.insync.replicas:1}"
754 consumer-stats: 759 consumer-stats:
755 enabled: "${TB_QUEUE_KAFKA_CONSUMER_STATS_ENABLED:true}" 760 enabled: "${TB_QUEUE_KAFKA_CONSUMER_STATS_ENABLED:true}"
756 print-interval-ms: "${TB_QUEUE_KAFKA_CONSUMER_STATS_MIN_PRINT_INTERVAL_MS:60000}" 761 print-interval-ms: "${TB_QUEUE_KAFKA_CONSUMER_STATS_MIN_PRINT_INTERVAL_MS:60000}"
@@ -821,6 +826,10 @@ queue: @@ -821,6 +826,10 @@ queue:
821 poll-interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}" 826 poll-interval: "${TB_QUEUE_CORE_POLL_INTERVAL_MS:25}"
822 partitions: "${TB_QUEUE_CORE_PARTITIONS:10}" 827 partitions: "${TB_QUEUE_CORE_PARTITIONS:10}"
823 pack-processing-timeout: "${TB_QUEUE_CORE_PACK_PROCESSING_TIMEOUT_MS:2000}" 828 pack-processing-timeout: "${TB_QUEUE_CORE_PACK_PROCESSING_TIMEOUT_MS:2000}"
  829 + firmware:
  830 + topic: "${TB_QUEUE_CORE_FW_TOPIC:tb_firmware}"
  831 + pack-interval-ms: "${TB_QUEUE_CORE_FW_PACK_INTERVAL_MS:60000}"
  832 + pack-size: "${TB_QUEUE_CORE_FW_PACK_SIZE:100}"
824 usage-stats-topic: "${TB_QUEUE_US_TOPIC:tb_usage_stats}" 833 usage-stats-topic: "${TB_QUEUE_US_TOPIC:tb_usage_stats}"
825 stats: 834 stats:
826 enabled: "${TB_QUEUE_CORE_STATS_ENABLED:true}" 835 enabled: "${TB_QUEUE_CORE_STATS_ENABLED:true}"
@@ -22,7 +22,7 @@ import org.springframework.stereotype.Service; @@ -22,7 +22,7 @@ import org.springframework.stereotype.Service;
22 import static org.thingsboard.server.common.data.CacheConstants.FIRMWARE_CACHE; 22 import static org.thingsboard.server.common.data.CacheConstants.FIRMWARE_CACHE;
23 23
24 @Service 24 @Service
25 -@ConditionalOnExpression("(('${service.type:null}'=='monolith' && '${transport.api_enabled:true}'=='true') || '${service.type:null}'=='tb-transport') && ('${cache.type:null}'=='caffeine' || '${cache.type:null}'=='caffeine')") 25 +@ConditionalOnExpression("(('${service.type:null}'=='monolith' && '${transport.api_enabled:true}'=='true') || '${service.type:null}'=='tb-transport') && ('${cache.type:null}'=='caffeine' || '${cache.type:null}'=='null')")
26 public class CaffeineFirmwareCacheReader implements FirmwareCacheReader { 26 public class CaffeineFirmwareCacheReader implements FirmwareCacheReader {
27 27
28 private final CacheManager cacheManager; 28 private final CacheManager cacheManager;
@@ -22,7 +22,7 @@ import org.springframework.stereotype.Service; @@ -22,7 +22,7 @@ import org.springframework.stereotype.Service;
22 import static org.thingsboard.server.common.data.CacheConstants.FIRMWARE_CACHE; 22 import static org.thingsboard.server.common.data.CacheConstants.FIRMWARE_CACHE;
23 23
24 @Service 24 @Service
25 -@ConditionalOnExpression("(('${service.type:null}'=='monolith' && '${transport.api_enabled:true}'=='true') || '${service.type:null}'=='core') && ('${cache.type:null}'=='caffeine' || '${cache.type:null}'=='caffeine')") 25 +@ConditionalOnExpression("('${service.type:null}'=='monolith' || '${service.type:null}'=='tb-core') && ('${cache.type:null}'=='caffeine' || '${cache.type:null}'=='null')")
26 public class CaffeineFirmwareCacheWriter implements FirmwareCacheWriter { 26 public class CaffeineFirmwareCacheWriter implements FirmwareCacheWriter {
27 27
28 private final CacheManager cacheManager; 28 private final CacheManager cacheManager;
@@ -21,7 +21,7 @@ import org.springframework.data.redis.connection.RedisConnectionFactory; @@ -21,7 +21,7 @@ import org.springframework.data.redis.connection.RedisConnectionFactory;
21 import org.springframework.stereotype.Service; 21 import org.springframework.stereotype.Service;
22 22
23 @Service 23 @Service
24 -@ConditionalOnExpression("(('${service.type:null}'=='monolith' && '${transport.api_enabled:true}'=='true') || '${service.type:null}'=='core') && '${cache.type:null}'=='redis'") 24 +@ConditionalOnExpression("('${service.type:null}'=='monolith' || '${service.type:null}'=='tb-core') && '${cache.type:null}'=='redis'")
25 public class RedisFirmwareCacheWriter extends AbstractRedisFirmwareCache implements FirmwareCacheWriter { 25 public class RedisFirmwareCacheWriter extends AbstractRedisFirmwareCache implements FirmwareCacheWriter {
26 26
27 public RedisFirmwareCacheWriter(RedisConnectionFactory redisConnectionFactory) { 27 public RedisFirmwareCacheWriter(RedisConnectionFactory redisConnectionFactory) {
@@ -61,6 +61,8 @@ public interface DeviceService { @@ -61,6 +61,8 @@ public interface DeviceService {
61 61
62 PageData<Device> findDevicesByTenantIdAndType(TenantId tenantId, String type, PageLink pageLink); 62 PageData<Device> findDevicesByTenantIdAndType(TenantId tenantId, String type, PageLink pageLink);
63 63
  64 + PageData<Device> findDevicesByTenantIdAndTypeAndEmptyFirmware(TenantId tenantId, String type, PageLink pageLink);
  65 +
64 PageData<DeviceInfo> findDeviceInfosByTenantIdAndType(TenantId tenantId, String type, PageLink pageLink); 66 PageData<DeviceInfo> findDeviceInfosByTenantIdAndType(TenantId tenantId, String type, PageLink pageLink);
65 67
66 PageData<DeviceInfo> findDeviceInfosByTenantIdAndDeviceProfileId(TenantId tenantId, DeviceProfileId deviceProfileId, PageLink pageLink); 68 PageData<DeviceInfo> findDeviceInfosByTenantIdAndDeviceProfileId(TenantId tenantId, DeviceProfileId deviceProfileId, PageLink pageLink);
@@ -99,7 +99,7 @@ public class DataConstants { @@ -99,7 +99,7 @@ public class DataConstants {
99 public static final String CURRENT_FIRMWARE_VERSION = "cur_fw_version"; 99 public static final String CURRENT_FIRMWARE_VERSION = "cur_fw_version";
100 public static final String TARGET_FIRMWARE_TITLE = "target_fw_title"; 100 public static final String TARGET_FIRMWARE_TITLE = "target_fw_title";
101 public static final String TARGET_FIRMWARE_VERSION = "target_fw_version"; 101 public static final String TARGET_FIRMWARE_VERSION = "target_fw_version";
102 - public static final String CURRENT_FIRMWARE_STATE = "cur_fw_state"; 102 + public static final String FIRMWARE_STATE = "fw_state";
103 103
104 //attributes 104 //attributes
105 //telemetry 105 //telemetry
@@ -69,7 +69,7 @@ public class TbKafkaConsumerStatsService { @@ -69,7 +69,7 @@ public class TbKafkaConsumerStatsService {
69 this.adminClient = AdminClient.create(kafkaSettings.toAdminProps()); 69 this.adminClient = AdminClient.create(kafkaSettings.toAdminProps());
70 this.statsPrintScheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("kafka-consumer-stats")); 70 this.statsPrintScheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("kafka-consumer-stats"));
71 71
72 - Properties consumerProps = kafkaSettings.toConsumerProps(); 72 + Properties consumerProps = kafkaSettings.toConsumerProps(null);
73 consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer-stats-loader-client"); 73 consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer-stats-loader-client");
74 consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-stats-loader-client-group"); 74 consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-stats-loader-client-group");
75 this.consumer = new KafkaConsumer<>(consumerProps); 75 this.consumer = new KafkaConsumer<>(consumerProps);
@@ -50,7 +50,7 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQue @@ -50,7 +50,7 @@ public class TbKafkaConsumerTemplate<T extends TbQueueMsg> extends AbstractTbQue
50 String clientId, String groupId, String topic, 50 String clientId, String groupId, String topic,
51 TbQueueAdmin admin, TbKafkaConsumerStatsService statsService) { 51 TbQueueAdmin admin, TbKafkaConsumerStatsService statsService) {
52 super(topic); 52 super(topic);
53 - Properties props = settings.toConsumerProps(); 53 + Properties props = settings.toConsumerProps(topic);
54 props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId); 54 props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
55 if (groupId != null) { 55 if (groupId != null) {
56 props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); 56 props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
@@ -31,7 +31,9 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; @@ -31,7 +31,9 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
31 import org.springframework.boot.context.properties.ConfigurationProperties; 31 import org.springframework.boot.context.properties.ConfigurationProperties;
32 import org.springframework.stereotype.Component; 32 import org.springframework.stereotype.Component;
33 33
  34 +import java.util.Collections;
34 import java.util.List; 35 import java.util.List;
  36 +import java.util.Map;
35 import java.util.Properties; 37 import java.util.Properties;
36 38
37 /** 39 /**
@@ -95,6 +97,9 @@ public class TbKafkaSettings { @@ -95,6 +97,9 @@ public class TbKafkaSettings {
95 @Setter 97 @Setter
96 private List<TbKafkaProperty> other; 98 private List<TbKafkaProperty> other;
97 99
  100 + @Setter
  101 + private Map<String, List<TbKafkaProperty>> consumerPropertiesPerTopic = Collections.emptyMap();
  102 +
98 public Properties toAdminProps() { 103 public Properties toAdminProps() {
99 Properties props = toProps(); 104 Properties props = toProps();
100 props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, servers); 105 props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
@@ -103,7 +108,7 @@ public class TbKafkaSettings { @@ -103,7 +108,7 @@ public class TbKafkaSettings {
103 return props; 108 return props;
104 } 109 }
105 110
106 - public Properties toConsumerProps() { 111 + public Properties toConsumerProps(String topic) {
107 Properties props = toProps(); 112 Properties props = toProps();
108 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); 113 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
109 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); 114 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
@@ -113,6 +118,10 @@ public class TbKafkaSettings { @@ -113,6 +118,10 @@ public class TbKafkaSettings {
113 118
114 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 119 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
115 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); 120 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
  121 +
  122 + consumerPropertiesPerTopic
  123 + .getOrDefault(topic, Collections.emptyList())
  124 + .forEach(kv -> props.put(kv.getKey(), kv.getValue()));
116 return props; 125 return props;
117 } 126 }
118 127
@@ -19,6 +19,7 @@ import lombok.Getter; @@ -19,6 +19,7 @@ import lombok.Getter;
19 import org.springframework.beans.factory.annotation.Value; 19 import org.springframework.beans.factory.annotation.Value;
20 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; 20 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
21 import org.springframework.stereotype.Component; 21 import org.springframework.stereotype.Component;
  22 +import org.thingsboard.server.common.data.StringUtils;
22 23
23 import javax.annotation.PostConstruct; 24 import javax.annotation.PostConstruct;
24 import java.util.HashMap; 25 import java.util.HashMap;
@@ -37,6 +38,8 @@ public class TbKafkaTopicConfigs { @@ -37,6 +38,8 @@ public class TbKafkaTopicConfigs {
37 private String notificationsProperties; 38 private String notificationsProperties;
38 @Value("${queue.kafka.topic-properties.js-executor}") 39 @Value("${queue.kafka.topic-properties.js-executor}")
39 private String jsExecutorProperties; 40 private String jsExecutorProperties;
  41 + @Value("${queue.kafka.topic-properties.fw-updates:}")
  42 + private String fwUpdatesProperties;
40 43
41 @Getter 44 @Getter
42 private Map<String, String> coreConfigs; 45 private Map<String, String> coreConfigs;
@@ -48,6 +51,8 @@ public class TbKafkaTopicConfigs { @@ -48,6 +51,8 @@ public class TbKafkaTopicConfigs {
48 private Map<String, String> notificationsConfigs; 51 private Map<String, String> notificationsConfigs;
49 @Getter 52 @Getter
50 private Map<String, String> jsExecutorConfigs; 53 private Map<String, String> jsExecutorConfigs;
  54 + @Getter
  55 + private Map<String, String> fwUpdatesConfigs;
51 56
52 @PostConstruct 57 @PostConstruct
53 private void init() { 58 private void init() {
@@ -56,15 +61,18 @@ public class TbKafkaTopicConfigs { @@ -56,15 +61,18 @@ public class TbKafkaTopicConfigs {
56 transportApiConfigs = getConfigs(transportApiProperties); 61 transportApiConfigs = getConfigs(transportApiProperties);
57 notificationsConfigs = getConfigs(notificationsProperties); 62 notificationsConfigs = getConfigs(notificationsProperties);
58 jsExecutorConfigs = getConfigs(jsExecutorProperties); 63 jsExecutorConfigs = getConfigs(jsExecutorProperties);
  64 + fwUpdatesConfigs = getConfigs(fwUpdatesProperties);
59 } 65 }
60 66
61 private Map<String, String> getConfigs(String properties) { 67 private Map<String, String> getConfigs(String properties) {
62 Map<String, String> configs = new HashMap<>(); 68 Map<String, String> configs = new HashMap<>();
63 - for (String property : properties.split(";")) {  
64 - int delimiterPosition = property.indexOf(":");  
65 - String key = property.substring(0, delimiterPosition);  
66 - String value = property.substring(delimiterPosition + 1);  
67 - configs.put(key, value); 69 + if (StringUtils.isNotEmpty(properties)) {
  70 + for (String property : properties.split(";")) {
  71 + int delimiterPosition = property.indexOf(":");
  72 + String key = property.substring(0, delimiterPosition);
  73 + String value = property.substring(delimiterPosition + 1);
  74 + configs.put(key, value);
  75 + }
68 } 76 }
69 return configs; 77 return configs;
70 } 78 }
@@ -186,6 +186,17 @@ public class AwsSqsMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng @@ -186,6 +186,17 @@ public class AwsSqsMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
186 msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders())); 186 msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
187 } 187 }
188 188
  189 + @Override
  190 + public TbQueueConsumer<TbProtoQueueMsg<ToFirmwareStateServiceMsg>> createToFirmwareStateServiceMsgConsumer() {
  191 + return new TbAwsSqsConsumerTemplate<>(transportApiAdmin, sqsSettings, coreSettings.getFirmwareTopic(),
  192 + msg -> new TbProtoQueueMsg<>(msg.getKey(), ToFirmwareStateServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
  193 + }
  194 +
  195 + @Override
  196 + public TbQueueProducer<TbProtoQueueMsg<ToFirmwareStateServiceMsg>> createToFirmwareStateServiceMsgProducer() {
  197 + return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, coreSettings.getFirmwareTopic());
  198 + }
  199 +
189 @PreDestroy 200 @PreDestroy
190 private void destroy() { 201 private void destroy() {
191 if (coreAdmin != null) { 202 if (coreAdmin != null) {
@@ -21,12 +21,13 @@ import org.springframework.context.annotation.Bean; @@ -21,12 +21,13 @@ import org.springframework.context.annotation.Bean;
21 import org.springframework.stereotype.Component; 21 import org.springframework.stereotype.Component;
22 import org.thingsboard.server.common.msg.queue.ServiceType; 22 import org.thingsboard.server.common.msg.queue.ServiceType;
23 import org.thingsboard.server.gen.js.JsInvokeProtos; 23 import org.thingsboard.server.gen.js.JsInvokeProtos;
24 -import org.thingsboard.server.gen.transport.TransportProtos;  
25 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; 24 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
26 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; 25 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
  26 +import org.thingsboard.server.gen.transport.TransportProtos.ToFirmwareStateServiceMsg;
27 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; 27 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
28 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; 28 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
29 import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; 29 import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
  30 +import org.thingsboard.server.gen.transport.TransportProtos.ToUsageStatsServiceMsg;
30 import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg; 31 import org.thingsboard.server.gen.transport.TransportProtos.TransportApiRequestMsg;
31 import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg; 32 import org.thingsboard.server.gen.transport.TransportProtos.TransportApiResponseMsg;
32 import org.thingsboard.server.queue.TbQueueAdmin; 33 import org.thingsboard.server.queue.TbQueueAdmin;
@@ -165,14 +166,25 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory { @@ -165,14 +166,25 @@ public class AwsSqsTbCoreQueueFactory implements TbCoreQueueFactory {
165 } 166 }
166 167
167 @Override 168 @Override
168 - public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() { 169 + public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
169 return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, coreSettings.getUsageStatsTopic()); 170 return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, coreSettings.getUsageStatsTopic());
170 } 171 }
171 172
172 @Override 173 @Override
173 - public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgConsumer() { 174 + public TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgConsumer() {
174 return new TbAwsSqsConsumerTemplate<>(transportApiAdmin, sqsSettings, coreSettings.getUsageStatsTopic(), 175 return new TbAwsSqsConsumerTemplate<>(transportApiAdmin, sqsSettings, coreSettings.getUsageStatsTopic(),
175 - msg -> new TbProtoQueueMsg<>(msg.getKey(), TransportProtos.ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders())); 176 + msg -> new TbProtoQueueMsg<>(msg.getKey(), ToUsageStatsServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
  177 + }
  178 +
  179 + @Override
  180 + public TbQueueConsumer<TbProtoQueueMsg<ToFirmwareStateServiceMsg>> createToFirmwareStateServiceMsgConsumer() {
  181 + return new TbAwsSqsConsumerTemplate<>(transportApiAdmin, sqsSettings, coreSettings.getFirmwareTopic(),
  182 + msg -> new TbProtoQueueMsg<>(msg.getKey(), ToFirmwareStateServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
  183 + }
  184 +
  185 + @Override
  186 + public TbQueueProducer<TbProtoQueueMsg<ToFirmwareStateServiceMsg>> createToFirmwareStateServiceMsgProducer() {
  187 + return new TbAwsSqsProducerTemplate<>(coreAdmin, sqsSettings, coreSettings.getFirmwareTopic());
176 } 188 }
177 189
178 @PreDestroy 190 @PreDestroy
@@ -131,6 +131,16 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE @@ -131,6 +131,16 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
131 } 131 }
132 132
133 @Override 133 @Override
  134 + public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToFirmwareStateServiceMsg>> createToFirmwareStateServiceMsgConsumer() {
  135 + return new InMemoryTbQueueConsumer<>(coreSettings.getFirmwareTopic());
  136 + }
  137 +
  138 + @Override
  139 + public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToFirmwareStateServiceMsg>> createToFirmwareStateServiceMsgProducer() {
  140 + return new InMemoryTbQueueProducer<>(coreSettings.getFirmwareTopic());
  141 + }
  142 +
  143 + @Override
134 public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() { 144 public TbQueueProducer<TbProtoQueueMsg<TransportProtos.ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
135 return new InMemoryTbQueueProducer<>(coreSettings.getUsageStatsTopic()); 145 return new InMemoryTbQueueProducer<>(coreSettings.getUsageStatsTopic());
136 } 146 }
@@ -23,6 +23,7 @@ import org.thingsboard.server.common.msg.queue.ServiceType; @@ -23,6 +23,7 @@ import org.thingsboard.server.common.msg.queue.ServiceType;
23 import org.thingsboard.server.gen.js.JsInvokeProtos; 23 import org.thingsboard.server.gen.js.JsInvokeProtos;
24 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; 24 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
25 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; 25 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
  26 +import org.thingsboard.server.gen.transport.TransportProtos.ToFirmwareStateServiceMsg;
26 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; 27 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
27 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; 28 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
28 import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; 29 import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
@@ -73,6 +74,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi @@ -73,6 +74,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
73 private final TbQueueAdmin jsExecutorAdmin; 74 private final TbQueueAdmin jsExecutorAdmin;
74 private final TbQueueAdmin transportApiAdmin; 75 private final TbQueueAdmin transportApiAdmin;
75 private final TbQueueAdmin notificationAdmin; 76 private final TbQueueAdmin notificationAdmin;
  77 + private final TbQueueAdmin fwUpdatesAdmin;
76 78
77 public KafkaMonolithQueueFactory(PartitionService partitionService, TbKafkaSettings kafkaSettings, 79 public KafkaMonolithQueueFactory(PartitionService partitionService, TbKafkaSettings kafkaSettings,
78 TbServiceInfoProvider serviceInfoProvider, 80 TbServiceInfoProvider serviceInfoProvider,
@@ -98,6 +100,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi @@ -98,6 +100,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
98 this.jsExecutorAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getJsExecutorConfigs()); 100 this.jsExecutorAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getJsExecutorConfigs());
99 this.transportApiAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getTransportApiConfigs()); 101 this.transportApiAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getTransportApiConfigs());
100 this.notificationAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getNotificationsConfigs()); 102 this.notificationAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getNotificationsConfigs());
  103 + this.fwUpdatesAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getFwUpdatesConfigs());
101 } 104 }
102 105
103 @Override 106 @Override
@@ -274,6 +277,29 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi @@ -274,6 +277,29 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
274 } 277 }
275 278
276 @Override 279 @Override
  280 + public TbQueueConsumer<TbProtoQueueMsg<ToFirmwareStateServiceMsg>> createToFirmwareStateServiceMsgConsumer() {
  281 + TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToFirmwareStateServiceMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder();
  282 + consumerBuilder.settings(kafkaSettings);
  283 + consumerBuilder.topic(coreSettings.getFirmwareTopic());
  284 + consumerBuilder.clientId("monolith-fw-consumer-" + serviceInfoProvider.getServiceId());
  285 + consumerBuilder.groupId("monolith-fw-consumer");
  286 + consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToFirmwareStateServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
  287 + consumerBuilder.admin(fwUpdatesAdmin);
  288 + consumerBuilder.statsService(consumerStatsService);
  289 + return consumerBuilder.build();
  290 + }
  291 +
  292 + @Override
  293 + public TbQueueProducer<TbProtoQueueMsg<ToFirmwareStateServiceMsg>> createToFirmwareStateServiceMsgProducer() {
  294 + TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToFirmwareStateServiceMsg>> requestBuilder = TbKafkaProducerTemplate.builder();
  295 + requestBuilder.settings(kafkaSettings);
  296 + requestBuilder.clientId("monolith-fw-producer-" + serviceInfoProvider.getServiceId());
  297 + requestBuilder.defaultTopic(coreSettings.getFirmwareTopic());
  298 + requestBuilder.admin(fwUpdatesAdmin);
  299 + return requestBuilder.build();
  300 + }
  301 +
  302 + @Override
277 public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() { 303 public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
278 TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToUsageStatsServiceMsg>> requestBuilder = TbKafkaProducerTemplate.builder(); 304 TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToUsageStatsServiceMsg>> requestBuilder = TbKafkaProducerTemplate.builder();
279 requestBuilder.settings(kafkaSettings); 305 requestBuilder.settings(kafkaSettings);
@@ -23,6 +23,7 @@ import org.thingsboard.server.common.msg.queue.ServiceType; @@ -23,6 +23,7 @@ import org.thingsboard.server.common.msg.queue.ServiceType;
23 import org.thingsboard.server.gen.js.JsInvokeProtos; 23 import org.thingsboard.server.gen.js.JsInvokeProtos;
24 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; 24 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
25 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; 25 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
  26 +import org.thingsboard.server.gen.transport.TransportProtos.ToFirmwareStateServiceMsg;
26 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; 27 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
27 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; 28 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
28 import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; 29 import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
@@ -70,6 +71,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { @@ -70,6 +71,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
70 private final TbQueueAdmin jsExecutorAdmin; 71 private final TbQueueAdmin jsExecutorAdmin;
71 private final TbQueueAdmin transportApiAdmin; 72 private final TbQueueAdmin transportApiAdmin;
72 private final TbQueueAdmin notificationAdmin; 73 private final TbQueueAdmin notificationAdmin;
  74 + private final TbQueueAdmin fwUpdatesAdmin;
73 75
74 public KafkaTbCoreQueueFactory(PartitionService partitionService, TbKafkaSettings kafkaSettings, 76 public KafkaTbCoreQueueFactory(PartitionService partitionService, TbKafkaSettings kafkaSettings,
75 TbServiceInfoProvider serviceInfoProvider, 77 TbServiceInfoProvider serviceInfoProvider,
@@ -93,6 +95,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { @@ -93,6 +95,7 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
93 this.jsExecutorAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getJsExecutorConfigs()); 95 this.jsExecutorAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getJsExecutorConfigs());
94 this.transportApiAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getTransportApiConfigs()); 96 this.transportApiAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getTransportApiConfigs());
95 this.notificationAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getNotificationsConfigs()); 97 this.notificationAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getNotificationsConfigs());
  98 + this.fwUpdatesAdmin = new TbKafkaAdmin(kafkaSettings, kafkaTopicConfigs.getFwUpdatesConfigs());
96 } 99 }
97 100
98 @Override 101 @Override
@@ -242,6 +245,29 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory { @@ -242,6 +245,29 @@ public class KafkaTbCoreQueueFactory implements TbCoreQueueFactory {
242 } 245 }
243 246
244 @Override 247 @Override
  248 + public TbQueueConsumer<TbProtoQueueMsg<ToFirmwareStateServiceMsg>> createToFirmwareStateServiceMsgConsumer() {
  249 + TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToFirmwareStateServiceMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder();
  250 + consumerBuilder.settings(kafkaSettings);
  251 + consumerBuilder.topic(coreSettings.getFirmwareTopic());
  252 + consumerBuilder.clientId("tb-core-fw-consumer-" + serviceInfoProvider.getServiceId());
  253 + consumerBuilder.groupId("tb-core-fw-consumer");
  254 + consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToFirmwareStateServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
  255 + consumerBuilder.admin(fwUpdatesAdmin);
  256 + consumerBuilder.statsService(consumerStatsService);
  257 + return consumerBuilder.build();
  258 + }
  259 +
  260 + @Override
  261 + public TbQueueProducer<TbProtoQueueMsg<ToFirmwareStateServiceMsg>> createToFirmwareStateServiceMsgProducer() {
  262 + TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToFirmwareStateServiceMsg>> requestBuilder = TbKafkaProducerTemplate.builder();
  263 + requestBuilder.settings(kafkaSettings);
  264 + requestBuilder.clientId("tb-core-fw-producer-" + serviceInfoProvider.getServiceId());
  265 + requestBuilder.defaultTopic(coreSettings.getFirmwareTopic());
  266 + requestBuilder.admin(fwUpdatesAdmin);
  267 + return requestBuilder.build();
  268 + }
  269 +
  270 + @Override
245 public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() { 271 public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
246 TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToUsageStatsServiceMsg>> requestBuilder = TbKafkaProducerTemplate.builder(); 272 TbKafkaProducerTemplate.TbKafkaProducerTemplateBuilder<TbProtoQueueMsg<ToUsageStatsServiceMsg>> requestBuilder = TbKafkaProducerTemplate.builder();
247 requestBuilder.settings(kafkaSettings); 273 requestBuilder.settings(kafkaSettings);
@@ -22,6 +22,7 @@ import org.springframework.stereotype.Component; @@ -22,6 +22,7 @@ import org.springframework.stereotype.Component;
22 import org.thingsboard.server.common.msg.queue.ServiceType; 22 import org.thingsboard.server.common.msg.queue.ServiceType;
23 import org.thingsboard.server.gen.js.JsInvokeProtos.RemoteJsRequest; 23 import org.thingsboard.server.gen.js.JsInvokeProtos.RemoteJsRequest;
24 import org.thingsboard.server.gen.js.JsInvokeProtos.RemoteJsResponse; 24 import org.thingsboard.server.gen.js.JsInvokeProtos.RemoteJsResponse;
  25 +import org.thingsboard.server.gen.transport.TransportProtos.*;
25 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; 26 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
26 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; 27 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
27 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; 28 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
@@ -191,6 +192,17 @@ public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng @@ -191,6 +192,17 @@ public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
191 } 192 }
192 193
193 @Override 194 @Override
  195 + public TbQueueConsumer<TbProtoQueueMsg<ToFirmwareStateServiceMsg>> createToFirmwareStateServiceMsgConsumer() {
  196 + return new TbPubSubConsumerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getFirmwareTopic(),
  197 + msg -> new TbProtoQueueMsg<>(msg.getKey(), ToFirmwareStateServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
  198 + }
  199 +
  200 + @Override
  201 + public TbQueueProducer<TbProtoQueueMsg<ToFirmwareStateServiceMsg>> createToFirmwareStateServiceMsgProducer() {
  202 + return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getFirmwareTopic());
  203 + }
  204 +
  205 + @Override
194 public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() { 206 public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
195 return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getUsageStatsTopic()); 207 return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getUsageStatsTopic());
196 } 208 }
@@ -23,6 +23,7 @@ import org.thingsboard.server.common.msg.queue.ServiceType; @@ -23,6 +23,7 @@ import org.thingsboard.server.common.msg.queue.ServiceType;
23 import org.thingsboard.server.gen.js.JsInvokeProtos; 23 import org.thingsboard.server.gen.js.JsInvokeProtos;
24 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; 24 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
25 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; 25 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
  26 +import org.thingsboard.server.gen.transport.TransportProtos.ToFirmwareStateServiceMsg;
26 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; 27 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
27 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; 28 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
28 import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; 29 import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
@@ -165,6 +166,17 @@ public class PubSubTbCoreQueueFactory implements TbCoreQueueFactory { @@ -165,6 +166,17 @@ public class PubSubTbCoreQueueFactory implements TbCoreQueueFactory {
165 } 166 }
166 167
167 @Override 168 @Override
  169 + public TbQueueConsumer<TbProtoQueueMsg<ToFirmwareStateServiceMsg>> createToFirmwareStateServiceMsgConsumer() {
  170 + return new TbPubSubConsumerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getFirmwareTopic(),
  171 + msg -> new TbProtoQueueMsg<>(msg.getKey(), ToFirmwareStateServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
  172 + }
  173 +
  174 + @Override
  175 + public TbQueueProducer<TbProtoQueueMsg<ToFirmwareStateServiceMsg>> createToFirmwareStateServiceMsgProducer() {
  176 + return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getFirmwareTopic());
  177 + }
  178 +
  179 + @Override
168 public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() { 180 public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
169 return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getUsageStatsTopic()); 181 return new TbPubSubProducerTemplate<>(coreAdmin, pubSubSettings, coreSettings.getUsageStatsTopic());
170 } 182 }
@@ -24,6 +24,7 @@ import org.thingsboard.server.gen.js.JsInvokeProtos.RemoteJsRequest; @@ -24,6 +24,7 @@ import org.thingsboard.server.gen.js.JsInvokeProtos.RemoteJsRequest;
24 import org.thingsboard.server.gen.js.JsInvokeProtos.RemoteJsResponse; 24 import org.thingsboard.server.gen.js.JsInvokeProtos.RemoteJsResponse;
25 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; 25 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
26 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; 26 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
  27 +import org.thingsboard.server.gen.transport.TransportProtos.ToFirmwareStateServiceMsg;
27 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; 28 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
28 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; 29 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
29 import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; 30 import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
@@ -189,6 +190,17 @@ public class RabbitMqMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE @@ -189,6 +190,17 @@ public class RabbitMqMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
189 } 190 }
190 191
191 @Override 192 @Override
  193 + public TbQueueConsumer<TbProtoQueueMsg<ToFirmwareStateServiceMsg>> createToFirmwareStateServiceMsgConsumer() {
  194 + return new TbRabbitMqConsumerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getFirmwareTopic(),
  195 + msg -> new TbProtoQueueMsg<>(msg.getKey(), ToFirmwareStateServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
  196 + }
  197 +
  198 + @Override
  199 + public TbQueueProducer<TbProtoQueueMsg<ToFirmwareStateServiceMsg>> createToFirmwareStateServiceMsgProducer() {
  200 + return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getFirmwareTopic());
  201 + }
  202 +
  203 + @Override
192 public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() { 204 public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
193 return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getUsageStatsTopic()); 205 return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getUsageStatsTopic());
194 } 206 }
@@ -23,6 +23,7 @@ import org.thingsboard.server.common.msg.queue.ServiceType; @@ -23,6 +23,7 @@ import org.thingsboard.server.common.msg.queue.ServiceType;
23 import org.thingsboard.server.gen.js.JsInvokeProtos; 23 import org.thingsboard.server.gen.js.JsInvokeProtos;
24 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; 24 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
25 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; 25 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
  26 +import org.thingsboard.server.gen.transport.TransportProtos.ToFirmwareStateServiceMsg;
26 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; 27 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
27 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; 28 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
28 import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; 29 import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
@@ -171,6 +172,17 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory { @@ -171,6 +172,17 @@ public class RabbitMqTbCoreQueueFactory implements TbCoreQueueFactory {
171 } 172 }
172 173
173 @Override 174 @Override
  175 + public TbQueueConsumer<TbProtoQueueMsg<ToFirmwareStateServiceMsg>> createToFirmwareStateServiceMsgConsumer() {
  176 + return new TbRabbitMqConsumerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getFirmwareTopic(),
  177 + msg -> new TbProtoQueueMsg<>(msg.getKey(), ToFirmwareStateServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
  178 + }
  179 +
  180 + @Override
  181 + public TbQueueProducer<TbProtoQueueMsg<ToFirmwareStateServiceMsg>> createToFirmwareStateServiceMsgProducer() {
  182 + return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getFirmwareTopic());
  183 + }
  184 +
  185 + @Override
174 public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() { 186 public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
175 return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getUsageStatsTopic()); 187 return new TbRabbitMqProducerTemplate<>(coreAdmin, rabbitMqSettings, coreSettings.getUsageStatsTopic());
176 } 188 }
@@ -23,6 +23,7 @@ import org.thingsboard.server.common.msg.queue.ServiceType; @@ -23,6 +23,7 @@ import org.thingsboard.server.common.msg.queue.ServiceType;
23 import org.thingsboard.server.gen.js.JsInvokeProtos; 23 import org.thingsboard.server.gen.js.JsInvokeProtos;
24 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; 24 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
25 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; 25 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
  26 +import org.thingsboard.server.gen.transport.TransportProtos.ToFirmwareStateServiceMsg;
26 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; 27 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
27 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; 28 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
28 import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; 29 import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
@@ -188,6 +189,17 @@ public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRul @@ -188,6 +189,17 @@ public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRul
188 } 189 }
189 190
190 @Override 191 @Override
  192 + public TbQueueConsumer<TbProtoQueueMsg<ToFirmwareStateServiceMsg>> createToFirmwareStateServiceMsgConsumer() {
  193 + return new TbServiceBusConsumerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getFirmwareTopic(),
  194 + msg -> new TbProtoQueueMsg<>(msg.getKey(), ToFirmwareStateServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
  195 + }
  196 +
  197 + @Override
  198 + public TbQueueProducer<TbProtoQueueMsg<ToFirmwareStateServiceMsg>> createToFirmwareStateServiceMsgProducer() {
  199 + return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getFirmwareTopic());
  200 + }
  201 +
  202 + @Override
191 public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() { 203 public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
192 return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getUsageStatsTopic()); 204 return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getUsageStatsTopic());
193 } 205 }
@@ -23,6 +23,7 @@ import org.thingsboard.server.common.msg.queue.ServiceType; @@ -23,6 +23,7 @@ import org.thingsboard.server.common.msg.queue.ServiceType;
23 import org.thingsboard.server.gen.js.JsInvokeProtos; 23 import org.thingsboard.server.gen.js.JsInvokeProtos;
24 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; 24 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
25 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; 25 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
  26 +import org.thingsboard.server.gen.transport.TransportProtos.ToFirmwareStateServiceMsg;
26 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; 27 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
27 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg; 28 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineNotificationMsg;
28 import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg; 29 import org.thingsboard.server.gen.transport.TransportProtos.ToTransportMsg;
@@ -171,6 +172,17 @@ public class ServiceBusTbCoreQueueFactory implements TbCoreQueueFactory { @@ -171,6 +172,17 @@ public class ServiceBusTbCoreQueueFactory implements TbCoreQueueFactory {
171 } 172 }
172 173
173 @Override 174 @Override
  175 + public TbQueueConsumer<TbProtoQueueMsg<ToFirmwareStateServiceMsg>> createToFirmwareStateServiceMsgConsumer() {
  176 + return new TbServiceBusConsumerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getFirmwareTopic(),
  177 + msg -> new TbProtoQueueMsg<>(msg.getKey(), ToFirmwareStateServiceMsg.parseFrom(msg.getData()), msg.getHeaders()));
  178 + }
  179 +
  180 + @Override
  181 + public TbQueueProducer<TbProtoQueueMsg<ToFirmwareStateServiceMsg>> createToFirmwareStateServiceMsgProducer() {
  182 + return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getFirmwareTopic());
  183 + }
  184 +
  185 + @Override
174 public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() { 186 public TbQueueProducer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgProducer() {
175 return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getUsageStatsTopic()); 187 return new TbServiceBusProducerTemplate<>(coreAdmin, serviceBusSettings, coreSettings.getUsageStatsTopic());
176 } 188 }
@@ -16,6 +16,7 @@ @@ -16,6 +16,7 @@
16 package org.thingsboard.server.queue.provider; 16 package org.thingsboard.server.queue.provider;
17 17
18 import org.thingsboard.server.gen.js.JsInvokeProtos; 18 import org.thingsboard.server.gen.js.JsInvokeProtos;
  19 +import org.thingsboard.server.gen.transport.TransportProtos.ToFirmwareStateServiceMsg;
19 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; 20 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
20 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; 21 import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
21 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg; 22 import org.thingsboard.server.gen.transport.TransportProtos.ToRuleEngineMsg;
@@ -86,6 +87,20 @@ public interface TbCoreQueueFactory extends TbUsageStatsClientQueueFactory { @@ -86,6 +87,20 @@ public interface TbCoreQueueFactory extends TbUsageStatsClientQueueFactory {
86 TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgConsumer(); 87 TbQueueConsumer<TbProtoQueueMsg<ToUsageStatsServiceMsg>> createToUsageStatsServiceMsgConsumer();
87 88
88 /** 89 /**
  90 + * Used to consume messages about firmware update notifications by TB Core Service
  91 + *
  92 + * @return
  93 + */
  94 + TbQueueConsumer<TbProtoQueueMsg<ToFirmwareStateServiceMsg>> createToFirmwareStateServiceMsgConsumer();
  95 +
  96 + /**
  97 + * Used to consume messages about firmware update notifications by TB Core Service
  98 + *
  99 + * @return
  100 + */
  101 + TbQueueProducer<TbProtoQueueMsg<ToFirmwareStateServiceMsg>> createToFirmwareStateServiceMsgProducer();
  102 +
  103 + /**
89 * Used to consume high priority messages by TB Core Service 104 * Used to consume high priority messages by TB Core Service
90 * 105 *
91 * @return 106 * @return
@@ -26,6 +26,9 @@ public class TbQueueCoreSettings { @@ -26,6 +26,9 @@ public class TbQueueCoreSettings {
26 @Value("${queue.core.topic}") 26 @Value("${queue.core.topic}")
27 private String topic; 27 private String topic;
28 28
  29 + @Value("${queue.core.firmware.topic:tb_firmware}")
  30 + private String firmwareTopic;
  31 +
29 @Value("${queue.core.usage-stats-topic:tb_usage_stats}") 32 @Value("${queue.core.usage-stats-topic:tb_usage_stats}")
30 private String usageStatsTopic; 33 private String usageStatsTopic;
31 34
@@ -363,8 +363,10 @@ message GetFirmwareResponseMsg { @@ -363,8 +363,10 @@ message GetFirmwareResponseMsg {
363 ResponseStatus responseStatus = 1; 363 ResponseStatus responseStatus = 1;
364 int64 firmwareIdMSB = 2; 364 int64 firmwareIdMSB = 2;
365 int64 firmwareIdLSB = 3; 365 int64 firmwareIdLSB = 3;
366 - string contentType = 4;  
367 - string fileName = 5; 366 + string title = 4;
  367 + string version = 5;
  368 + string contentType = 6;
  369 + string fileName = 7;
368 } 370 }
369 371
370 //Used to report session state to tb-Service and persist this state in the cache on the tb-Service level. 372 //Used to report session state to tb-Service and persist this state in the cache on the tb-Service level.
@@ -663,3 +665,13 @@ message ToUsageStatsServiceMsg { @@ -663,3 +665,13 @@ message ToUsageStatsServiceMsg {
663 int64 customerIdMSB = 6; 665 int64 customerIdMSB = 6;
664 int64 customerIdLSB = 7; 666 int64 customerIdLSB = 7;
665 } 667 }
  668 +
  669 +message ToFirmwareStateServiceMsg {
  670 + int64 ts = 1;
  671 + int64 tenantIdMSB = 2;
  672 + int64 tenantIdLSB = 3;
  673 + int64 deviceIdMSB = 4;
  674 + int64 deviceIdLSB = 5;
  675 + int64 firmwareIdMSB = 6;
  676 + int64 firmwareIdLSB = 7;
  677 +}
@@ -207,6 +207,8 @@ public class DeviceApiController { @@ -207,6 +207,8 @@ public class DeviceApiController {
207 207
208 @RequestMapping(value = "/{deviceToken}/firmware", method = RequestMethod.GET) 208 @RequestMapping(value = "/{deviceToken}/firmware", method = RequestMethod.GET)
209 public DeferredResult<ResponseEntity> getFirmware(@PathVariable("deviceToken") String deviceToken, 209 public DeferredResult<ResponseEntity> getFirmware(@PathVariable("deviceToken") String deviceToken,
  210 + @RequestParam(value = "title") String title,
  211 + @RequestParam(value = "version") String version,
210 @RequestParam(value = "chunkSize", required = false, defaultValue = "0") int chunkSize, 212 @RequestParam(value = "chunkSize", required = false, defaultValue = "0") int chunkSize,
211 @RequestParam(value = "chunk", required = false, defaultValue = "0") int chunk) { 213 @RequestParam(value = "chunk", required = false, defaultValue = "0") int chunk) {
212 DeferredResult<ResponseEntity> responseWriter = new DeferredResult<>(); 214 DeferredResult<ResponseEntity> responseWriter = new DeferredResult<>();
@@ -217,7 +219,7 @@ public class DeviceApiController { @@ -217,7 +219,7 @@ public class DeviceApiController {
217 .setTenantIdLSB(sessionInfo.getTenantIdLSB()) 219 .setTenantIdLSB(sessionInfo.getTenantIdLSB())
218 .setDeviceIdMSB(sessionInfo.getDeviceIdMSB()) 220 .setDeviceIdMSB(sessionInfo.getDeviceIdMSB())
219 .setDeviceIdLSB(sessionInfo.getDeviceIdLSB()).build(); 221 .setDeviceIdLSB(sessionInfo.getDeviceIdLSB()).build();
220 - transportContext.getTransportService().process(sessionInfo, requestMsg, new GetFirmwareCallback(responseWriter, chunkSize, chunk)); 222 + transportContext.getTransportService().process(sessionInfo, requestMsg, new GetFirmwareCallback(responseWriter, title, version, chunkSize, chunk));
221 })); 223 }));
222 return responseWriter; 224 return responseWriter;
223 } 225 }
@@ -278,11 +280,15 @@ public class DeviceApiController { @@ -278,11 +280,15 @@ public class DeviceApiController {
278 280
279 private class GetFirmwareCallback implements TransportServiceCallback<TransportProtos.GetFirmwareResponseMsg> { 281 private class GetFirmwareCallback implements TransportServiceCallback<TransportProtos.GetFirmwareResponseMsg> {
280 private final DeferredResult<ResponseEntity> responseWriter; 282 private final DeferredResult<ResponseEntity> responseWriter;
  283 + private final String title;
  284 + private final String version;
281 private final int chuckSize; 285 private final int chuckSize;
282 private final int chuck; 286 private final int chuck;
283 287
284 - GetFirmwareCallback(DeferredResult<ResponseEntity> responseWriter, int chuckSize, int chuck) { 288 + GetFirmwareCallback(DeferredResult<ResponseEntity> responseWriter, String title, String version, int chuckSize, int chuck) {
285 this.responseWriter = responseWriter; 289 this.responseWriter = responseWriter;
  290 + this.title = title;
  291 + this.version = version;
286 this.chuckSize = chuckSize; 292 this.chuckSize = chuckSize;
287 this.chuck = chuck; 293 this.chuck = chuck;
288 } 294 }
@@ -291,7 +297,7 @@ public class DeviceApiController { @@ -291,7 +297,7 @@ public class DeviceApiController {
291 public void onSuccess(TransportProtos.GetFirmwareResponseMsg firmwareResponseMsg) { 297 public void onSuccess(TransportProtos.GetFirmwareResponseMsg firmwareResponseMsg) {
292 if (!TransportProtos.ResponseStatus.SUCCESS.equals(firmwareResponseMsg.getResponseStatus())) { 298 if (!TransportProtos.ResponseStatus.SUCCESS.equals(firmwareResponseMsg.getResponseStatus())) {
293 responseWriter.setResult(new ResponseEntity<>(HttpStatus.NOT_FOUND)); 299 responseWriter.setResult(new ResponseEntity<>(HttpStatus.NOT_FOUND));
294 - } else { 300 + } else if (title.equals(firmwareResponseMsg.getTitle()) && version.equals(firmwareResponseMsg.getVersion())) {
295 String firmwareId = new UUID(firmwareResponseMsg.getFirmwareIdMSB(), firmwareResponseMsg.getFirmwareIdLSB()).toString(); 301 String firmwareId = new UUID(firmwareResponseMsg.getFirmwareIdMSB(), firmwareResponseMsg.getFirmwareIdLSB()).toString();
296 ByteArrayResource resource = new ByteArrayResource(transportContext.getFirmwareCacheReader().get(firmwareId, chuckSize, chuck)); 302 ByteArrayResource resource = new ByteArrayResource(transportContext.getFirmwareCacheReader().get(firmwareId, chuckSize, chuck));
297 ResponseEntity<ByteArrayResource> response = ResponseEntity.ok() 303 ResponseEntity<ByteArrayResource> response = ResponseEntity.ok()
@@ -301,6 +307,8 @@ public class DeviceApiController { @@ -301,6 +307,8 @@ public class DeviceApiController {
301 .contentType(parseMediaType(firmwareResponseMsg.getContentType())) 307 .contentType(parseMediaType(firmwareResponseMsg.getContentType()))
302 .body(resource); 308 .body(resource);
303 responseWriter.setResult(response); 309 responseWriter.setResult(response);
  310 + } else {
  311 + responseWriter.setResult(new ResponseEntity<>(HttpStatus.BAD_REQUEST));
304 } 312 }
305 } 313 }
306 314
@@ -81,6 +81,8 @@ public interface DeviceDao extends Dao<Device>, TenantEntityDao { @@ -81,6 +81,8 @@ public interface DeviceDao extends Dao<Device>, TenantEntityDao {
81 */ 81 */
82 PageData<Device> findDevicesByTenantIdAndType(UUID tenantId, String type, PageLink pageLink); 82 PageData<Device> findDevicesByTenantIdAndType(UUID tenantId, String type, PageLink pageLink);
83 83
  84 + PageData<Device> findDevicesByTenantIdAndTypeAndEmptyFirmware(UUID tenantId, String type, PageLink pageLink);
  85 +
84 /** 86 /**
85 * Find device infos by tenantId, type and page link. 87 * Find device infos by tenantId, type and page link.
86 * 88 *
@@ -355,6 +355,15 @@ public class DeviceServiceImpl extends AbstractEntityService implements DeviceSe @@ -355,6 +355,15 @@ public class DeviceServiceImpl extends AbstractEntityService implements DeviceSe
355 } 355 }
356 356
357 @Override 357 @Override
  358 + public PageData<Device> findDevicesByTenantIdAndTypeAndEmptyFirmware(TenantId tenantId, String type, PageLink pageLink) {
  359 + log.trace("Executing findDevicesByTenantIdAndType, tenantId [{}], type [{}], pageLink [{}]", tenantId, type, pageLink);
  360 + validateId(tenantId, INCORRECT_TENANT_ID + tenantId);
  361 + validateString(type, "Incorrect type " + type);
  362 + validatePageLink(pageLink);
  363 + return deviceDao.findDevicesByTenantIdAndTypeAndEmptyFirmware(tenantId.getId(), type, pageLink);
  364 + }
  365 +
  366 + @Override
358 public PageData<DeviceInfo> findDeviceInfosByTenantIdAndType(TenantId tenantId, String type, PageLink pageLink) { 367 public PageData<DeviceInfo> findDeviceInfosByTenantIdAndType(TenantId tenantId, String type, PageLink pageLink) {
359 log.trace("Executing findDeviceInfosByTenantIdAndType, tenantId [{}], type [{}], pageLink [{}]", tenantId, type, pageLink); 368 log.trace("Executing findDeviceInfosByTenantIdAndType, tenantId [{}], type [{}], pageLink [{}]", tenantId, type, pageLink);
360 validateId(tenantId, INCORRECT_TENANT_ID + tenantId); 369 validateId(tenantId, INCORRECT_TENANT_ID + tenantId);
@@ -94,6 +94,15 @@ public interface DeviceRepository extends PagingAndSortingRepository<DeviceEntit @@ -94,6 +94,15 @@ public interface DeviceRepository extends PagingAndSortingRepository<DeviceEntit
94 @Param("textSearch") String textSearch, 94 @Param("textSearch") String textSearch,
95 Pageable pageable); 95 Pageable pageable);
96 96
  97 + @Query("SELECT d FROM DeviceEntity d WHERE d.tenantId = :tenantId " +
  98 + "AND d.type = :type " +
  99 + "AND d.firmwareId = null " +
  100 + "AND LOWER(d.searchText) LIKE LOWER(CONCAT(:textSearch, '%'))")
  101 + Page<DeviceEntity> findByTenantIdAndTypeAndFirmwareIdIsNull(@Param("tenantId") UUID tenantId,
  102 + @Param("type") String type,
  103 + @Param("textSearch") String textSearch,
  104 + Pageable pageable);
  105 +
97 @Query("SELECT new org.thingsboard.server.dao.model.sql.DeviceInfoEntity(d, c.title, c.additionalInfo, p.name) " + 106 @Query("SELECT new org.thingsboard.server.dao.model.sql.DeviceInfoEntity(d, c.title, c.additionalInfo, p.name) " +
98 "FROM DeviceEntity d " + 107 "FROM DeviceEntity d " +
99 "LEFT JOIN CustomerEntity c on c.id = d.customerId " + 108 "LEFT JOIN CustomerEntity c on c.id = d.customerId " +
@@ -150,6 +150,16 @@ public class JpaDeviceDao extends JpaAbstractSearchTextDao<DeviceEntity, Device> @@ -150,6 +150,16 @@ public class JpaDeviceDao extends JpaAbstractSearchTextDao<DeviceEntity, Device>
150 } 150 }
151 151
152 @Override 152 @Override
  153 + public PageData<Device> findDevicesByTenantIdAndTypeAndEmptyFirmware(UUID tenantId, String type, PageLink pageLink) {
  154 + return DaoUtil.toPageData(
  155 + deviceRepository.findByTenantIdAndTypeAndFirmwareIdIsNull(
  156 + tenantId,
  157 + type,
  158 + Objects.toString(pageLink.getTextSearch(), ""),
  159 + DaoUtil.toPageable(pageLink)));
  160 + }
  161 +
  162 + @Override
153 public PageData<DeviceInfo> findDeviceInfosByTenantIdAndType(UUID tenantId, String type, PageLink pageLink) { 163 public PageData<DeviceInfo> findDeviceInfosByTenantIdAndType(UUID tenantId, String type, PageLink pageLink) {
154 return DaoUtil.toPageData( 164 return DaoUtil.toPageData(
155 deviceRepository.findDeviceInfosByTenantIdAndType( 165 deviceRepository.findDeviceInfosByTenantIdAndType(
@@ -26,7 +26,7 @@ import java.util.Arrays; @@ -26,7 +26,7 @@ import java.util.Arrays;
26 @SpringBootConfiguration 26 @SpringBootConfiguration
27 @EnableAsync 27 @EnableAsync
28 @EnableScheduling 28 @EnableScheduling
29 -@ComponentScan({"org.thingsboard.server.coap", "org.thingsboard.server.common", "org.thingsboard.server.coapserver", "org.thingsboard.server.transport.coap", "org.thingsboard.server.queue"}) 29 +@ComponentScan({"org.thingsboard.server.coap", "org.thingsboard.server.common", "org.thingsboard.server.coapserver", "org.thingsboard.server.transport.coap", "org.thingsboard.server.queue", "org.thingsboard.server.cache"})
30 public class ThingsboardCoapTransportApplication { 30 public class ThingsboardCoapTransportApplication {
31 31
32 private static final String SPRING_CONFIG_NAME_KEY = "--spring.config.name"; 32 private static final String SPRING_CONFIG_NAME_KEY = "--spring.config.name";
@@ -40,6 +40,49 @@ zk: @@ -40,6 +40,49 @@ zk:
40 # Name of the directory in zookeeper 'filesystem' 40 # Name of the directory in zookeeper 'filesystem'
41 zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" 41 zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}"
42 42
  43 +cache:
  44 + type: "${CACHE_TYPE:redis}"
  45 +
  46 +redis:
  47 + # standalone or cluster
  48 + connection:
  49 + type: "${REDIS_CONNECTION_TYPE:standalone}"
  50 + standalone:
  51 + host: "${REDIS_HOST:localhost}"
  52 + port: "${REDIS_PORT:6379}"
  53 + useDefaultClientConfig: "${REDIS_USE_DEFAULT_CLIENT_CONFIG:true}"
  54 + # this value may be used only if you used not default ClientConfig
  55 + clientName: "${REDIS_CLIENT_NAME:standalone}"
  56 + # this value may be used only if you used not default ClientConfig
  57 + connectTimeout: "${REDIS_CLIENT_CONNECT_TIMEOUT:30000}"
  58 + # this value may be used only if you used not default ClientConfig
  59 + readTimeout: "${REDIS_CLIENT_READ_TIMEOUT:60000}"
  60 + # this value may be used only if you used not default ClientConfig
  61 + usePoolConfig: "${REDIS_CLIENT_USE_POOL_CONFIG:false}"
  62 + cluster:
  63 + # Comma-separated list of "host:port" pairs to bootstrap from.
  64 + nodes: "${REDIS_NODES:}"
  65 + # Maximum number of redirects to follow when executing commands across the cluster.
  66 + max-redirects: "${REDIS_MAX_REDIRECTS:12}"
  67 + useDefaultPoolConfig: "${REDIS_USE_DEFAULT_POOL_CONFIG:true}"
  68 + # db index
  69 + db: "${REDIS_DB:0}"
  70 + # db password
  71 + password: "${REDIS_PASSWORD:}"
  72 + # pool config
  73 + pool_config:
  74 + maxTotal: "${REDIS_POOL_CONFIG_MAX_TOTAL:128}"
  75 + maxIdle: "${REDIS_POOL_CONFIG_MAX_IDLE:128}"
  76 + minIdle: "${REDIS_POOL_CONFIG_MIN_IDLE:16}"
  77 + testOnBorrow: "${REDIS_POOL_CONFIG_TEST_ON_BORROW:true}"
  78 + testOnReturn: "${REDIS_POOL_CONFIG_TEST_ON_RETURN:true}"
  79 + testWhileIdle: "${REDIS_POOL_CONFIG_TEST_WHILE_IDLE:true}"
  80 + minEvictableMs: "${REDIS_POOL_CONFIG_MIN_EVICTABLE_MS:60000}"
  81 + evictionRunsMs: "${REDIS_POOL_CONFIG_EVICTION_RUNS_MS:30000}"
  82 + maxWaitMills: "${REDIS_POOL_CONFIG_MAX_WAIT_MS:60000}"
  83 + numberTestsPerEvictionRun: "${REDIS_POOL_CONFIG_NUMBER_TESTS_PER_EVICTION_RUN:3}"
  84 + blockWhenExhausted: "${REDIS_POOL_CONFIG_BLOCK_WHEN_EXHAUSTED:true}"
  85 +
43 # COAP server parameters 86 # COAP server parameters
44 transport: 87 transport:
45 coap: 88 coap:
@@ -24,7 +24,7 @@ import java.util.Arrays; @@ -24,7 +24,7 @@ import java.util.Arrays;
24 24
25 @SpringBootApplication 25 @SpringBootApplication
26 @EnableAsync 26 @EnableAsync
27 -@ComponentScan({"org.thingsboard.server.http", "org.thingsboard.server.common", "org.thingsboard.server.transport.http", "org.thingsboard.server.queue"}) 27 +@ComponentScan({"org.thingsboard.server.http", "org.thingsboard.server.common", "org.thingsboard.server.transport.http", "org.thingsboard.server.queue", "org.thingsboard.server.cache"})
28 public class ThingsboardHttpTransportApplication { 28 public class ThingsboardHttpTransportApplication {
29 29
30 private static final String SPRING_CONFIG_NAME_KEY = "--spring.config.name"; 30 private static final String SPRING_CONFIG_NAME_KEY = "--spring.config.name";
@@ -35,6 +35,49 @@ zk: @@ -35,6 +35,49 @@ zk:
35 # Name of the directory in zookeeper 'filesystem' 35 # Name of the directory in zookeeper 'filesystem'
36 zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" 36 zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}"
37 37
  38 +cache:
  39 + type: "${CACHE_TYPE:redis}"
  40 +
  41 +redis:
  42 + # standalone or cluster
  43 + connection:
  44 + type: "${REDIS_CONNECTION_TYPE:standalone}"
  45 + standalone:
  46 + host: "${REDIS_HOST:localhost}"
  47 + port: "${REDIS_PORT:6379}"
  48 + useDefaultClientConfig: "${REDIS_USE_DEFAULT_CLIENT_CONFIG:true}"
  49 + # this value may be used only if you used not default ClientConfig
  50 + clientName: "${REDIS_CLIENT_NAME:standalone}"
  51 + # this value may be used only if you used not default ClientConfig
  52 + connectTimeout: "${REDIS_CLIENT_CONNECT_TIMEOUT:30000}"
  53 + # this value may be used only if you used not default ClientConfig
  54 + readTimeout: "${REDIS_CLIENT_READ_TIMEOUT:60000}"
  55 + # this value may be used only if you used not default ClientConfig
  56 + usePoolConfig: "${REDIS_CLIENT_USE_POOL_CONFIG:false}"
  57 + cluster:
  58 + # Comma-separated list of "host:port" pairs to bootstrap from.
  59 + nodes: "${REDIS_NODES:}"
  60 + # Maximum number of redirects to follow when executing commands across the cluster.
  61 + max-redirects: "${REDIS_MAX_REDIRECTS:12}"
  62 + useDefaultPoolConfig: "${REDIS_USE_DEFAULT_POOL_CONFIG:true}"
  63 + # db index
  64 + db: "${REDIS_DB:0}"
  65 + # db password
  66 + password: "${REDIS_PASSWORD:}"
  67 + # pool config
  68 + pool_config:
  69 + maxTotal: "${REDIS_POOL_CONFIG_MAX_TOTAL:128}"
  70 + maxIdle: "${REDIS_POOL_CONFIG_MAX_IDLE:128}"
  71 + minIdle: "${REDIS_POOL_CONFIG_MIN_IDLE:16}"
  72 + testOnBorrow: "${REDIS_POOL_CONFIG_TEST_ON_BORROW:true}"
  73 + testOnReturn: "${REDIS_POOL_CONFIG_TEST_ON_RETURN:true}"
  74 + testWhileIdle: "${REDIS_POOL_CONFIG_TEST_WHILE_IDLE:true}"
  75 + minEvictableMs: "${REDIS_POOL_CONFIG_MIN_EVICTABLE_MS:60000}"
  76 + evictionRunsMs: "${REDIS_POOL_CONFIG_EVICTION_RUNS_MS:30000}"
  77 + maxWaitMills: "${REDIS_POOL_CONFIG_MAX_WAIT_MS:60000}"
  78 + numberTestsPerEvictionRun: "${REDIS_POOL_CONFIG_NUMBER_TESTS_PER_EVICTION_RUN:3}"
  79 + blockWhenExhausted: "${REDIS_POOL_CONFIG_BLOCK_WHEN_EXHAUSTED:true}"
  80 +
38 # HTTP server parameters 81 # HTTP server parameters
39 transport: 82 transport:
40 http: 83 http:
@@ -41,41 +41,7 @@ zk: @@ -41,41 +41,7 @@ zk:
41 zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" 41 zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}"
42 42
43 cache: 43 cache:
44 - # caffeine or redis  
45 - type: "${CACHE_TYPE:caffeine}"  
46 -  
47 -caffeine:  
48 - specs:  
49 - relations:  
50 - timeToLiveInMinutes: 1440  
51 - maxSize: 0  
52 - deviceCredentials:  
53 - timeToLiveInMinutes: 1440  
54 - maxSize: 0  
55 - devices:  
56 - timeToLiveInMinutes: 1440  
57 - maxSize: 0  
58 - sessions:  
59 - timeToLiveInMinutes: 1440  
60 - maxSize: 0  
61 - assets:  
62 - timeToLiveInMinutes: 1440  
63 - maxSize: 0  
64 - entityViews:  
65 - timeToLiveInMinutes: 1440  
66 - maxSize: 0  
67 - claimDevices:  
68 - timeToLiveInMinutes: 1  
69 - maxSize: 0  
70 - securitySettings:  
71 - timeToLiveInMinutes: 1440  
72 - maxSize: 0  
73 - tenantProfiles:  
74 - timeToLiveInMinutes: 1440  
75 - maxSize: 0  
76 - deviceProfiles:  
77 - timeToLiveInMinutes: 1440  
78 - maxSize: 0 44 + type: "${CACHE_TYPE:redis}"
79 45
80 redis: 46 redis:
81 # standalone or cluster 47 # standalone or cluster
@@ -26,7 +26,7 @@ import java.util.Arrays; @@ -26,7 +26,7 @@ import java.util.Arrays;
26 @SpringBootConfiguration 26 @SpringBootConfiguration
27 @EnableAsync 27 @EnableAsync
28 @EnableScheduling 28 @EnableScheduling
29 -@ComponentScan({"org.thingsboard.server.mqtt", "org.thingsboard.server.common", "org.thingsboard.server.transport.mqtt", "org.thingsboard.server.queue"}) 29 +@ComponentScan({"org.thingsboard.server.mqtt", "org.thingsboard.server.common", "org.thingsboard.server.transport.mqtt", "org.thingsboard.server.queue", "org.thingsboard.server.cache"})
30 public class ThingsboardMqttTransportApplication { 30 public class ThingsboardMqttTransportApplication {
31 31
32 private static final String SPRING_CONFIG_NAME_KEY = "--spring.config.name"; 32 private static final String SPRING_CONFIG_NAME_KEY = "--spring.config.name";
@@ -40,6 +40,49 @@ zk: @@ -40,6 +40,49 @@ zk:
40 # Name of the directory in zookeeper 'filesystem' 40 # Name of the directory in zookeeper 'filesystem'
41 zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}" 41 zk_dir: "${ZOOKEEPER_NODES_DIR:/thingsboard}"
42 42
  43 +cache:
  44 + type: "${CACHE_TYPE:redis}"
  45 +
  46 +redis:
  47 + # standalone or cluster
  48 + connection:
  49 + type: "${REDIS_CONNECTION_TYPE:standalone}"
  50 + standalone:
  51 + host: "${REDIS_HOST:localhost}"
  52 + port: "${REDIS_PORT:6379}"
  53 + useDefaultClientConfig: "${REDIS_USE_DEFAULT_CLIENT_CONFIG:true}"
  54 + # this value may be used only if you used not default ClientConfig
  55 + clientName: "${REDIS_CLIENT_NAME:standalone}"
  56 + # this value may be used only if you used not default ClientConfig
  57 + connectTimeout: "${REDIS_CLIENT_CONNECT_TIMEOUT:30000}"
  58 + # this value may be used only if you used not default ClientConfig
  59 + readTimeout: "${REDIS_CLIENT_READ_TIMEOUT:60000}"
  60 + # this value may be used only if you used not default ClientConfig
  61 + usePoolConfig: "${REDIS_CLIENT_USE_POOL_CONFIG:false}"
  62 + cluster:
  63 + # Comma-separated list of "host:port" pairs to bootstrap from.
  64 + nodes: "${REDIS_NODES:}"
  65 + # Maximum number of redirects to follow when executing commands across the cluster.
  66 + max-redirects: "${REDIS_MAX_REDIRECTS:12}"
  67 + useDefaultPoolConfig: "${REDIS_USE_DEFAULT_POOL_CONFIG:true}"
  68 + # db index
  69 + db: "${REDIS_DB:0}"
  70 + # db password
  71 + password: "${REDIS_PASSWORD:}"
  72 + # pool config
  73 + pool_config:
  74 + maxTotal: "${REDIS_POOL_CONFIG_MAX_TOTAL:128}"
  75 + maxIdle: "${REDIS_POOL_CONFIG_MAX_IDLE:128}"
  76 + minIdle: "${REDIS_POOL_CONFIG_MIN_IDLE:16}"
  77 + testOnBorrow: "${REDIS_POOL_CONFIG_TEST_ON_BORROW:true}"
  78 + testOnReturn: "${REDIS_POOL_CONFIG_TEST_ON_RETURN:true}"
  79 + testWhileIdle: "${REDIS_POOL_CONFIG_TEST_WHILE_IDLE:true}"
  80 + minEvictableMs: "${REDIS_POOL_CONFIG_MIN_EVICTABLE_MS:60000}"
  81 + evictionRunsMs: "${REDIS_POOL_CONFIG_EVICTION_RUNS_MS:30000}"
  82 + maxWaitMills: "${REDIS_POOL_CONFIG_MAX_WAIT_MS:60000}"
  83 + numberTestsPerEvictionRun: "${REDIS_POOL_CONFIG_NUMBER_TESTS_PER_EVICTION_RUN:3}"
  84 + blockWhenExhausted: "${REDIS_POOL_CONFIG_BLOCK_WHEN_EXHAUSTED:true}"
  85 +
43 # MQTT server parameters 86 # MQTT server parameters
44 transport: 87 transport:
45 mqtt: 88 mqtt: