cenglu

Data Flow

How log messages flow through Cenglu from call to output

Data Flow

Understanding how a log message flows through Cenglu helps you optimize performance, debug issues, and extend functionality. This guide walks you through each stage of the logging pipeline.

Quick Overview: Logger call → Build record → Plugin processing → Format → Write to transports

Visual Overview

The 5 Stages

Stage 1: Logger Call

When you call a logging method, the logger parses your arguments:

// Different call patterns
logger.info("User logged in", { userId: 123 });
logger.error("Failed to connect", error);
logger.warn("Slow query", { duration: 1500, query: "SELECT *" });

// All become: logger.log(level, msg, context?, error?)

What happens:

  • Extract message string
  • Extract context object (optional)
  • Extract error object (optional)
  • Check if level is enabled (fast path)
// Simplified internal flow
public info(msg: string, contextOrError?: Bindings | Error): void {
  if (!this.isLevelEnabled("info")) return; // Fast exit
  this.log("info", msg, contextOrError);
}

Stage 2: Build LogRecord

The logger constructs a canonical LogRecord object containing all log data:

// LogRecord structure
{
  time: 1700000000000,           // Timestamp (ms)
  level: "info",                 // Log level
  msg: "User logged in",         // Message (redacted if needed)
  context: {                     // Merged context
    userId: 123,
    service: "auth-service",
    env: "production",
    requestId: "abc-123"
  },
  err: null,                     // Error info (if present)
  service: "auth-service",       // Service metadata
  env: "production",
  version: "1.2.3",
  traceId: "abc-123",            // Distributed tracing ID
  spanId: "xyz-789"
}

Context Merging Order (later sources override earlier):

  1. Global bindings (from createLogger({ bindings: {...} }))
  2. Child logger bindings (from logger.child({ ... }))
  3. Async context (from LoggerContext.run() or middleware)
  4. Bound context (from logger.with({ ... }))
  5. Call-site context (from logger.info("msg", { ... }))
// Example context merging
const logger = createLogger({
  bindings: { service: "api", env: "prod" }  // 1. Global
});

const childLogger = logger.child({
  module: "users" // 2. Child bindings
});

// In middleware (sets async context)
LoggerContext.run({ requestId: "abc-123" }, () => {  // 3. Async context

  childLogger
    .with({ operation: "create" })  // 4. Bound context
    .info("User created", { userId: 456 });  // 5. Call-site

  // Final context: { service, env, module, requestId, operation, userId }
});

Redaction Applied:

  • Message string is redacted
  • Context object is redacted
  • Error details are redacted

Performance Tip: Context merging happens on every log. Keep bindings small and avoid expensive computations.

Stage 3: Plugin Processing (onRecord)

Plugins can transform or filter the log record:

// Plugin hook signature
onRecord?(record: LogRecord): LogRecord | null;

Plugin execution order (by order property, ascending):

const logger = createLogger({
  plugins: [
    samplingPlugin({ order: 10 }),    // 1. Drop 90% of debug logs
    redactionPlugin({ order: 20 }),   // 2. Redact sensitive data
    enrichPlugin({ order: 30 }),      // 3. Add hostname, PID
    metricsPlugin({ order: 40 }),     // 4. Count logs by level
  ],
});

Example: Sampling plugin drops logs

// Sampling plugin
{
  name: "sampling",
  order: 10,
  onRecord(record: LogRecord): LogRecord | null {
    if (record.level === "debug" && Math.random() > 0.1) {
      return null; // Drop 90% of debug logs
    }
    return record; // Keep the log
  }
}

Example: Enrichment plugin adds fields

// Enrichment plugin
{
  name: "enrich",
  order: 30,
  onRecord(record: LogRecord): LogRecord {
    return {
      ...record,
      context: {
        ...record.context,
        hostname: os.hostname(),
        pid: process.pid,
      }
    };
  }
}

Important: Returning null from onRecord drops the log entirely. No formatting or transport writing occurs.

Stage 4: Formatting

The logger converts the LogRecord into a formatted string:

// Format selection
if (config.pretty.enabled) {
  // Development: colored, human-readable
  formatted = formatPretty(record, theme);
} else {
  // Production: structured format
  switch (config.structured.type) {
    case "json":     formatted = formatJson(record);
    case "ecs":      formatted = formatEcs(record);
    case "datadog":  formatted = formatDatadog(record);
    case "splunk":   formatted = formatSplunk(record);
    case "logfmt":   formatted = formatLogfmt(record);
  }
}

Example Outputs:

Pretty Format (development):

[2024-01-15 10:30:45] INFO  User logged in
  userId: 123
  requestId: abc-123
  duration: 42ms

JSON Format (production):

{"time":1700000000000,"level":"info","msg":"User logged in","context":{"userId":123,"requestId":"abc-123"},"service":"api","env":"production"}

ECS Format (Elasticsearch):

{"@timestamp":"2024-01-15T10:30:45.000Z","log.level":"info","message":"User logged in","user.id":123,"trace.id":"abc-123","service.name":"api"}

Plugin onFormat Hooks:

Plugins can modify the formatted string:

{
  name: "prefix",
  onFormat(record: LogRecord, formatted: string): string {
    return `[MY-APP] ${formatted}`;
  }
}

Stage 5: Output (Transports & Adapters)

The formatted log is written to all configured destinations:

// 1. Write to transports (synchronous)
for (const transport of transports) {
  transport.write(record, formatted, isError);
}

// 2. Notify plugins (onWrite hook)
for (const plugin of plugins) {
  plugin.onWrite?.(record, formatted);
}

// 3. Send to adapters (async, non-blocking)
for (const adapter of adapters) {
  adapter.handle(record); // Fire-and-forget
}

Transport vs Adapter:

FeatureTransportAdapter
PurposeWrite formatted logs locallyForward to external services
Blocking✅ Yes (synchronous)❌ No (async, background)
FormatUses formatted stringUses raw LogRecord
ErrorsPropagate to callerCaught and logged
ExamplesConsole, FileDatadog, Splunk, Metrics

Example: Console Transport

const consoleTransport: Transport = {
  write(record, formatted, isError) {
    const stream = isError ? process.stderr : process.stdout;
    stream.write(formatted + "\n");
  }
};

Example: Metrics Adapter

const metricsAdapter: ProviderAdapter = {
  name: "metrics",
  async handle(record) {
    await metrics.increment("logs.total", 1, {
      level: record.level,
      service: record.service,
    });
  }
};

Complete Example: Following a Log

Let's trace a single log call through the entire pipeline:

// 1. Setup
const logger = createLogger({
  service: "user-service",
  env: "production",
  level: "info",
  bindings: { version: "1.2.3" },
  plugins: [
    samplingPlugin({ defaultRate: 1.0, rates: { debug: 0.1 } }),
    enrichPlugin({ fields: { hostname: os.hostname() } }),
  ],
  redaction: { enabled: true },
});

// 2. Log call
logger.info("User created", {
  userId: 123,
  email: "user@example.com",
  password: "secret123"  // Will be redacted!
});

Step-by-step execution:

Step 1: Parse Arguments

level = "info"
msg = "User created"
context = { userId: 123, email: "user@example.com", password: "secret123" }

Step 2: Build LogRecord

{
  time: 1700000000000,
  level: "info",
  msg: "User created",
  context: {
    userId: 123,
    email: "user@example.com",
    password: "[REDACTED]",  // ← Redacted!
    version: "1.2.3",        // ← From global bindings
    hostname: "server-01"    // ← Added by enrich plugin
  },
  service: "user-service",
  env: "production"
}

Step 3: Plugin onRecord

// Sampling plugin: level="info", rate=1.0 → Keep
// Enrichment plugin: Add hostname → Already added in context
// Result: Record passes through unchanged

Step 4: Format

// JSON format (production)
formatted = '{"time":1700000000000,"level":"info","msg":"User created","context":{"userId":123,"email":"user@example.com","password":"[REDACTED]","version":"1.2.3","hostname":"server-01"},"service":"user-service","env":"production"}'

Step 5: Output

// Console transport writes to stdout
console.log(formatted);

// Metrics adapter increments counter
metrics.increment("logs.total", 1, { level: "info" });

Performance Optimization Tips

1. Early Exit on Level Check

// ✅ Fast - level check happens first
if (!this.isLevelEnabled("debug")) return;

// Logger never builds record if level is disabled
logger.debug("Expensive operation", expensiveComputation());

2. Lazy Evaluation

// ❌ Slow - always computes
logger.debug("Stats", { stats: computeExpensiveStats() });

// ✅ Fast - only computes if debug is enabled
if (logger.isLevelEnabled("debug")) {
  logger.debug("Stats", { stats: computeExpensiveStats() });
}

3. Plugin Ordering Matters

// ✅ Good - sampling drops logs early
plugins: [
  samplingPlugin({ order: 10 }),     // Drop 90% first
  expensiveEnrichPlugin({ order: 20 }) // Only runs on 10%
]

// ❌ Bad - enrichment runs on all logs
plugins: [
  expensiveEnrichPlugin({ order: 10 }), // Runs on 100%
  samplingPlugin({ order: 20 })         // Then drops 90%
]

4. Keep onRecord Lightweight

// ❌ Bad - blocks logging
onRecord(record) {
  const result = await fetch("/api/enrich"); // Async I/O!
  return { ...record, context: { ...record.context, ...result } };
}

// ✅ Good - do I/O in onWrite
onRecord(record) {
  return record; // Fast, synchronous
},
onWrite(record, formatted) {
  fetch("/api/log", { body: formatted }); // Fire-and-forget
}

Lifecycle Operations

Flush

Ensures all buffered logs are written:

await logger.flush();

// Internally calls:
// 1. plugin.onFlush() for each plugin
// 2. transport.flush() for each transport

When to flush:

  • Before process exit
  • After critical operations
  • When using batching plugins

Close

Cleanup and shutdown:

await logger.close();

// Internally calls:
// 1. logger.flush() (flush first)
// 2. plugin.onClose() for each plugin
// 3. transport.close() for each transport

Example: Graceful shutdown

process.on("SIGTERM", async () => {
  logger.info("Shutting down...");
  await logger.flush();  // Ensure last logs are written
  await logger.close();  // Clean up resources
  process.exit(0);
});

Debugging Tips

Missing Logs?

Check 1: Level filtering

// Is the level enabled?
console.log(logger.state.level); // "info"
logger.debug("This won't show"); // debug < info

Check 2: Plugin dropping logs

// Did a plugin return null?
samplingPlugin({ rates: { debug: 0.01 } }) // Drops 99% of debug

Check 3: Async context

// Are you inside a context?
LoggerContext.run({ requestId: "abc" }, () => {
  logger.info("Inside context"); // ✅ Has requestId
});
logger.info("Outside context"); // ❌ No requestId

Sensitive Data Leaking?

Check 1: Redaction enabled

const logger = createLogger({
  redaction: { enabled: true, useDefaults: true }
});

Check 2: Custom patterns

redaction: {
  patterns: [
    { pattern: /api[_-]?key/gi, replacement: "[API_KEY]" }
  ]
}

Check 3: Object paths

redaction: {
  paths: ["user.password", "credentials.token"]
}

Performance Issues?

Check 1: Plugin order

// Place expensive plugins late (high order value)
plugins: [
  samplingPlugin({ order: 10 }),
  expensivePlugin({ order: 100 })
]

Check 2: Heavy context

// ❌ Bad - large context on every log
logger.info("msg", { hugeObject: [...10000 items] });

// ✅ Good - summarize
logger.info("msg", { count: items.length });

Check 3: Async I/O in plugins

// ❌ Bad - blocks in onRecord
onRecord(record) { await fetch(...); }

// ✅ Good - background in onWrite
onWrite(record) { fetch(...); }

Source Code References

  • Logger Core: src/logger.ts
  • Record Building: buildRecord() method (~line 699)
  • Formatting: format() method (~line 800)
  • Emission: emit() method (~line 848)
  • Context Utilities: src/context.ts
  • Redaction: src/redaction.ts

On this page