Update threading model for company profile
This commit is contained in:
@@ -1,6 +1,4 @@
|
||||
using System;
|
||||
using System.Threading;
|
||||
using MarketData.MarketDataModel;
|
||||
using MarketData.MarketDataModel;
|
||||
using MarketData.DataAccess;
|
||||
using MarketData.Utils;
|
||||
|
||||
@@ -19,111 +17,106 @@ namespace MarketData.Helper
|
||||
ResetEvent=resetEvent;
|
||||
}
|
||||
}
|
||||
public class CompanyProfileMarketDataHelper
|
||||
|
||||
public class CompanyProfileMarketDataHelper : MarketDataHelperBase<CompanyProfile>
|
||||
{
|
||||
private static int MaxThreads = (int)ThreadHelperEnum.MaxThreads;
|
||||
private CompanyProfiles companyProfiles;
|
||||
private int currentIndex = 0;
|
||||
private static int SLEEP_TIME_MS = 500;
|
||||
|
||||
public CompanyProfileMarketDataHelper()
|
||||
{
|
||||
}
|
||||
public bool UpdateCompanyProfile(String symbol)
|
||||
{
|
||||
Profiler profiler=new Profiler();
|
||||
try
|
||||
{
|
||||
companyProfiles=new CompanyProfiles();
|
||||
CompanyProfile companyProfile=CompanyProfileDA.GetCompanyProfile(symbol);
|
||||
if(null==companyProfile)return false;
|
||||
companyProfiles.Add(companyProfile);
|
||||
currentIndex = 0;
|
||||
while (true)
|
||||
{
|
||||
CompanyProfiles queueProfiles = GetQueueProfiles();
|
||||
if (null == queueProfiles || 0 == queueProfiles.Count) break;
|
||||
ManualResetEvent[] resetEvents = new ManualResetEvent[queueProfiles.Count];
|
||||
for (int eventIndex = 0; eventIndex < resetEvents.Length; eventIndex++)
|
||||
{
|
||||
resetEvents[eventIndex] = new ManualResetEvent(false);
|
||||
}
|
||||
for (int index = 0; index < queueProfiles.Count; index++)
|
||||
{
|
||||
CompanyProfileThreadHelper companyProfileThreadHelper = new CompanyProfileThreadHelper(queueProfiles[index],resetEvents[index]);
|
||||
ThreadPool.QueueUserWorkItem(ThreadPoolCallback, companyProfileThreadHelper);
|
||||
}
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,"Load company profile, waiting for queued items to complete.");
|
||||
WaitHandle.WaitAll(resetEvents);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
finally
|
||||
{
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[UpdateCompanyProfiles]End, total took {0}(ms)",profiler.End()));
|
||||
}
|
||||
}
|
||||
|
||||
public bool UpdateCompanyProfiles()
|
||||
{
|
||||
return UpdateCompanyProfile();
|
||||
}
|
||||
|
||||
public bool UpdateCompanyProfile(String symbol=null)
|
||||
{
|
||||
Profiler profiler=new Profiler();
|
||||
try
|
||||
{
|
||||
companyProfiles=CompanyProfileDA.GetCompanyProfiles();
|
||||
currentIndex = 0;
|
||||
while (true)
|
||||
{
|
||||
CompanyProfiles queueProfiles = GetQueueProfiles();
|
||||
if (null == queueProfiles || 0 == queueProfiles.Count) break;
|
||||
ManualResetEvent[] resetEvents = new ManualResetEvent[queueProfiles.Count];
|
||||
for (int eventIndex = 0; eventIndex < resetEvents.Length; eventIndex++)
|
||||
{
|
||||
resetEvents[eventIndex] = new ManualResetEvent(false);
|
||||
}
|
||||
for (int index = 0; index < queueProfiles.Count; index++)
|
||||
{
|
||||
CompanyProfileThreadHelper companyProfileThreadHelper = new CompanyProfileThreadHelper(queueProfiles[index],resetEvents[index]);
|
||||
ThreadPool.QueueUserWorkItem(ThreadPoolCallback, companyProfileThreadHelper);
|
||||
}
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,"Load company profile, waiting for queued items to complete.");
|
||||
WaitHandle.WaitAll(resetEvents);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
finally
|
||||
{
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[UpdateCompanyProfiles]End, total took {0}(ms)",profiler.End()));
|
||||
}
|
||||
}
|
||||
private CompanyProfiles GetQueueProfiles()
|
||||
{
|
||||
CompanyProfiles queueProfiles = new CompanyProfiles();
|
||||
int index = currentIndex;
|
||||
for (; index < currentIndex + MaxThreads && index < companyProfiles.Count; index++)
|
||||
Profiler profiler=new Profiler();
|
||||
try
|
||||
{
|
||||
queueProfiles.Add(companyProfiles[index]);
|
||||
if(null==symbol)
|
||||
{
|
||||
Queue=CompanyProfileDA.GetCompanyProfiles();
|
||||
}
|
||||
else
|
||||
{
|
||||
CompanyProfile companyProfile=CompanyProfileDA.GetCompanyProfile(symbol);
|
||||
if(null==companyProfile)return false;
|
||||
Queue = new CompanyProfile[]{companyProfile}.ToList();
|
||||
}
|
||||
Index=-1;
|
||||
ManualResetEvent[] resetEvents = new ManualResetEvent[MaxThreads];
|
||||
for (int eventIndex = 0; eventIndex < resetEvents.Length; eventIndex++)resetEvents[eventIndex] = new ManualResetEvent(true);
|
||||
MDTrace.WriteLine(String.Format("[UpdateCompanyProfile] Queuing company profile fetches ..."));
|
||||
DateTime modified=DateTime.Now;
|
||||
while (true)
|
||||
{
|
||||
ManualResetEvent[] availableEvents=GetAvailableEvents(resetEvents);
|
||||
ManualResetEvent[] busyEvents=GetBusyEvents(resetEvents);
|
||||
if (null == PeekQueueItem() && 0==busyEvents.Length)
|
||||
{
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[UpdateCompanyProfile] queue contains {0} items, busy events {1}, all done.",0,busyEvents.Length));
|
||||
break;
|
||||
}
|
||||
for (int index = 0; index < availableEvents.Length; index++)
|
||||
{
|
||||
CompanyProfile queueItem=GetQueueItem();
|
||||
if (null != queueItem)
|
||||
{
|
||||
availableEvents[index].Reset();
|
||||
CompanyProfileThreadHelper companyProfileThreadHelper = new CompanyProfileThreadHelper(queueItem,resetEvents[index]);
|
||||
ThreadPool.QueueUserWorkItem(ThreadPoolCallback, companyProfileThreadHelper);
|
||||
try { Thread.Sleep(SLEEP_TIME_MS); }catch (Exception) { ;}
|
||||
}
|
||||
else
|
||||
{
|
||||
busyEvents=GetBusyEvents(resetEvents);
|
||||
if(busyEvents.Length!=availableEvents.Length)
|
||||
{
|
||||
ManualResetEvent[] resizedEvents=new ManualResetEvent[busyEvents.Length];
|
||||
Array.Copy(busyEvents, resizedEvents,busyEvents.Length);
|
||||
resetEvents = resizedEvents;
|
||||
}
|
||||
break;
|
||||
}
|
||||
} // for
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,"[UpdateCompanyProfile] waiting for free slots...");
|
||||
if(resetEvents.Length>0)WaitHandle.WaitAny(resetEvents);
|
||||
if(null==PeekQueueItem())resetEvents=ResizeEvents(resetEvents);
|
||||
} // while
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,"[UpdateCompanyProfile] completed.");
|
||||
return true;
|
||||
}
|
||||
currentIndex = index;
|
||||
return queueProfiles;
|
||||
}
|
||||
finally
|
||||
{
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[UpdateCompanyProfile] End, total took {0}(ms)",profiler.End()));
|
||||
}
|
||||
}
|
||||
|
||||
public void ThreadPoolCallback(Object companyProfileThreadHelperContext)
|
||||
{
|
||||
CompanyProfileThreadHelper companyProfileThreadHelper = (CompanyProfileThreadHelper)companyProfileThreadHelperContext;
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Load profile, Thread {0} started for {1}...", Thread.CurrentThread.ManagedThreadId, companyProfileThreadHelper.CompanyProfile.Symbol));
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[UpdateCompanyProfile] Thread {0} started for {1}...", Thread.CurrentThread.ManagedThreadId, companyProfileThreadHelper.CompanyProfile.Symbol));
|
||||
LoadCompanyProfileEx(companyProfileThreadHelper.CompanyProfile);
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("Load profile, Thread {0} ended for {1}", Thread.CurrentThread.ManagedThreadId, companyProfileThreadHelper.CompanyProfile.Symbol));
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[UpdateCompanyProfile] Thread {0} ended for {1}", Thread.CurrentThread.ManagedThreadId, companyProfileThreadHelper.CompanyProfile.Symbol));
|
||||
companyProfileThreadHelper.ResetEvent.Set();
|
||||
}
|
||||
|
||||
private static void LoadCompanyProfileEx(CompanyProfile companyProfile)
|
||||
{
|
||||
CompanyProfile marketCompanyProfile = MarketDataHelper.GetCompanyProfile(companyProfile.Symbol);
|
||||
if (null == marketCompanyProfile)
|
||||
{
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,"No company profile for symbol '" + companyProfile.Symbol + "'");
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,"[[UpdateCompanyProfile]] No company profile for symbol '" + companyProfile.Symbol + "'");
|
||||
return;
|
||||
}
|
||||
if(!String.IsNullOrEmpty(marketCompanyProfile.Sector) && !marketCompanyProfile.Sector.Equals("N/A"))companyProfile.Sector=marketCompanyProfile.Sector;
|
||||
if(!String.IsNullOrEmpty(marketCompanyProfile.Industry) && !marketCompanyProfile.Industry.Equals("N/A"))companyProfile.Industry=marketCompanyProfile.Industry;
|
||||
if(!String.IsNullOrEmpty(marketCompanyProfile.Description) && !marketCompanyProfile.Description.Equals("N/A"))companyProfile.Description=marketCompanyProfile.Description;
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("LoadCompanyProfileEx: Updating company profile.....Symbol:{0} Sector:{1} Industry:{2}",companyProfile.Symbol,companyProfile.Sector, companyProfile.Industry));
|
||||
MDTrace.WriteLine(LogLevel.DEBUG,String.Format("[UpdateCompanyProfile] Updating company profile.....Symbol:{0} Sector:{1} Industry:{2}",companyProfile.Symbol,companyProfile.Sector, companyProfile.Industry));
|
||||
CompanyProfileDA.UpdateCompanyProfile(companyProfile);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user