using MarketData.MarketDataModel; using MarketData.DataAccess; using MarketData.Utils; namespace MarketData.Helper { public class AnalystRatingsThreadHelper : ThreadHelper { public AnalystRatingsThreadHelper(String symbol, DateTime modified, DateTime maxAnalystRatingsDate, ManualResetEvent resetEvent) { Symbol = symbol; Modified = modified; ResetEvent = resetEvent; MaxAnalystRatingsDate = maxAnalystRatingsDate; } public DateTime MaxAnalystRatingsDate { get; private set; } } // **************************************************************************************************************************************** public class AnalystRatingsMarketDataHelper : MarketDataHelperBase { private static int MaxThreads = 10; private static int SLEEP_TIME_MS = 500; public AnalystRatingsMarketDataHelper() { } public bool UpdateAnalystRatings() { return false; } public bool UpdateMissingAnalystRatings() { Profiler profiler = new Profiler(); try { List symbols = PricingDA.GetSymbols(); DateTime maxRatingsDate = AnalystRatingsDA.GetMaxDateNoZacks(); Queue = symbols; // ShuffleQueue(); // shuffle up the symbols Index = -1; ManualResetEvent[] resetEvents = new ManualResetEvent[MaxThreads]; for (int eventIndex = 0; eventIndex < resetEvents.Length; eventIndex++) resetEvents[eventIndex] = new ManualResetEvent(true); MDTrace.WriteLine(String.Format("Queuing UpdateMissingAnalystRatings fetches ...")); DateTime modified = DateTime.Now; while (true) { ManualResetEvent[] availableEvents = GetAvailableEvents(resetEvents); ManualResetEvent[] busyEvents = GetBusyEvents(resetEvents); if (null == PeekQueueItem() && 0 == busyEvents.Length) { MDTrace.WriteLine(LogLevel.DEBUG, String.Format("UpdateMissingAnalystRatings queue contains {0} items, busy events {1}, all done.", 0, busyEvents.Length)); break; } for (int index = 0; index < availableEvents.Length; index++) { String symbol = GetQueueItem(); if (null != symbol) { availableEvents[index].Reset(); AnalystRatingsThreadHelper analystRatingsThreadHelper = new AnalystRatingsThreadHelper(symbol, maxRatingsDate, modified, availableEvents[index]); ThreadPool.QueueUserWorkItem(ThreadPoolCallbackUpdateMissingAnalystRating, analystRatingsThreadHelper); try { Thread.Sleep(SLEEP_TIME_MS); } catch (Exception) {; } } else { busyEvents = GetBusyEvents(resetEvents); if (busyEvents.Length != availableEvents.Length) { ManualResetEvent[] resizedEvents = new ManualResetEvent[busyEvents.Length]; Array.Copy(busyEvents, resizedEvents, busyEvents.Length); resetEvents = resizedEvents; } break; } } // for MDTrace.WriteLine(LogLevel.DEBUG, "UpdateMissingAnalystRatings waiting for free slots..."); if (resetEvents.Length > 0) WaitHandle.WaitAny(resetEvents); if (null == PeekQueueItem()) resetEvents = ResizeEvents(resetEvents); } // while MDTrace.WriteLine(LogLevel.DEBUG, "UpdateMissingAnalystRatings completed."); return true; } finally { MDTrace.WriteLine(LogLevel.DEBUG, String.Format("[UpdateMissingAnalystRatings]Done, total took {0}(ms)", profiler.End())); } } public void ThreadPoolCallbackUpdateMissingAnalystRating(Object analystRatingThreadHelperContext) { AnalystRatingsThreadHelper threadHelper = (AnalystRatingsThreadHelper)analystRatingThreadHelperContext; try { MDTrace.WriteLine(LogLevel.DEBUG, String.Format("Load missing analyst rating, Thread {0} started for {1}...", Thread.CurrentThread.ManagedThreadId, threadHelper.Symbol)); UpdateMissingAnalystRatingEx(threadHelper.Symbol, threadHelper.MaxAnalystRatingsDate, threadHelper.Modified); MDTrace.WriteLine(LogLevel.DEBUG, String.Format("Load missing analyst rating, Thread {0} ended for {1}", Thread.CurrentThread.ManagedThreadId, threadHelper.Symbol)); } finally { threadHelper.ResetEvent.Set(); } } public void UpdateMissingAnalystRatingEx(String symbol, DateTime maxAnalystRatingsDate, DateTime modified) { try { AnalystRatings analystRatings = MarketDataHelper.GetAnalystRatingsMarketBeat(symbol); if (null == analystRatings || 0 == analystRatings.Count) { return; } analystRatings = new AnalystRatings((from AnalystRating analystRating in analystRatings where analystRating.Date >= maxAnalystRatingsDate.Date select analystRating).ToList()); AnalystRatings duplicateList = new AnalystRatings(); foreach (AnalystRating analystRating in analystRatings) { if (AnalystRatingsDA.ContainsAnalystRating(analystRating)) { MDTrace.WriteLine(LogLevel.DEBUG, String.Format("Already have analyst rating for {0} on {1} from brokerage firm {2}", analystRating.Symbol, analystRating.Date.ToShortDateString(), analystRating.BrokerageFirm)); duplicateList.Add(analystRating); } } analystRatings = new AnalystRatings(analystRatings.Except(duplicateList).ToList()); foreach (AnalystRating analystRating in analystRatings) { MDTrace.WriteLine(LogLevel.DEBUG, String.Format("Inserting Analyst Rating for {0} on {1} from brokerage firm {2}", analystRating.Symbol, analystRating.Date.ToShortDateString(), analystRating.BrokerageFirm)); } AnalystRatingsDA.InsertAnalystRatings(analystRatings); } catch (Exception exception) { MDTrace.WriteLine(LogLevel.DEBUG, $"UpdateMissingAnalystRatingEx : Exception {exception.ToString()}"); } } } }