Kafka Topology

Overview

This document defines the Apache Kafka topic naming conventions, partitioning strategies, and message routing patterns for the Process Path integration with all bounded contexts.


Topic Subscription Overview

flowchart LR
    subgraph Sources["EXTERNAL TOPICS (Consumed)"]
        OM_TOPIC[fulfillment.order_management.v1.events]
        INV_TOPIC[fulfillment.inventory.v1.events]
        WAVE_TOPIC[fulfillment.wave.v1.events]
        WES_TOPIC[wes.orchestration.v1.events]
        WL_TOPIC[workload.planning.v1.events]
        TRACK_TOPIC[tracking.v1.events]
        ROBOT_TOPIC[robotics.fleet.v1.events]
        PACK_TOPIC[wes.pack.v1.events]
    end

    subgraph ProcessPath["PROCESS PATH SERVICES"]
        ROUTING[routing-service]
        ORCH[orchestration-service]
        AFE[afe-path-service]
        SINGLES[singles-path-service]
        BATCH[batch-flow-path-service]
        SLAM[slam-operations-service]
    end

    subgraph Published["PROCESS PATH TOPICS (Published)"]
        ROUTING_OUT[process-path.routing.v1.events]
        ORCH_OUT[process-path.orchestration.v1.events]
        AFE_OUT[process-path.afe.v1.events]
        SLAM_OUT[wes.slam.v1.events]
    end

    OM_TOPIC --> ROUTING
    INV_TOPIC --> ROUTING
    WAVE_TOPIC --> ROUTING
    WES_TOPIC --> ROUTING

    WL_TOPIC --> ORCH
    TRACK_TOPIC --> ORCH
    ROBOT_TOPIC --> ORCH
    ROBOT_TOPIC --> AFE

    PACK_TOPIC --> SINGLES
    PACK_TOPIC --> BATCH

    ROUTING --> ROUTING_OUT
    ORCH --> ORCH_OUT
    AFE --> AFE_OUT
    SLAM --> SLAM_OUT

Topic Naming Convention

Format

1
{context}.{aggregate}.{event-type}
Component Description Examples
{context} Bounded context name (kebab-case) process-path, order-management
{aggregate} Domain aggregate or entity routing, capacity, shipments
{event-type} Event category events, commands, requests

Process Path Topics (Published)

Implementation Status: All topics below have been implemented.

Routing Events

Topic Service Description Partition Key
process-path.routing.v1.events routing-service Shipment routing decisions shipmentId

Events Published:

Orchestration Events

Topic Service Description Partition Key
process-path.orchestration.v1.events orchestration-service Capacity, SLA, and workload events pathId or shipmentId

Events Published:

AFE Buffer Events

Topic Service Description Partition Key
process-path.afe.v1.events afe-path-service Buffer state changes resourceId

Events Published:

SLAM Events

Topic Service Description Partition Key
wes.slam.v1.events slam-operations-service SLAM completion events shipmentId

Events Published:


Topics Consumed by Process Path

Implementation Status: All consumers below have been implemented.

Order Management

Topic Consumer Service Consumer Group Events
fulfillment.order_management.v1.events routing-service process-path-routing-service OrderReleased, OrderCancelled

Inventory Service

Topic Consumer Service Consumer Group Events
fulfillment.inventory.v1.events routing-service process-path-routing-service InventoryAllocated, AllocationShortage

WES Orchestration

Topic Consumer Service Consumer Group Events
wes.orchestration.v1.events routing-service process-path-routing-service WorkReleaseAuthorized, CircuitBreakerStateChanged

Wave Planning

Topic Consumer Service Consumer Group Events
fulfillment.wave.v1.events routing-service process-path-routing-service WaveReleased, WaveCancelled

Workload Planning

Topic Consumer Service Consumer Group Events
workload.planning.v1.events orchestration-service process-path-orchestration-service CapacityForecastUpdated, LaborAvailabilityChanged

Physical Tracking

Topic Consumer Service Consumer Group Events
tracking.v1.events orchestration-service process-path-orchestration-service ShipmentLocationUpdated, DwellTimeExceeded, AnomalyDetected

Robotics Fleet

Topic Consumer Service Consumer Group Events
robotics.fleet.v1.events orchestration-service process-path-orchestration-service RobotTaskCompleted, PodDelivered, RobotCapacity
robotics.fleet.v1.events afe-path-service afe-path-service RobotTaskCompleted, PodDelivered

Pack & Ship

Topic Consumer Service Consumer Group Events
wes.pack.v1.events singles-path-service singles-path-service PackingCompleted
wes.pack.v1.events batch-flow-path-service batch-flow-path-service PackingCompleted

Partitioning Strategy

By Shipment ID

Used for shipment lifecycle events to ensure ordering:

1
2
3
4
5
6
7
8
Topic: process-path.routing.events
Partition Key: shipmentId (e.g., "SHP-123456")
Partitions: 12

Distribution:
- All events for SHP-123456 → Partition 7
- All events for SHP-123457 → Partition 3
- Ensures ordered processing per shipment

By Path ID

Used for capacity events to ensure path-level ordering:

1
2
3
4
5
6
7
Topic: process-path.capacity.events
Partition Key: pathId (e.g., "PATH-AFE-01")
Partitions: 6

Distribution:
- All events for PATH-AFE-01 → Partition 2
- All events for PATH-SINGLES-01 → Partition 4

By Resource ID

Used for buffer events:

1
2
3
Topic: process-path.buffer.events
Partition Key: resourceId (wallId, loopId, etc.)
Partitions: 6

Topic Configuration

Standard Event Topics

1
2
3
4
5
6
7
8
9
10
11
12
13
14
topics:
  process-path.routing.events:
    partitions: 12
    replication-factor: 3
    retention-ms: 604800000  # 7 days
    cleanup-policy: delete
    min-insync-replicas: 2

  process-path.capacity.events:
    partitions: 6
    replication-factor: 3
    retention-ms: 259200000  # 3 days
    cleanup-policy: delete
    min-insync-replicas: 2

High-Volume Topics

1
2
3
4
5
6
7
8
topics:
  physical-tracking.movements.shipments:
    partitions: 24
    replication-factor: 3
    retention-ms: 86400000  # 1 day
    cleanup-policy: delete
    min-insync-replicas: 2
    compression-type: lz4

Alert Topics

1
2
3
4
5
6
7
topics:
  process-path.sla.breaches:
    partitions: 6
    replication-factor: 3
    retention-ms: 2592000000  # 30 days
    cleanup-policy: delete
    min-insync-replicas: 2

Consumer Group Configuration

Process Path Consumer Groups

Consumer Group Topics Instances Offset Reset
process-path-orders order-management.orders.* 4 latest
process-path-inventory inventory.allocation.* 3 latest
process-path-wes wes.orchestration.* 3 latest
process-path-waves wave-planning.waves.* 2 latest
process-path-workload workload-planning.* 2 latest
process-path-tracking physical-tracking.* 6 latest
process-path-robotics robotics.* 3 latest
process-path-tasks task-execution.tasks.* 3 latest
process-path-pack pack-ship.* 3 latest

Consumer Configuration

1
2
3
4
5
6
7
8
9
consumer:
  group-id: process-path-orders
  enable-auto-commit: false
  auto-offset-reset: latest
  max-poll-records: 500
  max-poll-interval-ms: 300000
  session-timeout-ms: 30000
  heartbeat-interval-ms: 10000
  isolation-level: read_committed

Message Flow Patterns

Request-Reply (Synchronous)

For operations requiring immediate response:

sequenceDiagram
    participant PP as Process Path Service
    participant REQ as wes.orchestration.capacity.requests
    participant WES as WES Orchestration Engine
    participant RES as process-path.responses

    PP->>REQ: Produce(requestId, payload)
    Note right of PP: Headers:<br/>reply-to: process-path.responses<br/>correlation-id: {uuid}
    REQ->>WES: Consume
    WES->>WES: Process Request
    WES->>RES: Produce(requestId, response)
    Note right of WES: correlation-id: {uuid}
    RES->>PP: Consume Response

Event Streaming (Asynchronous)

For standard event processing:

flowchart LR
    PP[Process Path Service] -->|shipmentId| TOPIC[process-path.routing.v1.events]
    TOPIC --> TASK[Task Execution]
    TOPIC --> PICK[Pick Execution]
    TOPIC --> ANALYTICS[Analytics Service]

Fan-Out Pattern

For events consumed by multiple services:

flowchart TD
    EVENT[ShipmentRoutedToPathEvent]
    TOPIC[process-path.routing.v1.events]

    EVENT --> TOPIC

    TOPIC --> TASK[task-execution-service<br/>group: process-path-tasks]
    TOPIC --> PICK[pick-execution-service<br/>group: process-path-pick]
    TOPIC --> ANALYTICS[analytics-service<br/>group: process-path-analytics]
    TOPIC --> DASH[operations-dashboard<br/>group: process-path-dashboard]

Dead Letter Queue (DLQ)

DLQ Topics

1
2
3
4
5
6
7
8
9
10
topics:
  process-path.routing.events.dlq:
    partitions: 3
    replication-factor: 3
    retention-ms: 2592000000  # 30 days

  process-path.orders.dlq:
    partitions: 3
    replication-factor: 3
    retention-ms: 2592000000

DLQ Routing

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@KafkaListener(topics = "order-management.orders.released")
public void handleOrder(ConsumerRecord<String, OrderReleasedEvent> record) {
    try {
        processOrder(record.value());
    } catch (Exception e) {
        sendToDlq(record, e);
    }
}

private void sendToDlq(ConsumerRecord<?, ?> record, Exception e) {
    var dlqRecord = new ProducerRecord<>(
        record.topic() + ".dlq",
        record.key(),
        createDlqPayload(record, e)
    );
    dlqProducer.send(dlqRecord);
}

Schema Registry

Subject Naming

1
{topic-name}-value

Examples:

Compatibility Mode

1
2
3
4
5
6
7
schema-registry:
  compatibility: BACKWARD
  subjects:
    process-path.routing.events-value:
      compatibility: BACKWARD_TRANSITIVE
    physical-tracking.movements.shipments-value:
      compatibility: FORWARD  # High-frequency, less strict

Monitoring

Topic Metrics

Metric Description Alert Threshold
kafka_topic_partition_log_size Partition size > 10GB
kafka_consumer_group_lag Consumer lag > 10000
kafka_producer_request_latency_avg Produce latency > 100ms
kafka_consumer_fetch_latency_avg Consume latency > 50ms

Consumer Lag Alerts

1
2
3
4
5
6
7
8
9
10
alerts:
  - name: process-path-high-consumer-lag
    expr: kafka_consumer_group_lag{group=~"process-path-.*"} > 10000
    duration: 5m
    severity: warning

  - name: process-path-critical-consumer-lag
    expr: kafka_consumer_group_lag{group=~"process-path-.*"} > 100000
    duration: 2m
    severity: critical

Operational Procedures

Adding a New Topic

  1. Define topic configuration in infrastructure-as-code
  2. Create topic in non-prod environments
  3. Register schema in Schema Registry
  4. Deploy consumer/producer changes
  5. Create topic in production
  6. Enable monitoring and alerts

Partition Rebalancing

1
2
3
4
5
6
7
# Check partition distribution
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
  --describe --group process-path-orders

# Trigger rebalance
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
  --group process-path-orders --reset-offsets --to-current --execute

Topic Retention Adjustment

1
2
3
4
# Increase retention for debugging
kafka-configs.sh --bootstrap-server kafka:9092 \
  --entity-type topics --entity-name process-path.routing.events \
  --alter --add-config retention.ms=1209600000  # 14 days

Security

ACL Configuration

1
2
3
4
5
6
7
8
9
# Process Path Service
User:process-path-service has:
  - READ on topic process-path.* (Consumer)
  - WRITE on topic process-path.* (Producer)
  - READ on group process-path-* (Consumer Groups)
  - READ on topic order-management.orders.* (Consumer)
  - READ on topic inventory.allocation.* (Consumer)
  - READ on topic wes.orchestration.* (Consumer)
  ...

Encryption