src/orders/ingestion/ai-extractor/claude.extractor.ts
Methods |
|
constructor()
|
| Async extractOrders |
extractOrders(rawText: string, clientName?: string)
|
|
Extract orders from raw spreadsheet text. For large files: splits into chunks, processes in parallel, merges results. Handles 100,000+ rows.
Returns :
Promise<ExtractionResult>
|
import Anthropic from '@anthropic-ai/sdk';
import { Logger } from '@nestjs/common';
export interface ExtractedOrder {
origin_address: string | null;
destination_address: string | null;
origin_contact_name: string | null;
origin_contact_phone: string | null;
dest_contact_name: string | null;
dest_contact_phone: string | null;
weight_kg: number | null;
volume_cbm: number | null;
pieces: number | null;
commodity: string | null;
vehicle_type: string | null;
pickup_date: string | null;
delivery_date: string | null;
special_instructions: string | null;
priority: string;
reference_number: string | null;
[key: string]: unknown;
}
export interface ExtractionSummary {
total_orders: number;
total_weight_kg: number | null;
unique_origins: string[];
unique_destinations: string[];
warnings: string[];
[key: string]: unknown;
}
export interface ExtractionResult {
orders: ExtractedOrder[];
summary: ExtractionSummary;
}
export class ClaudeExtractor {
private readonly logger = new Logger(ClaudeExtractor.name);
private client: Anthropic | null = null;
private readonly MAX_CHUNK_CHARS = 25000; // ~6K tokens per chunk
private readonly MAX_CONCURRENT = 5;
constructor() {
const apiKey = process.env.ANTHROPIC_API_KEY;
if (apiKey && apiKey !== 'sk-ant-placeholder' && apiKey.length > 10) {
this.client = new Anthropic({ apiKey });
this.logger.log('Claude AI extractor initialized');
} else {
this.logger.warn('No Anthropic API key — AI extraction unavailable');
}
}
/**
* Extract orders from raw spreadsheet text.
* For large files: splits into chunks, processes in parallel, merges results.
* Handles 100,000+ rows.
*/
async extractOrders(
rawText: string,
clientName?: string,
): Promise<ExtractionResult> {
if (!this.client) {
this.logger.warn('Claude not available, using heuristic parser');
return this.heuristicExtract(rawText);
}
// Split into chunks if the text is large
const chunks = this.splitIntoChunks(rawText);
this.logger.log(`Processing ${chunks.length} chunk(s) for "${clientName || 'unknown client'}"`);
if (chunks.length === 1) {
// Small file — single call
return this.extractChunk(chunks[0], clientName, '1/1').catch(err => {
this.logger.error(`Single chunk extraction failed: ${err.message}, using heuristic`);
return this.heuristicExtract(chunks[0]);
});
}
// Large file — parallel chunked extraction
const allOrders: ExtractedOrder[] = [];
const allWarnings: string[] = [];
// Process in batches of MAX_CONCURRENT
for (let i = 0; i < chunks.length; i += this.MAX_CONCURRENT) {
const batch = chunks.slice(i, i + this.MAX_CONCURRENT);
const promises = batch.map((chunk, j) =>
this.extractChunk(chunk, clientName, `${i + j + 1}/${chunks.length}`)
.catch(err => {
this.logger.error(`Chunk ${i + j + 1} failed: ${err.message}`);
allWarnings.push(`Chunk ${i + j + 1} failed: ${err.message}`);
return { orders: [], summary: { total_orders: 0, total_weight_kg: null, unique_origins: [], unique_destinations: [], warnings: [`Failed: ${err.message}`] } } as ExtractionResult;
})
);
const results = await Promise.all(promises);
for (const result of results) {
allOrders.push(...result.orders);
if (result.summary.warnings) {
allWarnings.push(...result.summary.warnings);
}
}
this.logger.log(`Batch complete: ${allOrders.length} orders so far`);
}
// Merge summaries
const origins = new Set<string>();
const destinations = new Set<string>();
let totalWeight = 0;
let hasWeight = false;
for (const o of allOrders) {
if (o.origin_address) origins.add(o.origin_address);
if (o.destination_address) destinations.add(o.destination_address);
if (o.weight_kg) { totalWeight += o.weight_kg; hasWeight = true; }
}
return {
orders: allOrders,
summary: {
total_orders: allOrders.length,
total_weight_kg: hasWeight ? totalWeight : null,
unique_origins: Array.from(origins),
unique_destinations: Array.from(destinations),
warnings: allWarnings,
chunks_processed: chunks.length,
},
};
}
/**
* Split raw text into chunks that fit within Claude's context window.
* Keeps sheet headers with each chunk so Claude understands the format.
*/
private splitIntoChunks(rawText: string): string[] {
if (rawText.length <= this.MAX_CHUNK_CHARS) {
return [rawText];
}
const lines = rawText.split('\n');
const chunks: string[] = [];
let currentChunk = '';
let currentHeader = '';
for (const line of lines) {
// Detect sheet headers
if (line.startsWith('=== Sheet:')) {
// If we have accumulated content, save the chunk
if (currentChunk.length > this.MAX_CHUNK_CHARS * 0.8) {
chunks.push(currentChunk);
currentChunk = currentHeader + '\n'; // Start new chunk with the header context
}
currentHeader = line;
}
currentChunk += line + '\n';
// If chunk is getting too large, split it
if (currentChunk.length > this.MAX_CHUNK_CHARS) {
chunks.push(currentChunk);
// Carry over the header context for the next chunk
currentChunk = currentHeader ? currentHeader + '\n' : '';
}
}
if (currentChunk.trim()) {
chunks.push(currentChunk);
}
return chunks;
}
/**
* Extract orders from a single chunk of text.
*/
private async extractChunk(
chunkText: string,
clientName?: string,
chunkLabel?: string,
): Promise<ExtractionResult> {
const prompt = `You are a logistics data extraction specialist. Extract ALL transport orders from this spreadsheet data.
CRITICAL RULES:
1. This may be one chunk of a larger file. Extract EVERY order row you find.
2. SKIP rows that are clearly headers, section labels, subtotals, or notes (e.g., "SPILLS", "COAST BEER", "TOTALS", "PLAN", "POST CUT-OFF", "PACKED" as a section header).
3. A valid order has at minimum a distributor/destination name AND some identifying number (LPO, order number, delivery number).
4. Columns may be in ANY order. Common patterns:
- DATE OF ORDER CREATION, PLANT, DISTRIBUTOR, LPO NO., BEER_ORDER, UDV_ORDER, Pallets, DELNO., QTY BEER, QTY UDV, TRUCK ALLOCATION, STATUS
- Or generic: Order#, From, To, Weight, Quantity, Vehicle, Date
5. Dates in various formats (DD.MM.YYYY, DD/MM/YYYY, YYYY-MM-DD, "March 15, 2026") — normalize to ISO 8601.
6. Weight may be in kg, tons, or tonnes — convert to kg.
7. If "PACKED" appears as a STATUS value (not section header), set priority to LOW.
8. Extract EVERY single data row. Do not skip or summarize.
For EACH order extract these fields (use null if not found):
- reference_number: LPO/PO/order number
- origin_address: Plant/origin/from location
- destination_address: Distributor/destination/to location
- pieces: Quantity (QTY BEER + QTY UDV, or generic quantity)
- commodity: Type of goods
- weight_kg: Weight in kg (null if not provided)
- pickup_date: Order/pickup date in ISO 8601
- delivery_date: Circulation/delivery date in ISO 8601
- special_instructions: Truck plate, palletization notes
- priority: NORMAL or LOW (LOW if packed)
- truck_plate: Vehicle/truck allocation
- status: LOADED, PACKED, empty, etc.
- lpo_number, beer_order_number, udv_order_number, delivery_number_beer, delivery_number_udv, qty_beer, qty_udv, pallets
- plant, distributor, sheet_date
${chunkLabel ? `This is chunk ${chunkLabel} of the file.` : ''}
Client: ${clientName || 'Unknown'}
Respond ONLY with valid JSON (no markdown, no explanation):
{"orders":[{...}],"summary":{"total_orders":N,"warnings":[]}}
DATA:
${chunkText}`;
const response = await this.client!.messages.create({
model: process.env.ANTHROPIC_MODEL || 'claude-sonnet-4-20250514',
max_tokens: 16384,
messages: [{ role: 'user', content: prompt }],
});
const text = response.content[0].type === 'text' ? response.content[0].text : '';
// Strip markdown code fences (```json ... ``` or ``` ... ```)
const stripped = text
.replace(/^```(?:json)?\s*/i, '')
.replace(/\s*```\s*$/i, '')
.trim();
// Try to extract the outermost JSON object
const jsonMatch = stripped.match(/\{[\s\S]*\}/);
if (!jsonMatch) {
this.logger.warn(`Chunk ${chunkLabel}: no JSON found in response, using heuristic fallback`);
return this.heuristicExtract(chunkText);
}
try {
const result = JSON.parse(jsonMatch[0]) as ExtractionResult;
if (!result.orders) result.orders = [];
if (!result.summary) result.summary = { total_orders: result.orders.length, total_weight_kg: null, unique_origins: [], unique_destinations: [], warnings: [] };
this.logger.log(`Chunk ${chunkLabel}: extracted ${result.orders.length} orders`);
return result;
} catch (err) {
this.logger.warn(`Chunk ${chunkLabel}: JSON parse failed (${(err as Error).message}), using heuristic fallback`);
return this.heuristicExtract(chunkText);
}
}
/**
* Heuristic fallback when Claude is not available.
* Maps columns by pattern matching on headers.
*/
private heuristicExtract(rawText: string): ExtractionResult {
this.logger.log('Using heuristic parser (no AI)');
const lines = rawText.split('\n');
const orders: ExtractedOrder[] = [];
let headers: string[] = [];
let headerFound = false;
for (const line of lines) {
if (line.startsWith('===')) continue;
const cells = line.split(',').map(c => c.trim().replace(/^"|"$/g, ''));
if (!headerFound) {
// Look for header row — has multiple known column names
const knownPatterns = [/distributor/i, /lpo/i, /order/i, /plant/i, /qty/i, /truck/i, /date/i, /origin/i, /from/i, /to/i, /weight/i];
const matches = knownPatterns.filter(p => cells.some(c => p.test(c)));
if (matches.length >= 3) {
headers = cells;
headerFound = true;
continue;
}
continue;
}
// Data row — map to fields using header positions
if (cells.length < 3) continue;
const getValue = (patterns: RegExp[]): string => {
for (const p of patterns) {
const idx = headers.findIndex(h => p.test(h));
if (idx >= 0 && idx < cells.length && cells[idx]) return cells[idx];
}
return '';
};
const dest = getValue([/distributor/i, /destination/i, /to/i, /consignee/i]);
const lpo = getValue([/lpo/i, /order.*no/i, /ref/i, /po.*no/i]);
// Skip section headers
if (!dest || /^(spill|plan|post|eaml|kisumu|coast|nairobi|mountain|lake|packed|western|total)/i.test(dest)) continue;
const qtyBeer = parseFloat(getValue([/qty.*beer/i, /quantity/i, /pieces/i]).replace(/[,\s]/g, '')) || null;
const qtyUdv = parseFloat(getValue([/qty.*udv/i]).replace(/[,\s]/g, '')) || null;
orders.push({
reference_number: lpo || null,
origin_address: getValue([/plant/i, /origin/i, /from/i]) || null,
destination_address: dest || null,
origin_contact_name: null, origin_contact_phone: null,
dest_contact_name: null, dest_contact_phone: null,
weight_kg: parseFloat(getValue([/weight/i, /tonnage/i]).replace(/[,\s]/g, '')) || null,
volume_cbm: null,
pieces: (qtyBeer || 0) + (qtyUdv || 0) || null,
commodity: qtyBeer ? (qtyUdv ? 'Beer & Spirits' : 'Beer') : (qtyUdv ? 'Spirits' : null),
vehicle_type: null,
pickup_date: getValue([/date.*order/i, /pickup/i, /created/i]) || null,
delivery_date: getValue([/circulation/i, /delivery/i]) || null,
special_instructions: getValue([/truck/i, /vehicle/i]) || null,
priority: getValue([/status/i]).toLowerCase() === 'packed' ? 'LOW' : 'NORMAL',
});
}
return {
orders,
summary: {
total_orders: orders.length,
total_weight_kg: null,
unique_origins: [...new Set(orders.map(o => o.origin_address).filter(Boolean) as string[])],
unique_destinations: [...new Set(orders.map(o => o.destination_address).filter(Boolean) as string[])],
warnings: orders.length === 0 ? ['No orders found with heuristic parser'] : ['Using heuristic parser (no AI). Results may be less accurate.'],
},
};
}
}