createDrain()
Create a Drain instance for incremental or streaming log processing
Create a Drain instance for incremental or streaming log processing. Use this for advanced scenarios where you need fine-grained control over the compression pipeline.
Signature
function createDrain(options?: DrainOptions): DrainParameters
options
- Type:
DrainOptions - Required: No
Configuration options for the Drain algorithm.
options.depth
- Type:
number - Default:
4
Parse tree depth. Higher values create more specific templates.
options.simThreshold
- Type:
number - Default:
0.4 - Range:
0.0to1.0
Similarity threshold for template matching.
options.maxChildren
- Type:
number - Default:
100
Maximum children per parse tree node.
options.maxClusters
- Type:
number - Default:
1000
Maximum total templates allowed.
options.maxSamples
- Type:
number - Default:
3
Maximum sample variables per template.
options.preprocessing
- Type:
ParsingStrategy - Default: Built-in strategy
Custom preprocessing strategy.
options.onProgress
- Type:
ProgressCallback - Default:
undefined
Progress callback for monitoring long-running operations.
Return Value
Returns a Drain instance with the following methods:
process(line: string): void
Process a single log line and add it to the appropriate cluster.
drain.process('ERROR Connection failed');addLogLines(lines: string[]): void
Process multiple log lines at once.
drain.addLogLines([
'ERROR Connection failed',
'INFO Request completed',
]);getClusters(): LogCluster[]
Get all discovered clusters (templates).
const clusters = drain.getClusters();
clusters.forEach(cluster => {
console.log(cluster.getPattern());
});getResult(format?: OutputFormat): CompressionResult
Get compression results in the specified format.
const result = drain.getResult('detailed');
console.log(result.formatted);Examples
Incremental Processing
import { createDrain } from 'logpare';
const drain = createDrain({
depth: 4,
simThreshold: 0.4,
});
// Process logs one at a time
drain.process('ERROR Connection to 192.168.1.1 failed');
drain.process('ERROR Connection to 192.168.1.2 failed');
drain.process('INFO Request abc123 completed');
// Get results
const result = drain.getResult('summary');
console.log(result.formatted);Streaming Processing
import { createDrain } from 'logpare';
import { createReadStream } from 'fs';
import { createInterface } from 'readline';
const drain = createDrain({
depth: 5,
simThreshold: 0.4,
maxClusters: 500,
});
const rl = createInterface({
input: createReadStream('/var/log/syslog'),
crlfDelay: Infinity,
});
let lineCount = 0;
rl.on('line', (line) => {
drain.process(line);
lineCount++;
if (lineCount % 1000 === 0) {
console.log(`Processed ${lineCount} lines...`);
}
});
rl.on('close', () => {
const result = drain.getResult('detailed');
console.log(result.formatted);
});Batch Processing with Progress
import { createDrain } from 'logpare';
const drain = createDrain({
onProgress: (event) => {
console.log(`${event.currentPhase}: ${event.processedLines} lines`);
if (event.percentComplete !== undefined) {
console.log(`Progress: ${event.percentComplete.toFixed(1)}%`);
}
}
});
// Process in batches
const batchSize = 1000;
for (let i = 0; i < logs.length; i += batchSize) {
const batch = logs.slice(i, i + batchSize);
drain.addLogLines(batch);
}
const result = drain.getResult();Custom Preprocessing
import { createDrain, defineStrategy, WILDCARD } from 'logpare';
const customStrategy = defineStrategy({
preprocess(line: string): string {
// Custom masking for domain-specific patterns
return line
.replace(/order-[A-Z0-9]{8}/g, WILDCARD)
.replace(/user_\d+/g, WILDCARD);
},
tokenize(line: string): string[] {
// Custom tokenization
return line.split(/\s+/).filter(Boolean);
},
getSimThreshold(depth: number): number {
// Depth-dependent threshold
return depth <= 2 ? 0.3 : 0.4;
}
});
const drain = createDrain({
preprocessing: customStrategy,
depth: 5,
});
drain.addLogLines(logs);
const result = drain.getResult();Accessing Clusters Directly
import { createDrain } from 'logpare';
const drain = createDrain();
drain.addLogLines(logs);
const clusters = drain.getClusters();
// Sort by occurrence
const sorted = clusters.sort((a, b) => b.size() - a.size());
// Get top 10 patterns
const top10 = sorted.slice(0, 10);
top10.forEach((cluster, i) => {
console.log(`${i + 1}. [${cluster.size()}x] ${cluster.getPattern()}`);
// Access metadata
const metadata = cluster.getMetadata();
console.log(` Severity: ${metadata.severity}`);
console.log(` URLs: ${metadata.urlSamples.join(', ')}`);
});Real-time Monitoring
import { createDrain } from 'logpare';
import { spawn } from 'child_process';
const drain = createDrain({
depth: 4,
simThreshold: 0.3,
});
// Monitor live logs
const tail = spawn('tail', ['-f', '/var/log/app.log']);
tail.stdout.on('data', (data) => {
const lines = data.toString().split('\n').filter(Boolean);
drain.addLogLines(lines);
});
// Report every 10 seconds
setInterval(() => {
const clusters = drain.getClusters();
console.log(`\n=== Current State: ${clusters.length} templates ===`);
const errors = clusters.filter(c =>
c.getMetadata().severity === 'error'
);
if (errors.length > 0) {
console.log('Error templates:');
errors.slice(0, 5).forEach(c => {
console.log(` [${c.size()}x] ${c.getPattern()}`);
});
}
}, 10000);When to Use
Use createDrain() when:
- Processing very large log files that don't fit in memory
- Implementing streaming log analysis
- Building real-time log monitoring systems
- You need to process logs incrementally
- You want direct access to internal clusters
Use compress() or compressText() when:
- Processing complete log files that fit in memory
- You want a simple, one-step compression
- You don't need incremental updates
See Also
- compress() - Simple compression API
- compressText() - Text compression
- Custom Preprocessing - Define custom strategies
- Types Reference - TypeScript interfaces