using MarketData.MarketDataModel; using MarketData.Utils; using MarketData.DataAccess; using System.Collections.Concurrent; using System.Collections.Generic; using System; using System.Linq; using System.Threading; using System.Threading.Tasks; namespace MarketData.Cache { public class GLPriceCache : IDisposable { // ── Singleton ──────────────────────────────────────────────────────────── private static readonly object instanceLock = new object(); private static GLPriceCache instance = null; public static GLPriceCache GetInstance() { lock (instanceLock) { if (instance == null) instance = new GLPriceCache(); return instance; } } // ── State ──────────────────────────────────────────────────────────────── private readonly Dictionary priceCache = new Dictionary(); private readonly ConcurrentDictionary symbolFetchLocks = new ConcurrentDictionary(); private readonly object cacheLock = new object(); private DateTime latestDate = Utility.Epoch; // ── Background refresh ─────────────────────────────────────────────────── private readonly TimeSpan cacheCycle = TimeSpan.FromMinutes(5); private Timer refreshTimer = null; private int tickCount = 0; private const int evictionTickInterval = 12; // every 12 ticks x 5 min = 1 hour // ── Parallelism ────────────────────────────────────────────────────────── private static readonly int maxParallelDbCalls = ResolveMaxParallelDbCalls(); private static int ResolveMaxParallelDbCalls() { int @default = Math.Min(Math.Max(1, Environment.ProcessorCount) * 3, 32); string configured = Environment.GetEnvironmentVariable("GL_PRICE_CACHE_PARALLEL_DB_CALLS"); return int.TryParse(configured, out int parsed) && parsed > 0 ? parsed : @default; } // ── Disposal ───────────────────────────────────────────────────────────── private volatile bool disposed = false; private int refreshInProgress = 0; // ── Constructor ────────────────────────────────────────────────────────── private GLPriceCache() { refreshTimer = new Timer(OnCacheRefreshTick, null, cacheCycle, cacheCycle); } public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } protected virtual void Dispose(bool disposing) { if (disposed) return; disposed = true; if (disposing) { Timer timerToDispose; lock (instanceLock) { timerToDispose = refreshTimer; refreshTimer = null; instance = null; } // Block until any in-flight tick completes before disposing using (var waited = new ManualResetEventSlim(false)) { if (timerToDispose != null) timerToDispose.Dispose(waited.WaitHandle); waited.Wait(TimeSpan.FromSeconds(10)); } MDTrace.WriteLine(LogLevel.DEBUG, "[GLPriceCache:Dispose] Disposed."); } } // ── Background tick ────────────────────────────────────────────────────── private void OnCacheRefreshTick(object state) { if (disposed) return; try { lock (cacheLock) { long count = CountInternal(); MDTrace.WriteLine(LogLevel.DEBUG, $"[GLPriceCache:Tick] Symbols: {priceCache.Keys.Count}. " + $"Items in cache: {Utility.FormatNumber(count, 0, true)}."); } if (++tickCount % evictionTickInterval == 0) EvictStaleSymbols(); } catch (Exception ex) { MDTrace.WriteLine(LogLevel.DEBUG, $"[GLPriceCache:Tick] [ERROR] Unhandled exception: {ex}"); } } // ── Eviction ───────────────────────────────────────────────────────────── private void EvictStaleSymbols() { if (Interlocked.CompareExchange(ref refreshInProgress, 1, 0) == 1) { MDTrace.WriteLine(LogLevel.DEBUG, "[GLPriceCache:Evict] Skipped — refresh in progress."); return; } try { int count; lock (cacheLock) { count = priceCache.Count; priceCache.Clear(); symbolFetchLocks.Clear(); } MDTrace.WriteLine(LogLevel.DEBUG, $"[GLPriceCache:Evict] Cache cleared. {count} symbols evicted."); } finally { Interlocked.Exchange(ref refreshInProgress, 0); } } // ── Public API ─────────────────────────────────────────────────────────── public void Add(PortfolioTrades portfolioTrades) { List symbols = portfolioTrades.Symbols; Dictionary minTradeDates = symbols.ToDictionary( sym => sym, sym => portfolioTrades.GetMinTradeDate(sym)); // Only open positions need an intraday price refresh. // Closed positions, regardless of close date, have immutable prices — skip the DB call. HashSet mutableSymbols = new HashSet( symbols.Where(sym => portfolioTrades.HasOpenPositions(sym))); Dictionary minCacheDates; lock (cacheLock) { minCacheDates = symbols.ToDictionary( sym => sym, sym => priceCache.ContainsKey(sym) ? priceCache[sym].MinDate : DateTime.MaxValue); } ConcurrentDictionary fetchedPrices = new ConcurrentDictionary(); ConcurrentDictionary latestPrices = new ConcurrentDictionary(); Parallel.ForEach(symbols, new ParallelOptions { MaxDegreeOfParallelism = maxParallelDbCalls }, symbol => { if (disposed) return; DateTime minTradeDate = minTradeDates[symbol]; DateTime minCacheDate = minCacheDates[symbol]; // Acquire a per-symbol lock to prevent concurrent clients from issuing // duplicate DB fetches for the same symbol. First thread fetches; // subsequent threads for the same symbol block here, then re-check // the cache on entry and skip the fetch if already populated. object symbolLock = symbolFetchLocks.GetOrAdd(symbol, _ => new object()); lock (symbolLock) { try { // Re-check cache state after acquiring symbol lock — another // client thread may have already fetched this symbol while we waited lock (cacheLock) { if (priceCache.ContainsKey(symbol)) minCacheDate = priceCache[symbol].MinDate; } // Historical fetch — only when cache is missing or incomplete Prices prices = null; if (minCacheDate == DateTime.MaxValue) prices = PricingDA.GetPrices(symbol, minTradeDate); else if (minTradeDate < minCacheDate) prices = PricingDA.GetPrices(symbol, minCacheDate, minTradeDate); if (prices != null && prices.Count > 0) fetchedPrices[symbol] = prices; // Intraday refresh — mutable symbols only if (mutableSymbols.Contains(symbol)) { Price latestPrice = PricingDA.GetPrice(symbol); if (latestPrice != null) latestPrices[symbol] = latestPrice; } } catch (Exception ex) { MDTrace.WriteLine(LogLevel.DEBUG, $"[GLPriceCache:Add] [ERROR] Failed fetching prices for {symbol}: {ex.Message}"); } } }); lock (cacheLock) { // Historical prices — idempotent, will not overwrite existing entries foreach (KeyValuePair kvp in fetchedPrices) foreach (Price price in kvp.Value) AddInternal(price); // Latest prices — unconditional overwrite to capture intraday updates foreach (KeyValuePair kvp in latestPrices) { PricesByDate pricesByDate; if (!priceCache.TryGetValue(kvp.Key, out pricesByDate)) { pricesByDate = new PricesByDate(); priceCache[kvp.Key] = pricesByDate; } if (pricesByDate.ContainsKey(kvp.Value.Date)) pricesByDate.Remove(kvp.Value.Date); pricesByDate.Add(kvp.Value.Date, kvp.Value); } } MDTrace.WriteLine(LogLevel.DEBUG, $"[GLPriceCache:Add] Symbols: {symbols.Count}, Mutable: {mutableSymbols.Count}, " + $"Historical fetches: {fetchedPrices.Count}, Intraday updates: {latestPrices.Count}"); } public void Add(Prices prices) { foreach (Price price in prices) Add(price); } public void Add(List symbols, DateTime pricingDate) { if (symbols == null || symbols.Count == 0) return; ConcurrentDictionary fetchedPrices = new ConcurrentDictionary(); Parallel.ForEach(symbols, new ParallelOptions { MaxDegreeOfParallelism = maxParallelDbCalls }, symbol => { if (disposed) return; lock (cacheLock) { if (ContainsPriceInternal(symbol, pricingDate)) return; } try { Price price = PricingDA.GetPrice(symbol, pricingDate); if (price != null) fetchedPrices[symbol] = price; } catch (Exception ex) { MDTrace.WriteLine(LogLevel.DEBUG, $"[GLPriceCache:Add] [ERROR] Failed fetching price for {symbol} on {pricingDate:yyyy-MM-dd}: {ex.Message}"); } }); lock (cacheLock) { foreach (KeyValuePair kvp in fetchedPrices) AddInternal(kvp.Value); } } public void Add(Price price) { lock (cacheLock) { AddInternal(price); } } public void Refresh() { if (Interlocked.CompareExchange(ref refreshInProgress, 1, 0) == 1) { MDTrace.WriteLine(LogLevel.DEBUG, "[GLPriceCache:Refresh] Skipped — refresh already in progress."); return; } try { List symbols; Dictionary currentMaxDates; lock (cacheLock) { symbols = priceCache.Keys.ToList(); currentMaxDates = priceCache.ToDictionary(kvp => kvp.Key, kvp => kvp.Value.MaxDate); } if (symbols.Count == 0) return; ConcurrentDictionary fullReloads = new ConcurrentDictionary(); ConcurrentDictionary singleUpdates = new ConcurrentDictionary(); // Fetch outside the cache lock — no timeout risk holding cacheLock over I/O Dictionary maxDbDates = PricingDA.GetLatestDates(symbols); DateTime latestDateFromDb = PricingDA.GetLatestDate(); Parallel.ForEach(symbols, new ParallelOptions { MaxDegreeOfParallelism = maxParallelDbCalls }, symbol => { if (disposed) return; DateTime cachedMax; if (!currentMaxDates.TryGetValue(symbol, out cachedMax)) return; try { DateTime dbMax; if (maxDbDates.TryGetValue(symbol, out dbMax) && dbMax.Date != cachedMax.Date) { Prices prices = PricingDA.GetPrices(symbol, cachedMax); if (prices != null) fullReloads[symbol] = prices.GetPricesByDate(); } else { Price price = PricingDA.GetPrice(symbol, cachedMax); if (price != null) singleUpdates[symbol] = price; } } catch (Exception ex) { MDTrace.WriteLine(LogLevel.DEBUG, $"[GLPriceCache:Refresh] [ERROR] Failed refreshing {symbol}: {ex.Message}"); } }); lock (cacheLock) { latestDate = latestDateFromDb; foreach (KeyValuePair kvp in fullReloads) { PricesByDate existing; if (priceCache.TryGetValue(kvp.Key, out existing) && existing.MaxDate == currentMaxDates[kvp.Key]) { priceCache[kvp.Key] = kvp.Value; } } foreach (KeyValuePair kvp in singleUpdates) { PricesByDate pricesByDate; if (priceCache.TryGetValue(kvp.Key, out pricesByDate) && pricesByDate.MaxDate == currentMaxDates[kvp.Key]) { if (pricesByDate.ContainsKey(kvp.Value.Date)) pricesByDate.Remove(kvp.Value.Date); pricesByDate.Add(kvp.Value.Date, kvp.Value); } } } MDTrace.WriteLine(LogLevel.DEBUG, $"[GLPriceCache:Refresh] Full reloads: {fullReloads.Count}, Single updates: {singleUpdates.Count}"); } finally { Interlocked.Exchange(ref refreshInProgress, 0); } } public DateTime GetLatestDate() { lock (cacheLock) { if (Utility.IsEpoch(latestDate)) latestDate = PricingDA.GetLatestDate(); return latestDate; } } public void RefreshLatestDate() { lock (cacheLock) { latestDate = PricingDA.GetLatestDate(); } } public DateTime GetMinCacheDate(string symbol) { lock (cacheLock) { PricesByDate symbolPrices; if (!priceCache.TryGetValue(symbol, out symbolPrices) || symbolPrices.Count == 0) return Utility.Epoch; return symbolPrices.MinDate; } } public Prices GetPrices(string symbol, DateTime endDate, int dayCount) { lock (cacheLock) { PricesByDate pricesByDate; if (!priceCache.TryGetValue(symbol, out pricesByDate)) return new Prices(); DateGenerator dateGenerator = new DateGenerator(); List historicalDates = dateGenerator.GenerateHistoricalDates(endDate, dayCount); Prices result = new Prices(); foreach (DateTime date in historicalDates) if (pricesByDate.ContainsKey(date)) result.Add(pricesByDate[date]); return result; } } public Price GetPrice(string symbol, DateTime date) { lock (cacheLock) { PricesByDate pricesByDate; if (!priceCache.TryGetValue(symbol, out pricesByDate)) return null; Price price; return pricesByDate.TryGetValue(date, out price) ? price : null; } } public bool ContainsPrice(string symbol, DateTime date) { lock (cacheLock) { return ContainsPriceInternal(symbol, date); } } public bool ContainsPrice(List symbols, DateTime date) { if (symbols == null || symbols.Count == 0) return false; lock (cacheLock) { foreach (string symbol in symbols) if (!ContainsPriceInternal(symbol, date)) return false; return true; } } public bool ContainsSymbol(string symbol) { lock (cacheLock) { return priceCache.ContainsKey(symbol); } } // ── Private helpers ────────────────────────────────────────────────────── // Must be called under cacheLock private void AddInternal(Price price) { if (price == null) return; PricesByDate pricesByDate; if (!priceCache.TryGetValue(price.Symbol, out pricesByDate)) { pricesByDate = new PricesByDate(); priceCache[price.Symbol] = pricesByDate; } if (!pricesByDate.ContainsKey(price.Date)) pricesByDate.Add(price.Date, price); } // Must be called under cacheLock private bool ContainsPriceInternal(string symbol, DateTime date) { PricesByDate pricesByDate; if (!priceCache.TryGetValue(symbol, out pricesByDate)) return false; return pricesByDate.ContainsKey(date); } // Must be called under cacheLock private long CountInternal() { long count = 0; foreach (PricesByDate pricesByDate in priceCache.Values) count += pricesByDate.Count; return count; } } }