using MarketData.MarketDataModel; using MarketData.DataAccess; using MarketData.Utils; namespace MarketData.Helper { public class SECFilingMarketDataHelper : MarketDataHelperBase { private static int MaxThreads = 5; // 10 requests per second is what is allowable under SEC.GOV. We'll request 5 symbols per batch. Note:each request may contain subrequests public SECFilingMarketDataHelper() { } public bool UpdateSECFilings(List symbols) { Profiler profiler=new Profiler(); try { if(null==symbols || 0==symbols.Count)return false; Queue=symbols; Index=-1; ManualResetEvent[] resetEvents = new ManualResetEvent[MaxThreads]; for (int eventIndex = 0; eventIndex < resetEvents.Length; eventIndex++)resetEvents[eventIndex] = new ManualResetEvent(true); MDTrace.WriteLine(String.Format("Queuing SEC Filings Load ...")); while (true) { ManualResetEvent[] availableEvents=GetAvailableEvents(resetEvents); ManualResetEvent[] busyEvents=GetBusyEvents(resetEvents); if (null == PeekQueueItem() && 0==busyEvents.Length) { MDTrace.WriteLine(LogLevel.DEBUG,String.Format("SECFilings queue contains {0} items, busy events {1}, all done.",0,busyEvents.Length)); break; } for (int index = 0; index < availableEvents.Length; index++) { String symbol=GetQueueItem(); if (null != symbol) { availableEvents[index].Reset(); ThreadHelper threadHelper = new ThreadHelper(symbol,availableEvents[index]); ThreadPool.QueueUserWorkItem(ThreadPoolCallbackUpdateSECFiling, threadHelper); } else { busyEvents=GetBusyEvents(resetEvents); if(busyEvents.Length!=availableEvents.Length) { ManualResetEvent[] resizedEvents=new ManualResetEvent[busyEvents.Length]; Array.Copy(busyEvents, resizedEvents,busyEvents.Length); resetEvents = resizedEvents; } break; } } // for MDTrace.WriteLine(LogLevel.DEBUG,"SECFilings waiting for free slots..."); if(resetEvents.Length>0)WaitHandle.WaitAny(resetEvents); if(null==PeekQueueItem())resetEvents=ResizeEvents(resetEvents); } // while MDTrace.WriteLine(LogLevel.DEBUG,"SECFilings completed."); return true; } finally { MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[UpdateSECFilings] End, total took {0}(ms)",profiler.End())); } } public void ThreadPoolCallbackUpdateSECFiling(Object threadHelperContext) { ThreadHelper threadHelper = (ThreadHelper)threadHelperContext; try { MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Load SEC Filing, Thread {0} started for {1}...", Thread.CurrentThread.ManagedThreadId, threadHelper.Symbol)); UpdateSECFiling(threadHelper.Symbol); MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Load SEC Filing, Thread {0} ended for {1}", Thread.CurrentThread.ManagedThreadId, threadHelper.Symbol)); } finally { threadHelper.ResetEvent.Set(); } } public static void UpdateSECFiling(String symbol) { try { MDTrace.WriteLine(LogLevel.DEBUG,"UpdateSECFiling '"+symbol+"'"); String cik = PricingDA.GetCIKForSymbol(symbol); if (null == cik) { MDTrace.WriteLine(LogLevel.DEBUG,"No CIK for symbol '" + symbol + "'"); return; } MDTrace.WriteLine(LogLevel.DEBUG,"UpdateSECFiling for symbol '" + symbol + "'"); SECFilings secFilings = MarketDataHelper.GetSECFilings(symbol, cik); if (null != secFilings) { // for (int index = 0; index < secFilings.Count; index++) // { // StringBuilder sb = new StringBuilder(); // SECFiling secFiling = secFilings[index]; // sb.Append(secFiling.Symbol).Append(","); // sb.Append(Utility.DateTimeToStringYYYYHMMHDD(secFiling.FilingDate)).Append(","); // sb.Append(secFiling.SECAccessionNumber).Append(","); // sb.Append(secFiling.Sequence); // MDTrace.WriteLine(LogLevel.DEBUG,sb.ToString()); // } MDTrace.WriteLine(LogLevel.DEBUG,"Got "+secFilings.Count+" filings for symbol '"+symbol+"'"); SECFilingDA.InsertOrUpdateSECFilings(secFilings); } return; } catch (Exception exception) { MDTrace.WriteLine(LogLevel.DEBUG,exception); return; } } } }