using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using MarketData.MarketDataModel; using MarketData.DataAccess; using MarketData.Utils; namespace MarketData.Helper { public class SECFilingMarketDataHelper { private static int MaxThreads = 5; // 10 requests per second is what is allowable under SEC.GOV. We'll request 5 symbols per batch. Note:each request may contain subrequests private static int WAIT_TIME_MS=1000; // wait between requests private List symbols; private int currentIndex = 0; public SECFilingMarketDataHelper() { } public bool UpdateSECFilings(List symbols) { Profiler profiler=new Profiler(); try { this.symbols=symbols; currentIndex=0; while (true) { List queueSymbols = GetQueueSymbols(); if (null == queueSymbols || 0 == queueSymbols.Count) break; ManualResetEvent[] resetEvents = new ManualResetEvent[queueSymbols.Count]; for (int eventIndex = 0; eventIndex < resetEvents.Length; eventIndex++) { resetEvents[eventIndex] = new ManualResetEvent(false); } for (int index = 0; index < queueSymbols.Count; index++) { ThreadHelper threadHelper = new ThreadHelper(queueSymbols[index],resetEvents[index]); ThreadPool.QueueUserWorkItem(ThreadPoolCallbackUpdateSECFiling, threadHelper); try { Thread.Sleep(WAIT_TIME_MS); } catch(Exception) { ;} // SEC has a traffic limit } MDTrace.WriteLine(LogLevel.DEBUG,"Load SEC Filings, waiting for queued items to complete."); WaitHandle.WaitAll(resetEvents); } return true; } catch(Exception exception) { MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[UpdateSECFilings]Exception {0}",exception.ToString())); return false; } finally { MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[UpdateSECFilings]End, total took {0}(ms)",profiler.End())); } } public bool UpdateSECFilings() { Profiler profiler=new Profiler(); try { List watchListSymbols = WatchListDA.GetWatchList("Valuations"); // get the current watchlist symbols List secFilingSymbols=SECFilingDA.GetDistinctFilingSymbols(); // get the current SEC filing symbols symbols=watchListSymbols.Union(secFilingSymbols).Distinct().ToList(); // use the union of the two currentIndex = 0; while (true) { List queueSymbols = GetQueueSymbols(); if (null == queueSymbols || 0 == queueSymbols.Count) break; ManualResetEvent[] resetEvents = new ManualResetEvent[queueSymbols.Count]; for (int eventIndex = 0; eventIndex < resetEvents.Length; eventIndex++) { resetEvents[eventIndex] = new ManualResetEvent(false); } for (int index = 0; index < queueSymbols.Count; index++) { ThreadHelper threadHelper = new ThreadHelper(queueSymbols[index],resetEvents[index]); ThreadPool.QueueUserWorkItem(ThreadPoolCallbackUpdateSECFiling, threadHelper); try { Thread.Sleep(WAIT_TIME_MS); }catch(Exception) { ;} // SEC has a traffic limit } MDTrace.WriteLine(LogLevel.DEBUG,"Load SEC Filings, waiting for queued items to complete."); WaitHandle.WaitAll(resetEvents); } return true; } catch(Exception exception) { MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[UpdateSECFilings]Exception {0}",exception.ToString())); return false; } finally { MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[UpdateSECFilings]End, total took {0}(ms)",profiler.End())); } } private List GetQueueSymbols() { List queueSymbols = new List(); int index = currentIndex; for (; index < currentIndex + MaxThreads && index < symbols.Count; index++) { queueSymbols.Add(symbols[index]); } currentIndex = index; return queueSymbols; } public void ThreadPoolCallbackUpdateSECFiling(Object threadHelperContext) { ThreadHelper threadHelper = (ThreadHelper)threadHelperContext; try { MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Load SEC Filing, Thread {0} started for {1}...", Thread.CurrentThread.ManagedThreadId, threadHelper.Symbol)); UpdateSECFiling(threadHelper.Symbol); MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Load SEC Filing, Thread {0} ended for {1}", Thread.CurrentThread.ManagedThreadId, threadHelper.Symbol)); } finally { threadHelper.ResetEvent.Set(); } } public static void UpdateSECFiling(String symbol) { try { MDTrace.WriteLine(LogLevel.DEBUG,"UpdateSECFiling '"+symbol+"'"); String cik = PricingDA.GetCIKForSymbol(symbol); if (null == cik) { MDTrace.WriteLine(LogLevel.DEBUG,"No CIK for symbol '" + symbol + "'"); return; } MDTrace.WriteLine(LogLevel.DEBUG,"UpdateSECFiling for symbol '" + symbol + "'"); SECFilings secFilings = MarketDataHelper.GetSECFilings(symbol, cik); if (null != secFilings) { for (int index = 0; index < secFilings.Count; index++) { StringBuilder sb = new StringBuilder(); SECFiling secFiling = secFilings[index]; sb.Append(secFiling.Symbol).Append(","); sb.Append(Utility.DateTimeToStringYYYYHMMHDD(secFiling.FilingDate)).Append(","); sb.Append(secFiling.SECAccessionNumber).Append(","); sb.Append(secFiling.Sequence); MDTrace.WriteLine(LogLevel.DEBUG,sb.ToString()); } MDTrace.WriteLine(LogLevel.DEBUG,"Got "+secFilings.Count+" filings for symbol '"+symbol+"'"); SECFilingDA.InsertOrUpdateSECFilings(secFilings); } return; } catch (Exception exception) { MDTrace.WriteLine(LogLevel.DEBUG,exception); return; } } } }