The package registry can emit audit events to a Kafka-compatible broker for every package operation. This enables analytics on package usage, download patterns, and security monitoring.
Set the following environment variables:
export KAFKA_BROKERS="kafka:9092" # Comma-separated broker list
export KAFKA_AUDIT_TOPIC="audit-events" # Topic name (auto-created)
When KAFKA_BROKERS is not set, audit logging is disabled and no events are produced.
Each audit event is a JSON message with the following structure:
{
"timestamp": "2024-01-15T10:30:00.000Z",
"event_type": "pypi.package.download",
"registry": "pypi",
"package": "requests",
"version": "2.31.0",
"filename": "requests-2.31.0-py3-none-any.whl",
"action": "download",
"source": "cache",
"user_agent": "pip/23.0.1",
"remote_addr": "192.168.1.100",
"status_code": 200,
"size": 62574,
"extra": {}
}
| Field | Type | Description |
|---|---|---|
timestamp | ISO 8601 | When the event occurred |
event_type | string | Specific event identifier (see below) |
registry | string | "pypi" or "npm" |
package | string | Package name (normalized) |
version | string | Package version (when available) |
filename | string | Tarball/wheel filename (for downloads) |
action | string | High-level action: download, upload, metadata, search, login |
source | string | "cache" (served from MinIO) or "upstream" (fetched from upstream) |
user_agent | string | Client User-Agent header |
remote_addr | string | Client IP address |
status_code | int | HTTP response status code |
size | int | Response size in bytes |
extra | object | Additional key-value metadata |
| Event Type | Action | Description |
|---|---|---|
pypi.package.download | download | Package file served from MinIO cache |
pypi.package.download.upstream | download | Package fetched from upstream, cached, then served |
pypi.package.upload | upload | Package uploaded via twine |
pypi.package.metadata | metadata | Package detail page requested |
pypi.index.list | metadata | Root simple index requested |
| Event Type | Action | Description |
|---|---|---|
npm.package.download | download | Tarball served from MinIO cache |
npm.package.download.upstream | download | Tarball fetched from upstream, cached, then served |
npm.package.publish | upload | Package published via npm publish |
npm.package.metadata | metadata | Package metadata requested |
npm.user.login | login | User login attempt |
npm.dist-tags.update | metadata | Dist-tag created/updated/deleted |
npm.search | search | Package search query |
KAFKA_AUDIT_TOPIC (default: audit-events)For long-term analytics, the audit events can be stored in an Apache Iceberg table. Below is the recommended schema:
CREATE TABLE audit.package_events (
timestamp TIMESTAMP NOT NULL,
event_type STRING NOT NULL,
registry STRING NOT NULL,
package STRING,
version STRING,
filename STRING,
action STRING NOT NULL,
source STRING,
user_agent STRING,
remote_addr STRING,
status_code INT,
size BIGINT,
extra MAP<STRING, STRING>
)
USING iceberg
PARTITIONED BY (registry, days(timestamp))
LOCATION 's3://audit-data/package_events'
TBLPROPERTIES (
'format-version' = '2',
'write.format.default' = 'parquet',
'write.parquet.compression-codec' = 'zstd'
);
The table is partitioned by:
registry - Separates PyPI and npm events for efficient queryingdays(timestamp) - Daily partitions using Iceberg’s hidden partitioningThis enables efficient queries like:
-- Most downloaded PyPI packages this week
SELECT package, COUNT(*) as downloads
FROM audit.package_events
WHERE registry = 'pypi'
AND action = 'download'
AND timestamp >= current_date - INTERVAL 7 DAYS
GROUP BY package
ORDER BY downloads DESC
LIMIT 20;
-- Cache hit ratio by registry
SELECT
registry,
source,
COUNT(*) as count,
ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER (PARTITION BY registry), 1) as pct
FROM audit.package_events
WHERE action = 'download'
GROUP BY registry, source;
-- Hourly download traffic
SELECT
date_trunc('hour', timestamp) as hour,
registry,
COUNT(*) as downloads,
SUM(size) as total_bytes
FROM audit.package_events
WHERE action = 'download'
GROUP BY 1, 2
ORDER BY 1 DESC;
For Kafka Connect or schema registry integration, here is the Avro schema:
{
"type": "record",
"name": "PackageEvent",
"namespace": "io.miniohq.registry.audit",
"fields": [
{"name": "timestamp", "type": {"type": "long", "logicalType": "timestamp-millis"}},
{"name": "event_type", "type": "string"},
{"name": "registry", "type": "string"},
{"name": "package", "type": ["null", "string"], "default": null},
{"name": "version", "type": ["null", "string"], "default": null},
{"name": "filename", "type": ["null", "string"], "default": null},
{"name": "action", "type": "string"},
{"name": "source", "type": ["null", "string"], "default": null},
{"name": "user_agent", "type": ["null", "string"], "default": null},
{"name": "remote_addr", "type": ["null", "string"], "default": null},
{"name": "status_code", "type": ["null", "int"], "default": null},
{"name": "size", "type": ["null", "long"], "default": null},
{"name": "extra", "type": ["null", {"type": "map", "values": "string"}], "default": null}
]
}
To stream events from Kafka to Iceberg:
Example Spark Structured Streaming job:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import *
spark = SparkSession.builder \
.config("spark.sql.catalog.audit", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.audit.type", "hadoop") \
.config("spark.sql.catalog.audit.warehouse", "s3://audit-data") \
.getOrCreate()
schema = StructType([
StructField("timestamp", TimestampType()),
StructField("event_type", StringType()),
StructField("registry", StringType()),
StructField("package", StringType()),
StructField("version", StringType()),
StructField("filename", StringType()),
StructField("action", StringType()),
StructField("source", StringType()),
StructField("user_agent", StringType()),
StructField("remote_addr", StringType()),
StructField("status_code", IntegerType()),
StructField("size", LongType()),
StructField("extra", MapType(StringType(), StringType())),
])
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "audit-events") \
.load() \
.select(from_json(col("value").cast("string"), schema).alias("event")) \
.select("event.*")
df.writeStream \
.format("iceberg") \
.outputMode("append") \
.option("checkpointLocation", "s3://audit-data/checkpoints") \
.toTable("audit.package_events")
For local development and testing, use Tansu as a lightweight Kafka-compatible broker with in-memory storage:
docker run -d \
--name tansu \
-p 9092:9092 \
-e STORAGE_ENGINE="memory://" \
-e ADVERTISED_LISTENER_URL="tcp://localhost:9092" \
ghcr.io/tansu-io/tansu:latest
Then configure the registry:
export KAFKA_BROKERS="localhost:9092"
export KAFKA_AUDIT_TOPIC="audit-events"