Commit d206eabe authored by CompileNix's avatar CompileNix

add RequestJobImporter module

parent ade5f2b0
......@@ -10,25 +10,24 @@ using Compilenix.HttpHeaderSurvey.Implementation.Shared;
using Compilenix.HttpHeaderSurvey.Integration.DataAccess.Entitys;
using Compilenix.HttpHeaderSurvey.Integration.Domain;
using Compilenix.HttpHeaderSurvey.Integration.Domain.DataTransferObjects;
using JetBrains.Annotations;
namespace Compilenix.HttpHeaderSurvey.Implementation.Domain
{
public class DataTransferObjectConverter : IDataTransferObjectConverter
{
private static async Task<DataTable> ConvertCsvToDataTable(string filePath, char seperator)
[ItemNotNull]
[NotNull]
private static async Task<DataTable> ConvertCsvToDataTable([NotNull] string filePath, char seperator)
{
var dataTable = new DataTable();
using (var streamReader = new StreamReader(filePath, Encoding.UTF8))
{
typeof(DataTransferObjectConverter).Log()?.Debug("Loading csv file");
typeof(DataTransferObjectConverter).Log().Debug("Loading csv file");
dataTable.Columns.AddRange(GetDataColumnsFromCsvHeader(seperator, streamReader));
var blockOptions = new ExecutionDataflowBlockOptions
{
BoundedCapacity = Environment.ProcessorCount * 2,
MaxDegreeOfParallelism = Environment.ProcessorCount * 2
};
var blockOptions = new ExecutionDataflowBlockOptions { BoundedCapacity = Environment.ProcessorCount * 2, MaxDegreeOfParallelism = Environment.ProcessorCount * 2 };
var coloumCount = dataTable.Columns.Count;
......@@ -36,92 +35,117 @@ namespace Compilenix.HttpHeaderSurvey.Implementation.Domain
var processLineBock = new TransformBlock<string, DataRow>(
line =>
{
var row = default(DataRow);
try
{
var row = default(DataRow);
dataTable.ThreadSafeAction(() => { row = dataTable.NewRow(); });
if (line == null)
{
return row;
}
var columnValues = line.Split(seperator);
if (columnValues.Length == coloumCount)
{
// ReSharper disable once CoVariantArrayConversion
row.ItemArray = columnValues;
}
return row;
}
catch (Exception e)
catch
{
Console.WriteLine(e);
throw;
// ignored
}
},
blockOptions);
return row;
}, blockOptions);
var completeBlock = new ActionBlock<DataRow>(
row =>
{
if (row == null)
{
typeof(DataTransferObjectConverter).Log()?.Debug("Got null row");
typeof(DataTransferObjectConverter).Log().Debug("Got null row");
return;
}
dataTable.ThreadSafeAction(() => dataTable.Rows.Add(row));
},
blockOptions);
dataTable.ThreadSafeAction(() => dataTable.Rows?.Add(row));
}, blockOptions);
bufferBlock.LinkTo(processLineBock);
processLineBock.LinkTo(completeBlock);
#pragma warning disable 4014
bufferBlock.Completion?.ContinueWith(t => processLineBock.Complete());
processLineBock.Completion?.ContinueWith(t => completeBlock.Complete());
#pragma warning restore 4014
while (!streamReader.EndOfStream)
{
await bufferBlock.SendAsync(await streamReader.ReadLineAsync());
var readLineAsync = streamReader.ReadLineAsync();
if (readLineAsync == null)
{
continue;
}
var sendAsync = bufferBlock.SendAsync(await readLineAsync);
if (sendAsync != null)
{
await sendAsync;
}
}
bufferBlock.Complete();
// ReSharper disable once PossibleNullReferenceException
await bufferBlock.Completion;
// ReSharper disable once PossibleNullReferenceException
await processLineBock.Completion;
// ReSharper disable once PossibleNullReferenceException
await completeBlock.Completion;
}
typeof(DataTransferObjectConverter).Log()?.Debug("Csv file loaded");
typeof(DataTransferObjectConverter).Log().Debug("Csv file loaded");
return dataTable;
}
private static DataColumn[] GetDataColumnsFromCsvHeader(char seperator, TextReader streamReader)
private static DataColumn[] GetDataColumnsFromCsvHeader(char seperator, [NotNull] TextReader streamReader)
{
return streamReader.ReadLine().Split(seperator).Select(column => new DataColumn(column)).ToArray();
return streamReader.ReadLine()?.Split(seperator).Select(column => new DataColumn(column)).ToArray();
}
public async Task<IEnumerable<RequestJob>> RequestJobsFromCsv(string filePath, char seperator)
{
var jobs = new List<NewRequestJobDto>();
var dataTable = await ConvertCsvToDataTable(filePath, seperator);
if (dataTable.Rows == null)
{
return new List<RequestJob>();
}
var rows = new DataRow[dataTable.Rows.Count];
dataTable.Rows.CopyTo(rows, 0);
dataTable.Dispose();
GarbageCollectionUtils.CollectNow();
Parallel.ForEach(
rows,
new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount * 2 },
row =>
rows, new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount }, row =>
{
if (row.ItemArray.Length == typeof(NewRequestJobDto).GetProperties().Length)
if (row != null && row.ItemArray.Length == typeof(NewRequestJobDto).GetProperties().Length)
{
jobs.ThreadSafeAction(
() =>
jobs.Add(
new NewRequestJobDto
{
Method = row.ItemArray[0]?.ToString(),
HttpVersion = row.ItemArray[1]?.ToString(),
IsRunOnce = bool.Parse(row.ItemArray[2]?.ToString()),
Uri = row.ItemArray[3]?.ToString()
}));
() => jobs.Add(
new NewRequestJobDto
{
Method = row.ItemArray[0]?.ToString(),
HttpVersion = row.ItemArray[1]?.ToString(),
IsRunOnce = bool.Parse(row.ItemArray[2]?.ToString() ?? "true"),
Uri = row.ItemArray[3]?.ToString()
}));
}
});
// ReSharper disable once AssignNullToNotNullAttribute
return MappingUtils.MapRange<RequestJob>(jobs);
}
}
......
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using Compilenix.HttpHeaderSurvey.Implementation.Shared.IoC;
using Compilenix.HttpHeaderSurvey.Integration.DataAccess;
using Compilenix.HttpHeaderSurvey.Integration.DataAccess.Entitys;
using Compilenix.HttpHeaderSurvey.Integration.Domain;
using Compilenix.HttpHeaderSurvey.Integration.Domain.Modules;
using JetBrains.Annotations;
namespace Compilenix.HttpHeaderSurvey.Implementation.Domain
{
[UsedImplicitly]
public class RequestJobImporter : IRequestJobImporter
{
[NotNull]
private readonly IUnitOfWork _unit;
public RequestJobImporter([NotNull] IUnitOfWork unit)
{
_unit = unit;
}
public async Task FromCsvAsync(string filePath, IEnumerable<RequestHeader> requestHeaders, char delimiter = ',')
{
var jobsFromCsv = await new DataTransferObjectConverter().RequestJobsFromCsv(filePath, delimiter);
var input = new BufferBlock<RequestJob>();
var processingList = new List<ActionBlock<RequestJob>>();
for (var i = 0; i < 4; i++)
{
var item = new ActionBlock<RequestJob>(
job =>
{
if (job != null)
{
ImportAsync(job, requestHeaders).Wait();
}
}, new ExecutionDataflowBlockOptions
{
BoundedCapacity = 3,
TaskScheduler = new ConcurrentExclusiveSchedulerPair().ConcurrentScheduler,
MaxDegreeOfParallelism = Environment.ProcessorCount
});
input.LinkTo(item);
input.Completion?.ContinueWith(task => item.Complete());
processingList.Add(item);
}
foreach (var requestJob in jobsFromCsv)
{
// ReSharper disable once PossibleNullReferenceException
while (!await input.SendAsync(requestJob)) { }
}
// ReSharper disable once RedundantAssignment
jobsFromCsv = null;
input.Complete();
Task.WaitAll(processingList.Select(x => x?.Completion).ToArray());
}
public async Task FromCsvAsync(string filePath, char delimiter = ',')
{
var module = _unit.Resolve<IRequestHeaderModule>();
await FromCsvAsync(filePath, await module.GetDefaultRequestHeadersAsync(), delimiter);
}
public async Task ImportAsync(RequestJob requestJob, IEnumerable<RequestHeader> headers)
{
using (var unit = IoC.Resolve<IUnitOfWork>())
{
var headersToAdd = new List<RequestHeader>();
foreach (var header in headers)
{
if (header != null)
{
headersToAdd.Add(await unit.Resolve<IRequestHeaderModule>().AddOrUpdateAsync(header));
}
}
headersToAdd = headersToAdd.Where(x => x != null).ToList();
requestJob.Headers = headersToAdd;
await unit.Resolve<IRequestJobModule>().AddOrUpdateAsync(requestJob);
}
}
/// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
public void Dispose()
{
_unit.Dispose();
}
}
}
\ No newline at end of file
......@@ -2,6 +2,7 @@
<packages>
<package id="AutoMapper" version="5.2.0" targetFramework="net461" />
<package id="JetBrains.Annotations" version="10.2.1" targetFramework="net461" />
<package id="log4net" version="2.0.7" targetFramework="net461" />
<package id="System.Threading.Tasks.Dataflow" version="4.7.0" targetFramework="net461" />
</packages>
\ No newline at end of file
......@@ -41,6 +41,7 @@
<Compile Include="DataTransferObjects\NewRequestJobDTO.cs" />
<Compile Include="IApplicationConfigurationCollection.cs" />
<Compile Include="IDataTransferObjectConverter.cs" />
<Compile Include="IRequestJobImporter.cs" />
<Compile Include="IRequestJobWorker.cs" />
<Compile Include="Modules\IBaseModule.cs" />
<Compile Include="Modules\IResponseErrorModule.cs" />
......
namespace Compilenix.HttpHeaderSurvey.Integration.Domain.DataTransferObjects
using System.Diagnostics.CodeAnalysis;
namespace Compilenix.HttpHeaderSurvey.Integration.Domain.DataTransferObjects
{
[SuppressMessage("ReSharper", "UnusedAutoPropertyAccessor.Global")]
public class NewRequestJobDto
{
public string Method { get; set; }
......
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Threading.Tasks;
using Compilenix.HttpHeaderSurvey.Integration.DataAccess.Entitys;
using JetBrains.Annotations;
namespace Compilenix.HttpHeaderSurvey.Integration.Domain
{
[SuppressMessage("ReSharper", "UnusedMemberInSuper.Global")]
public interface IDataTransferObjectConverter
{
Task<IEnumerable<RequestJob>> RequestJobsFromCsv(string filePath, char seperator);
[ItemNotNull]
[NotNull]
Task<IEnumerable<RequestJob>> RequestJobsFromCsv([NotNull] string filePath, char seperator);
}
}
\ No newline at end of file
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Threading.Tasks;
using Compilenix.HttpHeaderSurvey.Integration.DataAccess.Entitys;
using JetBrains.Annotations;
namespace Compilenix.HttpHeaderSurvey.Integration.Domain
{
[SuppressMessage("ReSharper", "UnusedMemberInSuper.Global")]
public interface IRequestJobImporter : IDisposable
{
[NotNull]
Task FromCsvAsync([NotNull] string filePath, char delimiter = ',');
[NotNull]
Task FromCsvAsync([NotNull] string filePath, [ItemNotNull] [NotNull] IEnumerable<RequestHeader> requestHeaders, char delimiter = ',');
[NotNull]
Task ImportAsync([NotNull] RequestJob requestJob, [NotNull] IEnumerable<RequestHeader> headers);
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment