Kafka Topology
Overview
This document defines the Apache Kafka topic naming conventions, partitioning strategies, and message routing patterns for the Process Path integration with all bounded contexts.
Topic Subscription Overview
flowchart LR
subgraph Sources["EXTERNAL TOPICS (Consumed)"]
OM_TOPIC[fulfillment.order_management.v1.events]
INV_TOPIC[fulfillment.inventory.v1.events]
WAVE_TOPIC[fulfillment.wave.v1.events]
WES_TOPIC[wes.orchestration.v1.events]
WL_TOPIC[workload.planning.v1.events]
TRACK_TOPIC[tracking.v1.events]
ROBOT_TOPIC[robotics.fleet.v1.events]
PACK_TOPIC[wes.pack.v1.events]
end
subgraph ProcessPath["PROCESS PATH SERVICES"]
ROUTING[routing-service]
ORCH[orchestration-service]
AFE[afe-path-service]
SINGLES[singles-path-service]
BATCH[batch-flow-path-service]
SLAM[slam-operations-service]
end
subgraph Published["PROCESS PATH TOPICS (Published)"]
ROUTING_OUT[process-path.routing.v1.events]
ORCH_OUT[process-path.orchestration.v1.events]
AFE_OUT[process-path.afe.v1.events]
SLAM_OUT[wes.slam.v1.events]
end
OM_TOPIC --> ROUTING
INV_TOPIC --> ROUTING
WAVE_TOPIC --> ROUTING
WES_TOPIC --> ROUTING
WL_TOPIC --> ORCH
TRACK_TOPIC --> ORCH
ROBOT_TOPIC --> ORCH
ROBOT_TOPIC --> AFE
PACK_TOPIC --> SINGLES
PACK_TOPIC --> BATCH
ROUTING --> ROUTING_OUT
ORCH --> ORCH_OUT
AFE --> AFE_OUT
SLAM --> SLAM_OUT
Topic Naming Convention
1
| {context}.{aggregate}.{event-type}
|
| Component |
Description |
Examples |
{context} |
Bounded context name (kebab-case) |
process-path, order-management |
{aggregate} |
Domain aggregate or entity |
routing, capacity, shipments |
{event-type} |
Event category |
events, commands, requests |
Process Path Topics (Published)
Implementation Status: All topics below have been implemented.
Routing Events
| Topic |
Service |
Description |
Partition Key |
process-path.routing.v1.events |
routing-service |
Shipment routing decisions |
shipmentId |
Events Published:
ShipmentRoutedToPathEvent - Type: com.paklog.processpath.routing.shipment-routed.v1
ShipmentReroutedEvent - Type: com.paklog.processpath.routing.shipment-rerouted.v1
PathAssignmentFailedEvent - Type: com.paklog.processpath.routing.path-assignment-failed.v1
Orchestration Events
| Topic |
Service |
Description |
Partition Key |
process-path.orchestration.v1.events |
orchestration-service |
Capacity, SLA, and workload events |
pathId or shipmentId |
Events Published:
PathCapacityChangedEvent - Type: com.paklog.processpath.orchestration.path-capacity-changed.v1
WorkloadRebalanceTriggeredEvent - Type: com.paklog.processpath.orchestration.workload-rebalance-triggered.v1
SurgeDetectedEvent - Type: com.paklog.processpath.orchestration.surge-detected.v1
SLAPriorityEscalatedEvent - Type: com.paklog.processpath.orchestration.sla-priority-escalated.v1
SLABreachImminentEvent - Type: com.paklog.processpath.orchestration.sla-breach-imminent.v1
AFE Buffer Events
| Topic |
Service |
Description |
Partition Key |
process-path.afe.v1.events |
afe-path-service |
Buffer state changes |
resourceId |
Events Published:
WallCapacityChangedEvent - Type: com.paklog.processpath.afe.wall-capacity-changed.v1
TrayCirculationImbalanceEvent - Type: com.paklog.processpath.afe.tray-circulation-imbalance.v1
DischargeJamDetectedEvent - Type: com.paklog.processpath.afe.discharge-jam-detected.v1
SLAM Events
| Topic |
Service |
Description |
Partition Key |
wes.slam.v1.events |
slam-operations-service |
SLAM completion events |
shipmentId |
Events Published:
SLAMCompletedEvent - Type: com.paklog.wes.slam.completed.v1
Topics Consumed by Process Path
Implementation Status: All consumers below have been implemented.
Order Management
| Topic |
Consumer Service |
Consumer Group |
Events |
fulfillment.order_management.v1.events |
routing-service |
process-path-routing-service |
OrderReleased, OrderCancelled |
Inventory Service
| Topic |
Consumer Service |
Consumer Group |
Events |
fulfillment.inventory.v1.events |
routing-service |
process-path-routing-service |
InventoryAllocated, AllocationShortage |
WES Orchestration
| Topic |
Consumer Service |
Consumer Group |
Events |
wes.orchestration.v1.events |
routing-service |
process-path-routing-service |
WorkReleaseAuthorized, CircuitBreakerStateChanged |
Wave Planning
| Topic |
Consumer Service |
Consumer Group |
Events |
fulfillment.wave.v1.events |
routing-service |
process-path-routing-service |
WaveReleased, WaveCancelled |
Workload Planning
| Topic |
Consumer Service |
Consumer Group |
Events |
workload.planning.v1.events |
orchestration-service |
process-path-orchestration-service |
CapacityForecastUpdated, LaborAvailabilityChanged |
Physical Tracking
| Topic |
Consumer Service |
Consumer Group |
Events |
tracking.v1.events |
orchestration-service |
process-path-orchestration-service |
ShipmentLocationUpdated, DwellTimeExceeded, AnomalyDetected |
Robotics Fleet
| Topic |
Consumer Service |
Consumer Group |
Events |
robotics.fleet.v1.events |
orchestration-service |
process-path-orchestration-service |
RobotTaskCompleted, PodDelivered, RobotCapacity |
robotics.fleet.v1.events |
afe-path-service |
afe-path-service |
RobotTaskCompleted, PodDelivered |
Pack & Ship
| Topic |
Consumer Service |
Consumer Group |
Events |
wes.pack.v1.events |
singles-path-service |
singles-path-service |
PackingCompleted |
wes.pack.v1.events |
batch-flow-path-service |
batch-flow-path-service |
PackingCompleted |
Partitioning Strategy
By Shipment ID
Used for shipment lifecycle events to ensure ordering:
1
2
3
4
5
6
7
8
| Topic: process-path.routing.events
Partition Key: shipmentId (e.g., "SHP-123456")
Partitions: 12
Distribution:
- All events for SHP-123456 → Partition 7
- All events for SHP-123457 → Partition 3
- Ensures ordered processing per shipment
|
By Path ID
Used for capacity events to ensure path-level ordering:
1
2
3
4
5
6
7
| Topic: process-path.capacity.events
Partition Key: pathId (e.g., "PATH-AFE-01")
Partitions: 6
Distribution:
- All events for PATH-AFE-01 → Partition 2
- All events for PATH-SINGLES-01 → Partition 4
|
By Resource ID
Used for buffer events:
1
2
3
| Topic: process-path.buffer.events
Partition Key: resourceId (wallId, loopId, etc.)
Partitions: 6
|
Topic Configuration
Standard Event Topics
1
2
3
4
5
6
7
8
9
10
11
12
13
14
| topics:
process-path.routing.events:
partitions: 12
replication-factor: 3
retention-ms: 604800000 # 7 days
cleanup-policy: delete
min-insync-replicas: 2
process-path.capacity.events:
partitions: 6
replication-factor: 3
retention-ms: 259200000 # 3 days
cleanup-policy: delete
min-insync-replicas: 2
|
High-Volume Topics
1
2
3
4
5
6
7
8
| topics:
physical-tracking.movements.shipments:
partitions: 24
replication-factor: 3
retention-ms: 86400000 # 1 day
cleanup-policy: delete
min-insync-replicas: 2
compression-type: lz4
|
Alert Topics
1
2
3
4
5
6
7
| topics:
process-path.sla.breaches:
partitions: 6
replication-factor: 3
retention-ms: 2592000000 # 30 days
cleanup-policy: delete
min-insync-replicas: 2
|
Consumer Group Configuration
Process Path Consumer Groups
| Consumer Group |
Topics |
Instances |
Offset Reset |
process-path-orders |
order-management.orders.* |
4 |
latest |
process-path-inventory |
inventory.allocation.* |
3 |
latest |
process-path-wes |
wes.orchestration.* |
3 |
latest |
process-path-waves |
wave-planning.waves.* |
2 |
latest |
process-path-workload |
workload-planning.* |
2 |
latest |
process-path-tracking |
physical-tracking.* |
6 |
latest |
process-path-robotics |
robotics.* |
3 |
latest |
process-path-tasks |
task-execution.tasks.* |
3 |
latest |
process-path-pack |
pack-ship.* |
3 |
latest |
Consumer Configuration
1
2
3
4
5
6
7
8
9
| consumer:
group-id: process-path-orders
enable-auto-commit: false
auto-offset-reset: latest
max-poll-records: 500
max-poll-interval-ms: 300000
session-timeout-ms: 30000
heartbeat-interval-ms: 10000
isolation-level: read_committed
|
Message Flow Patterns
Request-Reply (Synchronous)
For operations requiring immediate response:
sequenceDiagram
participant PP as Process Path Service
participant REQ as wes.orchestration.capacity.requests
participant WES as WES Orchestration Engine
participant RES as process-path.responses
PP->>REQ: Produce(requestId, payload)
Note right of PP: Headers:<br/>reply-to: process-path.responses<br/>correlation-id: {uuid}
REQ->>WES: Consume
WES->>WES: Process Request
WES->>RES: Produce(requestId, response)
Note right of WES: correlation-id: {uuid}
RES->>PP: Consume Response
Event Streaming (Asynchronous)
For standard event processing:
flowchart LR
PP[Process Path Service] -->|shipmentId| TOPIC[process-path.routing.v1.events]
TOPIC --> TASK[Task Execution]
TOPIC --> PICK[Pick Execution]
TOPIC --> ANALYTICS[Analytics Service]
Fan-Out Pattern
For events consumed by multiple services:
flowchart TD
EVENT[ShipmentRoutedToPathEvent]
TOPIC[process-path.routing.v1.events]
EVENT --> TOPIC
TOPIC --> TASK[task-execution-service<br/>group: process-path-tasks]
TOPIC --> PICK[pick-execution-service<br/>group: process-path-pick]
TOPIC --> ANALYTICS[analytics-service<br/>group: process-path-analytics]
TOPIC --> DASH[operations-dashboard<br/>group: process-path-dashboard]
Dead Letter Queue (DLQ)
DLQ Topics
1
2
3
4
5
6
7
8
9
10
| topics:
process-path.routing.events.dlq:
partitions: 3
replication-factor: 3
retention-ms: 2592000000 # 30 days
process-path.orders.dlq:
partitions: 3
replication-factor: 3
retention-ms: 2592000000
|
DLQ Routing
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| @KafkaListener(topics = "order-management.orders.released")
public void handleOrder(ConsumerRecord<String, OrderReleasedEvent> record) {
try {
processOrder(record.value());
} catch (Exception e) {
sendToDlq(record, e);
}
}
private void sendToDlq(ConsumerRecord<?, ?> record, Exception e) {
var dlqRecord = new ProducerRecord<>(
record.topic() + ".dlq",
record.key(),
createDlqPayload(record, e)
);
dlqProducer.send(dlqRecord);
}
|
Schema Registry
Subject Naming
Examples:
process-path.routing.events-value
order-management.orders.released-value
Compatibility Mode
1
2
3
4
5
6
7
| schema-registry:
compatibility: BACKWARD
subjects:
process-path.routing.events-value:
compatibility: BACKWARD_TRANSITIVE
physical-tracking.movements.shipments-value:
compatibility: FORWARD # High-frequency, less strict
|
Monitoring
Topic Metrics
| Metric |
Description |
Alert Threshold |
kafka_topic_partition_log_size |
Partition size |
> 10GB |
kafka_consumer_group_lag |
Consumer lag |
> 10000 |
kafka_producer_request_latency_avg |
Produce latency |
> 100ms |
kafka_consumer_fetch_latency_avg |
Consume latency |
> 50ms |
Consumer Lag Alerts
1
2
3
4
5
6
7
8
9
10
| alerts:
- name: process-path-high-consumer-lag
expr: kafka_consumer_group_lag{group=~"process-path-.*"} > 10000
duration: 5m
severity: warning
- name: process-path-critical-consumer-lag
expr: kafka_consumer_group_lag{group=~"process-path-.*"} > 100000
duration: 2m
severity: critical
|
Operational Procedures
Adding a New Topic
- Define topic configuration in infrastructure-as-code
- Create topic in non-prod environments
- Register schema in Schema Registry
- Deploy consumer/producer changes
- Create topic in production
- Enable monitoring and alerts
Partition Rebalancing
1
2
3
4
5
6
7
| # Check partition distribution
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
--describe --group process-path-orders
# Trigger rebalance
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
--group process-path-orders --reset-offsets --to-current --execute
|
Topic Retention Adjustment
1
2
3
4
| # Increase retention for debugging
kafka-configs.sh --bootstrap-server kafka:9092 \
--entity-type topics --entity-name process-path.routing.events \
--alter --add-config retention.ms=1209600000 # 14 days
|
Security
ACL Configuration
1
2
3
4
5
6
7
8
9
| # Process Path Service
User:process-path-service has:
- READ on topic process-path.* (Consumer)
- WRITE on topic process-path.* (Producer)
- READ on group process-path-* (Consumer Groups)
- READ on topic order-management.orders.* (Consumer)
- READ on topic inventory.allocation.* (Consumer)
- READ on topic wes.orchestration.* (Consumer)
...
|
Encryption
- In-transit: TLS 1.3 for all broker connections
- At-rest: Encryption managed by cloud provider (AWS KMS)