Fix threading in InsiderTransactionMarketDaatHelper
This commit is contained in:
@@ -1,150 +1,146 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Threading;
|
||||
using MarketData.MarketDataModel;
|
||||
using MarketData.MarketDataModel;
|
||||
using MarketData.DataAccess;
|
||||
using MarketData.Utils;
|
||||
|
||||
namespace MarketData.Helper
|
||||
{
|
||||
|
||||
public class InsiderTransactionThreadHelper : ThreadHelper
|
||||
{
|
||||
public InsiderTransactionThreadHelper(String symbol, ManualResetEvent resetEvent)
|
||||
: base(symbol, resetEvent)
|
||||
{
|
||||
}
|
||||
public int YearGreaterEqual { get; set; }
|
||||
public int YearGreaterEqual { get; set; } = -1;
|
||||
}
|
||||
|
||||
public class InsiderTransactionMarketDataHelper
|
||||
public class InsiderTransactionMarketDataHelper : MarketDataHelperBase<String>
|
||||
{
|
||||
private static int MaxThreads = 10; // 10 threads avoids receiving HTTP Response 429 (Too many requests)
|
||||
private static int WAIT_TIME_MS=500; // wait between request
|
||||
private List<String> symbols;
|
||||
private int currentIndex = 0;
|
||||
private UpdateManager UpdateManager = new UpdateManager();
|
||||
|
||||
public InsiderTransactionMarketDataHelper()
|
||||
{
|
||||
}
|
||||
|
||||
public bool LoadInsiderTransactions(String symbol)
|
||||
{
|
||||
Profiler profiler=new Profiler();
|
||||
try
|
||||
{
|
||||
symbols = new List<String>();
|
||||
symbols.Add(symbol);
|
||||
currentIndex = 0;
|
||||
while (true)
|
||||
{
|
||||
List<String> queueSymbols = GetQueueSymbols();
|
||||
if (null == queueSymbols || 0 == queueSymbols.Count) break;
|
||||
ManualResetEvent[] resetEvents = new ManualResetEvent[queueSymbols.Count];
|
||||
for (int eventIndex = 0; eventIndex < resetEvents.Length; eventIndex++)
|
||||
{
|
||||
resetEvents[eventIndex] = new ManualResetEvent(false);
|
||||
}
|
||||
for (int index = 0; index < queueSymbols.Count; index++)
|
||||
{
|
||||
ThreadHelper threadHelper = new ThreadHelper(queueSymbols[index], resetEvents[index]);
|
||||
ThreadPool.QueueUserWorkItem(ThreadPoolCallback, threadHelper);
|
||||
}
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,"Insider Transactions, waiting for queued items to complete.");
|
||||
WaitHandle.WaitAll(resetEvents);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
finally
|
||||
{
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[LoadInsiderTransactions]End, total took {0}(ms)",profiler.End()));
|
||||
}
|
||||
return LoadInsiderTransactions(new String[]{symbol}.ToList<String>());
|
||||
}
|
||||
|
||||
public bool LoadInsiderTransactions(List<String> symbols)
|
||||
{
|
||||
Profiler profiler=new Profiler();
|
||||
try
|
||||
{
|
||||
this.symbols = symbols;
|
||||
currentIndex = 0;
|
||||
while (true)
|
||||
{
|
||||
List<String> queueSymbols = GetQueueSymbols();
|
||||
if (null == queueSymbols || 0 == queueSymbols.Count) break;
|
||||
ManualResetEvent[] resetEvents = new ManualResetEvent[queueSymbols.Count];
|
||||
for (int eventIndex = 0; eventIndex < resetEvents.Length; eventIndex++)
|
||||
{
|
||||
resetEvents[eventIndex] = new ManualResetEvent(false);
|
||||
}
|
||||
for (int index = 0; index < queueSymbols.Count; index++)
|
||||
{
|
||||
ThreadHelper threadHelper = new ThreadHelper(queueSymbols[index], resetEvents[index]);
|
||||
ThreadPool.QueueUserWorkItem(ThreadPoolCallback, threadHelper);
|
||||
try { Thread.Sleep(WAIT_TIME_MS); } catch(Exception) { ;} // wait
|
||||
}
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,"Insider Transactions, waiting for queued items to complete.");
|
||||
WaitHandle.WaitAll(resetEvents);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
finally
|
||||
{
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[LoadInsiderTransactions]End, total took {0}(ms)",profiler.End()));
|
||||
}
|
||||
}
|
||||
Profiler profiler=new Profiler();
|
||||
try
|
||||
{
|
||||
if(null==symbols || 0==symbols.Count)return false;
|
||||
Queue=symbols;
|
||||
Index=-1;
|
||||
ManualResetEvent[] resetEvents = new ManualResetEvent[MaxThreads];
|
||||
for (int eventIndex = 0; eventIndex < resetEvents.Length; eventIndex++)resetEvents[eventIndex] = new ManualResetEvent(true);
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,$"[LoadInsiderTransactions] Queuing LoadInsiderTransactions Load {symbols.Count} symbols MaxThreads:{MaxThreads}...");
|
||||
while (true)
|
||||
{
|
||||
ManualResetEvent[] availableEvents=GetAvailableEvents(resetEvents);
|
||||
ManualResetEvent[] busyEvents=GetBusyEvents(resetEvents);
|
||||
if (null == PeekQueueItem() && 0==busyEvents.Length)
|
||||
{
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,$"[LoadInsiderTransactions] queue contains {0} items, busy events {busyEvents.Length}, all done.");
|
||||
break;
|
||||
}
|
||||
for (int index = 0; index < availableEvents.Length; index++)
|
||||
{
|
||||
String symbol=GetQueueItem();
|
||||
if (null != symbol)
|
||||
{
|
||||
availableEvents[index].Reset();
|
||||
ThreadHelper threadHelper = new ThreadHelper(symbol,availableEvents[index]);
|
||||
ThreadPool.QueueUserWorkItem(ThreadPoolCallback, threadHelper);
|
||||
try { Thread.Sleep(WAIT_TIME_MS); } catch(Exception) { ;} // wait
|
||||
}
|
||||
else
|
||||
{
|
||||
busyEvents=GetBusyEvents(resetEvents);
|
||||
if(busyEvents.Length!=availableEvents.Length)
|
||||
{
|
||||
ManualResetEvent[] resizedEvents=new ManualResetEvent[busyEvents.Length];
|
||||
Array.Copy(busyEvents, resizedEvents,busyEvents.Length);
|
||||
resetEvents = resizedEvents;
|
||||
}
|
||||
break;
|
||||
}
|
||||
} // for
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,"[LoadInsiderTransactions] waiting for free slots...");
|
||||
if(resetEvents.Length>0)WaitHandle.WaitAny(resetEvents);
|
||||
if(null==PeekQueueItem())resetEvents=ResizeEvents(resetEvents);
|
||||
} // while
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,"[LoadInsiderTransactions] completed.");
|
||||
return true;
|
||||
}
|
||||
finally
|
||||
{
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[LoadInsiderTransactions] End, total took {0}(ms)",profiler.End()));
|
||||
}
|
||||
}
|
||||
|
||||
public bool LoadInsiderTransactionsYearGreaterEqual(List<String> symbols,int yearGreaterEqual)
|
||||
{
|
||||
Profiler profiler=new Profiler();
|
||||
try
|
||||
{
|
||||
this.symbols = symbols;
|
||||
currentIndex = 0;
|
||||
|
||||
UpdateManager.Prepare("load_insider_transactions_year.txt", 7); // use max age 7 days
|
||||
this.symbols=this.symbols.Except(new List<String>(UpdateManager.Entries)).ToList();
|
||||
|
||||
while (true)
|
||||
{
|
||||
List<String> queueSymbols = GetQueueSymbols();
|
||||
if (null == queueSymbols || 0 == queueSymbols.Count) break;
|
||||
ManualResetEvent[] resetEvents = new ManualResetEvent[queueSymbols.Count];
|
||||
for (int eventIndex = 0; eventIndex < resetEvents.Length; eventIndex++)
|
||||
{
|
||||
resetEvents[eventIndex] = new ManualResetEvent(false);
|
||||
}
|
||||
for (int index = 0; index < queueSymbols.Count; index++)
|
||||
{
|
||||
InsiderTransactionThreadHelper threadHelper = new InsiderTransactionThreadHelper(queueSymbols[index], resetEvents[index]);
|
||||
threadHelper.UpdateManager=UpdateManager;
|
||||
threadHelper.YearGreaterEqual=yearGreaterEqual;
|
||||
ThreadPool.QueueUserWorkItem(ThreadPoolCallbackYearGreaterEqual, threadHelper);
|
||||
try { Thread.Sleep(WAIT_TIME_MS); } catch(Exception) { ;} // wait
|
||||
}
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,"Insider Transactions, waiting for queued items to complete.");
|
||||
WaitHandle.WaitAll(resetEvents);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
finally
|
||||
{
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[LoadInsiderTransactions]End, total took {0}(ms)",profiler.End()));
|
||||
}
|
||||
}
|
||||
|
||||
private List<String> GetQueueSymbols()
|
||||
{
|
||||
List<String> queueSymbols = new List<String>();
|
||||
int index = currentIndex;
|
||||
for (; index < currentIndex + MaxThreads && index < symbols.Count; index++)
|
||||
Profiler profiler=new Profiler();
|
||||
try
|
||||
{
|
||||
queueSymbols.Add(symbols[index]);
|
||||
if(null==symbols || 0==symbols.Count)return false;
|
||||
Queue=symbols;
|
||||
Index=-1;
|
||||
ManualResetEvent[] resetEvents = new ManualResetEvent[MaxThreads];
|
||||
for (int eventIndex = 0; eventIndex < resetEvents.Length; eventIndex++)resetEvents[eventIndex] = new ManualResetEvent(true);
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,$"[LoadInsiderTransactionsYearGreaterEqual] Queuing LoadInsiderTransactions Load {symbols} Year>={yearGreaterEqual} MaxThreads:{MaxThreads}...");
|
||||
while (true)
|
||||
{
|
||||
ManualResetEvent[] availableEvents=GetAvailableEvents(resetEvents);
|
||||
ManualResetEvent[] busyEvents=GetBusyEvents(resetEvents);
|
||||
if (null == PeekQueueItem() && 0==busyEvents.Length)
|
||||
{
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,$"[LoadInsiderTransactionsYearGreaterEqual] queue contains {0} items, busy events {busyEvents.Length}, all done.");
|
||||
break;
|
||||
}
|
||||
for (int index = 0; index < availableEvents.Length; index++)
|
||||
{
|
||||
String symbol=GetQueueItem();
|
||||
if (null != symbol)
|
||||
{
|
||||
availableEvents[index].Reset();
|
||||
InsiderTransactionThreadHelper threadHelper = new InsiderTransactionThreadHelper(symbol, availableEvents[index]);
|
||||
threadHelper.UpdateManager=UpdateManager;
|
||||
threadHelper.YearGreaterEqual=yearGreaterEqual;
|
||||
ThreadPool.QueueUserWorkItem(ThreadPoolCallbackYearGreaterEqual, threadHelper);
|
||||
try { Thread.Sleep(WAIT_TIME_MS); } catch(Exception) { ;} // wait
|
||||
}
|
||||
else
|
||||
{
|
||||
busyEvents=GetBusyEvents(resetEvents);
|
||||
if(busyEvents.Length!=availableEvents.Length)
|
||||
{
|
||||
ManualResetEvent[] resizedEvents=new ManualResetEvent[busyEvents.Length];
|
||||
Array.Copy(busyEvents, resizedEvents,busyEvents.Length);
|
||||
resetEvents = resizedEvents;
|
||||
}
|
||||
break;
|
||||
}
|
||||
} // for
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,"[LoadInsiderTransactionsYearGreaterEqual] waiting for free slots...");
|
||||
if(resetEvents.Length>0)WaitHandle.WaitAny(resetEvents);
|
||||
if(null==PeekQueueItem())resetEvents=ResizeEvents(resetEvents);
|
||||
} // while
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,"[LoadInsiderTransactionsYearGreaterEqual] completed.");
|
||||
return true;
|
||||
}
|
||||
currentIndex = index;
|
||||
return queueSymbols;
|
||||
}
|
||||
finally
|
||||
{
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[LoadInsiderTransactionsYearGreaterEqual] End, total took {0}(ms)",profiler.End()));
|
||||
}
|
||||
}
|
||||
|
||||
public void ThreadPoolCallback(Object threadHelperContext)
|
||||
private void ThreadPoolCallback(Object threadHelperContext)
|
||||
{
|
||||
ThreadHelper threadHelper = (ThreadHelper)threadHelperContext;
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Insider Transactions Thread {0} started for {1}...", Thread.CurrentThread.ManagedThreadId, threadHelper.Symbol));
|
||||
@@ -153,7 +149,7 @@ namespace MarketData.Helper
|
||||
threadHelper.ResetEvent.Set();
|
||||
}
|
||||
|
||||
public static void LoadInsiderTransactionsSymbolEx(String symbol)
|
||||
private static void LoadInsiderTransactionsSymbolEx(String symbol)
|
||||
{
|
||||
symbol = symbol.ToUpper();
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Load insider transactions for {0}", symbol));
|
||||
@@ -168,7 +164,7 @@ namespace MarketData.Helper
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,"Insider Transactions - Done.");
|
||||
}
|
||||
|
||||
public void ThreadPoolCallbackYearGreaterEqual(Object threadHelperContext)
|
||||
private void ThreadPoolCallbackYearGreaterEqual(Object threadHelperContext)
|
||||
{
|
||||
InsiderTransactionThreadHelper threadHelper = (InsiderTransactionThreadHelper)threadHelperContext;
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Insider Transactions Thread {0} started for {1}...", Thread.CurrentThread.ManagedThreadId, threadHelper.Symbol));
|
||||
@@ -178,7 +174,7 @@ namespace MarketData.Helper
|
||||
threadHelper.ResetEvent.Set();
|
||||
}
|
||||
|
||||
public static void LoadInsiderTransactionsYearGreaterEqualEx(String symbol, int yearGreaterEqual)
|
||||
private static void LoadInsiderTransactionsYearGreaterEqualEx(String symbol, int yearGreaterEqual)
|
||||
{
|
||||
symbol = symbol.ToUpper();
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Load insider transactions for {0} years>={1}", symbol,yearGreaterEqual));
|
||||
|
||||
Reference in New Issue
Block a user