src/orders/email-listener/email-listener.service.ts
Methods |
|
constructor(prisma: PrismaService, ingestionService: IngestionService)
|
|||||||||
|
Parameters :
|
| onModuleDestroy |
onModuleDestroy()
|
|
Returns :
void
|
| Async onModuleInit |
onModuleInit()
|
|
Returns :
any
|
import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
import { PrismaService } from '../../prisma/prisma.service';
import { IngestionService } from '../ingestion/ingestion.service';
@Injectable()
export class EmailListenerService implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(EmailListenerService.name);
private pollInterval: NodeJS.Timeout | null = null;
private imapClient: any = null;
constructor(
private prisma: PrismaService,
private ingestionService: IngestionService,
) {}
async onModuleInit() {
if (!process.env.IMAP_HOST || !process.env.IMAP_USER) {
this.logger.warn('IMAP not configured — email ingestion disabled. Set IMAP_HOST, IMAP_USER, IMAP_PASS to enable.');
return;
}
this.logger.log(`Email listener starting for ${process.env.IMAP_USER}...`);
await this.startPolling();
}
onModuleDestroy() {
if (this.pollInterval) clearInterval(this.pollInterval);
this.disconnect();
}
private async startPolling() {
// Poll every 60 seconds
const intervalMs = parseInt(process.env.IMAP_POLL_INTERVAL || '60000');
// Do an initial check
await this.checkForNewEmails().catch(err => this.logger.error(`Initial email check failed: ${err.message}`));
this.pollInterval = setInterval(async () => {
await this.checkForNewEmails().catch(err => this.logger.error(`Email check failed: ${err.message}`));
}, intervalMs);
this.logger.log(`Email polling started (every ${intervalMs / 1000}s)`);
}
private async connect() {
if (this.imapClient) return this.imapClient;
const { ImapFlow } = await import('imapflow');
this.imapClient = new ImapFlow({
host: process.env.IMAP_HOST!,
port: parseInt(process.env.IMAP_PORT || '993'),
secure: true,
auth: {
user: process.env.IMAP_USER!,
pass: process.env.IMAP_PASS!,
},
logger: false,
});
await this.imapClient.connect();
this.logger.log('IMAP connected');
return this.imapClient;
}
private async disconnect() {
if (this.imapClient) {
try { await this.imapClient.logout(); } catch {}
this.imapClient = null;
}
}
private async checkForNewEmails() {
const client = await this.connect();
const lock = await client.getMailboxLock('INBOX');
try {
// Fetch unseen messages
const messages = client.fetch({ seen: false }, {
envelope: true,
bodyStructure: true,
source: true,
});
for await (const msg of messages) {
try {
await this.processMessage(msg, client);
// Mark as seen
await client.messageFlagsAdd(msg.uid, ['\\Seen'], { uid: true });
} catch (err: any) {
this.logger.error(`Failed to process message ${msg.uid}: ${err.message}`);
}
}
} finally {
lock.release();
}
}
private async processMessage(msg: any, client: any) {
const from = msg.envelope?.from?.[0]?.address || 'unknown';
const subject = msg.envelope?.subject || 'No subject';
this.logger.log(`Processing email from ${from}: "${subject}"`);
// Extract attachments from body structure
const attachments = this.findAttachments(msg.bodyStructure);
if (attachments.length === 0) {
this.logger.log(`No attachments found in email from ${from}`);
return;
}
for (const attachment of attachments) {
// Only process Excel/CSV files
const ext = attachment.filename?.toLowerCase() || '';
if (!ext.endsWith('.xlsx') && !ext.endsWith('.xls') && !ext.endsWith('.csv')) {
continue;
}
this.logger.log(`Found attachment: ${attachment.filename} (${attachment.size} bytes)`);
// Download the attachment
const { content } = await client.download(msg.uid, attachment.part, { uid: true });
const chunks: Buffer[] = [];
for await (const chunk of content) {
chunks.push(chunk);
}
const buffer = Buffer.concat(chunks);
// Try to match sender to a client
const matchedClient = await this.prisma.client.findFirst({
where: { contactEmail: from },
});
// Get default org
const org = await this.prisma.organization.findFirst();
if (!org) {
this.logger.error('No organization found — cannot process email');
return;
}
// Create a mock file object for the ingestion service
const mockFile = {
originalname: attachment.filename || 'email-attachment.xlsx',
buffer,
mimetype: attachment.contentType || 'application/octet-stream',
size: buffer.length,
} as any;
// Process through the ingestion pipeline
const ingestion = await this.ingestionService.processUpload(
mockFile,
matchedClient?.id || null,
org.id,
'email-listener', // system user
);
this.logger.log(`Email ingestion created: ${ingestion.ingestion.id} (${ingestion.ingestion.status}) from ${from}`);
}
}
private findAttachments(structure: any, path: string = ''): Array<{ part: string; filename: string; contentType: string; size: number }> {
const attachments: Array<{ part: string; filename: string; contentType: string; size: number }> = [];
if (!structure) return attachments;
if (structure.disposition === 'attachment' || structure.disposition === 'inline') {
const filename = structure.dispositionParameters?.filename || structure.parameters?.name || '';
if (filename) {
attachments.push({
part: path || '1',
filename,
contentType: `${structure.type}/${structure.subtype}`,
size: structure.size || 0,
});
}
}
if (structure.childNodes) {
for (let i = 0; i < structure.childNodes.length; i++) {
const childPath = path ? `${path}.${i + 1}` : `${i + 1}`;
attachments.push(...this.findAttachments(structure.childNodes[i], childPath));
}
}
return attachments;
}
}