Audit Logging

Audit Logging

The package registry can emit audit events to a Kafka-compatible broker for every package operation. This enables analytics on package usage, download patterns, and security monitoring.

Enabling Audit Logging

Set the following environment variables:

export KAFKA_BROKERS="kafka:9092"           # Comma-separated broker list
export KAFKA_AUDIT_TOPIC="audit-events"      # Topic name (auto-created)

When KAFKA_BROKERS is not set, audit logging is disabled and no events are produced.

Event Schema

Each audit event is a JSON message with the following structure:

{
  "timestamp": "2024-01-15T10:30:00.000Z",
  "event_type": "pypi.package.download",
  "registry": "pypi",
  "package": "requests",
  "version": "2.31.0",
  "filename": "requests-2.31.0-py3-none-any.whl",
  "action": "download",
  "source": "cache",
  "user_agent": "pip/23.0.1",
  "remote_addr": "192.168.1.100",
  "status_code": 200,
  "size": 62574,
  "extra": {}
}

Field Reference

FieldTypeDescription
timestampISO 8601When the event occurred
event_typestringSpecific event identifier (see below)
registrystring"pypi" or "npm"
packagestringPackage name (normalized)
versionstringPackage version (when available)
filenamestringTarball/wheel filename (for downloads)
actionstringHigh-level action: download, upload, metadata, search, login
sourcestring"cache" (served from MinIO) or "upstream" (fetched from upstream)
user_agentstringClient User-Agent header
remote_addrstringClient IP address
status_codeintHTTP response status code
sizeintResponse size in bytes
extraobjectAdditional key-value metadata

Event Types

PyPI Events

Event TypeActionDescription
pypi.package.downloaddownloadPackage file served from MinIO cache
pypi.package.download.upstreamdownloadPackage fetched from upstream, cached, then served
pypi.package.uploaduploadPackage uploaded via twine
pypi.package.metadatametadataPackage detail page requested
pypi.index.listmetadataRoot simple index requested

npm Events

Event TypeActionDescription
npm.package.downloaddownloadTarball served from MinIO cache
npm.package.download.upstreamdownloadTarball fetched from upstream, cached, then served
npm.package.publishuploadPackage published via npm publish
npm.package.metadatametadataPackage metadata requested
npm.user.loginloginUser login attempt
npm.dist-tags.updatemetadataDist-tag created/updated/deleted
npm.searchsearchPackage search query

Kafka Message Format

Apache Iceberg Table Schema

For long-term analytics, the audit events can be stored in an Apache Iceberg table. Below is the recommended schema:

Table DDL (Spark SQL)

CREATE TABLE audit.package_events (
    timestamp       TIMESTAMP    NOT NULL,
    event_type      STRING       NOT NULL,
    registry        STRING       NOT NULL,
    package         STRING,
    version         STRING,
    filename        STRING,
    action          STRING       NOT NULL,
    source          STRING,
    user_agent      STRING,
    remote_addr     STRING,
    status_code     INT,
    size            BIGINT,
    extra           MAP<STRING, STRING>
)
USING iceberg
PARTITIONED BY (registry, days(timestamp))
LOCATION 's3://audit-data/package_events'
TBLPROPERTIES (
    'format-version' = '2',
    'write.format.default' = 'parquet',
    'write.parquet.compression-codec' = 'zstd'
);

Partition Strategy

The table is partitioned by:

  1. registry - Separates PyPI and npm events for efficient querying
  2. days(timestamp) - Daily partitions using Iceberg’s hidden partitioning

This enables efficient queries like:

-- Most downloaded PyPI packages this week
SELECT package, COUNT(*) as downloads
FROM audit.package_events
WHERE registry = 'pypi'
  AND action = 'download'
  AND timestamp >= current_date - INTERVAL 7 DAYS
GROUP BY package
ORDER BY downloads DESC
LIMIT 20;

-- Cache hit ratio by registry
SELECT
    registry,
    source,
    COUNT(*) as count,
    ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (PARTITION BY registry), 1) as pct
FROM audit.package_events
WHERE action = 'download'
GROUP BY registry, source;

-- Hourly download traffic
SELECT
    date_trunc('hour', timestamp) as hour,
    registry,
    COUNT(*) as downloads,
    SUM(size) as total_bytes
FROM audit.package_events
WHERE action = 'download'
GROUP BY 1, 2
ORDER BY 1 DESC;

Avro Schema

For Kafka Connect or schema registry integration, here is the Avro schema:

{
  "type": "record",
  "name": "PackageEvent",
  "namespace": "io.miniohq.registry.audit",
  "fields": [
    {"name": "timestamp", "type": {"type": "long", "logicalType": "timestamp-millis"}},
    {"name": "event_type", "type": "string"},
    {"name": "registry", "type": "string"},
    {"name": "package", "type": ["null", "string"], "default": null},
    {"name": "version", "type": ["null", "string"], "default": null},
    {"name": "filename", "type": ["null", "string"], "default": null},
    {"name": "action", "type": "string"},
    {"name": "source", "type": ["null", "string"], "default": null},
    {"name": "user_agent", "type": ["null", "string"], "default": null},
    {"name": "remote_addr", "type": ["null", "string"], "default": null},
    {"name": "status_code", "type": ["null", "int"], "default": null},
    {"name": "size", "type": ["null", "long"], "default": null},
    {"name": "extra", "type": ["null", {"type": "map", "values": "string"}], "default": null}
  ]
}

Ingestion Pipeline

To stream events from Kafka to Iceberg:

  1. Apache Spark Structured Streaming: Read from Kafka, write to Iceberg table
  2. Kafka Connect + Iceberg Sink: Use the Iceberg sink connector
  3. Apache Flink: Flink SQL with Iceberg catalog integration

Example Spark Structured Streaming job:

from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import *

spark = SparkSession.builder \
    .config("spark.sql.catalog.audit", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.audit.type", "hadoop") \
    .config("spark.sql.catalog.audit.warehouse", "s3://audit-data") \
    .getOrCreate()

schema = StructType([
    StructField("timestamp", TimestampType()),
    StructField("event_type", StringType()),
    StructField("registry", StringType()),
    StructField("package", StringType()),
    StructField("version", StringType()),
    StructField("filename", StringType()),
    StructField("action", StringType()),
    StructField("source", StringType()),
    StructField("user_agent", StringType()),
    StructField("remote_addr", StringType()),
    StructField("status_code", IntegerType()),
    StructField("size", LongType()),
    StructField("extra", MapType(StringType(), StringType())),
])

df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "audit-events") \
    .load() \
    .select(from_json(col("value").cast("string"), schema).alias("event")) \
    .select("event.*")

df.writeStream \
    .format("iceberg") \
    .outputMode("append") \
    .option("checkpointLocation", "s3://audit-data/checkpoints") \
    .toTable("audit.package_events")

Testing with Tansu

For local development and testing, use Tansu as a lightweight Kafka-compatible broker with in-memory storage:

docker run -d \
  --name tansu \
  -p 9092:9092 \
  -e STORAGE_ENGINE="memory://" \
  -e ADVERTISED_LISTENER_URL="tcp://localhost:9092" \
  ghcr.io/tansu-io/tansu:latest

Then configure the registry:

export KAFKA_BROKERS="localhost:9092"
export KAFKA_AUDIT_TOPIC="audit-events"