File

src/orders/email-listener/email-listener.service.ts

Index

Methods

Constructor

constructor(prisma: PrismaService, ingestionService: IngestionService)
Parameters :
Name Type Optional
prisma PrismaService No
ingestionService IngestionService No

Methods

onModuleDestroy
onModuleDestroy()
Returns : void
Async onModuleInit
onModuleInit()
Returns : any
import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { PrismaService } from '../../prisma/prisma.service';
import { IngestionService } from '../ingestion/ingestion.service';

@Injectable()
export class EmailListenerService implements OnModuleInit, OnModuleDestroy {
  private readonly logger = new Logger(EmailListenerService.name);
  private pollInterval: NodeJS.Timeout | null = null;
  private imapClient: any = null;

  constructor(
    private prisma: PrismaService,
    private ingestionService: IngestionService,
  ) {}

  async onModuleInit() {
    if (!process.env.IMAP_HOST || !process.env.IMAP_USER) {
      this.logger.warn('IMAP not configured — email ingestion disabled. Set IMAP_HOST, IMAP_USER, IMAP_PASS to enable.');
      return;
    }

    this.logger.log(`Email listener starting for ${process.env.IMAP_USER}...`);
    await this.startPolling();
  }

  onModuleDestroy() {
    if (this.pollInterval) clearInterval(this.pollInterval);
    this.disconnect();
  }

  private async startPolling() {
    // Poll every 60 seconds
    const intervalMs = parseInt(process.env.IMAP_POLL_INTERVAL || '60000');

    // Do an initial check
    await this.checkForNewEmails().catch(err => this.logger.error(`Initial email check failed: ${err.message}`));

    this.pollInterval = setInterval(async () => {
      await this.checkForNewEmails().catch(err => this.logger.error(`Email check failed: ${err.message}`));
    }, intervalMs);

    this.logger.log(`Email polling started (every ${intervalMs / 1000}s)`);
  }

  private async connect() {
    if (this.imapClient) return this.imapClient;

    const { ImapFlow } = await import('imapflow');
    this.imapClient = new ImapFlow({
      host: process.env.IMAP_HOST!,
      port: parseInt(process.env.IMAP_PORT || '993'),
      secure: true,
      auth: {
        user: process.env.IMAP_USER!,
        pass: process.env.IMAP_PASS!,
      },
      logger: false,
    });

    await this.imapClient.connect();
    this.logger.log('IMAP connected');
    return this.imapClient;
  }

  private async disconnect() {
    if (this.imapClient) {
      try { await this.imapClient.logout(); } catch {}
      this.imapClient = null;
    }
  }

  private async checkForNewEmails() {
    const client = await this.connect();
    const lock = await client.getMailboxLock('INBOX');

    try {
      // Fetch unseen messages
      const messages = client.fetch({ seen: false }, {
        envelope: true,
        bodyStructure: true,
        source: true,
      });

      for await (const msg of messages) {
        try {
          await this.processMessage(msg, client);
          // Mark as seen
          await client.messageFlagsAdd(msg.uid, ['\\Seen'], { uid: true });
        } catch (err: any) {
          this.logger.error(`Failed to process message ${msg.uid}: ${err.message}`);
        }
      }
    } finally {
      lock.release();
    }
  }

  private async processMessage(msg: any, client: any) {
    const from = msg.envelope?.from?.[0]?.address || 'unknown';
    const subject = msg.envelope?.subject || 'No subject';

    this.logger.log(`Processing email from ${from}: "${subject}"`);

    // Extract attachments from body structure
    const attachments = this.findAttachments(msg.bodyStructure);

    if (attachments.length === 0) {
      this.logger.log(`No attachments found in email from ${from}`);
      return;
    }

    for (const attachment of attachments) {
      // Only process Excel/CSV files
      const ext = attachment.filename?.toLowerCase() || '';
      if (!ext.endsWith('.xlsx') && !ext.endsWith('.xls') && !ext.endsWith('.csv')) {
        continue;
      }

      this.logger.log(`Found attachment: ${attachment.filename} (${attachment.size} bytes)`);

      // Download the attachment
      const { content } = await client.download(msg.uid, attachment.part, { uid: true });
      const chunks: Buffer[] = [];
      for await (const chunk of content) {
        chunks.push(chunk);
      }
      const buffer = Buffer.concat(chunks);

      // Try to match sender to a client
      const matchedClient = await this.prisma.client.findFirst({
        where: { contactEmail: from },
      });

      // Get default org
      const org = await this.prisma.organization.findFirst();
      if (!org) {
        this.logger.error('No organization found — cannot process email');
        return;
      }

      // Create a mock file object for the ingestion service
      const mockFile = {
        originalname: attachment.filename || 'email-attachment.xlsx',
        buffer,
        mimetype: attachment.contentType || 'application/octet-stream',
        size: buffer.length,
      } as any;

      // Process through the ingestion pipeline
      const ingestion = await this.ingestionService.processUpload(
        mockFile,
        matchedClient?.id || null,
        org.id,
        'email-listener', // system user
      );

      this.logger.log(`Email ingestion created: ${ingestion.ingestion.id} (${ingestion.ingestion.status}) from ${from}`);
    }
  }

  private findAttachments(structure: any, path: string = ''): Array<{ part: string; filename: string; contentType: string; size: number }> {
    const attachments: Array<{ part: string; filename: string; contentType: string; size: number }> = [];

    if (!structure) return attachments;

    if (structure.disposition === 'attachment' || structure.disposition === 'inline') {
      const filename = structure.dispositionParameters?.filename || structure.parameters?.name || '';
      if (filename) {
        attachments.push({
          part: path || '1',
          filename,
          contentType: `${structure.type}/${structure.subtype}`,
          size: structure.size || 0,
        });
      }
    }

    if (structure.childNodes) {
      for (let i = 0; i < structure.childNodes.length; i++) {
        const childPath = path ? `${path}.${i + 1}` : `${i + 1}`;
        attachments.push(...this.findAttachments(structure.childNodes[i], childPath));
      }
    }

    return attachments;
  }
}

results matching ""

    No results matching ""