Custom Transports
Implement custom transports to send logs anywhere
Custom Transports
Custom transports allow you to send logs to any destination: databases, external APIs, cloud services, or custom storage solutions. Implementing a transport is straightforward with the Transport interface.
Quick Start
Basic custom transport:
import { createLogger, type Transport, type LogRecord } from "cenglu";
const myTransport: Transport = {
write(record: LogRecord, formatted: string, isError: boolean): void {
// Send log somewhere
console.log("Custom:", formatted);
},
async flush(): Promise<void> {
// Flush any buffered logs
},
async close(): Promise<void> {
// Cleanup resources
},
};
const logger = createLogger({
transports: [myTransport],
});
logger.info("Hello from custom transport!");Transport Interface
The Transport interface defines three methods:
interface Transport {
/**
* Write a log record
* @param record - The log record object
* @param formatted - Pre-formatted log string (JSON, ECS, etc.)
* @param isError - Whether this is an error-level log
*/
write(record: LogRecord, formatted: string, isError: boolean): void;
/**
* Flush any buffered logs
* Called before close() and on logger.flush()
*/
flush(): Promise<void>;
/**
* Close the transport and cleanup resources
* Called on logger.close() and process exit
*/
close(): Promise<void>;
}LogRecord Type
interface LogRecord {
level: string; // Log level (info, error, etc.)
msg: string; // Log message
time: number; // Timestamp (milliseconds since epoch)
context: Record<string, unknown>; // Additional context
service?: string; // Service name
env?: string; // Environment
version?: string; // Version
hostname?: string; // Hostname
pid?: number; // Process ID
error?: { // Error details (if present)
message: string;
stack?: string;
type?: string;
code?: string;
};
}HTTP Transport Example
Send logs to an HTTP endpoint:
import type { Transport, LogRecord } from "cenglu";
class HttpTransport implements Transport {
private readonly url: string;
private readonly batchSize: number;
private readonly flushInterval: number;
private buffer: string[] = [];
private timer: NodeJS.Timeout | null = null;
constructor(options: {
url: string;
batchSize?: number;
flushInterval?: number;
}) {
this.url = options.url;
this.batchSize = options.batchSize ?? 100;
this.flushInterval = options.flushInterval ?? 5000;
this.startTimer();
}
write(_record: LogRecord, formatted: string, _isError: boolean): void {
this.buffer.push(formatted);
if (this.buffer.length >= this.batchSize) {
this.flush().catch(console.error);
}
}
async flush(): Promise<void> {
if (this.buffer.length === 0) {
return;
}
const batch = this.buffer.splice(0);
try {
await fetch(this.url, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ logs: batch }),
});
} catch (error) {
console.error("Failed to send logs:", error);
// Optionally: re-queue failed logs
}
}
async close(): Promise<void> {
if (this.timer) {
clearInterval(this.timer);
this.timer = null;
}
await this.flush();
}
private startTimer(): void {
this.timer = setInterval(() => {
this.flush().catch(console.error);
}, this.flushInterval);
// Don't keep process alive
this.timer.unref();
}
}
// Usage
const logger = createLogger({
transports: [
new HttpTransport({
url: "https://logs.example.com/ingest",
batchSize: 100,
flushInterval: 5000,
}),
],
});Database Transport Example
Write logs to a database:
import type { Transport, LogRecord } from "cenglu";
import { createPool, type Pool } from "generic-database";
class DatabaseTransport implements Transport {
private readonly pool: Pool;
private readonly table: string;
constructor(options: { pool: Pool; table?: string }) {
this.pool = options.pool;
this.table = options.table ?? "logs";
}
write(record: LogRecord, _formatted: string, _isError: boolean): void {
// Use record object directly for structured data
this.pool
.query(
`INSERT INTO ${this.table} (level, message, time, context, service) VALUES (?, ?, ?, ?, ?)`,
[
record.level,
record.msg,
new Date(record.time),
JSON.stringify(record.context),
record.service,
]
)
.catch((error) => {
console.error("Failed to write log to database:", error);
});
}
async flush(): Promise<void> {
// Database writes are already flushed
// Could add batching here if needed
}
async close(): Promise<void> {
await this.pool.end();
}
}
// Usage
const pool = createPool({
host: "localhost",
database: "logs",
});
const logger = createLogger({
transports: [
new DatabaseTransport({ pool }),
],
});Cloud Service Examples
AWS CloudWatch
import { CloudWatchLogsClient, PutLogEventsCommand } from "@aws-sdk/client-cloudwatch-logs";
import type { Transport, LogRecord } from "cenglu";
class CloudWatchTransport implements Transport {
private readonly client: CloudWatchLogsClient;
private readonly logGroupName: string;
private readonly logStreamName: string;
private buffer: Array<{ message: string; timestamp: number }> = [];
private sequenceToken?: string;
constructor(options: {
region: string;
logGroupName: string;
logStreamName: string;
}) {
this.client = new CloudWatchLogsClient({ region: options.region });
this.logGroupName = options.logGroupName;
this.logStreamName = options.logStreamName;
}
write(_record: LogRecord, formatted: string, _isError: boolean): void {
this.buffer.push({
message: formatted,
timestamp: Date.now(),
});
if (this.buffer.length >= 100) {
this.flush().catch(console.error);
}
}
async flush(): Promise<void> {
if (this.buffer.length === 0) {
return;
}
const logEvents = this.buffer.splice(0);
try {
const command = new PutLogEventsCommand({
logGroupName: this.logGroupName,
logStreamName: this.logStreamName,
logEvents,
sequenceToken: this.sequenceToken,
});
const response = await this.client.send(command);
this.sequenceToken = response.nextSequenceToken;
} catch (error) {
console.error("Failed to send logs to CloudWatch:", error);
}
}
async close(): Promise<void> {
await this.flush();
this.client.destroy();
}
}Google Cloud Logging
import { Logging } from "@google-cloud/logging";
import type { Transport, LogRecord } from "cenglu";
class GoogleCloudTransport implements Transport {
private readonly log: any;
private buffer: any[] = [];
constructor(options: { projectId: string; logName: string }) {
const logging = new Logging({ projectId: options.projectId });
this.log = logging.log(options.logName);
}
write(record: LogRecord, _formatted: string, _isError: boolean): void {
const entry = this.log.entry(
{
severity: record.level.toUpperCase(),
timestamp: new Date(record.time),
},
{
message: record.msg,
...record.context,
service: record.service,
}
);
this.buffer.push(entry);
if (this.buffer.length >= 50) {
this.flush().catch(console.error);
}
}
async flush(): Promise<void> {
if (this.buffer.length === 0) {
return;
}
const entries = this.buffer.splice(0);
try {
await this.log.write(entries);
} catch (error) {
console.error("Failed to write to Google Cloud Logging:", error);
}
}
async close(): Promise<void> {
await this.flush();
}
}Datadog
import type { Transport, LogRecord } from "cenglu";
class DatadogTransport implements Transport {
private readonly apiKey: string;
private readonly service: string;
private readonly hostname: string;
private buffer: any[] = [];
constructor(options: { apiKey: string; service: string; hostname: string }) {
this.apiKey = options.apiKey;
this.service = options.service;
this.hostname = options.hostname;
}
write(record: LogRecord, _formatted: string, _isError: boolean): void {
this.buffer.push({
ddsource: "nodejs",
ddtags: `env:${record.env},service:${this.service}`,
hostname: this.hostname,
message: record.msg,
status: record.level,
timestamp: record.time,
...record.context,
});
if (this.buffer.length >= 100) {
this.flush().catch(console.error);
}
}
async flush(): Promise<void> {
if (this.buffer.length === 0) {
return;
}
const batch = this.buffer.splice(0);
try {
await fetch("https://http-intake.logs.datadoghq.com/v1/input", {
method: "POST",
headers: {
"Content-Type": "application/json",
"DD-API-KEY": this.apiKey,
},
body: JSON.stringify(batch),
});
} catch (error) {
console.error("Failed to send logs to Datadog:", error);
}
}
async close(): Promise<void> {
await this.flush();
}
}Batching Pattern
Batching improves performance for remote transports:
import type { Transport, LogRecord } from "cenglu";
class BatchedTransport implements Transport {
private buffer: string[] = [];
private timer: NodeJS.Timeout | null = null;
private readonly batchSize: number;
private readonly flushInterval: number;
private readonly sender: (batch: string[]) => Promise<void>;
constructor(options: {
batchSize: number;
flushInterval: number;
sender: (batch: string[]) => Promise<void>;
}) {
this.batchSize = options.batchSize;
this.flushInterval = options.flushInterval;
this.sender = options.sender;
this.startTimer();
}
write(_record: LogRecord, formatted: string, _isError: boolean): void {
this.buffer.push(formatted);
// Flush when buffer is full
if (this.buffer.length >= this.batchSize) {
this.flush().catch(console.error);
}
}
async flush(): Promise<void> {
if (this.buffer.length === 0) {
return;
}
const batch = this.buffer.splice(0);
try {
await this.sender(batch);
} catch (error) {
console.error("Failed to send batch:", error);
// Optionally: implement retry logic
}
}
async close(): Promise<void> {
if (this.timer) {
clearInterval(this.timer);
this.timer = null;
}
await this.flush();
}
private startTimer(): void {
this.timer = setInterval(() => {
this.flush().catch(console.error);
}, this.flushInterval);
this.timer.unref();
}
}
// Usage
const transport = new BatchedTransport({
batchSize: 100,
flushInterval: 5000,
sender: async (batch) => {
await fetch("https://logs.example.com/ingest", {
method: "POST",
body: JSON.stringify({ logs: batch }),
});
},
});Error Handling
Handle errors gracefully in transports:
import type { Transport, LogRecord } from "cenglu";
class ResilientTransport implements Transport {
private readonly fallback: Transport;
private readonly primary: Transport;
constructor(primary: Transport, fallback: Transport) {
this.primary = primary;
this.fallback = fallback;
}
write(record: LogRecord, formatted: string, isError: boolean): void {
try {
this.primary.write(record, formatted, isError);
} catch (error) {
console.error("Primary transport failed:", error);
// Fallback to secondary transport
try {
this.fallback.write(record, formatted, isError);
} catch (fallbackError) {
console.error("Fallback transport also failed:", fallbackError);
}
}
}
async flush(): Promise<void> {
await Promise.allSettled([
this.primary.flush(),
this.fallback.flush(),
]);
}
async close(): Promise<void> {
await Promise.allSettled([
this.primary.close(),
this.fallback.close(),
]);
}
}Retry Logic
Implement automatic retries:
import type { Transport, LogRecord } from "cenglu";
class RetryTransport implements Transport {
private readonly inner: Transport;
private readonly maxRetries: number;
private readonly retryDelay: number;
constructor(
inner: Transport,
options: { maxRetries?: number; retryDelay?: number } = {}
) {
this.inner = inner;
this.maxRetries = options.maxRetries ?? 3;
this.retryDelay = options.retryDelay ?? 1000;
}
write(record: LogRecord, formatted: string, isError: boolean): void {
this.inner.write(record, formatted, isError);
}
async flush(): Promise<void> {
await this.retry(() => this.inner.flush());
}
async close(): Promise<void> {
await this.retry(() => this.inner.close());
}
private async retry<T>(fn: () => Promise<T>): Promise<T> {
let lastError: unknown;
for (let attempt = 0; attempt <= this.maxRetries; attempt++) {
try {
return await fn();
} catch (error) {
lastError = error;
if (attempt < this.maxRetries) {
await this.sleep(this.retryDelay * Math.pow(2, attempt));
}
}
}
throw lastError;
}
private sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
}Testing Transports
Mock Transport
import type { Transport, LogRecord } from "cenglu";
export class MockTransport implements Transport {
public logs: LogRecord[] = [];
public formatted: string[] = [];
write(record: LogRecord, formatted: string, _isError: boolean): void {
this.logs.push(record);
this.formatted.push(formatted);
}
async flush(): Promise<void> {
// No-op
}
async close(): Promise<void> {
// No-op
}
reset(): void {
this.logs = [];
this.formatted = [];
}
}
// Usage in tests
import { createLogger } from "cenglu";
import { test, expect } from "vitest";
test("logs are sent to transport", async () => {
const mock = new MockTransport();
const logger = createLogger({ transports: [mock] });
logger.info("Test message", { userId: 123 });
await logger.flush();
expect(mock.logs).toHaveLength(1);
expect(mock.logs[0].msg).toBe("Test message");
expect(mock.logs[0].context.userId).toBe(123);
});Test Custom Transport
import { test, expect, vi } from "vitest";
test("http transport sends batched logs", async () => {
const fetchMock = vi.fn().mockResolvedValue({ ok: true });
global.fetch = fetchMock;
const transport = new HttpTransport({
url: "https://logs.example.com",
batchSize: 2,
});
const record = {
level: "info",
msg: "test",
time: Date.now(),
context: {},
} as LogRecord;
// Write two logs (triggers batch)
transport.write(record, '{"msg":"test"}', false);
transport.write(record, '{"msg":"test"}', false);
await transport.flush();
expect(fetchMock).toHaveBeenCalledWith(
"https://logs.example.com",
expect.objectContaining({
method: "POST",
body: expect.stringContaining("test"),
})
);
await transport.close();
});Best Practices
1. Non-Blocking Writes
Never block the main thread in write():
// Bad: Blocking write
write(record: LogRecord, formatted: string): void {
await fetch(this.url, { body: formatted }); // ❌ Blocks
}
// Good: Async write
write(record: LogRecord, formatted: string): void {
this.buffer.push(formatted);
this.flushAsync(); // ✅ Fire and forget
}
private flushAsync(): void {
this.flush().catch(console.error);
}2. Buffer Logs
Buffer logs for better performance:
private buffer: string[] = [];
write(record: LogRecord, formatted: string): void {
this.buffer.push(formatted);
if (this.buffer.length >= 100) {
this.flushAsync();
}
}3. Handle Errors Gracefully
Don't crash the application:
async flush(): Promise<void> {
try {
await this.sendLogs();
} catch (error) {
console.error("Transport error:", error);
// Don't throw - logging shouldn't crash the app
}
}4. Cleanup Resources
Always cleanup in close():
async close(): Promise<void> {
// Stop timers
if (this.timer) {
clearInterval(this.timer);
}
// Flush pending logs
await this.flush();
// Close connections
await this.connection?.close();
}5. Use Timers Wisely
Don't keep process alive:
this.timer = setInterval(() => {
this.flush().catch(console.error);
}, 5000);
// Allow process to exit
this.timer.unref();6. Choose Record vs Formatted
Use record for structured data, formatted for strings:
write(record: LogRecord, formatted: string, isError: boolean): void {
// Structured transport (database): use record
database.insert({
level: record.level,
message: record.msg,
context: record.context,
});
// String transport (file, HTTP): use formatted
file.write(formatted + "\n");
}Performance Considerations
- Batching: Batch multiple logs to reduce network/disk overhead
- Async operations: Use async for I/O operations, but don't block
write() - Buffering: Buffer logs in memory before sending
- Backpressure: Handle slow destinations gracefully
- Resource pooling: Reuse connections when possible
Common Patterns
Conditional Transport
Enable transport based on conditions:
class ConditionalTransport implements Transport {
constructor(
private readonly inner: Transport,
private readonly condition: () => boolean
) {}
write(record: LogRecord, formatted: string, isError: boolean): void {
if (this.condition()) {
this.inner.write(record, formatted, isError);
}
}
async flush(): Promise<void> {
await this.inner.flush();
}
async close(): Promise<void> {
await this.inner.close();
}
}
// Usage
const transport = new ConditionalTransport(
new HttpTransport({ url: "..." }),
() => process.env.NODE_ENV === "production"
);Transform Transport
Transform logs before sending:
class TransformTransport implements Transport {
constructor(
private readonly inner: Transport,
private readonly transform: (record: LogRecord) => LogRecord
) {}
write(record: LogRecord, formatted: string, isError: boolean): void {
const transformed = this.transform(record);
// Re-format with transformed record if needed
this.inner.write(transformed, formatted, isError);
}
async flush(): Promise<void> {
await this.inner.flush();
}
async close(): Promise<void> {
await this.inner.close();
}
}