using MarketData.MarketDataModel; using MarketData.Utils; using MarketData.DataAccess; using System.Collections.Concurrent; namespace MarketData.Cache { /// /// GLPriceCache - Used by Gain/Loss Generator which are usedby MarketDataSErver and the User Interface. /// The entire cache evicts every hour so calling Add(PortfolioTrades portfolioTrades) will force price reload /// Always ensures that open trade symbols get most recent price from the database. /// public class GLPriceCache : IDisposable { // -- Singleton ------------------------------------------------------------ private static readonly object instanceLock = new object(); private static GLPriceCache instance = null; // -- Disposal ------------------------------------------------------------- private volatile bool disposed = false; private int refreshInProgress = 0; // -- State ---------------------------------------------------------------- private readonly Dictionary priceCache = new Dictionary(); private readonly ConcurrentDictionary symbolFetchLocks = new ConcurrentDictionary(); private readonly object cacheLock = new object(); private DateTime latestDate = Utility.Epoch; // -- Background refresh --------------------------------------------------- private readonly TimeSpan cacheCycle = TimeSpan.FromMinutes(5); private Timer refreshTimer = null; private int tickCount = 0; // private const int evictionTickInterval = 12; // every 12 ticks x 5 min = 1 hour private const int evictionTickInterval = 144; // every 144 ticks x 5 min interval length = 12 hour. (144 intervals *5 minutes )/60 minutes = 12 hours // -- Parallelism ---------------------------------------------------------- private static readonly int maxParallelDbCalls = ResolveMaxParallelDbCalls(); /// /// /// /// public static GLPriceCache GetInstance() { lock (instanceLock) { if (instance == null)instance = new GLPriceCache(); return instance; } } private static int ResolveMaxParallelDbCalls() { return Math.Min(Math.Max(1, Environment.ProcessorCount) * 3, 32); // int @default = Math.Min(Math.Max(1, Environment.ProcessorCount) * 3, 32); // string configured = Environment.GetEnvironmentVariable("GL_PRICE_CACHE_PARALLEL_DB_CALLS"); // return int.TryParse(configured, out int parsed) && parsed > 0 ? parsed : @default; } // -- Constructor ---------------------------------------------------------- private GLPriceCache() { refreshTimer = new Timer(OnCacheRefreshTick, null, cacheCycle, cacheCycle); } /// /// Dispose /// public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } /// /// Dispose /// protected virtual void Dispose(bool disposing) { if (disposed) return; disposed = true; if (disposing) { Timer timerToDispose = null; lock (instanceLock) { timerToDispose = refreshTimer; refreshTimer = null; instance = null; } // Dispose timer immediately; no blocking wait timerToDispose?.Dispose(); MDTrace.WriteLine(LogLevel.DEBUG, "[GLPriceCache:Dispose] Disposed."); } } // -- Background tick ------------------------------------------------------ private void OnCacheRefreshTick(object state) { if (disposed) return; try { lock (cacheLock) { long count = CountInternal(); MDTrace.WriteLine(LogLevel.DEBUG,$"[GLPriceCache:Tick] Symbols: {priceCache.Keys.Count}. " + $"Items in cache: {Utility.FormatNumber(count, 0, true)}."); } if (++tickCount % evictionTickInterval == 0) { EvictStaleSymbols(); } } catch (Exception ex) { MDTrace.WriteLine(LogLevel.DEBUG,$"[GLPriceCache:Tick] [ERROR] Unhandled exception: {ex}"); } } // -- Eviction ------------------------------------------------------------- private void EvictStaleSymbols() { if (Interlocked.CompareExchange(ref refreshInProgress, 1, 0) == 1) { MDTrace.WriteLine(LogLevel.DEBUG, "[GLPriceCache:Evict] Skipped — refresh in progress."); return; } try { int count; lock (cacheLock) { count = priceCache.Count; priceCache.Clear(); symbolFetchLocks.Clear(); } MDTrace.WriteLine(LogLevel.DEBUG,$"[GLPriceCache:Evict] Cache cleared. {count} symbols evicted."); } finally { Interlocked.Exchange(ref refreshInProgress, 0); } } // -- Public API ----------------------------------------------------------- // public void Add(PortfolioTrades portfolioTrades) // { // List symbols = portfolioTrades.Symbols; // Dictionary minTradeDates = symbols.ToDictionary( // sym => sym, sym => portfolioTrades.GetMinTradeDate(sym)); // // Only open positions need an intraday price refresh. // // Closed positions, regardless of close date, have immutable prices — skip the DB call. // HashSet mutableSymbols = new HashSet( // symbols.Where(sym => portfolioTrades.HasOpenPositions(sym))); // Dictionary minCacheDates; // lock (cacheLock) // { // minCacheDates = symbols.ToDictionary( // sym => sym, // sym => priceCache.ContainsKey(sym) ? priceCache[sym].MinDate : DateTime.MaxValue); // } // ConcurrentDictionary fetchedPrices = new ConcurrentDictionary(); // ConcurrentDictionary latestPrices = new ConcurrentDictionary(); // Parallel.ForEach(symbols, new ParallelOptions { MaxDegreeOfParallelism = maxParallelDbCalls }, symbol => // { // if (disposed) return; // DateTime minTradeDate = minTradeDates[symbol]; // DateTime minCacheDate = minCacheDates[symbol]; // // Acquire a per-symbol lock to prevent concurrent clients from issuing // // duplicate DB fetches for the same symbol. First thread fetches; // // subsequent threads for the same symbol block here, then re-check // // the cache on entry and skip the fetch if already populated. // object symbolLock = symbolFetchLocks.GetOrAdd(symbol, _ => new object()); // lock (symbolLock) // { // try // { // // Re-check cache state after acquiring symbol lock — another // // client thread may have already fetched this symbol while we waited // lock (cacheLock) // { // if (priceCache.ContainsKey(symbol)) // minCacheDate = priceCache[symbol].MinDate; // } // // Historical fetch — only when cache is missing or incomplete // Prices prices = null; // if (minCacheDate == DateTime.MaxValue) // prices = PricingDA.GetPrices(symbol, minTradeDate); // else if (minTradeDate < minCacheDate) // prices = PricingDA.GetPrices(symbol, minCacheDate, minTradeDate); // if (prices != null && prices.Count > 0) // fetchedPrices[symbol] = prices; // // Intraday refresh — mutable symbols only // if (mutableSymbols.Contains(symbol)) // { // Price latestPrice = PricingDA.GetPrice(symbol); // if (latestPrice != null) // latestPrices[symbol] = latestPrice; // } // } // catch (Exception ex) // { // MDTrace.WriteLine(LogLevel.DEBUG, // $"[GLPriceCache:Add] [ERROR] Failed fetching prices for {symbol}: {ex.Message}"); // } // } // }); // lock (cacheLock) // { // // Historical prices — idempotent, will not overwrite existing entries // foreach (KeyValuePair kvp in fetchedPrices) // foreach (Price price in kvp.Value) // AddInternal(price); // // Latest prices — unconditional overwrite to capture intraday updates // foreach (KeyValuePair kvp in latestPrices) // { // PricesByDate pricesByDate; // if (!priceCache.TryGetValue(kvp.Key, out pricesByDate)) // { // pricesByDate = new PricesByDate(); // priceCache[kvp.Key] = pricesByDate; // } // if (pricesByDate.ContainsKey(kvp.Value.Date)) // pricesByDate.Remove(kvp.Value.Date); // pricesByDate.Add(kvp.Value.Date, kvp.Value); // } // } // MDTrace.WriteLine(LogLevel.DEBUG, // $"[GLPriceCache:Add] Symbols: {symbols.Count}, Mutable: {mutableSymbols.Count}, " + // $"Historical fetches: {fetchedPrices.Count}, Intraday updates: {latestPrices.Count}"); // } public void Add(PortfolioTrades portfolioTrades) { List symbols = portfolioTrades.Symbols; // Map each symbol to its earliest trade date Dictionary minTradeDates = symbols.ToDictionary(symbol => symbol, symbol => portfolioTrades.GetMinTradeDate(symbol)); // Only open positions need intraday price refresh HashSet mutableSymbols = new HashSet(symbols.Where(sym => portfolioTrades.HasOpenPositions(sym))); // Pre-filter symbols to skip fully cached immutable symbols List symbolsToProcess = symbols .Where(sym => { DateTime minCacheDate; lock (cacheLock) { minCacheDate = priceCache.ContainsKey(sym) ? priceCache[sym].MinDate : DateTime.MaxValue; } DateTime minTradeDate = minTradeDates[sym]; // Skip if symbol is immutable AND cache already covers all trade dates return mutableSymbols.Contains(sym) || minTradeDate < minCacheDate; }) .ToList(); if (symbolsToProcess.Count == 0) return; // nothing to do ConcurrentDictionary fetchedPrices = new ConcurrentDictionary(); ConcurrentDictionary latestPrices = new ConcurrentDictionary(); Parallel.ForEach(symbolsToProcess, new ParallelOptions { MaxDegreeOfParallelism = maxParallelDbCalls }, symbol => { if (disposed) return; DateTime minTradeDate = minTradeDates[symbol]; DateTime minCacheDate; lock (cacheLock) { minCacheDate = priceCache.ContainsKey(symbol) ? priceCache[symbol].MinDate : DateTime.MaxValue; } object symbolLock = symbolFetchLocks.GetOrAdd(symbol, _ => new object()); lock (symbolLock) { try { // Re-check cache after acquiring lock lock (cacheLock) { if (priceCache.ContainsKey(symbol))minCacheDate = priceCache[symbol].MinDate; } Prices prices = null; // Historical fetch only if cache is missing or incomplete if (minCacheDate == DateTime.MaxValue)prices = PricingDA.GetPrices(symbol, minTradeDate); else if (minTradeDate < minCacheDate)prices = PricingDA.GetPrices(symbol, minCacheDate, minTradeDate); if (prices != null && prices.Count > 0)fetchedPrices[symbol] = prices; // Intraday refresh for mutable symbols only if (mutableSymbols.Contains(symbol)) { Price latestPrice = PricingDA.GetPrice(symbol); if (latestPrice != null)latestPrices[symbol] = latestPrice; } } catch (Exception ex) { MDTrace.WriteLine(LogLevel.DEBUG,$"[GLPriceCache:Add] [ERROR] Failed fetching prices for {symbol}: {ex.Message}"); } } // lcok(symLock) }); // Parallel lock (cacheLock) { // Historical prices — idempotent, do not overwrite existing entries foreach (KeyValuePair kvp in fetchedPrices) { foreach (Price price in kvp.Value) { AddInternal(price); } } // Latest prices — unconditional overwrite to capture intraday updates foreach (KeyValuePair kvp in latestPrices) { PricesByDate pricesByDate; if (!priceCache.TryGetValue(kvp.Key, out pricesByDate)) { pricesByDate = new PricesByDate(); priceCache[kvp.Key] = pricesByDate; } if (pricesByDate.ContainsKey(kvp.Value.Date)) { pricesByDate.Remove(kvp.Value.Date); } pricesByDate.Add(kvp.Value.Date, kvp.Value); } } // Only log if we actually fetched or updated something if (fetchedPrices.Count > 0 || latestPrices.Count > 0) { MDTrace.WriteLine(LogLevel.DEBUG,$"[GLPriceCache:Add] Symbols processed: {symbolsToProcess.Count}, Mutable: {mutableSymbols.Count}, " + $"Historical fetches: {fetchedPrices.Count}, Intraday updates: {latestPrices.Count}"); } } /// /// Add prices /// /// public void Add(Prices prices) { foreach (Price price in prices)Add(price); } /// /// Add(List symbols, DateTime pricingDate) /// /// /// public void Add(List symbols, DateTime pricingDate) { if (symbols == null || symbols.Count == 0) return; ConcurrentDictionary fetchedPrices = new ConcurrentDictionary(); Parallel.ForEach(symbols, new ParallelOptions { MaxDegreeOfParallelism = maxParallelDbCalls }, symbol => { if (disposed) return; lock (cacheLock) { if (ContainsPriceInternal(symbol, pricingDate)) return; } try { Price price = PricingDA.GetPrice(symbol, pricingDate); if (price != null) fetchedPrices[symbol] = price; } catch (Exception ex) { MDTrace.WriteLine(LogLevel.DEBUG,$"[GLPriceCache:Add] [ERROR] Failed fetching price for {symbol} on {pricingDate:yyyy-MM-dd}: {ex.Message}"); } }); lock (cacheLock) { foreach (KeyValuePair kvp in fetchedPrices) { AddInternal(kvp.Value); } } } /// /// Add(Price price) /// /// public void Add(Price price) { lock (cacheLock) { AddInternal(price); } } /// /// Refresh /// public void Refresh() { if (Interlocked.CompareExchange(ref refreshInProgress, 1, 0) == 1) { MDTrace.WriteLine(LogLevel.DEBUG, "[GLPriceCache:Refresh] Skipped — refresh already in progress."); return; } try { List symbols; Dictionary currentMaxDates; lock (cacheLock) { symbols = priceCache.Keys.ToList(); currentMaxDates = priceCache.ToDictionary(kvp => kvp.Key, kvp => kvp.Value.MaxDate); } if (symbols.Count == 0) return; ConcurrentDictionary fullReloads = new ConcurrentDictionary(); ConcurrentDictionary singleUpdates = new ConcurrentDictionary(); // Fetch outside the cache lock — no timeout risk holding cacheLock over I/O Dictionary maxDbDates = PricingDA.GetLatestDates(symbols); DateTime latestDateFromDb = PricingDA.GetLatestDate(); Parallel.ForEach(symbols, new ParallelOptions { MaxDegreeOfParallelism = maxParallelDbCalls }, symbol => { if (disposed) return; DateTime cachedMax; if (!currentMaxDates.TryGetValue(symbol, out cachedMax)) return; try { DateTime dbMax; if (maxDbDates.TryGetValue(symbol, out dbMax) && dbMax.Date != cachedMax.Date) { Prices prices = PricingDA.GetPrices(symbol, cachedMax); if (prices != null) fullReloads[symbol] = prices.GetPricesByDate(); } else { Price price = PricingDA.GetPrice(symbol, cachedMax); if (price != null) singleUpdates[symbol] = price; } } catch (Exception ex) { MDTrace.WriteLine(LogLevel.DEBUG,$"[GLPriceCache:Refresh] [ERROR] Failed refreshing {symbol}: {ex.Message}"); } }); // Parallel lock (cacheLock) { latestDate = latestDateFromDb; foreach (KeyValuePair kvp in fullReloads) { PricesByDate existing; if (priceCache.TryGetValue(kvp.Key, out existing) && existing.MaxDate == currentMaxDates[kvp.Key]) { priceCache[kvp.Key] = kvp.Value; } } foreach (KeyValuePair kvp in singleUpdates) { PricesByDate pricesByDate; if (priceCache.TryGetValue(kvp.Key, out pricesByDate) && pricesByDate.MaxDate == currentMaxDates[kvp.Key]) { if (pricesByDate.ContainsKey(kvp.Value.Date)) { pricesByDate.Remove(kvp.Value.Date); } pricesByDate.Add(kvp.Value.Date, kvp.Value); } } } // lock(cacheLock) MDTrace.WriteLine(LogLevel.DEBUG,$"[GLPriceCache:Refresh] Full reloads: {fullReloads.Count}, Single updates: {singleUpdates.Count}"); } finally { Interlocked.Exchange(ref refreshInProgress, 0); } } /// /// GetLatestDate /// /// public DateTime GetLatestDate() { lock (cacheLock) { if (Utility.IsEpoch(latestDate)) { latestDate = PricingDA.GetLatestDate(); } return latestDate; } } /// /// RefreshLatestDate /// public void RefreshLatestDate() { lock (cacheLock) { latestDate = PricingDA.GetLatestDate(); } } /// /// GetMinCacheDate /// /// /// public DateTime GetMinCacheDate(string symbol) { lock (cacheLock) { PricesByDate symbolPrices; if (!priceCache.TryGetValue(symbol, out symbolPrices) || symbolPrices.Count == 0) { return Utility.Epoch; } return symbolPrices.MinDate; } } /// /// GetPrices /// /// /// /// /// public Prices GetPrices(string symbol, DateTime endDate, int dayCount) { lock (cacheLock) { PricesByDate pricesByDate; if (!priceCache.TryGetValue(symbol, out pricesByDate)) return new Prices(); DateGenerator dateGenerator = new DateGenerator(); List historicalDates = dateGenerator.GenerateHistoricalDates(endDate, dayCount); Prices result = new Prices(); foreach (DateTime date in historicalDates) { if (pricesByDate.ContainsKey(date)) { result.Add(pricesByDate[date]); } } return result; } // lock(cacheLock) } /// /// GetPrice /// /// /// /// public Price GetPrice(string symbol, DateTime date) { lock (cacheLock) { PricesByDate pricesByDate; if (!priceCache.TryGetValue(symbol, out pricesByDate)) return null; Price price; return pricesByDate.TryGetValue(date, out price) ? price : null; } } /// /// ContainsPrice /// /// /// /// public bool ContainsPrice(string symbol, DateTime date) { lock (cacheLock) { return ContainsPriceInternal(symbol, date); } } /// /// ContainsPrice /// /// /// /// public bool ContainsPrice(List symbols, DateTime date) { if (symbols == null || symbols.Count == 0) return false; lock (cacheLock) { foreach (string symbol in symbols) { if (!ContainsPriceInternal(symbol, date)) return false; } return true; } } /// /// ContainsSymbol /// /// /// public bool ContainsSymbol(string symbol) { lock (cacheLock) { return priceCache.ContainsKey(symbol); } } // -- Private helpers ------------------------------------------------------ // Must be called under cacheLock private void AddInternal(Price price) { if (price == null) return; PricesByDate pricesByDate; if (!priceCache.TryGetValue(price.Symbol, out pricesByDate)) { pricesByDate = new PricesByDate(); priceCache[price.Symbol] = pricesByDate; } if (!pricesByDate.ContainsKey(price.Date)) { pricesByDate.Add(price.Date, price); } } /// /// ContainsPriceInternal - mUST BE CALLED UNDER CACHELOCK /// /// /// /// private bool ContainsPriceInternal(string symbol, DateTime date) { PricesByDate pricesByDate; if (!priceCache.TryGetValue(symbol, out pricesByDate)) return false; return pricesByDate.ContainsKey(date); } /// /// CountInternal - MUST BE CALLED UNDER CACHELOCK /// /// private long CountInternal() { long count = 0; foreach (PricesByDate pricesByDate in priceCache.Values) { count += pricesByDate.Count; } return count; } } }