Implement SafeBatchInsert for Headlines Insert in the HeadlinesDA
Some checks failed
Build .NET Project / build (push) Has been cancelled
Some checks failed
Build .NET Project / build (push) Has been cancelled
This commit is contained in:
@@ -301,39 +301,106 @@ namespace MarketData.DataAccess
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// InsertHeadlines - Inserts headlines by batching them up into groups.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="headlines"></param>
|
||||||
|
/// <returns></returns>
|
||||||
public static bool InsertHeadlines(Headlines headlines)
|
public static bool InsertHeadlines(Headlines headlines)
|
||||||
{
|
{
|
||||||
MySqlCommand sqlCommand=null;
|
const int batchSize = 1000;
|
||||||
MySqlConnection sqlConnection=null;
|
MySqlConnection sqlConnection=null;
|
||||||
MySqlTransaction sqlTransaction=null;
|
MySqlTransaction sqlTransaction=null;
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
if (null == headlines || 0 == headlines.Count) return false;
|
if (null == headlines || 0 == headlines.Count) return false;
|
||||||
StringBuilder sb = new StringBuilder();
|
|
||||||
sqlConnection = SqlUtils.CreateMySqlConnection(MainDataSource.Instance.LocateDataSource("market_data"));
|
sqlConnection = SqlUtils.CreateMySqlConnection(MainDataSource.Instance.LocateDataSource("market_data"));
|
||||||
sqlTransaction = sqlConnection.BeginTransaction(System.Data.IsolationLevel.ReadCommitted);
|
sqlTransaction = sqlConnection.BeginTransaction(System.Data.IsolationLevel.ReadCommitted);
|
||||||
headlines=new Headlines(headlines.Distinct(new HeadlinesEqualityComparer()).ToList());
|
headlines=new Headlines(headlines.Distinct(new HeadlinesEqualityComparer()).ToList());
|
||||||
foreach(Headline headline in headlines)
|
foreach (IEnumerable<Headline> batch in headlines.Chunk(batchSize))
|
||||||
{
|
{
|
||||||
if(!HeadlineExists(headline,sqlConnection,sqlTransaction))InsertHeadline(headline,sqlConnection,sqlTransaction);
|
SafeInsertBatch(batch.ToList(), sqlConnection, sqlTransaction);
|
||||||
}
|
}
|
||||||
sqlTransaction.Commit();
|
sqlTransaction.Commit();
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
catch (Exception exception)
|
catch (Exception exception)
|
||||||
{
|
{
|
||||||
|
sqlTransaction?.Rollback(); // This will rollback without explicityl calling Rollback but we'll do this anyway.
|
||||||
MDTrace.WriteLine(LogLevel.DEBUG,exception);
|
MDTrace.WriteLine(LogLevel.DEBUG,exception);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
finally
|
finally
|
||||||
{
|
{
|
||||||
if(null!=sqlCommand)sqlCommand.Dispose();
|
|
||||||
if(null!=sqlTransaction)sqlTransaction.Dispose();
|
if(null!=sqlTransaction)sqlTransaction.Dispose();
|
||||||
if(null!=sqlConnection) sqlConnection.Close();
|
if(null!=sqlConnection) sqlConnection.Close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// SafeInsertBatch - This recursive approach allows us to catch exceptions and recursively call SafeInsertBatch continually dividing the
|
||||||
|
/// batch until we find the offensive row.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="batch"></param>
|
||||||
|
/// <param name="sqlConnection"></param>
|
||||||
|
/// <param name="sqlTransaction"></param>
|
||||||
|
private static void SafeInsertBatch(List<Headline> batch, MySqlConnection sqlConnection, MySqlTransaction sqlTransaction)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
InsertBatch(batch, sqlConnection, sqlTransaction);
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
// If only one row, it's the offender
|
||||||
|
if (batch.Count == 1)
|
||||||
|
{
|
||||||
|
MDTrace.WriteLine(LogLevel.DEBUG, ex);
|
||||||
|
MDTrace.WriteLine(LogLevel.DEBUG, $"Bad row: Symbol={batch[0].Symbol}, Headline={batch[0].Entry}");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// Split batch and retry
|
||||||
|
int mid = batch.Count / 2;
|
||||||
|
List<Headline> firstHalf = batch.Take(mid).ToList();
|
||||||
|
List<Headline> secondHalf = batch.Skip(mid).ToList();
|
||||||
|
SafeInsertBatch(firstHalf, sqlConnection, sqlTransaction);
|
||||||
|
SafeInsertBatch(secondHalf, sqlConnection, sqlTransaction);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Inserts a batch of headlines
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="batch"></param>
|
||||||
|
/// <param name="sqlConnection"></param>
|
||||||
|
/// <param name="sqlTransaction"></param>
|
||||||
|
private static void InsertBatch(IEnumerable<Headline> batch, MySqlConnection sqlConnection, MySqlTransaction sqlTransaction)
|
||||||
|
{
|
||||||
|
StringBuilder sb = new StringBuilder();
|
||||||
|
using MySqlCommand sqlCommand = new MySqlCommand();
|
||||||
|
sqlCommand.Connection = sqlConnection;
|
||||||
|
sqlCommand.Transaction = sqlTransaction;
|
||||||
|
DateTime now = DateTime.Now;
|
||||||
|
|
||||||
|
sb.Append("INSERT IGNORE INTO Headlines (symbol, asof, headline, source, modified) VALUES ");
|
||||||
|
int index = 0;
|
||||||
|
foreach (Headline headline in batch)
|
||||||
|
{
|
||||||
|
if (index > 0) sb.Append(",");
|
||||||
|
sb.Append($"(@symbol{index}, @asof{index}, @headline{index}, @source{index}, @modified{index})");
|
||||||
|
sqlCommand.Parameters.AddWithValue($"@symbol{index}", headline.Symbol);
|
||||||
|
sqlCommand.Parameters.AddWithValue($"@asof{index}", headline.Date);
|
||||||
|
sqlCommand.Parameters.AddWithValue($"@headline{index}", headline.Entry);
|
||||||
|
sqlCommand.Parameters.AddWithValue($"@source{index}", headline.Source);
|
||||||
|
DateTime modified = Utility.IsEpoch(headline.Modified) ? now : headline.Modified;
|
||||||
|
sqlCommand.Parameters.AddWithValue($"@modified{index}", modified);
|
||||||
|
index++;
|
||||||
|
}
|
||||||
|
sqlCommand.CommandText = sb.ToString();
|
||||||
|
sqlCommand.ExecuteNonQuery();
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// InsertHeadline - This is now parameterized. The MySql driver should handle all escaping etc.,
|
/// InsertHeadline - This is now parameterized. The MySql driver should handle all escaping etc.,
|
||||||
/// </summary>
|
/// </summary>
|
||||||
|
|||||||
Reference in New Issue
Block a user