cenglu

Custom Transports

Implement custom transports to send logs anywhere

Custom Transports

Custom transports allow you to send logs to any destination: databases, external APIs, cloud services, or custom storage solutions. Implementing a transport is straightforward with the Transport interface.

Quick Start

Basic custom transport:

import { createLogger, type Transport, type LogRecord } from "cenglu";

const myTransport: Transport = {
  write(record: LogRecord, formatted: string, isError: boolean): void {
    // Send log somewhere
    console.log("Custom:", formatted);
  },

  async flush(): Promise<void> {
    // Flush any buffered logs
  },

  async close(): Promise<void> {
    // Cleanup resources
  },
};

const logger = createLogger({
  transports: [myTransport],
});

logger.info("Hello from custom transport!");

Transport Interface

The Transport interface defines three methods:

interface Transport {
  /**
   * Write a log record
   * @param record - The log record object
   * @param formatted - Pre-formatted log string (JSON, ECS, etc.)
   * @param isError - Whether this is an error-level log
   */
  write(record: LogRecord, formatted: string, isError: boolean): void;

  /**
   * Flush any buffered logs
   * Called before close() and on logger.flush()
   */
  flush(): Promise<void>;

  /**
   * Close the transport and cleanup resources
   * Called on logger.close() and process exit
   */
  close(): Promise<void>;
}

LogRecord Type

interface LogRecord {
  level: string;           // Log level (info, error, etc.)
  msg: string;             // Log message
  time: number;            // Timestamp (milliseconds since epoch)
  context: Record<string, unknown>;  // Additional context
  service?: string;        // Service name
  env?: string;            // Environment
  version?: string;        // Version
  hostname?: string;       // Hostname
  pid?: number;            // Process ID
  error?: {                // Error details (if present)
    message: string;
    stack?: string;
    type?: string;
    code?: string;
  };
}

HTTP Transport Example

Send logs to an HTTP endpoint:

import type { Transport, LogRecord } from "cenglu";

class HttpTransport implements Transport {
  private readonly url: string;
  private readonly batchSize: number;
  private readonly flushInterval: number;
  private buffer: string[] = [];
  private timer: NodeJS.Timeout | null = null;

  constructor(options: {
    url: string;
    batchSize?: number;
    flushInterval?: number;
  }) {
    this.url = options.url;
    this.batchSize = options.batchSize ?? 100;
    this.flushInterval = options.flushInterval ?? 5000;
    this.startTimer();
  }

  write(_record: LogRecord, formatted: string, _isError: boolean): void {
    this.buffer.push(formatted);

    if (this.buffer.length >= this.batchSize) {
      this.flush().catch(console.error);
    }
  }

  async flush(): Promise<void> {
    if (this.buffer.length === 0) {
      return;
    }

    const batch = this.buffer.splice(0);

    try {
      await fetch(this.url, {
        method: "POST",
        headers: { "Content-Type": "application/json" },
        body: JSON.stringify({ logs: batch }),
      });
    } catch (error) {
      console.error("Failed to send logs:", error);
      // Optionally: re-queue failed logs
    }
  }

  async close(): Promise<void> {
    if (this.timer) {
      clearInterval(this.timer);
      this.timer = null;
    }
    await this.flush();
  }

  private startTimer(): void {
    this.timer = setInterval(() => {
      this.flush().catch(console.error);
    }, this.flushInterval);

    // Don't keep process alive
    this.timer.unref();
  }
}

// Usage
const logger = createLogger({
  transports: [
    new HttpTransport({
      url: "https://logs.example.com/ingest",
      batchSize: 100,
      flushInterval: 5000,
    }),
  ],
});

Database Transport Example

Write logs to a database:

import type { Transport, LogRecord } from "cenglu";
import { createPool, type Pool } from "generic-database";

class DatabaseTransport implements Transport {
  private readonly pool: Pool;
  private readonly table: string;

  constructor(options: { pool: Pool; table?: string }) {
    this.pool = options.pool;
    this.table = options.table ?? "logs";
  }

  write(record: LogRecord, _formatted: string, _isError: boolean): void {
    // Use record object directly for structured data
    this.pool
      .query(
        `INSERT INTO ${this.table} (level, message, time, context, service) VALUES (?, ?, ?, ?, ?)`,
        [
          record.level,
          record.msg,
          new Date(record.time),
          JSON.stringify(record.context),
          record.service,
        ]
      )
      .catch((error) => {
        console.error("Failed to write log to database:", error);
      });
  }

  async flush(): Promise<void> {
    // Database writes are already flushed
    // Could add batching here if needed
  }

  async close(): Promise<void> {
    await this.pool.end();
  }
}

// Usage
const pool = createPool({
  host: "localhost",
  database: "logs",
});

const logger = createLogger({
  transports: [
    new DatabaseTransport({ pool }),
  ],
});

Cloud Service Examples

AWS CloudWatch

import { CloudWatchLogsClient, PutLogEventsCommand } from "@aws-sdk/client-cloudwatch-logs";
import type { Transport, LogRecord } from "cenglu";

class CloudWatchTransport implements Transport {
  private readonly client: CloudWatchLogsClient;
  private readonly logGroupName: string;
  private readonly logStreamName: string;
  private buffer: Array<{ message: string; timestamp: number }> = [];
  private sequenceToken?: string;

  constructor(options: {
    region: string;
    logGroupName: string;
    logStreamName: string;
  }) {
    this.client = new CloudWatchLogsClient({ region: options.region });
    this.logGroupName = options.logGroupName;
    this.logStreamName = options.logStreamName;
  }

  write(_record: LogRecord, formatted: string, _isError: boolean): void {
    this.buffer.push({
      message: formatted,
      timestamp: Date.now(),
    });

    if (this.buffer.length >= 100) {
      this.flush().catch(console.error);
    }
  }

  async flush(): Promise<void> {
    if (this.buffer.length === 0) {
      return;
    }

    const logEvents = this.buffer.splice(0);

    try {
      const command = new PutLogEventsCommand({
        logGroupName: this.logGroupName,
        logStreamName: this.logStreamName,
        logEvents,
        sequenceToken: this.sequenceToken,
      });

      const response = await this.client.send(command);
      this.sequenceToken = response.nextSequenceToken;
    } catch (error) {
      console.error("Failed to send logs to CloudWatch:", error);
    }
  }

  async close(): Promise<void> {
    await this.flush();
    this.client.destroy();
  }
}

Google Cloud Logging

import { Logging } from "@google-cloud/logging";
import type { Transport, LogRecord } from "cenglu";

class GoogleCloudTransport implements Transport {
  private readonly log: any;
  private buffer: any[] = [];

  constructor(options: { projectId: string; logName: string }) {
    const logging = new Logging({ projectId: options.projectId });
    this.log = logging.log(options.logName);
  }

  write(record: LogRecord, _formatted: string, _isError: boolean): void {
    const entry = this.log.entry(
      {
        severity: record.level.toUpperCase(),
        timestamp: new Date(record.time),
      },
      {
        message: record.msg,
        ...record.context,
        service: record.service,
      }
    );

    this.buffer.push(entry);

    if (this.buffer.length >= 50) {
      this.flush().catch(console.error);
    }
  }

  async flush(): Promise<void> {
    if (this.buffer.length === 0) {
      return;
    }

    const entries = this.buffer.splice(0);

    try {
      await this.log.write(entries);
    } catch (error) {
      console.error("Failed to write to Google Cloud Logging:", error);
    }
  }

  async close(): Promise<void> {
    await this.flush();
  }
}

Datadog

import type { Transport, LogRecord } from "cenglu";

class DatadogTransport implements Transport {
  private readonly apiKey: string;
  private readonly service: string;
  private readonly hostname: string;
  private buffer: any[] = [];

  constructor(options: { apiKey: string; service: string; hostname: string }) {
    this.apiKey = options.apiKey;
    this.service = options.service;
    this.hostname = options.hostname;
  }

  write(record: LogRecord, _formatted: string, _isError: boolean): void {
    this.buffer.push({
      ddsource: "nodejs",
      ddtags: `env:${record.env},service:${this.service}`,
      hostname: this.hostname,
      message: record.msg,
      status: record.level,
      timestamp: record.time,
      ...record.context,
    });

    if (this.buffer.length >= 100) {
      this.flush().catch(console.error);
    }
  }

  async flush(): Promise<void> {
    if (this.buffer.length === 0) {
      return;
    }

    const batch = this.buffer.splice(0);

    try {
      await fetch("https://http-intake.logs.datadoghq.com/v1/input", {
        method: "POST",
        headers: {
          "Content-Type": "application/json",
          "DD-API-KEY": this.apiKey,
        },
        body: JSON.stringify(batch),
      });
    } catch (error) {
      console.error("Failed to send logs to Datadog:", error);
    }
  }

  async close(): Promise<void> {
    await this.flush();
  }
}

Batching Pattern

Batching improves performance for remote transports:

import type { Transport, LogRecord } from "cenglu";

class BatchedTransport implements Transport {
  private buffer: string[] = [];
  private timer: NodeJS.Timeout | null = null;
  private readonly batchSize: number;
  private readonly flushInterval: number;
  private readonly sender: (batch: string[]) => Promise<void>;

  constructor(options: {
    batchSize: number;
    flushInterval: number;
    sender: (batch: string[]) => Promise<void>;
  }) {
    this.batchSize = options.batchSize;
    this.flushInterval = options.flushInterval;
    this.sender = options.sender;
    this.startTimer();
  }

  write(_record: LogRecord, formatted: string, _isError: boolean): void {
    this.buffer.push(formatted);

    // Flush when buffer is full
    if (this.buffer.length >= this.batchSize) {
      this.flush().catch(console.error);
    }
  }

  async flush(): Promise<void> {
    if (this.buffer.length === 0) {
      return;
    }

    const batch = this.buffer.splice(0);

    try {
      await this.sender(batch);
    } catch (error) {
      console.error("Failed to send batch:", error);
      // Optionally: implement retry logic
    }
  }

  async close(): Promise<void> {
    if (this.timer) {
      clearInterval(this.timer);
      this.timer = null;
    }
    await this.flush();
  }

  private startTimer(): void {
    this.timer = setInterval(() => {
      this.flush().catch(console.error);
    }, this.flushInterval);

    this.timer.unref();
  }
}

// Usage
const transport = new BatchedTransport({
  batchSize: 100,
  flushInterval: 5000,
  sender: async (batch) => {
    await fetch("https://logs.example.com/ingest", {
      method: "POST",
      body: JSON.stringify({ logs: batch }),
    });
  },
});

Error Handling

Handle errors gracefully in transports:

import type { Transport, LogRecord } from "cenglu";

class ResilientTransport implements Transport {
  private readonly fallback: Transport;
  private readonly primary: Transport;

  constructor(primary: Transport, fallback: Transport) {
    this.primary = primary;
    this.fallback = fallback;
  }

  write(record: LogRecord, formatted: string, isError: boolean): void {
    try {
      this.primary.write(record, formatted, isError);
    } catch (error) {
      console.error("Primary transport failed:", error);
      // Fallback to secondary transport
      try {
        this.fallback.write(record, formatted, isError);
      } catch (fallbackError) {
        console.error("Fallback transport also failed:", fallbackError);
      }
    }
  }

  async flush(): Promise<void> {
    await Promise.allSettled([
      this.primary.flush(),
      this.fallback.flush(),
    ]);
  }

  async close(): Promise<void> {
    await Promise.allSettled([
      this.primary.close(),
      this.fallback.close(),
    ]);
  }
}

Retry Logic

Implement automatic retries:

import type { Transport, LogRecord } from "cenglu";

class RetryTransport implements Transport {
  private readonly inner: Transport;
  private readonly maxRetries: number;
  private readonly retryDelay: number;

  constructor(
    inner: Transport,
    options: { maxRetries?: number; retryDelay?: number } = {}
  ) {
    this.inner = inner;
    this.maxRetries = options.maxRetries ?? 3;
    this.retryDelay = options.retryDelay ?? 1000;
  }

  write(record: LogRecord, formatted: string, isError: boolean): void {
    this.inner.write(record, formatted, isError);
  }

  async flush(): Promise<void> {
    await this.retry(() => this.inner.flush());
  }

  async close(): Promise<void> {
    await this.retry(() => this.inner.close());
  }

  private async retry<T>(fn: () => Promise<T>): Promise<T> {
    let lastError: unknown;

    for (let attempt = 0; attempt <= this.maxRetries; attempt++) {
      try {
        return await fn();
      } catch (error) {
        lastError = error;

        if (attempt < this.maxRetries) {
          await this.sleep(this.retryDelay * Math.pow(2, attempt));
        }
      }
    }

    throw lastError;
  }

  private sleep(ms: number): Promise<void> {
    return new Promise((resolve) => setTimeout(resolve, ms));
  }
}

Testing Transports

Mock Transport

import type { Transport, LogRecord } from "cenglu";

export class MockTransport implements Transport {
  public logs: LogRecord[] = [];
  public formatted: string[] = [];

  write(record: LogRecord, formatted: string, _isError: boolean): void {
    this.logs.push(record);
    this.formatted.push(formatted);
  }

  async flush(): Promise<void> {
    // No-op
  }

  async close(): Promise<void> {
    // No-op
  }

  reset(): void {
    this.logs = [];
    this.formatted = [];
  }
}

// Usage in tests
import { createLogger } from "cenglu";
import { test, expect } from "vitest";

test("logs are sent to transport", async () => {
  const mock = new MockTransport();
  const logger = createLogger({ transports: [mock] });

  logger.info("Test message", { userId: 123 });
  await logger.flush();

  expect(mock.logs).toHaveLength(1);
  expect(mock.logs[0].msg).toBe("Test message");
  expect(mock.logs[0].context.userId).toBe(123);
});

Test Custom Transport

import { test, expect, vi } from "vitest";

test("http transport sends batched logs", async () => {
  const fetchMock = vi.fn().mockResolvedValue({ ok: true });
  global.fetch = fetchMock;

  const transport = new HttpTransport({
    url: "https://logs.example.com",
    batchSize: 2,
  });

  const record = {
    level: "info",
    msg: "test",
    time: Date.now(),
    context: {},
  } as LogRecord;

  // Write two logs (triggers batch)
  transport.write(record, '{"msg":"test"}', false);
  transport.write(record, '{"msg":"test"}', false);

  await transport.flush();

  expect(fetchMock).toHaveBeenCalledWith(
    "https://logs.example.com",
    expect.objectContaining({
      method: "POST",
      body: expect.stringContaining("test"),
    })
  );

  await transport.close();
});

Best Practices

1. Non-Blocking Writes

Never block the main thread in write():

// Bad: Blocking write
write(record: LogRecord, formatted: string): void {
  await fetch(this.url, { body: formatted }); // ❌ Blocks
}

// Good: Async write
write(record: LogRecord, formatted: string): void {
  this.buffer.push(formatted);
  this.flushAsync(); // ✅ Fire and forget
}

private flushAsync(): void {
  this.flush().catch(console.error);
}

2. Buffer Logs

Buffer logs for better performance:

private buffer: string[] = [];

write(record: LogRecord, formatted: string): void {
  this.buffer.push(formatted);

  if (this.buffer.length >= 100) {
    this.flushAsync();
  }
}

3. Handle Errors Gracefully

Don't crash the application:

async flush(): Promise<void> {
  try {
    await this.sendLogs();
  } catch (error) {
    console.error("Transport error:", error);
    // Don't throw - logging shouldn't crash the app
  }
}

4. Cleanup Resources

Always cleanup in close():

async close(): Promise<void> {
  // Stop timers
  if (this.timer) {
    clearInterval(this.timer);
  }

  // Flush pending logs
  await this.flush();

  // Close connections
  await this.connection?.close();
}

5. Use Timers Wisely

Don't keep process alive:

this.timer = setInterval(() => {
  this.flush().catch(console.error);
}, 5000);

// Allow process to exit
this.timer.unref();

6. Choose Record vs Formatted

Use record for structured data, formatted for strings:

write(record: LogRecord, formatted: string, isError: boolean): void {
  // Structured transport (database): use record
  database.insert({
    level: record.level,
    message: record.msg,
    context: record.context,
  });

  // String transport (file, HTTP): use formatted
  file.write(formatted + "\n");
}

Performance Considerations

  1. Batching: Batch multiple logs to reduce network/disk overhead
  2. Async operations: Use async for I/O operations, but don't block write()
  3. Buffering: Buffer logs in memory before sending
  4. Backpressure: Handle slow destinations gracefully
  5. Resource pooling: Reuse connections when possible

Common Patterns

Conditional Transport

Enable transport based on conditions:

class ConditionalTransport implements Transport {
  constructor(
    private readonly inner: Transport,
    private readonly condition: () => boolean
  ) {}

  write(record: LogRecord, formatted: string, isError: boolean): void {
    if (this.condition()) {
      this.inner.write(record, formatted, isError);
    }
  }

  async flush(): Promise<void> {
    await this.inner.flush();
  }

  async close(): Promise<void> {
    await this.inner.close();
  }
}

// Usage
const transport = new ConditionalTransport(
  new HttpTransport({ url: "..." }),
  () => process.env.NODE_ENV === "production"
);

Transform Transport

Transform logs before sending:

class TransformTransport implements Transport {
  constructor(
    private readonly inner: Transport,
    private readonly transform: (record: LogRecord) => LogRecord
  ) {}

  write(record: LogRecord, formatted: string, isError: boolean): void {
    const transformed = this.transform(record);
    // Re-format with transformed record if needed
    this.inner.write(transformed, formatted, isError);
  }

  async flush(): Promise<void> {
    await this.inner.flush();
  }

  async close(): Promise<void> {
    await this.inner.close();
  }
}

On this page