logpare

createDrain()

Create a Drain instance for incremental or streaming log processing

Create a Drain instance for incremental or streaming log processing. Use this for advanced scenarios where you need fine-grained control over the compression pipeline.

Signature

function createDrain(options?: DrainOptions): Drain

Parameters

options

  • Type: DrainOptions
  • Required: No

Configuration options for the Drain algorithm.

options.depth

  • Type: number
  • Default: 4

Parse tree depth. Higher values create more specific templates.

options.simThreshold

  • Type: number
  • Default: 0.4
  • Range: 0.0 to 1.0

Similarity threshold for template matching.

options.maxChildren

  • Type: number
  • Default: 100

Maximum children per parse tree node.

options.maxClusters

  • Type: number
  • Default: 1000

Maximum total templates allowed.

options.maxSamples

  • Type: number
  • Default: 3

Maximum sample variables per template.

options.preprocessing

  • Type: ParsingStrategy
  • Default: Built-in strategy

Custom preprocessing strategy.

options.onProgress

  • Type: ProgressCallback
  • Default: undefined

Progress callback for monitoring long-running operations.

Return Value

Returns a Drain instance with the following methods:

process(line: string): void

Process a single log line and add it to the appropriate cluster.

drain.process('ERROR Connection failed');

addLogLines(lines: string[]): void

Process multiple log lines at once.

drain.addLogLines([
  'ERROR Connection failed',
  'INFO Request completed',
]);

getClusters(): LogCluster[]

Get all discovered clusters (templates).

const clusters = drain.getClusters();
clusters.forEach(cluster => {
  console.log(cluster.getPattern());
});

getResult(format?: OutputFormat): CompressionResult

Get compression results in the specified format.

const result = drain.getResult('detailed');
console.log(result.formatted);

Examples

Incremental Processing

import { createDrain } from 'logpare';
 
const drain = createDrain({
  depth: 4,
  simThreshold: 0.4,
});
 
// Process logs one at a time
drain.process('ERROR Connection to 192.168.1.1 failed');
drain.process('ERROR Connection to 192.168.1.2 failed');
drain.process('INFO Request abc123 completed');
 
// Get results
const result = drain.getResult('summary');
console.log(result.formatted);

Streaming Processing

import { createDrain } from 'logpare';
import { createReadStream } from 'fs';
import { createInterface } from 'readline';
 
const drain = createDrain({
  depth: 5,
  simThreshold: 0.4,
  maxClusters: 500,
});
 
const rl = createInterface({
  input: createReadStream('/var/log/syslog'),
  crlfDelay: Infinity,
});
 
let lineCount = 0;
 
rl.on('line', (line) => {
  drain.process(line);
  lineCount++;
 
  if (lineCount % 1000 === 0) {
    console.log(`Processed ${lineCount} lines...`);
  }
});
 
rl.on('close', () => {
  const result = drain.getResult('detailed');
  console.log(result.formatted);
});

Batch Processing with Progress

import { createDrain } from 'logpare';
 
const drain = createDrain({
  onProgress: (event) => {
    console.log(`${event.currentPhase}: ${event.processedLines} lines`);
    if (event.percentComplete !== undefined) {
      console.log(`Progress: ${event.percentComplete.toFixed(1)}%`);
    }
  }
});
 
// Process in batches
const batchSize = 1000;
for (let i = 0; i < logs.length; i += batchSize) {
  const batch = logs.slice(i, i + batchSize);
  drain.addLogLines(batch);
}
 
const result = drain.getResult();

Custom Preprocessing

import { createDrain, defineStrategy, WILDCARD } from 'logpare';
 
const customStrategy = defineStrategy({
  preprocess(line: string): string {
    // Custom masking for domain-specific patterns
    return line
      .replace(/order-[A-Z0-9]{8}/g, WILDCARD)
      .replace(/user_\d+/g, WILDCARD);
  },
 
  tokenize(line: string): string[] {
    // Custom tokenization
    return line.split(/\s+/).filter(Boolean);
  },
 
  getSimThreshold(depth: number): number {
    // Depth-dependent threshold
    return depth <= 2 ? 0.3 : 0.4;
  }
});
 
const drain = createDrain({
  preprocessing: customStrategy,
  depth: 5,
});
 
drain.addLogLines(logs);
const result = drain.getResult();

Accessing Clusters Directly

import { createDrain } from 'logpare';
 
const drain = createDrain();
drain.addLogLines(logs);
 
const clusters = drain.getClusters();
 
// Sort by occurrence
const sorted = clusters.sort((a, b) => b.size() - a.size());
 
// Get top 10 patterns
const top10 = sorted.slice(0, 10);
 
top10.forEach((cluster, i) => {
  console.log(`${i + 1}. [${cluster.size()}x] ${cluster.getPattern()}`);
 
  // Access metadata
  const metadata = cluster.getMetadata();
  console.log(`   Severity: ${metadata.severity}`);
  console.log(`   URLs: ${metadata.urlSamples.join(', ')}`);
});

Real-time Monitoring

import { createDrain } from 'logpare';
import { spawn } from 'child_process';
 
const drain = createDrain({
  depth: 4,
  simThreshold: 0.3,
});
 
// Monitor live logs
const tail = spawn('tail', ['-f', '/var/log/app.log']);
 
tail.stdout.on('data', (data) => {
  const lines = data.toString().split('\n').filter(Boolean);
  drain.addLogLines(lines);
});
 
// Report every 10 seconds
setInterval(() => {
  const clusters = drain.getClusters();
  console.log(`\n=== Current State: ${clusters.length} templates ===`);
 
  const errors = clusters.filter(c =>
    c.getMetadata().severity === 'error'
  );
 
  if (errors.length > 0) {
    console.log('Error templates:');
    errors.slice(0, 5).forEach(c => {
      console.log(`  [${c.size()}x] ${c.getPattern()}`);
    });
  }
}, 10000);

When to Use

Use createDrain() when:

  • Processing very large log files that don't fit in memory
  • Implementing streaming log analysis
  • Building real-time log monitoring systems
  • You need to process logs incrementally
  • You want direct access to internal clusters

Use compress() or compressText() when:

  • Processing complete log files that fit in memory
  • You want a simple, one-step compression
  • You don't need incremental updates

See Also