using MarketData.MarketDataModel; using MarketData.Utils; using MarketData.DataAccess; using System.Collections.Concurrent; using System.Threading; using System.Collections.Generic; using System; using System.Threading.Tasks; using System.Linq; namespace MarketData.Cache { /// /// This cache is used in the GainLoss Generators /// public class GLPriceCache { private Dictionary priceCache = new Dictionary(); private static GLPriceCache instance = null; private DateTime latestDate = Utility.Epoch; private Thread cacheMonitorThread = null; private volatile bool threadRun = true; private int cacheCycle = 300000; private object thisLock = new object(); private object fetchLock = new object(); private GLPriceCache() { cacheMonitorThread = new Thread(new ThreadStart(ThreadProc)); cacheMonitorThread.Start(); } public void Clear() { lock (thisLock) { priceCache = new Dictionary(); RefreshLatestDate(); } } public void Dispose() { Thread threadToJoin = null; lock (thisLock) { if (instance == null || !threadRun) return; threadRun = false; threadToJoin = cacheMonitorThread; cacheMonitorThread = null; instance = null; } if (threadToJoin != null) { MDTrace.WriteLine(LogLevel.DEBUG, $"[GLPriceCache:Dispose] Thread state is '{Utility.ThreadStateToString(threadToJoin)}'. Joining..."); threadToJoin.Join(5000); } MDTrace.WriteLine(LogLevel.DEBUG, "[GLPriceCache:Dispose] End"); } public static GLPriceCache GetInstance() { lock (typeof(LocalPriceCache)) { if (instance == null) { instance = new GLPriceCache(); } return instance; } } public void RefreshLatestDate() { lock (thisLock) { latestDate = PricingDA.GetLatestDate(); } } public DateTime GetLatestDate() { lock (thisLock) { if (Utility.IsEpoch(latestDate)) { RefreshLatestDate(); } return latestDate; } } public void Refresh() { List symbols; Dictionary currentMaxDates; lock (thisLock) { symbols = priceCache.Keys.ToList(); currentMaxDates = priceCache.ToDictionary(kvp => kvp.Key, kvp => kvp.Value.MaxDate); } if (symbols.Count == 0) return; ConcurrentDictionary fullReloads = new ConcurrentDictionary(); ConcurrentDictionary singleUpdates = new ConcurrentDictionary(); DateTime latestDateFromDb; lock (fetchLock) { Dictionary maxDbDates = PricingDA.GetLatestDates(symbols); latestDateFromDb = PricingDA.GetLatestDate(); Parallel.ForEach(symbols, new ParallelOptions { MaxDegreeOfParallelism = 8 }, symbol => { DateTime cachedMax; if (!currentMaxDates.TryGetValue(symbol, out cachedMax)) return; DateTime dbMax; if (maxDbDates.TryGetValue(symbol, out dbMax) && dbMax.Date != cachedMax.Date) { Prices prices = PricingDA.GetPrices(symbol, cachedMax); if (prices != null) fullReloads[symbol] = prices.GetPricesByDate(); } else { Price price = PricingDA.GetPrice(symbol, cachedMax); if (price != null) singleUpdates[symbol] = price; } }); } lock (thisLock) { latestDate = latestDateFromDb; foreach (KeyValuePair kvp in fullReloads) { if (priceCache.TryGetValue(kvp.Key, out PricesByDate existing) && existing.MaxDate == currentMaxDates[kvp.Key]) { priceCache[kvp.Key] = kvp.Value; } } foreach (KeyValuePair kvp in singleUpdates) { if (priceCache.TryGetValue(kvp.Key, out PricesByDate pricesByDate) && pricesByDate.MaxDate == currentMaxDates[kvp.Key]) { if (pricesByDate.ContainsKey(kvp.Value.Date)) pricesByDate.Remove(kvp.Value.Date); pricesByDate.Add(kvp.Value.Date, kvp.Value); } } } MDTrace.WriteLine(LogLevel.DEBUG, $"Full reloads: {fullReloads.Count}, Single updates: {singleUpdates.Count}"); } public void Add(PortfolioTrades portfolioTrades) { List symbols = portfolioTrades.Symbols; DateTime today = DateTime.Today; Dictionary minTradeDates = symbols.ToDictionary(symbol => symbol, symbol => portfolioTrades.GetMinTradeDate(symbol)); // Symbols that need an intraday refresh: // - open positions (no close date), or // *** REMOVED THIS - closed today (close price may still be settling) TODO ***** HashSet mutableSymbols = new HashSet(symbols.Where(symbol => portfolioTrades.HasOpenPositions(symbol))); Dictionary minCacheDates; lock (thisLock) { minCacheDates = symbols.ToDictionary(symbol => symbol,symbol => priceCache.ContainsKey(symbol) ? priceCache[symbol].MinDate : DateTime.MaxValue); } ConcurrentDictionary fetchedPrices = new ConcurrentDictionary(); ConcurrentDictionary latestPrices = new ConcurrentDictionary(); Parallel.ForEach(symbols, new ParallelOptions { MaxDegreeOfParallelism = 8 }, symbol => { DateTime minTradeDate = minTradeDates[symbol]; DateTime minCacheDate = minCacheDates[symbol]; try { // Historical fetch � only when cache is missing or incomplete Prices prices = null; if (minCacheDate == DateTime.MaxValue) { prices = PricingDA.GetPrices(symbol, minTradeDate); } else if (minTradeDate < minCacheDate) { prices = PricingDA.GetPrices(symbol, minCacheDate, minTradeDate); } if (prices != null && prices.Count > 0) { fetchedPrices[symbol] = prices; } // Intraday refresh open positions and positions closed today only if (mutableSymbols.Contains(symbol)) { Price latestPrice = PricingDA.GetPrice(symbol); if (latestPrice != null) latestPrices[symbol] = latestPrice; } } catch (Exception ex) { MDTrace.WriteLine(LogLevel.DEBUG, $"Error fetching prices for {symbol}: {ex.Message}"); } }); lock (thisLock) { // Historical prices idempotent, will not overwrite existing entries foreach (var kvp in fetchedPrices) { foreach (var price in kvp.Value) { Add(price); } } // Latest prices unconditional overwrite to capture any intraday updates foreach (var kvp in latestPrices) { if (!priceCache.TryGetValue(kvp.Key, out var pricesByDate)) { pricesByDate = new PricesByDate(); priceCache[kvp.Key] = pricesByDate; } if (pricesByDate.ContainsKey(kvp.Value.Date)) pricesByDate.Remove(kvp.Value.Date); pricesByDate.Add(kvp.Value.Date, kvp.Value); } } MDTrace.WriteLine(LogLevel.DEBUG, $"[GLPriceCache:Add] Symbols: {symbols.Count}, Mutable: {mutableSymbols.Count}, " + $"Historical fetches: {fetchedPrices.Count}, Intraday updates: {latestPrices.Count}"); } public void Add(Prices prices) { foreach (Price price in prices) { Add(price); } } public void Add(List symbols, DateTime pricingDate) { if (symbols == null || symbols.Count == 0) return; ConcurrentDictionary fetchedPrices = new ConcurrentDictionary(); Parallel.ForEach(symbols, new ParallelOptions { MaxDegreeOfParallelism = 8 }, symbol => { lock (thisLock) { if (ContainsPrice(symbol, pricingDate)) return; } try { Price price = PricingDA.GetPrice(symbol, pricingDate); if (price != null) fetchedPrices[symbol] = price; } catch (Exception ex) { MDTrace.WriteLine(LogLevel.DEBUG, $"GLPriceCache: Error fetching price for {symbol} on {pricingDate:yyyy-MM-dd}: {ex.Message}"); } }); lock (thisLock) { foreach (KeyValuePair kvp in fetchedPrices) { Add(kvp.Value); } } } public void Add(Price price) { if (price == null) return; lock (thisLock) { PricesByDate pricesByDate; if (!priceCache.TryGetValue(price.Symbol, out pricesByDate)) { pricesByDate = new PricesByDate(); priceCache[price.Symbol] = pricesByDate; } if (!pricesByDate.ContainsKey(price.Date)) { pricesByDate.Add(price.Date, price); } } } public DateTime GetMinCacheDate(string symbol) { lock (thisLock) { PricesByDate symbolPrices; if (!priceCache.TryGetValue(symbol, out symbolPrices) || symbolPrices.Count == 0) { return Utility.Epoch; } return symbolPrices.MinDate; } } public void RemoveDate(DateTime date) { lock (thisLock) { foreach (KeyValuePair kvp in priceCache) { kvp.Value.Remove(date); } } } public Prices GetPrices(string symbol, DateTime endDate, int dayCount) { lock (thisLock) { PricesByDate pricesByDate; if (!priceCache.TryGetValue(symbol, out pricesByDate)) return new Prices(); DateGenerator dateGenerator = new DateGenerator(); List historicalDates = dateGenerator.GenerateHistoricalDates(endDate, dayCount); Prices result = new Prices(); foreach (DateTime date in historicalDates) { if (pricesByDate.ContainsKey(date)) { result.Add(pricesByDate[date]); } } return result; } } public Price GetPrice(string symbol, DateTime date) { lock (thisLock) { PricesByDate pricesByDate; if (!priceCache.TryGetValue(symbol, out pricesByDate)) return null; Price price; return pricesByDate.TryGetValue(date, out price) ? price : null; } } public bool ContainsPrice(string symbol, DateTime date) { lock (thisLock) { PricesByDate pricesByDate; if (!priceCache.TryGetValue(symbol, out pricesByDate)) return false; return pricesByDate.ContainsKey(date); } } public bool ContainsPrice(List symbols, DateTime date) { if (symbols == null || symbols.Count == 0) return false; lock (thisLock) { foreach (string symbol in symbols) { PricesByDate pricesByDate; if (!priceCache.TryGetValue(symbol, out pricesByDate) || !pricesByDate.ContainsKey(date)) { return false; } } return true; } } public bool ContainsSymbol(string symbol) { lock (thisLock) { return priceCache.ContainsKey(symbol); } } private long Count() { lock (thisLock) { long count = 0; foreach (PricesByDate pricesByDate in priceCache.Values) { count += pricesByDate.Count; } return count; } } private void ThreadProc() { int quantums = 0; int quantumInterval = 1000; long lastCount = 0; while (threadRun) { Thread.Sleep(quantumInterval); if(!threadRun)break; quantums += quantumInterval; if (quantums > cacheCycle) { quantums = 0; lock (thisLock) { lastCount = Count(); MDTrace.WriteLine(LogLevel.DEBUG, $"[GLPriceCache:ThreadProc] Symbols: {priceCache.Keys.Count}. Items in cache: {Utility.FormatNumber(lastCount,0,true)}."); } } } MDTrace.WriteLine(LogLevel.DEBUG, $"[GLPriceCache:ThreadProc] Thread ended. Items in cache:{Utility.FormatNumber(lastCount,0,true)}"); } } }