using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using MarketData.MarketDataModel; using MarketData.DataAccess; using MarketData.Utils; namespace MarketData.Helper { public class AnalystPriceTargetMarketDataHelper : MarketDataHelperBase { private static int MaxThreads = 10; //(int)ThreadHelperEnum.MaxThreads; private static int SLEEP_TIME_MS = 500; public AnalystPriceTargetMarketDataHelper() { } public bool LoadAnalystPriceTarget(String symbol=null) { Profiler profiler=new Profiler(); try { if(symbol==null)Queue=PricingDA.GetSymbols(); else (Queue=new List()).Add(symbol); ShuffleQueue(); // shuffle up the symbols Index=-1; ManualResetEvent[] resetEvents = new ManualResetEvent[MaxThreads]; for (int eventIndex = 0; eventIndex < resetEvents.Length; eventIndex++)resetEvents[eventIndex] = new ManualResetEvent(true); MDTrace.WriteLine(String.Format("Queuing analyst price target fetches ...")); DateTime modified=DateTime.Now; while (true) { ManualResetEvent[] availableEvents=GetAvailableEvents(resetEvents); ManualResetEvent[] busyEvents=GetBusyEvents(resetEvents); if (null == PeekQueueItem() && 0==busyEvents.Length) { MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Analyst price target queue contains {0} items, busy events {1}, all done.",0,busyEvents.Length)); break; } for (int index = 0; index < availableEvents.Length; index++) { symbol=GetQueueItem(); if (null != symbol) { availableEvents[index].Reset(); ThreadHelper analystPriceTargetThreadHelper = new ThreadHelper(symbol,modified,availableEvents[index]); ThreadPool.QueueUserWorkItem(ThreadPoolCallbackLoadAnalystPriceTarget, analystPriceTargetThreadHelper); try { Thread.Sleep(SLEEP_TIME_MS); }catch (Exception) { ;} } else { busyEvents=GetBusyEvents(resetEvents); if(busyEvents.Length!=availableEvents.Length) { ManualResetEvent[] resizedEvents=new ManualResetEvent[busyEvents.Length]; Array.Copy(busyEvents, resizedEvents,busyEvents.Length); resetEvents = resizedEvents; } break; } } // for MDTrace.WriteLine(LogLevel.DEBUG,"Analyst price target waiting for free slots..."); if(resetEvents.Length>0)WaitHandle.WaitAny(resetEvents); if(null==PeekQueueItem())resetEvents=ResizeEvents(resetEvents); } // while MDTrace.WriteLine(LogLevel.DEBUG,"Analyst price target completed."); return true; } finally { MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[LoadAnalystPriceTarget]End, total took {0}(ms)",profiler.End())); } } public void ThreadPoolCallbackLoadAnalystPriceTarget(Object analystPriceTargetThreadHelperContext) { ThreadHelper threadHelper = (ThreadHelper)analystPriceTargetThreadHelperContext; try { MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Load analyst price target, Thread {0} started for {1}...", Thread.CurrentThread.ManagedThreadId, threadHelper.Symbol)); LoadAnalystPriceTargetEx(threadHelper.Symbol,threadHelper.Modified); MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Load analyst price target, Thread {0} ended for {1}", Thread.CurrentThread.ManagedThreadId, threadHelper.Symbol)); } finally { threadHelper.ResetEvent.Set(); } } // Single threaded method, use for debugging a single symbol fetch without going through the queue public static void LoadAnalystPriceTargetEx(String symbol,DateTime modified) { if (!AnalystPriceTargetDA.CheckAnalystPriceTargetModifiedOn(symbol, modified)) { LoadAnalystPriceTargetEx(symbol); } else { MDTrace.WriteLine(LogLevel.DEBUG,"Already have analyst price target for '"+symbol+"' on "+Utility.DateTimeToStringMMHDDHYYYY(modified)); } } public static void LoadAnalystPriceTargetEx(String symbol) { try { MDTrace.WriteLine(LogLevel.DEBUG,"Loading analyst price target for '" + symbol + "'"); AnalystPriceTarget analystPriceTarget = MarketDataHelper.GetAnalystPriceTarget(symbol); if (null != analystPriceTarget) { AnalystPriceTargetDA.InsertAnalystPriceTarget(analystPriceTarget); MDTrace.WriteLine(LogLevel.DEBUG,""); MDTrace.WriteLine(LogLevel.DEBUG,AnalystPriceTarget.Header); MDTrace.WriteLine(LogLevel.DEBUG,analystPriceTarget.ToString()); MDTrace.WriteLine(LogLevel.DEBUG,""); } } catch (Exception exception) { MDTrace.WriteLine(LogLevel.DEBUG,"Failed to load analyst price target for '" + symbol + "'"); MDTrace.WriteLine(LogLevel.DEBUG,"Exception was " + exception.ToString()); } } } }