File

src/tracking/live-telematics-worker.service.ts

Description

Live Telematics Worker.

The companion to GpsSimulatorService. Whereas the simulator fakes positions for orgs in demo mode, this worker fetches REAL positions from the configured telematics provider for orgs in live mode.

Architecture:

  • One global setInterval (30 seconds — most providers throttle below this anyway and we don't want to hammer Mix's API).
  • Each tick, load all orgs and pick the ones with:
    • tracking.mode === 'live'
    • telematics.provider set
    • telematics.lastTestStatus === 'ok' (so we never page Mix with credentials we already know are bad)
  • For each qualifying org, instantiate the right provider adapter, fetch current positions, and reconcile against the org's vehicles by vehicles.telematics_id.
  • Write into the same vehicles.current_lat/lng/last_gps_at columns the simulator uses, so the rest of the platform (tracking page, geofences, dispatch ETA, anomaly detector) doesn't care which side fed it.

Why a separate service from the simulator: clean separation of concerns + the simulator and live worker can fail / restart independently. We never want a Mix outage to break the demo, and we never want a simulator bug to overwrite real GPS data.

Provider coverage today: Mix Telematics (full). Other providers fall through with a single warn-once log so the admin sees it in the dashboard but doesn't get spammed every 30s.

Index

Methods

Constructor

constructor(prisma: PrismaService, trackingGateway: TrackingGateway)
Parameters :
Name Type Optional
prisma PrismaService No
trackingGateway TrackingGateway No

Methods

onModuleDestroy
onModuleDestroy()
Returns : void
onModuleInit
onModuleInit()
Returns : void
import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { PrismaService } from '../prisma/prisma.service';
import { TrackingGateway } from './tracking.gateway';
import {
  MixTelematicsAdapter,
  NormalizedGpsEvent,
} from './telematics/mix-telematics.adapter';

/**
 * Live Telematics Worker.
 *
 * The companion to GpsSimulatorService. Whereas the simulator fakes
 * positions for orgs in `demo` mode, this worker fetches REAL positions
 * from the configured telematics provider for orgs in `live` mode.
 *
 * Architecture:
 *   - One global setInterval (30 seconds — most providers throttle below
 *     this anyway and we don't want to hammer Mix's API).
 *   - Each tick, load all orgs and pick the ones with:
 *       - tracking.mode === 'live'
 *       - telematics.provider set
 *       - telematics.lastTestStatus === 'ok' (so we never page Mix
 *         with credentials we already know are bad)
 *   - For each qualifying org, instantiate the right provider adapter,
 *     fetch current positions, and reconcile against the org's vehicles
 *     by `vehicles.telematics_id`.
 *   - Write into the same `vehicles.current_lat/lng/last_gps_at` columns
 *     the simulator uses, so the rest of the platform (tracking page,
 *     geofences, dispatch ETA, anomaly detector) doesn't care which
 *     side fed it.
 *
 * Why a separate service from the simulator: clean separation of
 * concerns + the simulator and live worker can fail / restart
 * independently. We never want a Mix outage to break the demo, and
 * we never want a simulator bug to overwrite real GPS data.
 *
 * Provider coverage today: Mix Telematics (full). Other providers fall
 * through with a single warn-once log so the admin sees it in the
 * dashboard but doesn't get spammed every 30s.
 */
@Injectable()
export class LiveTelematicsWorkerService implements OnModuleInit, OnModuleDestroy {
  private readonly logger = new Logger(LiveTelematicsWorkerService.name);
  private intervalId: NodeJS.Timeout | null = null;
  private readonly TICK_INTERVAL_MS = 30_000; // 30 seconds
  private warnedProviders = new Set<string>(); // dedupe "not implemented" logs
  // Reentrancy guard — prevents overlapping ticks against the same set
  // of vehicles. With a slow Mix Telematics response, one tick can
  // outlast the 30-second interval; without this guard the next tick
  // would race the first and Postgres would deadlock the row updates.
  private running = false;

  constructor(
    private readonly prisma: PrismaService,
    private readonly trackingGateway: TrackingGateway,
  ) {}

  onModuleInit() {
    const killed = process.env.ENABLE_LIVE_TELEMATICS === 'false';
    if (killed) {
      this.logger.log(
        'Live telematics worker hard-disabled via ENABLE_LIVE_TELEMATICS=false',
      );
      return;
    }
    this.logger.log(
      `Live telematics worker starting (tick=${this.TICK_INTERVAL_MS}ms)`,
    );
    // First tick fires after one interval, not immediately, so the rest
    // of the boot sequence (Prisma, gateway) has time to settle.
    this.intervalId = setInterval(() => this.tick(), this.TICK_INTERVAL_MS);
  }

  onModuleDestroy() {
    if (this.intervalId) {
      clearInterval(this.intervalId);
      this.intervalId = null;
      this.logger.log('Live telematics worker stopped');
    }
  }

  /**
   * Decide whether an org should be polled for live data.
   * Mirrors the demo-mode check in GpsSimulatorService — exactly one
   * of them should return true for any given org, never both.
   */
  private shouldGoLive(settings: any): boolean {
    const requested = settings?.tracking?.mode || 'demo';
    if (requested !== 'live') return false;
    const t = settings?.telematics;
    if (!t || !t.provider) return false;
    const hasCreds = !!(t.apiKey || t.clientSecret || t.password);
    if (!hasCreds) return false;
    if (t.lastTestStatus !== 'ok') return false;
    return true;
  }

  private async tick() {
    if (this.running) return;
    this.running = true;
    try {
      const orgs = await this.prisma.organization.findMany({
        select: { id: true, name: true, settings: true },
      });

      const liveOrgs = orgs.filter(o => this.shouldGoLive(o.settings));
      if (liveOrgs.length === 0) return;

      for (const org of liveOrgs) {
        await this.processOrg(org.id, org.name, org.settings as Record<string, any>);
      }
    } catch (err: any) {
      this.logger.error(`Live worker tick failed: ${err?.message || err}`);
    } finally {
      this.running = false;
    }
  }

  /**
   * Pull positions for one org and reconcile them against this org's
   * vehicles. Errors are caught and logged but never thrown — one bad
   * org should never block the next.
   *
   * Tenant context: like the simulator, this worker runs outside any
   * HTTP request, so we wrap the per-vehicle reads/writes in a Prisma
   * transaction with the RLS GUC set explicitly. organization writes
   * (lastFetchAt etc) are NOT inside the tx — organizations is excluded
   * from RLS so they work on the base client.
   */
  private async processOrg(orgId: string, orgName: string, settings: Record<string, any>) {
    const t = settings.telematics as Record<string, any>;
    const provider = t?.provider as string;

    let positions: NormalizedGpsEvent[] = [];

    try {
      switch (provider) {
        case 'mix_telematics': {
          const adapter = MixTelematicsAdapter.fromOrgSettings(t);
          if (!adapter) {
            this.logger.warn(
              `[${orgName}] Mix Telematics config incomplete — skipping`,
            );
            return;
          }
          positions = await adapter.fetchCurrentPositions();
          break;
        }
        // Other providers are NOT yet wired but the storage + UI work
        // for them, so the admin can save credentials and we'll start
        // pulling as soon as the connector ships.
        case 'geotab':
        case 'samsara':
        case 'webfleet':
        case 'verizon_connect':
        case 'cartrack':
        case 'teltonika':
        case 'other': {
          if (!this.warnedProviders.has(provider)) {
            this.logger.warn(
              `[${orgName}] Provider "${provider}" connector not yet implemented — credentials are saved but no live data pulled.`,
            );
            this.warnedProviders.add(provider);
          }
          return;
        }
        default:
          return;
      }
    } catch (err: any) {
      this.logger.error(
        `[${orgName}] Failed to fetch from ${provider}: ${err?.message || err}`,
      );
      try {
        const next = {
          ...t,
          lastFetchAt: new Date().toISOString(),
          lastFetchStatus: 'failed',
          lastFetchMessage: String(err?.message || err).slice(0, 500),
        };
        await this.prisma.organization.update({
          where: { id: orgId },
          data: { settings: { ...settings, telematics: next } },
        });
      } catch {}
      return;
    }

    if (positions.length === 0) {
      this.logger.debug(`[${orgName}] No positions returned by ${provider}`);
      return;
    }

    let matched = 0;
    let unmatched = 0;

    try {
      await (this.prisma as any).$transaction(
        async (tx: any) => {
          // Set the RLS context for vehicle/gpsEvent writes.
          // Use $queryRaw — set_config is a SELECT.
          await tx.$queryRaw`SELECT set_config('app.current_org_id', ${orgId}, true)`;

        const externalIds = positions.map(p => p.vehicleExternalId);
        const vehicles = await tx.vehicle.findMany({
          where: { telematicsId: { in: externalIds } },
          select: { id: true, telematicsId: true, unitNumber: true },
        });
        const byExternalId = new Map(vehicles.map((v: any) => [v.telematicsId!, v]));

          for (const pos of positions) {
            const v = byExternalId.get(pos.vehicleExternalId) as any;
            if (!v) {
              unmatched++;
              continue;
            }
            try {
              await tx.vehicle.update({
                where: { id: v.id },
                data: {
                  currentLat: pos.lat,
                  currentLng: pos.lng,
                  currentSpeed: pos.speed,
                  lastGpsAt: pos.timestamp,
                },
              });
              await tx.gpsEvent.create({
                data: {
                  vehicleId: v.id,
                  lat: pos.lat,
                  lng: pos.lng,
                  speed: pos.speed,
                  heading: pos.heading,
                  timestamp: pos.timestamp,
                  source: provider,
                },
              });
              this.trackingGateway.broadcastPosition(v.id, {
                lat: pos.lat,
                lng: pos.lng,
                speed: pos.speed,
                heading: pos.heading,
                timestamp: pos.timestamp,
                unitNumber: v.unitNumber,
              });
              matched++;
            } catch (err: any) {
              this.logger.error(
                `[${orgName}] Failed to update vehicle ${v.unitNumber}: ${err?.message || err}`,
              );
            }
          }
        },
        // Generous timeout for the reconcile loop — see simulator notes.
        { timeout: 60_000, maxWait: 30_000 },
      );
    } catch (err: any) {
      this.logger.error(`[${orgName}] Reconcile tx failed: ${err?.message || err}`);
    }

    this.logger.log(
      `[${orgName}] live ${provider}: ${matched} matched, ${unmatched} unmatched`,
    );

    // Persist successful fetch state for the Settings UI. organization
    // table is RLS-excluded so this runs outside the tx.
    try {
      const next = {
        ...t,
        lastFetchAt: new Date().toISOString(),
        lastFetchStatus: 'ok',
        lastFetchMessage: `${matched} positions reconciled, ${unmatched} unmatched`,
        lastFetchMatched: matched,
        lastFetchUnmatched: unmatched,
      };
      await this.prisma.organization.update({
        where: { id: orgId },
        data: { settings: { ...settings, telematics: next } },
      });
    } catch {}
  }
}

results matching ""

    No results matching ""