diff --git a/MarketData/MarketDataLib/DataAccess/HeadlinesDA.cs b/MarketData/MarketDataLib/DataAccess/HeadlinesDA.cs
index 8d064a0..95e7729 100755
--- a/MarketData/MarketDataLib/DataAccess/HeadlinesDA.cs
+++ b/MarketData/MarketDataLib/DataAccess/HeadlinesDA.cs
@@ -301,39 +301,106 @@ namespace MarketData.DataAccess
}
}
+ ///
+ /// InsertHeadlines - Inserts headlines by batching them up into groups.
+ ///
+ ///
+ ///
public static bool InsertHeadlines(Headlines headlines)
{
- MySqlCommand sqlCommand=null;
+ const int batchSize = 1000;
MySqlConnection sqlConnection=null;
MySqlTransaction sqlTransaction=null;
try
{
if (null == headlines || 0 == headlines.Count) return false;
- StringBuilder sb = new StringBuilder();
sqlConnection = SqlUtils.CreateMySqlConnection(MainDataSource.Instance.LocateDataSource("market_data"));
sqlTransaction = sqlConnection.BeginTransaction(System.Data.IsolationLevel.ReadCommitted);
headlines=new Headlines(headlines.Distinct(new HeadlinesEqualityComparer()).ToList());
- foreach(Headline headline in headlines)
+ foreach (IEnumerable batch in headlines.Chunk(batchSize))
{
- if(!HeadlineExists(headline,sqlConnection,sqlTransaction))InsertHeadline(headline,sqlConnection,sqlTransaction);
+ SafeInsertBatch(batch.ToList(), sqlConnection, sqlTransaction);
}
sqlTransaction.Commit();
return true;
}
catch (Exception exception)
{
+ sqlTransaction?.Rollback(); // This will rollback without explicityl calling Rollback but we'll do this anyway.
MDTrace.WriteLine(LogLevel.DEBUG,exception);
return false;
}
finally
{
- if(null!=sqlCommand)sqlCommand.Dispose();
if(null!=sqlTransaction)sqlTransaction.Dispose();
if(null!=sqlConnection) sqlConnection.Close();
}
}
+ ///
+ /// SafeInsertBatch - This recursive approach allows us to catch exceptions and recursively call SafeInsertBatch continually dividing the
+ /// batch until we find the offensive row.
+ ///
+ ///
+ ///
+ ///
+ private static void SafeInsertBatch(List batch, MySqlConnection sqlConnection, MySqlTransaction sqlTransaction)
+ {
+ try
+ {
+ InsertBatch(batch, sqlConnection, sqlTransaction);
+ }
+ catch (Exception ex)
+ {
+ // If only one row, it's the offender
+ if (batch.Count == 1)
+ {
+ MDTrace.WriteLine(LogLevel.DEBUG, ex);
+ MDTrace.WriteLine(LogLevel.DEBUG, $"Bad row: Symbol={batch[0].Symbol}, Headline={batch[0].Entry}");
+ return;
+ }
+ // Split batch and retry
+ int mid = batch.Count / 2;
+ List firstHalf = batch.Take(mid).ToList();
+ List secondHalf = batch.Skip(mid).ToList();
+ SafeInsertBatch(firstHalf, sqlConnection, sqlTransaction);
+ SafeInsertBatch(secondHalf, sqlConnection, sqlTransaction);
+ }
+ }
+
+ ///
+ /// Inserts a batch of headlines
+ ///
+ ///
+ ///
+ ///
+ private static void InsertBatch(IEnumerable batch, MySqlConnection sqlConnection, MySqlTransaction sqlTransaction)
+ {
+ StringBuilder sb = new StringBuilder();
+ using MySqlCommand sqlCommand = new MySqlCommand();
+ sqlCommand.Connection = sqlConnection;
+ sqlCommand.Transaction = sqlTransaction;
+ DateTime now = DateTime.Now;
+
+ sb.Append("INSERT IGNORE INTO Headlines (symbol, asof, headline, source, modified) VALUES ");
+ int index = 0;
+ foreach (Headline headline in batch)
+ {
+ if (index > 0) sb.Append(",");
+ sb.Append($"(@symbol{index}, @asof{index}, @headline{index}, @source{index}, @modified{index})");
+ sqlCommand.Parameters.AddWithValue($"@symbol{index}", headline.Symbol);
+ sqlCommand.Parameters.AddWithValue($"@asof{index}", headline.Date);
+ sqlCommand.Parameters.AddWithValue($"@headline{index}", headline.Entry);
+ sqlCommand.Parameters.AddWithValue($"@source{index}", headline.Source);
+ DateTime modified = Utility.IsEpoch(headline.Modified) ? now : headline.Modified;
+ sqlCommand.Parameters.AddWithValue($"@modified{index}", modified);
+ index++;
+ }
+ sqlCommand.CommandText = sb.ToString();
+ sqlCommand.ExecuteNonQuery();
+ }
+
///
/// InsertHeadline - This is now parameterized. The MySql driver should handle all escaping etc.,
///