using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using MarketData.MarketDataModel; using MarketData.DataAccess; using MarketData.Utils; namespace MarketData.Helper { public class ZacksRankMarketDataHelper : MarketDataHelperBase { private static int MaxThreads = (int)ThreadHelperEnum.MaxThreads; public ZacksRankMarketDataHelper() { } public bool LoadZacksRank(String symbol=null) { Profiler profiler=new Profiler(); try { if(symbol==null)Queue=PricingDA.GetSymbols(); else (Queue=new List()).Add(symbol); Index=-1; ManualResetEvent[] resetEvents = new ManualResetEvent[MaxThreads]; for (int eventIndex = 0; eventIndex < resetEvents.Length; eventIndex++)resetEvents[eventIndex] = new ManualResetEvent(true); MDTrace.WriteLine(String.Format("Queuing Zacks rank load ...")); DateTime modified=DateTime.Now; while (true) { ManualResetEvent[] availableEvents=GetAvailableEvents(resetEvents); ManualResetEvent[] busyEvents=GetBusyEvents(resetEvents); if (null == PeekQueueItem() && 0==busyEvents.Length) { MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Zacks Rank queue contains {0} items, busy events {1}, all done.",0,busyEvents.Length)); break; } for (int index = 0; index < availableEvents.Length; index++) { symbol=GetQueueItem(); if (null != symbol) { availableEvents[index].Reset(); ThreadHelper zacksRankThreadHelper = new ThreadHelper(symbol,modified,availableEvents[index]); ThreadPool.QueueUserWorkItem(ThreadPoolCallback, zacksRankThreadHelper); } else { busyEvents=GetBusyEvents(resetEvents); if(busyEvents.Length!=availableEvents.Length) { ManualResetEvent[] resizedEvents=new ManualResetEvent[busyEvents.Length]; Array.Copy(busyEvents, resizedEvents,busyEvents.Length); resetEvents = resizedEvents; } break; } } // for MDTrace.WriteLine(LogLevel.DEBUG,"Zacks Rank waiting for free slots..."); if(resetEvents.Length>0)WaitHandle.WaitAny(resetEvents); if(null==PeekQueueItem())resetEvents=ResizeEvents(resetEvents); } // while MDTrace.WriteLine(LogLevel.DEBUG,"Zacks Rank completed."); return true; } finally { MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[LoadZacksRanks]End, total took {0}(ms)",profiler.End())); } } public void ThreadPoolCallback(Object threadHelperContext) { ThreadHelper threadHelper = (ThreadHelper)threadHelperContext; try { MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Load Zacks Rank, Thread {0} started for {1}...", Thread.CurrentThread.ManagedThreadId, threadHelper.Symbol)); LoadZacksRankEx(threadHelper.Symbol,threadHelper.Modified); MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Load Zacks Rank, Thread {0} ended for {1}", Thread.CurrentThread.ManagedThreadId, threadHelper.Symbol)); } finally { threadHelper.ResetEvent.Set(); } } // Single threaded method, use for debugging a single symbol fetch without going through the queue public static void LoadZacksRankEx(String symbol,DateTime modified) { LoadZacksRankEx(symbol); //if (!ZacksRankDA.CheckZacksRankModifiedOn(symbol, modified)) LoadZacksRankEx(symbol); //else MDTrace.WriteLine(LogLevel.DEBUG,"Already have Zacks Rank for '"+symbol+"' on "+Utility.DateTimeToStringMMHDDHYYYY(modified)); } public static void LoadZacksRankEx(String symbol) { try { MDTrace.WriteLine(LogLevel.DEBUG,"Loading Zacks Rank for '" + symbol + "'"); ZacksRank zacksRank=MarketDataHelper.GetZacksRank(symbol); if (null != zacksRank) { ZacksRankDA.InsertZacksRank(zacksRank); MDTrace.WriteLine(LogLevel.DEBUG,AnalystPriceTarget.Header); MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Zacks rank:{0} {1}",zacksRank.Symbol,zacksRank.Rank)); } } catch (Exception exception) { MDTrace.WriteLine(LogLevel.DEBUG,"Failed to load Zacks Rank for '" + symbol + "'"); MDTrace.WriteLine(LogLevel.DEBUG,"Exception was " + exception.ToString()); } } } }