using MarketData.MarketDataModel; using MarketData.DataAccess; using MarketData.Utils; namespace MarketData.Helper { public class InsiderTransactionThreadHelper : ThreadHelper { public InsiderTransactionThreadHelper(String symbol, ManualResetEvent resetEvent) : base(symbol, resetEvent) { } public int YearGreaterEqual { get; set; } = -1; } public class InsiderTransactionMarketDataHelper : MarketDataHelperBase { private static int MaxThreads = 10; // 10 threads avoids receiving HTTP Response 429 (Too many requests) private static int WAIT_TIME_MS=500; // wait between request private UpdateManager UpdateManager = new UpdateManager(); public InsiderTransactionMarketDataHelper() { } public bool LoadInsiderTransactions(String symbol) { return LoadInsiderTransactions(new String[]{symbol}.ToList()); } public bool LoadInsiderTransactions(List symbols) { Profiler profiler=new Profiler(); try { if(null==symbols || 0==symbols.Count)return false; Queue=symbols; Index=-1; ManualResetEvent[] resetEvents = new ManualResetEvent[MaxThreads]; for (int eventIndex = 0; eventIndex < resetEvents.Length; eventIndex++)resetEvents[eventIndex] = new ManualResetEvent(true); MDTrace.WriteLine(LogLevel.DEBUG,$"[LoadInsiderTransactions] Queuing LoadInsiderTransactions Load {symbols.Count} symbols MaxThreads:{MaxThreads}..."); while (true) { ManualResetEvent[] availableEvents=GetAvailableEvents(resetEvents); ManualResetEvent[] busyEvents=GetBusyEvents(resetEvents); if (null == PeekQueueItem() && 0==busyEvents.Length) { MDTrace.WriteLine(LogLevel.DEBUG,$"[LoadInsiderTransactions] queue contains {0} items, busy events {busyEvents.Length}, all done."); break; } for (int index = 0; index < availableEvents.Length; index++) { String symbol=GetQueueItem(); if (null != symbol) { availableEvents[index].Reset(); ThreadHelper threadHelper = new ThreadHelper(symbol,availableEvents[index]); ThreadPool.QueueUserWorkItem(ThreadPoolCallback, threadHelper); try { Thread.Sleep(WAIT_TIME_MS); } catch(Exception) { ;} // wait } else { busyEvents=GetBusyEvents(resetEvents); if(busyEvents.Length!=availableEvents.Length) { ManualResetEvent[] resizedEvents=new ManualResetEvent[busyEvents.Length]; Array.Copy(busyEvents, resizedEvents,busyEvents.Length); resetEvents = resizedEvents; } break; } } // for MDTrace.WriteLine(LogLevel.DEBUG,"[LoadInsiderTransactions] waiting for free slots..."); if(resetEvents.Length>0)WaitHandle.WaitAny(resetEvents); if(null==PeekQueueItem())resetEvents=ResizeEvents(resetEvents); } // while MDTrace.WriteLine(LogLevel.DEBUG,"[LoadInsiderTransactions] completed."); return true; } finally { MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[LoadInsiderTransactions] End, total took {0}(ms)",profiler.End())); } } public bool LoadInsiderTransactionsYearGreaterEqual(List symbols,int yearGreaterEqual) { Profiler profiler=new Profiler(); try { if(null==symbols || 0==symbols.Count)return false; Queue=symbols; Index=-1; ManualResetEvent[] resetEvents = new ManualResetEvent[MaxThreads]; for (int eventIndex = 0; eventIndex < resetEvents.Length; eventIndex++)resetEvents[eventIndex] = new ManualResetEvent(true); MDTrace.WriteLine(LogLevel.DEBUG,$"[LoadInsiderTransactionsYearGreaterEqual] Queuing LoadInsiderTransactions Load {symbols} Year>={yearGreaterEqual} MaxThreads:{MaxThreads}..."); while (true) { ManualResetEvent[] availableEvents=GetAvailableEvents(resetEvents); ManualResetEvent[] busyEvents=GetBusyEvents(resetEvents); if (null == PeekQueueItem() && 0==busyEvents.Length) { MDTrace.WriteLine(LogLevel.DEBUG,$"[LoadInsiderTransactionsYearGreaterEqual] queue contains {0} items, busy events {busyEvents.Length}, all done."); break; } for (int index = 0; index < availableEvents.Length; index++) { String symbol=GetQueueItem(); if (null != symbol) { availableEvents[index].Reset(); InsiderTransactionThreadHelper threadHelper = new InsiderTransactionThreadHelper(symbol, availableEvents[index]); threadHelper.UpdateManager=UpdateManager; threadHelper.YearGreaterEqual=yearGreaterEqual; ThreadPool.QueueUserWorkItem(ThreadPoolCallbackYearGreaterEqual, threadHelper); try { Thread.Sleep(WAIT_TIME_MS); } catch(Exception) { ;} // wait } else { busyEvents=GetBusyEvents(resetEvents); if(busyEvents.Length!=availableEvents.Length) { ManualResetEvent[] resizedEvents=new ManualResetEvent[busyEvents.Length]; Array.Copy(busyEvents, resizedEvents,busyEvents.Length); resetEvents = resizedEvents; } break; } } // for MDTrace.WriteLine(LogLevel.DEBUG,"[LoadInsiderTransactionsYearGreaterEqual] waiting for free slots..."); if(resetEvents.Length>0)WaitHandle.WaitAny(resetEvents); if(null==PeekQueueItem())resetEvents=ResizeEvents(resetEvents); } // while MDTrace.WriteLine(LogLevel.DEBUG,"[LoadInsiderTransactionsYearGreaterEqual] completed."); return true; } finally { MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[LoadInsiderTransactionsYearGreaterEqual] End, total took {0}(ms)",profiler.End())); } } private void ThreadPoolCallback(Object threadHelperContext) { ThreadHelper threadHelper = (ThreadHelper)threadHelperContext; MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Insider Transactions Thread {0} started for {1}...", Thread.CurrentThread.ManagedThreadId, threadHelper.Symbol)); LoadInsiderTransactionsSymbolEx(threadHelper.Symbol); MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Insider Transactions Thread {0} ended for {1}...", Thread.CurrentThread.ManagedThreadId, threadHelper.Symbol)); threadHelper.ResetEvent.Set(); } private static void LoadInsiderTransactionsSymbolEx(String symbol) { symbol = symbol.ToUpper(); MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Load insider transactions for {0}", symbol)); InsiderTransactions insiderTransactions = MarketDataHelper.GetInsiderTransactions(symbol); if (null == insiderTransactions || 0 == insiderTransactions.Count) { MDTrace.WriteLine(LogLevel.DEBUG,String.Format("No insider transactions for {0}", symbol)); return; } MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Insider Transactions, Saving {0} records for {1}", insiderTransactions.Count, symbol)); InsiderTransactionDA.InsertInsiderTransactions(insiderTransactions); MDTrace.WriteLine(LogLevel.DEBUG,"Insider Transactions - Done."); } private void ThreadPoolCallbackYearGreaterEqual(Object threadHelperContext) { InsiderTransactionThreadHelper threadHelper = (InsiderTransactionThreadHelper)threadHelperContext; MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Insider Transactions Thread {0} started for {1}...", Thread.CurrentThread.ManagedThreadId, threadHelper.Symbol)); LoadInsiderTransactionsYearGreaterEqualEx(threadHelper.Symbol,threadHelper.YearGreaterEqual); MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Insider Transactions Thread {0} ended for {1}...", Thread.CurrentThread.ManagedThreadId, threadHelper.Symbol)); threadHelper.UpdateManager.Add(threadHelper.Symbol); threadHelper.ResetEvent.Set(); } private static void LoadInsiderTransactionsYearGreaterEqualEx(String symbol, int yearGreaterEqual) { symbol = symbol.ToUpper(); MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Load insider transactions for {0} years>={1}", symbol,yearGreaterEqual)); InsiderTransactions insiderTransactions = MarketDataHelper.GetInsiderTransactionsYear(symbol, yearGreaterEqual); if (null == insiderTransactions || 0 == insiderTransactions.Count) { MDTrace.WriteLine(LogLevel.DEBUG,String.Format("No insider transactions for {0} years>={1}", symbol,yearGreaterEqual)); return; } MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Insider Transactions, Saving {0} records for {1} years>={2}", insiderTransactions.Count, symbol, yearGreaterEqual)); InsiderTransactionDA.DeleteInsiderTransactionsYearsGreaterEqual(symbol, yearGreaterEqual); InsiderTransactionDA.InsertInsiderTransactions(insiderTransactions); MDTrace.WriteLine(LogLevel.DEBUG,"Insider Transactions - Done."); } } }