using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using MarketData.MarketDataModel; using MarketData.DataAccess; using MarketData.Utils; namespace MarketData.Helper { public class FundamentalMarketDataHelper : MarketDataHelperBase { private static int MaxThreads = 5; //(int)ThreadHelperEnum.MaxThreads; private UpdateManager updateManager=new UpdateManager(); public FundamentalMarketDataHelper() { } public bool LoadFundamentals(String symbol=null) { Profiler profiler=new Profiler(); try { if(symbol==null)Queue=PricingDA.GetSymbols(); else (Queue=new List()).Add(symbol); updateManager.Prepare("fundamentals_session.txt"); Queue=Queue.Except(new List(updateManager.Entries)).ToList(); Index=-1; ManualResetEvent[] resetEvents = new ManualResetEvent[MaxThreads]; for (int eventIndex = 0; eventIndex < resetEvents.Length; eventIndex++)resetEvents[eventIndex] = new ManualResetEvent(true); MDTrace.WriteLine(String.Format("Queuing fundamentals ...")); DateTime modified=DateTime.Now; while (true) { ManualResetEvent[] availableEvents=GetAvailableEvents(resetEvents); ManualResetEvent[] busyEvents=GetBusyEvents(resetEvents); if (null == PeekQueueItem() && 0==busyEvents.Length) { MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Fundamentals queue contains {0} items, busy events {1}, all done.",0,busyEvents.Length)); break; } for (int index = 0; index < availableEvents.Length; index++) { symbol=GetQueueItem(); if (null != symbol) { availableEvents[index].Reset(); ThreadHelper fundamentalThreadHelper = new ThreadHelper(symbol,modified,availableEvents[index]); fundamentalThreadHelper.UpdateManager=updateManager; ThreadPool.QueueUserWorkItem(ThreadPoolCallbackLoadFundamental, fundamentalThreadHelper); // try{Thread.Sleep(2000);}catch(Exception){;} // adding a short pause between requests try{Thread.Sleep(1000);}catch(Exception){;} // changing this because I am going to run a test against TOR } else { busyEvents=GetBusyEvents(resetEvents); if(busyEvents.Length!=availableEvents.Length) { ManualResetEvent[] resizedEvents=new ManualResetEvent[busyEvents.Length]; Array.Copy(busyEvents, resizedEvents,busyEvents.Length); resetEvents = resizedEvents; } break; } } // for MDTrace.WriteLine(LogLevel.DEBUG,"Fundamentals waiting for free slots..."); if(resetEvents.Length>0)WaitHandle.WaitAny(resetEvents); if(null==PeekQueueItem())resetEvents=ResizeEvents(resetEvents); } // while MDTrace.WriteLine(LogLevel.DEBUG,"Fundamentals completed."); return true; } finally { MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[LoadFundamentals]End, total took {0}(ms)",profiler.End())); updateManager.Dispose(); } } public bool LoadFundamentalsFinViz(String symbol=null) { Profiler profiler=new Profiler(); try { if(symbol==null)Queue=PricingDA.GetSymbols(); else (Queue=new List()).Add(symbol); updateManager.Prepare("fundamentals_session.txt"); Queue=Queue.Except(new List(updateManager.Entries)).ToList(); Index=-1; ManualResetEvent[] resetEvents = new ManualResetEvent[MaxThreads]; for (int eventIndex = 0; eventIndex < resetEvents.Length; eventIndex++)resetEvents[eventIndex] = new ManualResetEvent(true); MDTrace.WriteLine(String.Format("Queuing fundamentals ...")); DateTime modified=DateTime.Now; while (true) { ManualResetEvent[] availableEvents=GetAvailableEvents(resetEvents); ManualResetEvent[] busyEvents=GetBusyEvents(resetEvents); if (null == PeekQueueItem() && 0==busyEvents.Length) { MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Fundamentals queue contains {0} items, busy events {1}, all done.",0,busyEvents.Length)); break; } for (int index = 0; index < availableEvents.Length; index++) { symbol=GetQueueItem(); if (null != symbol) { availableEvents[index].Reset(); ThreadHelper fundamentalThreadHelper = new ThreadHelper(symbol,modified,availableEvents[index]); fundamentalThreadHelper.UpdateManager=updateManager; ThreadPool.QueueUserWorkItem(ThreadPoolCallbackLoadFundamentalFinViz, fundamentalThreadHelper); try{Thread.Sleep(250);}catch(Exception){;} // adding a short pause between requests } else { busyEvents=GetBusyEvents(resetEvents); if(busyEvents.Length!=availableEvents.Length) { ManualResetEvent[] resizedEvents=new ManualResetEvent[busyEvents.Length]; Array.Copy(busyEvents, resizedEvents,busyEvents.Length); resetEvents = resizedEvents; } break; } } // for MDTrace.WriteLine(LogLevel.DEBUG,"Fundamentals waiting for free slots..."); if(resetEvents.Length>0)WaitHandle.WaitAny(resetEvents); if(null==PeekQueueItem())resetEvents=ResizeEvents(resetEvents); } // while MDTrace.WriteLine(LogLevel.DEBUG,"Fundamentals completed."); return true; } finally { MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[LoadFundamentals]End, total took {0}(ms)",profiler.End())); updateManager.Dispose(); } } // ****************************************************************************************************************************************************************** // ********************************************************************** T H R E A D C A L L B A C K ************************************************************** // ****************************************************************************************************************************************************************** public void ThreadPoolCallbackLoadFundamental(Object fundamentalThreadHelperContext) { ThreadHelper threadHelper = (ThreadHelper)fundamentalThreadHelperContext; try { MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Load fundamental, Thread {0} started for {1}...", Thread.CurrentThread.ManagedThreadId, threadHelper.Symbol)); LoadFundamentalEx(threadHelper.Symbol,threadHelper.Modified,threadHelper.updateManager); MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Load fundamental, Thread {0} ended for {1}", Thread.CurrentThread.ManagedThreadId, threadHelper.Symbol)); } finally { threadHelper.ResetEvent.Set(); } } public void ThreadPoolCallbackLoadFundamentalFinViz(Object fundamentalThreadHelperContext) { ThreadHelper threadHelper = (ThreadHelper)fundamentalThreadHelperContext; try { MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Load fundamental, Thread {0} started for {1}...", Thread.CurrentThread.ManagedThreadId, threadHelper.Symbol)); LoadFundamentalEx(threadHelper.Symbol,threadHelper.Modified,threadHelper.updateManager,true); MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Load fundamental, Thread {0} ended for {1}", Thread.CurrentThread.ManagedThreadId, threadHelper.Symbol)); } finally { threadHelper.ResetEvent.Set(); } } // ********************************************************************************************************************************************************************* // ********************************************************************************************************************************************************************* // ********************************************************************************************************************************************************************* // Single threaded method, use for debugging a single symbol fetch without going through the queue public static void LoadFundamentalEx(String symbol,DateTime modified,UpdateManager updateManager,bool useFinViz=false) { CompanyProfile companyProfile=CompanyProfileDA.GetCompanyProfile(symbol); if(null!=companyProfile&&!companyProfile.IsEquity) { updateManager.Add(symbol); return; } if (!FundamentalDA.CheckFundamentalModifiedOn(symbol, modified)) { bool result=false; result=useFinViz?LoadFundamentalExFinViz(symbol):LoadFundamentalEx(symbol); if(result)updateManager.Add(symbol); } else { Fundamental fundamental=FundamentalDA.GetFundamental(symbol,modified); // Even if we have the record Yahoo may not have given us some values. (i.e) Sometime we get zero back from yahoo in EnterpriseValue. Subsequent runs should re-fetch from Yahoo in this case // If the existing record contains zero in EnterpriseValue then re-fetch it if(null==fundamental||0.00==fundamental.EnterpriseValue) { bool result=false; result=useFinViz?LoadFundamentalExFinViz(symbol):LoadFundamentalEx(symbol); if(result)updateManager.Add(symbol); } else { MDTrace.WriteLine(LogLevel.DEBUG,"Already have fundamental for '"+symbol+"' on "+Utility.DateTimeToStringMMHDDHYYYY(modified)); updateManager.Add(symbol); } } } public static bool LoadFundamentalEx(String symbol) { try { MDTrace.WriteLine(LogLevel.DEBUG,"Loading fundamental data for '"+symbol+"'"); Fundamental fundamental = MarketDataHelper.GetFundamental(symbol); if (null == fundamental) {MDTrace.WriteLine(LogLevel.DEBUG,"Error loading fundmental for '"+symbol+"'");return false;} else { Fundamental priorFundamental=FundamentalDA.GetFundamental(symbol); fundamental.MergeFrom(priorFundamental); // if any fields are missing on this run then carry forward fields from previous run FundamentalDA.InsertFundamental(fundamental); MDTrace.WriteLine(LogLevel.DEBUG,Fundamental.Header); MDTrace.WriteLine(LogLevel.DEBUG,fundamental.ToString()); return true; } } catch (Exception exception) { MDTrace.WriteLine(LogLevel.DEBUG,"Failed to load funamental for '"+symbol+"'"); MDTrace.WriteLine(LogLevel.DEBUG,"Exception was "+exception.ToString()); return false; } } public static bool LoadFundamentalExFinViz(String symbol) { try { MDTrace.WriteLine(LogLevel.DEBUG,"Loading fundamental data for '"+symbol+"'"); Fundamental fundamental = MarketDataHelper.GetFundamentalFinViz(symbol); if (null == fundamental) {MDTrace.WriteLine(LogLevel.DEBUG,"Error loading fundmental for '"+symbol+"'");return false;} DateTime? nextEarningsDate=EarningsAnnouncementsDA.GetNextEarningsDate(symbol); if(null!=nextEarningsDate)fundamental.NextEarningsDate=nextEarningsDate.Value; Fundamental priorFundamental=FundamentalDA.GetFundamental(symbol); fundamental.MergeFrom(priorFundamental); // if any fields are missing on this run then carry forward fields from previous run FundamentalDA.InsertFundamental(fundamental); MDTrace.WriteLine(LogLevel.DEBUG,Fundamental.Header); MDTrace.WriteLine(LogLevel.DEBUG,fundamental.ToString()); return true; } catch (Exception exception) { MDTrace.WriteLine(LogLevel.DEBUG,"Failed to load funamental for '"+symbol+"'"); MDTrace.WriteLine(LogLevel.DEBUG,"Exception was "+exception.ToString()); return false; } } } }