Files
marketdata/MarketDataLib/Cache/GBPriceCache.cs

324 lines
10 KiB
C#

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using MarketData.MarketDataModel;
using MarketData.Utils;
using MarketData.Helper;
using MarketData.Numerical;
using MarketData.DataAccess;
namespace MarketData.Cache
{
public interface IPricingDataAccess
{
Price GetPrice(string symbol, DateTime date);
Prices GetPrices(string symbol, DateTime maxDate, DateTime minDate);
DateTime GetLatestDateOnOrBefore(string symbol, DateTime date);
}
internal class RealPricingDA : IPricingDataAccess
{
public Price GetPrice(string symbol, DateTime date) => PricingDA.GetPrice(symbol, date);
public Prices GetPrices(string symbol, DateTime maxDate, DateTime minDate) => PricingDA.GetPrices(symbol, maxDate, minDate);
public DateTime GetLatestDateOnOrBefore(string symbol, DateTime date) => PricingDA.GetLatestDateOnOrBefore(symbol, date);
}
internal class CacheSnapshot
{
public Dictionary<String, PricesByDate> PriceCache { get; }
public Dictionary<String, Price> RealTimePriceCache { get; }
public Dictionary<String, bool> NullCache { get; }
public CacheSnapshot(
Dictionary<String, PricesByDate> priceCache,
Dictionary<String, Price> realTimePriceCache,
Dictionary<String, bool> nullCache)
{
PriceCache = priceCache;
RealTimePriceCache = realTimePriceCache;
NullCache = nullCache;
}
}
public class GBPriceCache : IDisposable
{
private Thread cacheMonitorThread = null;
private volatile bool threadRun = true;
private Object thisLock = new Object();
private CacheSnapshot snapshot;
private DateGenerator dateGenerator = new DateGenerator();
private static GBPriceCache priceCacheInstance = null;
private int cacheRefreshAfter = 120000; // 2 minutes
private SemaphoreSlim fetchSemaphore = new SemaphoreSlim(8); // max 8 concurrent DB fetches
public IPricingDataAccess PricingDataAccess { get; set; } = new RealPricingDA();
protected GBPriceCache()
{
snapshot = new CacheSnapshot(new Dictionary<String, PricesByDate>(), new Dictionary<String, Price>(), new Dictionary<String, bool>());
cacheMonitorThread = new Thread(new ThreadStart(ThreadProc));
cacheMonitorThread.Start();
}
public static GBPriceCache GetInstance()
{
lock (typeof(GBPriceCache))
{
if (null == priceCacheInstance)
{
priceCacheInstance = new GBPriceCache();
}
return priceCacheInstance;
}
}
public void Clear()
{
lock (thisLock)
{
snapshot = new CacheSnapshot(new Dictionary<String, PricesByDate>(), new Dictionary<String, Price>(), new Dictionary<String, bool>());
}
}
public void Dispose()
{
lock (thisLock)
{
if (null == priceCacheInstance || !threadRun) return;
threadRun = false;
if (null != cacheMonitorThread)
{
MDTrace.WriteLine(LogLevel.DEBUG, "[GBPriceCache:Dispose] Joining monitor thread...");
cacheMonitorThread.Join(5000);
cacheMonitorThread = null;
}
priceCacheInstance = null;
}
}
public void ClearCacheOnOrBefore(DateTime onOrBeforeDate, bool collect = false)
{
lock (thisLock)
{
Dictionary<String, PricesByDate> newPriceCache = new Dictionary<String, PricesByDate>();
foreach (KeyValuePair<String, PricesByDate> entry in snapshot.PriceCache)
{
String symbol = entry.Key;
PricesByDate filteredPrices = new PricesByDate();
PricesByDate existingPrices = entry.Value;
foreach (KeyValuePair<DateTime, Price> kvp in existingPrices)
{
if (kvp.Key >= onOrBeforeDate)
{
filteredPrices.Add(kvp.Key, kvp.Value);
}
}
if (filteredPrices.Count > 0)
{
newPriceCache.Add(symbol, filteredPrices);
}
}
UpdateSnapshot(newPriceCache, snapshot.RealTimePriceCache, snapshot.NullCache);
if (collect) GC.Collect();
}
}
public Price GetPriceOrLatestAvailable(String symbol, DateTime date)
{
Price price = GetPrice(symbol, date);
if (null != price) return price;
DateTime latestPricingDate = PricingDataAccess.GetLatestDateOnOrBefore(symbol, date);
price = GetPrice(symbol, latestPricingDate);
if (null != price) return price;
fetchSemaphore.Wait();
try
{
price = PricingDataAccess.GetPrice(symbol, latestPricingDate);
}
finally
{
fetchSemaphore.Release();
}
if (null !=price) AddPrice(price);
return price;
}
public Price GetRealtimePrice(String symbol)
{
if (snapshot.RealTimePriceCache.ContainsKey(symbol))
{
return snapshot.RealTimePriceCache[symbol];
}
Price price = MarketDataHelper.GetLatestPrice(symbol);
if (null != price)
{
Dictionary<String, Price> newRealtime = new Dictionary<String, Price>(snapshot.RealTimePriceCache);
newRealtime.Add(symbol, price);
UpdateSnapshot(snapshot.PriceCache, newRealtime, snapshot.NullCache);
}
return price;
}
public Price GetPrice(String symbol, DateTime date)
{
date = date.Date;
if (!ContainsPrice(symbol, date))
{
String key = symbol + Utility.DateTimeToStringMMHDDHYYYY(date);
if (snapshot.NullCache.ContainsKey(key))
{
return null;
}
fetchSemaphore.Wait();
Price price;
try
{
price = PricingDataAccess.GetPrice(symbol, date);
}
finally
{
fetchSemaphore.Release();
}
if (null ==price)
{
Dictionary<String, bool> newNullCache = new Dictionary<String, bool>(snapshot.NullCache);
newNullCache.Add(key, true);
UpdateSnapshot(snapshot.PriceCache, snapshot.RealTimePriceCache, newNullCache);
return null;
}
AddPrice(price);
}
if (!snapshot.PriceCache.ContainsKey(symbol)) return null;
PricesByDate pricesByDate = snapshot.PriceCache[symbol];
if (!pricesByDate.ContainsKey(date)) return null;
return pricesByDate[date];
}
public Prices GetPrices(String symbol, DateTime earlierDate, DateTime laterDate)
{
DateGenerator localDateGenerator = new DateGenerator();
if (laterDate < earlierDate)
{
DateTime tempDate = earlierDate;
earlierDate = laterDate;
laterDate = tempDate;
}
List<DateTime> datesList = localDateGenerator.GenerateHistoricalDates(earlierDate, laterDate);
datesList = datesList.Where(x => x >= earlierDate).ToList();
return GetPrices(symbol, laterDate, datesList.Count);
}
public Prices GetPrices(String symbol, DateTime startDate, int dayCount)
{
List<DateTime> historicalDates = dateGenerator.GenerateHistoricalDates(startDate, dayCount + 60);
List<DateTime> missingDates = new List<DateTime>();
foreach (DateTime historicalDate in historicalDates)
{
if (!ContainsPrice(symbol, historicalDate))
{
String key = symbol + Utility.DateTimeToStringMMHDDHYYYY(historicalDate);
if (!snapshot.NullCache.ContainsKey(key))
{
missingDates.Add(historicalDate);
}
}
}
if (missingDates.Count > 0)
{
DateTime minDate = missingDates.Min();
DateTime maxDate = missingDates.Max();
fetchSemaphore.Wait();
Prices loadedPrices;
try
{
loadedPrices = PricingDataAccess.GetPrices(symbol, maxDate, minDate);
}
finally
{
fetchSemaphore.Release();
}
foreach (Price price in loadedPrices)
{
AddPrice(price);
}
}
Prices prices = new Prices();
foreach (DateTime historicalDate in historicalDates)
{
if (!snapshot.PriceCache.ContainsKey(symbol)) continue;
PricesByDate pricesByDate = snapshot.PriceCache[symbol];
if (!pricesByDate.ContainsKey(historicalDate)) continue;
prices.Add(pricesByDate[historicalDate]);
}
List<Price> ordered = prices.OrderByDescending(x => x.Date).ToList();
return new Prices(ordered.Take(dayCount).ToList());
}
private void AddPrice(Price price)
{
if (null == price) return;
lock (thisLock)
{
PricesByDate pricesByDate;
if (!snapshot.PriceCache.ContainsKey(price.Symbol))
{
pricesByDate = new PricesByDate();
pricesByDate.Add(price.Date, price);
Dictionary<String, PricesByDate> newCache = new Dictionary<String, PricesByDate>(snapshot.PriceCache);
newCache.Add(price.Symbol, pricesByDate);
UpdateSnapshot(newCache, snapshot.RealTimePriceCache, snapshot.NullCache);
}
else
{
pricesByDate = snapshot.PriceCache[price.Symbol];
if (!pricesByDate.ContainsKey(price.Date))
{
pricesByDate.Add(price.Date, price);
}
}
}
}
public bool ContainsPrice(String symbol, DateTime date)
{
if (!snapshot.PriceCache.ContainsKey(symbol)) return false;
PricesByDate pricesByDate = snapshot.PriceCache[symbol];
return pricesByDate.ContainsKey(date);
}
private void ThreadProc()
{
int quantums = 0;
int quantumInterval = 1000;
while (threadRun)
{
Thread.Sleep(quantumInterval);
if(!threadRun)break;
quantums += quantumInterval;
if (quantums > cacheRefreshAfter)
{
quantums = 0;
lock (thisLock)
{
UpdateSnapshot(snapshot.PriceCache, new Dictionary<String, Price>(), snapshot.NullCache);
}
}
}
}
private void UpdateSnapshot(Dictionary<String, PricesByDate> newPriceCache,Dictionary<String, Price> newRealtimePriceCache, Dictionary<String, bool> newNullCache)
{
snapshot = new CacheSnapshot(newPriceCache, newRealtimePriceCache, newNullCache);
}
}
}