using System; using System.Collections.Generic; using System.Linq; using System.Threading; using MarketData.MarketDataModel; using MarketData.Utils; using MarketData.Helper; using MarketData.Numerical; using MarketData.DataAccess; namespace MarketData.Cache { public interface IPricingDataAccess { Price GetPrice(string symbol, DateTime date); Prices GetPrices(string symbol, DateTime maxDate, DateTime minDate); DateTime GetLatestDateOnOrBefore(string symbol, DateTime date); } internal class RealPricingDA : IPricingDataAccess { public Price GetPrice(string symbol, DateTime date) => PricingDA.GetPrice(symbol, date); public Prices GetPrices(string symbol, DateTime maxDate, DateTime minDate) => PricingDA.GetPrices(symbol, maxDate, minDate); public DateTime GetLatestDateOnOrBefore(string symbol, DateTime date) => PricingDA.GetLatestDateOnOrBefore(symbol, date); } internal class CacheSnapshot { public Dictionary PriceCache { get; } public Dictionary RealTimePriceCache { get; } public Dictionary NullCache { get; } public CacheSnapshot( Dictionary priceCache, Dictionary realTimePriceCache, Dictionary nullCache) { PriceCache = priceCache; RealTimePriceCache = realTimePriceCache; NullCache = nullCache; } } public class GBPriceCache : IDisposable { private static readonly int EVICTION_DAYCOUNT=252; // upon eviction trigger remove all data older than maxdate - evictionPolicyThreshholdDays private Thread cacheMonitorThread = null; private volatile bool threadRun = true; private Object thisLock = new Object(); private CacheSnapshot snapshot; private DateGenerator dateGenerator = new DateGenerator(); private static GBPriceCache priceCacheInstance = null; private int cacheRefreshAfter = 120000; // 2 minutes private SemaphoreSlim fetchSemaphore = new SemaphoreSlim(8); // max 8 concurrent DB fetches public IPricingDataAccess PricingDataAccess { get; set; } = new RealPricingDA(); protected GBPriceCache() { snapshot = new CacheSnapshot(new Dictionary(), new Dictionary(), new Dictionary()); cacheMonitorThread = new Thread(new ThreadStart(ThreadProc)); cacheMonitorThread.Start(); } /// /// GetInstance /// /// /// public static GBPriceCache GetInstance() { lock (typeof(GBPriceCache)) { if (null == priceCacheInstance) { priceCacheInstance = new GBPriceCache(); } return priceCacheInstance; } } public void Clear() { lock (thisLock) { snapshot = new CacheSnapshot(new Dictionary(), new Dictionary(), new Dictionary()); } } /// /// Shuts down the cache and stops the monitor thread. This is a full application-level /// shutdown. Callers should always access the cache via GetInstance() and not hold /// long-lived references to the instance. /// public void Dispose() { lock (thisLock) { if (null == priceCacheInstance || !threadRun) return; threadRun = false; if (null != cacheMonitorThread) { MDTrace.WriteLine(LogLevel.DEBUG, "[GBPriceCache:Dispose] Joining monitor thread..."); cacheMonitorThread.Join(5000); cacheMonitorThread = null; } priceCacheInstance = null; } } public Price GetPriceOrLatestAvailable(String symbol, DateTime date) { Price price = GetPrice(symbol, date); if (null != price) return price; DateTime latestPricingDate = PricingDataAccess.GetLatestDateOnOrBefore(symbol, date); price = GetPrice(symbol, latestPricingDate); if (null != price) return price; fetchSemaphore.Wait(); try { price = PricingDataAccess.GetPrice(symbol, latestPricingDate); } finally { fetchSemaphore.Release(); } if (null !=price) AddPrice(price); return price; } public Price GetRealtimePrice(String symbol) { if (snapshot.RealTimePriceCache.ContainsKey(symbol)) { return snapshot.RealTimePriceCache[symbol]; } Price price = MarketDataHelper.GetLatestPrice(symbol); if (null != price) { Dictionary newRealtime = new Dictionary(snapshot.RealTimePriceCache); newRealtime.Add(symbol, price); UpdateSnapshot(snapshot.PriceCache, newRealtime, snapshot.NullCache); } return price; } public Price GetPrice(String symbol, DateTime date) { date = date.Date; if (!ContainsPrice(symbol, date)) { String key = symbol + Utility.DateTimeToStringMMHDDHYYYY(date); if (snapshot.NullCache.ContainsKey(key)) { return null; } fetchSemaphore.Wait(); Price price; try { price = PricingDataAccess.GetPrice(symbol, date); } finally { fetchSemaphore.Release(); } if (null ==price) { Dictionary newNullCache = new Dictionary(snapshot.NullCache); newNullCache.Add(key, true); UpdateSnapshot(snapshot.PriceCache, snapshot.RealTimePriceCache, newNullCache); return null; } AddPrice(price); } if (!snapshot.PriceCache.ContainsKey(symbol)) return null; PricesByDate pricesByDate = snapshot.PriceCache[symbol]; if (!pricesByDate.ContainsKey(date)) return null; return pricesByDate[date]; } public Prices GetPrices(String symbol, DateTime earlierDate, DateTime laterDate) { DateGenerator localDateGenerator = new DateGenerator(); if (laterDate < earlierDate) { DateTime tempDate = earlierDate; earlierDate = laterDate; laterDate = tempDate; } List datesList = localDateGenerator.GenerateHistoricalDates(earlierDate, laterDate); datesList = datesList.Where(x => x >= earlierDate).ToList(); return GetPrices(symbol, laterDate, datesList.Count); } public Prices GetPrices(String symbol, DateTime startDate, int dayCount) { List historicalDates = dateGenerator.GenerateHistoricalDates(startDate, dayCount + 60); List missingDates = new List(); foreach (DateTime historicalDate in historicalDates) { if (!ContainsPrice(symbol, historicalDate)) { String key = symbol + Utility.DateTimeToStringMMHDDHYYYY(historicalDate); if (!snapshot.NullCache.ContainsKey(key)) { missingDates.Add(historicalDate); } } } if (missingDates.Count > 0) { DateTime minDate = missingDates.Min(); DateTime maxDate = missingDates.Max(); fetchSemaphore.Wait(); Prices loadedPrices; try { loadedPrices = PricingDataAccess.GetPrices(symbol, maxDate, minDate); } finally { fetchSemaphore.Release(); } foreach (Price price in loadedPrices) { AddPrice(price); } } Prices prices = new Prices(); foreach (DateTime historicalDate in historicalDates) { if (!snapshot.PriceCache.ContainsKey(symbol)) continue; PricesByDate pricesByDate = snapshot.PriceCache[symbol]; if (!pricesByDate.ContainsKey(historicalDate)) continue; prices.Add(pricesByDate[historicalDate]); } List ordered = prices.OrderByDescending(x => x.Date).ToList(); return new Prices(ordered.Take(dayCount).ToList()); } private void AddPrice(Price price) { if (null == price) return; lock (thisLock) { PricesByDate pricesByDate; if (!snapshot.PriceCache.ContainsKey(price.Symbol)) { pricesByDate = new PricesByDate(); pricesByDate.Add(price.Date, price); Dictionary newCache = new Dictionary(snapshot.PriceCache); newCache.Add(price.Symbol, pricesByDate); UpdateSnapshot(newCache, snapshot.RealTimePriceCache, snapshot.NullCache); } else { pricesByDate = snapshot.PriceCache[price.Symbol]; if (!pricesByDate.ContainsKey(price.Date)) { pricesByDate.Add(price.Date, price); } } } } public bool ContainsPrice(String symbol, DateTime date) { if (!snapshot.PriceCache.ContainsKey(symbol)) return false; PricesByDate pricesByDate = snapshot.PriceCache[symbol]; return pricesByDate.ContainsKey(date); } public void ClearCacheOnOrBefore(DateTime onOrBeforeDate, bool collect = false) { lock (thisLock) { UpdateSnapshot(BuildEvictedPriceCache(onOrBeforeDate), snapshot.RealTimePriceCache, new Dictionary()); if (collect) GC.Collect(); } } private Dictionary BuildEvictedPriceCache(DateTime onOrBeforeDate) { Dictionary newPriceCache = new Dictionary(); foreach (KeyValuePair entry in snapshot.PriceCache) { String symbol = entry.Key; PricesByDate filteredPrices = new PricesByDate(); PricesByDate existingPrices = entry.Value; foreach (KeyValuePair kvp in existingPrices) { if (kvp.Key >= onOrBeforeDate) { filteredPrices.Add(kvp.Key, kvp.Value); } } if (filteredPrices.Count > 0) { newPriceCache.Add(symbol, filteredPrices); } } return newPriceCache; } private void ThreadProc() { int quantums = 0; int quantumInterval = 1000; while (threadRun) { Thread.Sleep(quantumInterval); if (!threadRun) break; quantums += quantumInterval; if (quantums > cacheRefreshAfter) { quantums = 0; lock (thisLock) { DateTime maxDate = snapshot.PriceCache.Values.SelectMany(p => p.Keys).DefaultIfEmpty(DateTime.MinValue).Max(); if (maxDate != DateTime.MinValue) { DateTime evictBefore = dateGenerator.GenerateHistoricalDates(maxDate, EVICTION_DAYCOUNT).Min(); int beforeCount = snapshot.PriceCache.Values.Sum(p => p.Count); Dictionary newCache = BuildEvictedPriceCache(evictBefore); int afterCount = newCache.Values.Sum(p => p.Count); int removed = beforeCount - afterCount; MDTrace.WriteLine(LogLevel.DEBUG, $"GBPriceCache eviction: removed {removed} prices (before={beforeCount}, after={afterCount}) on or before {evictBefore.ToShortDateString()}"); UpdateSnapshot(newCache, new Dictionary(), new Dictionary()); GC.Collect(); } } } } } private void UpdateSnapshot(Dictionary newPriceCache,Dictionary newRealtimePriceCache, Dictionary newNullCache) { snapshot = new CacheSnapshot(newPriceCache, newRealtimePriceCache, newNullCache); } } }