Commit 93a8c90882ea9c844d7d9698ff5d4ff03a81508d

Authored by Andrew Shvayka
1 parent 866d7c4a

Improvement to device state service

... ... @@ -30,13 +30,6 @@ public interface ClusterRoutingService extends DiscoveryServiceListener {
30 30
31 31 ServerAddress getCurrentServer();
32 32
33   - Optional<ServerAddress> resolveByUuid(UUID uuid);
34   -
35 33 Optional<ServerAddress> resolveById(EntityId entityId);
36 34
37   - Optional<ServerAddress> resolveByUuid(ServerType server, UUID uuid);
38   -
39   - Optional<ServerAddress> resolveById(ServerType server, EntityId entityId);
40   -
41   -
42 35 }
... ...
... ... @@ -88,21 +88,6 @@ public class ConsistentClusterRoutingService implements ClusterRoutingService {
88 88 return resolveByUuid(rootCircle, entityId.getId());
89 89 }
90 90
91   - @Override
92   - public Optional<ServerAddress> resolveByUuid(UUID uuid) {
93   - return resolveByUuid(rootCircle, uuid);
94   - }
95   -
96   - @Override
97   - public Optional<ServerAddress> resolveByUuid(ServerType server, UUID uuid) {
98   - return resolveByUuid(circles[server.ordinal()], uuid);
99   - }
100   -
101   - @Override
102   - public Optional<ServerAddress> resolveById(ServerType server, EntityId entityId) {
103   - return resolveByUuid(circles[server.ordinal()], entityId.getId());
104   - }
105   -
106 91 private Optional<ServerAddress> resolveByUuid(ConsistentHashCircle circle, UUID uuid) {
107 92 Assert.notNull(uuid);
108 93 if (circle.isEmpty()) {
... ...
... ... @@ -296,15 +296,21 @@ public class DefaultDeviceStateService implements DeviceStateService {
296 296 private void updateState() {
297 297 long ts = System.currentTimeMillis();
298 298 Set<DeviceId> deviceIds = new HashSet<>(deviceStates.keySet());
  299 + log.info("Calculating state updates for {} devices", deviceStates.size());
299 300 for (DeviceId deviceId : deviceIds) {
300   - DeviceStateData stateData = deviceStates.get(deviceId);
301   - DeviceState state = stateData.getState();
302   - state.setActive(ts < state.getLastActivityTime() + state.getInactivityTimeout());
303   - if (!state.isActive() && (state.getLastInactivityAlarmTime() == 0L || state.getLastInactivityAlarmTime() < state.getLastActivityTime())) {
304   - state.setLastInactivityAlarmTime(ts);
305   - pushRuleEngineMessage(stateData, INACTIVITY_EVENT);
306   - save(deviceId, INACTIVITY_ALARM_TIME, ts);
307   - save(deviceId, ACTIVITY_STATE, state.isActive());
  301 + DeviceStateData stateData = getOrFetchDeviceStateData(deviceId);
  302 + if (stateData != null) {
  303 + DeviceState state = stateData.getState();
  304 + state.setActive(ts < state.getLastActivityTime() + state.getInactivityTimeout());
  305 + if (!state.isActive() && (state.getLastInactivityAlarmTime() == 0L || state.getLastInactivityAlarmTime() < state.getLastActivityTime())) {
  306 + state.setLastInactivityAlarmTime(ts);
  307 + pushRuleEngineMessage(stateData, INACTIVITY_EVENT);
  308 + save(deviceId, INACTIVITY_ALARM_TIME, ts);
  309 + save(deviceId, ACTIVITY_STATE, state.isActive());
  310 + }
  311 + } else {
  312 + log.debug("[{}] Device that belongs to other server is detected and removed.", deviceId);
  313 + deviceStates.remove(deviceId);
308 314 }
309 315 }
310 316 }
... ... @@ -353,6 +359,7 @@ public class DefaultDeviceStateService implements DeviceStateService {
353 359 if (device != null) {
354 360 try {
355 361 deviceStateData = fetchDeviceState(device).get();
  362 + deviceStates.putIfAbsent(deviceId, deviceStateData);
356 363 } catch (InterruptedException | ExecutionException e) {
357 364 log.debug("[{}] Failed to fetch device state!", deviceId, e);
358 365 }
... ...