index.ts 7.18 KB
import { unref } from 'vue'
import { isString } from '@wry-smile/utils-is'
import type { AttributeData, EntityId, SubscriptionData, SubscriptionUpdateMsg, WebsocketDataMsg } from '../type/message'
import type { WebsocketCmd } from '../type/service'
import { WsSubscriber } from '../type/service'
import type { EntityType } from '../enum'
import { LatestTelemetry, TelemetryFeature } from '../enum'
import type { AggEntityHistoryCmd, AggTimeSeriesCmd, EntityHistoryCmd, LatestValueCmd, TelemetryPluginCmd, TimeSeriesCmd } from '../type/command'
import { GetHistoryCmd, SubscriptionCmd } from '../type/command'
import type { EntityDataQuery } from '../type/query'
import type { CmdWrapper } from '..'
import { WebsocketService } from '..'
import type { CommandSource } from '../processor'

export class TimeseriesSubscriptionCmd extends SubscriptionCmd {
  getType() {
    return TelemetryFeature.TIMESERIES
  }
}

export class HistorySubscriptionCmd extends GetHistoryCmd {
  getType() {
    return TelemetryFeature.TIMESERIES
  }
}

export class EntityDataCmd implements WebsocketCmd {
  cmdId: number = null as unknown as number
  query?: EntityDataQuery
  historyCmd?: EntityHistoryCmd
  latestCmd?: LatestValueCmd
  tsCmd?: TimeSeriesCmd
  aggHistoryCmd?: AggEntityHistoryCmd
  aggTsCmd?: AggTimeSeriesCmd

  public isEmpty(): boolean {
    return !this.query && !this.historyCmd && !this.latestCmd && !this.tsCmd && !this.aggTsCmd && !this.aggHistoryCmd
  }
}

export class SubscriptionUpdate implements SubscriptionUpdateMsg {
  subscriptionId: number
  errorCode: number
  errorMsg: string
  data: SubscriptionData

  constructor(msg: SubscriptionUpdateMsg) {
    this.subscriptionId = msg.subscriptionId
    this.errorCode = msg.errorCode
    this.errorMsg = msg.errorMsg
    this.data = msg.data
  }

  public prepareData(keys: string[]) {
    if (!this.data) this.data = {}

    if (keys) {
      keys.forEach((key) => {
        if (!this.data[key]) this.data[key] = []
      })
    }
  }

  public updateAttributeData(origData: AttributeData[]) {
    for (const key of Object.keys(this.data)) {
      const keyData = this.data[key]

      if (keyData.length) {
        const existing = origData.find(data => data.key === key)
        const [data] = keyData
        const [lastUpdateTs, value] = data || []
        if (existing) {
          existing.lastUpdateTs = lastUpdateTs
          existing.value = value
        }
        else {
          origData.push({ key, lastUpdateTs, value })
        }
      }
    }

    return origData
  }
}

export class TelemetrySubscriber extends WsSubscriber {
  subscriptionCommand: WebsocketCmd

  commandSource: CommandSource

  public static createTimeseriesSubscription(telemetryService: TelemetryWebsockerService, entityId: EntityId, keys: string | string[], commandSource: CommandSource) {
    keys = isString(keys) ? [keys] : keys

    const subscriptionCommand = new TimeseriesSubscriptionCmd()

    subscriptionCommand.entityId = entityId.id
    subscriptionCommand.entityType = entityId.entityType as EntityType
    subscriptionCommand.scope = LatestTelemetry.LATEST_TELEMETRY

    subscriptionCommand.keys = keys.join(',')

    const subscriber = new TelemetrySubscriber(telemetryService, commandSource)
    subscriber.subscriptionCommand = subscriptionCommand
    return subscriber
  }

  public static createHistorySubscription(telemetryService: TelemetryWebsockerService, entityId: EntityId, keys: string[] | string, commandSource: CommandSource) {
    keys = isString(keys) ? [keys] : keys

    const subscriptionCommand = new HistorySubscriptionCmd()

    subscriptionCommand.entityId = entityId.id
    subscriptionCommand.entityType = entityId.entityType as EntityType

    if (keys) subscriptionCommand.keys = keys.join(',')

    const subscriber = new TelemetrySubscriber(telemetryService, commandSource)
    subscriber.subscriptionCommand = subscriptionCommand
    return subscriber
  }

  constructor(telemetryService: TelemetryWebsockerService, commandSource: CommandSource) {
    super(telemetryService)
    this.subscriptionCommand = { cmdId: null as unknown as number }
    this.commandSource = commandSource
  }

  public onData(message: SubscriptionUpdate) {
    let keys: string[] = []
    const cmd = this.subscriptionCommand

    if (cmd) {
      const telemetryPluginCmd = cmd as TelemetryPluginCmd

      if (telemetryPluginCmd.keys && telemetryPluginCmd.keys.length)
        keys = telemetryPluginCmd.keys.split(',')
    }

    message.prepareData(keys)
  }
}

export class TelemetryPluginCmdsWrapper implements CmdWrapper {
  tsSubCmds: TimeseriesSubscriptionCmd[] = []
  historyCmds: GetHistoryCmd[] = []

  private static popCmds<T>(cmds: T[], leftCount: number) {
    const toPublish = Math.min(cmds.length, leftCount)
    if (toPublish) return cmds.splice(0, toPublish)
    else return []
  }

  constructor() {
    this.tsSubCmds = []
  }

  public hasCommands(): boolean {
    return !!this.tsSubCmds.length || !!this.historyCmds.length
  }

  public clear(): void {
    this.tsSubCmds.length = 0
    this.historyCmds.length = 0
  }

  public preparePublishCommands(maxCommands: number): CmdWrapper {
    const prepareWrapper = new TelemetryPluginCmdsWrapper()
    let leftCount = maxCommands
    prepareWrapper.tsSubCmds = TelemetryPluginCmdsWrapper.popCmds(this.tsSubCmds, leftCount)
    leftCount -= prepareWrapper.tsSubCmds.length
    prepareWrapper.historyCmds = TelemetryPluginCmdsWrapper.popCmds(this.historyCmds, leftCount)
    leftCount -= prepareWrapper.historyCmds.length
    return prepareWrapper
  }
}

export class TelemetryWebsockerService extends WebsocketService<TelemetrySubscriber> {
  constructor() {
    super(new TelemetryPluginCmdsWrapper())
  }

  public subscribe(subscriber: TelemetrySubscriber): void {
    const subscriptionCommand = subscriber.subscriptionCommand
    const cmdId = this.nextCmdId()
    this.subscribersMap.set(cmdId, subscriber)
    subscriptionCommand.cmdId = cmdId
    if (subscriptionCommand instanceof SubscriptionCmd) {
      if (subscriptionCommand.getType() === TelemetryFeature.TIMESERIES)
        this.cmdWrapper.tsSubCmds.push(subscriptionCommand)
    }
    else if (subscriptionCommand instanceof GetHistoryCmd) {
      this.cmdWrapper.historyCmds.push(subscriptionCommand)
    }

    this.subscribersCount++
    this.publishCommands()
  }

  public unsubscribe(subscriber: TelemetrySubscriber): void {
    if (unref(this.status) === 'OPEN') {
      const subscriptionCmd = subscriber.subscriptionCommand
      if (subscriptionCmd instanceof SubscriptionCmd) {
        subscriptionCmd.unsubscribe = true
        if (subscriptionCmd.getType() === TelemetryFeature.TIMESERIES)
          this.cmdWrapper.tsSubCmds.push(subscriptionCmd)
      }

      const cmdId = subscriptionCmd.cmdId
      if (cmdId) this.subscribersMap.delete(cmdId)
      this.subscribersCount--
      this.publishCommands()
    }
  }

  public update() {

  }

  processOnMessage(message: WebsocketDataMsg) {
    let subscriber: TelemetrySubscriber | undefined
    if ((message as SubscriptionUpdateMsg).subscriptionId) {
      subscriber = this.subscribersMap.get((message as SubscriptionUpdateMsg).subscriptionId)
      if (subscriber)
        subscriber.onData(new SubscriptionUpdate(message as SubscriptionUpdateMsg))
    }
  }
}