Initial Commit
This commit is contained in:
126
MarketData/MarketDataLib/Helper/AnalystPriceTargetMarketDataHelper.cs
Executable file
126
MarketData/MarketDataLib/Helper/AnalystPriceTargetMarketDataHelper.cs
Executable file
@@ -0,0 +1,126 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Text;
|
||||
using System.Threading;
|
||||
using MarketData.MarketDataModel;
|
||||
using MarketData.DataAccess;
|
||||
using MarketData.Utils;
|
||||
|
||||
namespace MarketData.Helper
|
||||
{
|
||||
public class AnalystPriceTargetMarketDataHelper : MarketDataHelperBase<String>
|
||||
{
|
||||
private static int MaxThreads = 10; //(int)ThreadHelperEnum.MaxThreads;
|
||||
private static int SLEEP_TIME_MS = 500;
|
||||
|
||||
public AnalystPriceTargetMarketDataHelper()
|
||||
{
|
||||
}
|
||||
public bool LoadAnalystPriceTarget(String symbol=null)
|
||||
{
|
||||
Profiler profiler=new Profiler();
|
||||
try
|
||||
{
|
||||
if(symbol==null)Queue=PricingDA.GetSymbols();
|
||||
else (Queue=new List<String>()).Add(symbol);
|
||||
ShuffleQueue(); // shuffle up the symbols
|
||||
Index=-1;
|
||||
ManualResetEvent[] resetEvents = new ManualResetEvent[MaxThreads];
|
||||
for (int eventIndex = 0; eventIndex < resetEvents.Length; eventIndex++)resetEvents[eventIndex] = new ManualResetEvent(true);
|
||||
MDTrace.WriteLine(String.Format("Queuing analyst price target fetches ..."));
|
||||
DateTime modified=DateTime.Now;
|
||||
while (true)
|
||||
{
|
||||
ManualResetEvent[] availableEvents=GetAvailableEvents(resetEvents);
|
||||
ManualResetEvent[] busyEvents=GetBusyEvents(resetEvents);
|
||||
if (null == PeekQueueItem() && 0==busyEvents.Length)
|
||||
{
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Analyst price target queue contains {0} items, busy events {1}, all done.",0,busyEvents.Length));
|
||||
break;
|
||||
}
|
||||
for (int index = 0; index < availableEvents.Length; index++)
|
||||
{
|
||||
symbol=GetQueueItem();
|
||||
if (null != symbol)
|
||||
{
|
||||
availableEvents[index].Reset();
|
||||
ThreadHelper analystPriceTargetThreadHelper = new ThreadHelper(symbol,modified,availableEvents[index]);
|
||||
ThreadPool.QueueUserWorkItem(ThreadPoolCallbackLoadAnalystPriceTarget, analystPriceTargetThreadHelper);
|
||||
try { Thread.Sleep(SLEEP_TIME_MS); }catch (Exception) { ;}
|
||||
}
|
||||
else
|
||||
{
|
||||
busyEvents=GetBusyEvents(resetEvents);
|
||||
if(busyEvents.Length!=availableEvents.Length)
|
||||
{
|
||||
ManualResetEvent[] resizedEvents=new ManualResetEvent[busyEvents.Length];
|
||||
Array.Copy(busyEvents, resizedEvents,busyEvents.Length);
|
||||
resetEvents = resizedEvents;
|
||||
}
|
||||
break;
|
||||
}
|
||||
} // for
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,"Analyst price target waiting for free slots...");
|
||||
if(resetEvents.Length>0)WaitHandle.WaitAny(resetEvents);
|
||||
if(null==PeekQueueItem())resetEvents=ResizeEvents(resetEvents);
|
||||
} // while
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,"Analyst price target completed.");
|
||||
return true;
|
||||
}
|
||||
finally
|
||||
{
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[LoadAnalystPriceTarget]End, total took {0}(ms)",profiler.End()));
|
||||
}
|
||||
}
|
||||
public void ThreadPoolCallbackLoadAnalystPriceTarget(Object analystPriceTargetThreadHelperContext)
|
||||
{
|
||||
ThreadHelper threadHelper = (ThreadHelper)analystPriceTargetThreadHelperContext;
|
||||
try
|
||||
{
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Load analyst price target, Thread {0} started for {1}...", Thread.CurrentThread.ManagedThreadId, threadHelper.Symbol));
|
||||
LoadAnalystPriceTargetEx(threadHelper.Symbol,threadHelper.Modified);
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Load analyst price target, Thread {0} ended for {1}", Thread.CurrentThread.ManagedThreadId, threadHelper.Symbol));
|
||||
}
|
||||
finally
|
||||
{
|
||||
threadHelper.ResetEvent.Set();
|
||||
}
|
||||
}
|
||||
// Single threaded method, use for debugging a single symbol fetch without going through the queue
|
||||
public static void LoadAnalystPriceTargetEx(String symbol,DateTime modified)
|
||||
{
|
||||
if (!AnalystPriceTargetDA.CheckAnalystPriceTargetModifiedOn(symbol, modified))
|
||||
{
|
||||
LoadAnalystPriceTargetEx(symbol);
|
||||
}
|
||||
else
|
||||
{
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,"Already have analyst price target for '"+symbol+"' on "+Utility.DateTimeToStringMMHDDHYYYY(modified));
|
||||
}
|
||||
}
|
||||
public static void LoadAnalystPriceTargetEx(String symbol)
|
||||
{
|
||||
try
|
||||
{
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,"Loading analyst price target for for '" + symbol + "'");
|
||||
AnalystPriceTarget analystPriceTarget = MarketDataHelper.GetAnalystPriceTarget(symbol);
|
||||
if (null != analystPriceTarget)
|
||||
{
|
||||
AnalystPriceTargetDA.InsertAnalystPriceTarget(analystPriceTarget);
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,"");
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,AnalystPriceTarget.Header);
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,analystPriceTarget.ToString());
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,"");
|
||||
}
|
||||
}
|
||||
catch (Exception exception)
|
||||
{
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,"Failed to load analyst price target for '" + symbol + "'");
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,"Exception was " + exception.ToString());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user