172 lines
8.8 KiB
Plaintext
172 lines
8.8 KiB
Plaintext
Fifo Module – RAG Knowledge Base
|
||
|
||
Chunk 1: Module Overview
|
||
Metadata: module=Fifo, type=overview, location=CMH.HFA.Accounting.Orchestration/src/Fifo, pattern=FIFO-queue, purpose=eventOrdering, storage=DynamoDB
|
||
Purpose: First-In-First-Out event ordering and deduplication system for policy-related events; prevents race conditions, maintains data consistency
|
||
Scope: Six components – Lambda functions for ingestion/completion, service layer for queue management, EventBridge publishing, repository abstractions, domain models
|
||
Stack: AWS Lambda, DynamoDB (queue storage), EventBridge (publishing), MD5 hashing (deduplication), conditional updates (concurrency control)
|
||
Key Concepts: Sequential event processing, policy-level isolation, deduplication, race condition prevention, status state machine, TTL-based cleanup, optimistic concurrency
|
||
|
||
Chunk 2: FifoLambda Entry Points
|
||
Metadata: component=FifoLambda, type=Lambda, operations=2, pattern=orchestrationEndpoints
|
||
Description: Lambda exposing two endpoints for FIFO queue lifecycle
|
||
Operations:
|
||
|
||
Received: Handles incoming policy events, validates PolicyNumber & EventID, timestamp generated, delegates to FifoService.EnqueueAsync
|
||
Completed: Marks event as completed, triggers next event, receives FifoEventDto, delegates to FifoService.DequeueAsync
|
||
Validation: PolicyNumber required, EventID non-empty (Guid), throws ArgumentException/ArgumentNullException on failure
|
||
Lambda Config: Execution role, 256 MB memory, 30 sec timeout
|
||
|
||
Chunk 3: Event Enqueue Processing
|
||
Metadata: operation=EnqueueAsync, service=FifoService, pattern=conditionalProcessing, deduplication=MD5
|
||
Purpose: Adds events to policy-specific queue, deduplicates, triggers immediate processing if no active events
|
||
Steps:
|
||
|
||
MD5 key = {PolicyNumber}.{EventPayload}
|
||
Query existing events for policy
|
||
Exit if duplicate key found
|
||
Create EventTableEntry (status PENDING or PROCESSING)
|
||
If no active events → PROCESSING + EventBridge publish
|
||
Else → PENDING + race condition guard
|
||
Deduplication: Prevents double-processing from retries or replay
|
||
|
||
Chunk 4: Race Condition Guard
|
||
Metadata: mechanism=raceConditionGuard, pattern=doubleCheck, atomicity=conditional
|
||
Scenario: PENDING event saved, PROCESSING event finishes before guard
|
||
Logic: Re-query policy events, check for active PROCESSING, find earliest PENDING
|
||
Resolution: If no PROCESSING & current event earliest PENDING → atomically update status to PROCESSING, publish to EventBridge, handle ConditionalCheckFailedException
|
||
Purpose: Closes timing window, ensures next event begins automatically
|
||
|
||
Chunk 5: Event Dequeue Processing
|
||
Metadata: operation=DequeueAsync, service=FifoService, pattern=sequentialProcessing, cleanup=TTL
|
||
Steps:
|
||
|
||
Lookup completed event by EventID (GSI)
|
||
Update status → COMPLETED, set 30-day TTL
|
||
Query PENDING events for same policy, order chronologically
|
||
Find earliest PENDING → conditional update → PROCESSING
|
||
Deserialize event body → extract type/source
|
||
Publish next event to EventBridge
|
||
TTL: DateTimeOffset.UtcNow.AddDays(30).ToUnixTimeSeconds()
|
||
Conditional Update: Ensures race-free transition
|
||
|
||
Chunk 6: EventTableEntry Model
|
||
Metadata: model=EventTableEntry, storage=DynamoDB, keyStructure=composite, ttlEnabled=true
|
||
Keys: PK=PolicyNumber, SK=Timestamp (ISO 8601)
|
||
Properties: EventId (Guid + GSI), Status (PENDING/PROCESSING/COMPLETED), EventBody (JSON), DeduplicationKey (MD5), TimeToLive
|
||
GSI: EventIdIndex for O(1) lookup
|
||
Purpose: Policy-scoped queries, event ID lookup, automatic cleanup
|
||
|
||
Chunk 7: Status State Machine
|
||
Metadata: stateMachine=eventStatus, states=3, transitions=sequential
|
||
States: PENDING → PROCESSING → COMPLETED
|
||
Constraints: Only one PROCESSING per policy, PENDING processed FIFO
|
||
Purpose: Enforces sequential processing, prevents concurrent policy modifications
|
||
|
||
Chunk 8: Deduplication Mechanism
|
||
Metadata: feature=deduplication, algorithm=MD5, scope=perPolicy, collision=veryLow
|
||
Hash Input: {PolicyNumber}.{EventPayload}
|
||
Output: 32-char lowercase hex string
|
||
Action: Duplicate detected → log & exit
|
||
Use Cases: Lambda retries, EventBridge replay, double-sends
|
||
Limitation: Scope per-policy
|
||
|
||
Chunk 9: Concurrency Control Mechanisms
|
||
Metadata: concurrency=optimistic, mechanism=conditionalUpdates, exception=ConditionalCheckFailedException
|
||
Pattern: UpdateItemAsync conditional on expected status
|
||
Scenarios: Multiple Lambdas, cold starts, rapid arrivals, completion races
|
||
Failure Handling: Log ConditionalCheckFailedException, no retry, graceful backoff
|
||
Atomic Guarantee: Only one instance succeeds per transition
|
||
|
||
Chunk 10: EventBridge Integration
|
||
Metadata: component=EventBridgePublisher, interface=IEventPublisher, destination=AWSEventBridge, pattern=publishSubscribe
|
||
Purpose: Publishes events for downstream Step Functions or consumers
|
||
EventBridgeDto: Source, DetailType, Payload
|
||
Publishing: Immediate if no active events; deferred if queued → eligible later
|
||
Integration: EventBridgeHelper + Lambda logging
|
||
Purpose: Decouples FIFO queue, supports multi-consumer fan-out
|
||
|
||
Chunk 11: DynamoDB Table Structure
|
||
Metadata: storage=DynamoDB, tableName=pattern, indexing=GSI, consistency=eventual
|
||
Table: {ENVIRONMENT}-fifo-poc-table
|
||
Primary Key: PK=PolicyNumber, SK=Timestamp
|
||
GSI: EventIdIndex (EventID lookup)
|
||
Query Patterns: Policy queue depth, PENDING filtering, EventID lookup, avoid scans
|
||
TTL: Automatic deletion 30 days after completion
|
||
|
||
Chunk 12: Policy-Level Isolation
|
||
Metadata: isolation=policyBased, concurrency=perPolicy, scalability=horizontal
|
||
Isolation: Each policy independent, concurrent processing allowed
|
||
Partition Strategy: PK=PolicyNumber
|
||
Scaling: Unlimited policies, no global locks, hot partition risk for very active policy
|
||
Benefits: High throughput, isolated failures, predictable performance
|
||
|
||
Chunk 13: TTL-Based Cleanup Strategy
|
||
Metadata: cleanup=TTL, retention=30days, cost=zero, trigger=statusCompleted
|
||
TTL Field: TimeToLive (Unix epoch)
|
||
Set When: Status → COMPLETED
|
||
Not Set: PENDING / PROCESSING
|
||
DynamoDB deletes items asynchronously (≈48 hours)
|
||
Purpose: Auto-clean completed events, maintain 30-day audit trail
|
||
|
||
Chunk 14: Error Handling Strategies
|
||
Metadata: errorHandling=graceful, logging=comprehensive, propagation=selective
|
||
Duplicate Detection: Log & ignore
|
||
Race Conditions: Catch ConditionalCheckFailedException
|
||
Missing Events: Throws KeyNotFoundException
|
||
Concurrent Updates: Log winner, skip
|
||
Function Errors: Logged, workflow continues
|
||
Environment Errors: Throw if ENVIRONMENT missing
|
||
|
||
Chunk 15: Lambda Orchestration Use Cases
|
||
Metadata: useCases=policyServicing, frequency=perTransaction, criticality=dataIntegrity
|
||
Policy Updates: Endorsements, renewals, cancellations in order
|
||
Billing Events: Sequential processing → accounting integrity
|
||
State Transitions: Prevent race conditions
|
||
Event Replay: Deduplication prevents double processing
|
||
Distributed Processing: Multi-Lambda, per-policy ordering maintained
|
||
Workflow: Step Functions consume knowing prior event completed
|
||
|
||
Chunk 16: Performance Characteristics
|
||
Metadata: performance=latency, throughput=high, consistency=eventual
|
||
Latency: DynamoDB query, conditional update, EventBridge publish, Lambda cold start
|
||
Fast Path: No queue → query + save + publish
|
||
Slow Path: Queued → query + save, later conditional update + publish
|
||
Throughput: Limited per policy partition, unlimited across policies
|
||
Optimization: Consistent reads, parallel processing
|
||
|
||
Chunk 17: Repository Abstraction
|
||
Metadata: pattern=repository, interface=IFifoRepository, implementation=DynamoDbFifoRepository
|
||
Operations: GetEventByIdAsync, QueryByPolicyNumberAsync, SaveEventAsync, UpdateStatusAsync
|
||
Benefits: Isolates DynamoDB, unit-testable, future-proof storage backend
|
||
Configuration: Table name via environment, consistent read, conditional expressions
|
||
|
||
Chunk 18: Event Body Preservation
|
||
Metadata: storage=eventBody, format=JSON, purpose=replay, completeness=full
|
||
Content: Serialized EventMessage including EventID, EventType, Source, Payload
|
||
Purpose: Enables replay, debugging, audit trail
|
||
Deserialization: Extract type/source for EventBridge DetailType
|
||
Cleanup: TTL deletes after 30 days
|
||
|
||
Chunk 19: Search Queries Supported
|
||
Metadata: type=queryPatterns, purpose=RAGRetrieval
|
||
Sample Queries:
|
||
|
||
"How does the FIFO queue ensure sequential processing?"
|
||
"What deduplication mechanism prevents duplicate processing?"
|
||
"How are race conditions handled?"
|
||
"What is the FIFO event status state machine?"
|
||
"How long are completed events retained?"
|
||
"What happens when two events arrive simultaneously for the same policy?"
|
||
"How does FIFO integrate with EventBridge?"
|
||
"What DynamoDB key structure is used?"
|
||
"How are events ordered within a policy queue?"
|
||
"What triggers automatic cleanup?"
|
||
"How does conditional update prevent race conditions?"
|
||
"Which Lambda functions are exposed?"
|
||
"How is event deduplication calculated?"
|
||
"What happens if event enqueued while another processing?"
|
||
"How does race condition guard work?"
|
||
"What exception indicates concurrent update conflict?"
|
||
"How are policy-level queues isolated?"
|
||
"What is the TTL strategy for completed events?" |