Commit 622084d9 authored by CompileNix's avatar CompileNix

clean minor stuff in RequestJobWorker

parent e0a7fc87
......@@ -87,7 +87,7 @@ namespace Compilenix.HttpHeaderSurvey.Implementation.Domain
itemsGot++;
await targetBlock.SendAsync(job);
await targetBlock.SendAsync(job, token);
}
countOfJobsToProcess -= itemsGot;
......@@ -98,19 +98,29 @@ namespace Compilenix.HttpHeaderSurvey.Implementation.Domain
targetBlock.Complete();
}
private static async Task ProcessHttpResponseAsync(HttpResponseMessage jobResult, IUnitOfWork unit, ResponseMessage responseMessage)
{
if (jobResult.Headers == null) throw new AggregateException("http result has no response headers");
if (jobResult.Content?.Headers == null) throw new AggregateException("http result has no response content headers");
var responseHeaderModule = unit.Resolve<IResponseHeaderModule>();
var headers = await responseHeaderModule.GetResponseHeadersFromListAsync(jobResult.Headers, unit);
headers.AddRange(await responseHeaderModule.GetResponseHeadersFromListAsync(jobResult.Content.Headers, unit));
responseMessage.ResponseHeaders = headers;
responseMessage.ProtocolVersion = jobResult.Version?.ToString();
responseMessage.StatusCode = (int)jobResult.StatusCode;
}
public async Task StopAsync()
{
_cancellationTokenSource?.Cancel();
if (_jobCompletedBlock?.Completion != null)
{
await _jobCompletedBlock?.Completion;
}
if (IsThrottling && ThrottlingTask != null)
{
await ThrottlingTask;
}
}
public async Task StartAsync(int countOfJobsToProcess)
......@@ -118,20 +128,18 @@ namespace Compilenix.HttpHeaderSurvey.Implementation.Domain
if (!_cancellationTokenSource?.IsCancellationRequested ?? false)
{
InitDataflow();
// ReSharper disable once PossibleNullReferenceException
await ProcessPendingJobs(countOfJobsToProcess, _cancellationTokenSource.Token);
await ProcessPendingJobs(countOfJobsToProcess);
}
}
/// <summary>Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.</summary>
/// <inheritdoc />
public void Dispose()
{
StopAsync().Wait();
_cancellationTokenSource?.Dispose();
}
private async Task CompleteRequestJob((RequestJob job, bool isCompleted) data)
private async Task CompleteRequestJobAsync((RequestJob job, bool isCompleted) data)
{
var job = data.job;
......@@ -184,17 +192,15 @@ namespace Compilenix.HttpHeaderSurvey.Implementation.Domain
_processJobBlocks = new List<TransformBlock<RequestJob, (RequestJob job, bool isCompleted)>>();
for (var i = 0; i < Environment.ProcessorCount << 3; i++)
{
// ReSharper disable once AssignNullToNotNullAttribute
_processJobBlocks.Add(new TransformBlock<RequestJob, (RequestJob job, bool isCompleted)>(job => ProcessRequestJob(job), blockOptions));
_processJobBlocks.Add(new TransformBlock<RequestJob, (RequestJob job, bool isCompleted)>((Func<RequestJob, Task<(RequestJob job, bool isCompleted)>>)ProcessRequestJobAsync, blockOptions));
}
_jobCompletedBlock = new ActionBlock<(RequestJob job, bool isCompleted)>(CompleteRequestJob, blockOptions);
_jobCompletedBlock = new ActionBlock<(RequestJob job, bool isCompleted)>(CompleteRequestJobAsync, blockOptions);
foreach (var jobBlock in _processJobBlocks)
{
_inputBufferBlock.LinkTo(jobBlock);
jobBlock.LinkTo(_jobCompletedBlock);
// ReSharper disable once MethodSupportsCancellation
_inputBufferBlock.Completion?.ContinueWith(t => { jobBlock.Complete(); });
}
}
......@@ -204,9 +210,7 @@ namespace Compilenix.HttpHeaderSurvey.Implementation.Domain
ThrottlingItemsPerSecond = uint.Parse(config.Get("RequestJobWorkerThrottlingItemsPerSecond") ?? "10");
if (ThrottlingItemsPerSecond == 0)
{
ThrottlingItemsPerSecond = 10;
}
CurrentItemsPerSecond = 0;
......@@ -215,13 +219,9 @@ namespace Compilenix.HttpHeaderSurvey.Implementation.Domain
if (CurrentItemsPerSecond != 0)
{
if (CurrentItemsPerSecond < ThrottlingItemsPerSecond)
{
CurrentItemsPerSecond = 0;
}
else
{
CurrentItemsPerSecond -= ThrottlingItemsPerSecond;
}
}
Thread.Sleep(1_000);
......@@ -235,17 +235,14 @@ namespace Compilenix.HttpHeaderSurvey.Implementation.Domain
}
}
if (ThrottlingTask == null)
{
ThrottlingTask = new Task(ItemsPerSecondAction);
}
if (ThrottlingTask == null) ThrottlingTask = new Task(ItemsPerSecondAction);
}
private async Task ProcessPendingJobs(int countOfJobsToProcess, CancellationToken token)
private async Task ProcessPendingJobs(int countOfJobsToProcess)
{
if (_inputBufferBlock != null)
{
await FillConsumerAsync(countOfJobsToProcess, _inputBufferBlock, token);
await FillConsumerAsync(countOfJobsToProcess, _inputBufferBlock, _cancellationTokenSource.Token);
if (_inputBufferBlock?.Completion != null)
{
......@@ -266,7 +263,7 @@ namespace Compilenix.HttpHeaderSurvey.Implementation.Domain
}
}
private async Task<(RequestJob job, bool isCompleted)> ProcessRequestJob(RequestJob requestJob)
private async Task<(RequestJob job, bool isCompleted)> ProcessRequestJobAsync(RequestJob requestJob)
{
if (IsThrottling)
{
......@@ -284,6 +281,7 @@ namespace Compilenix.HttpHeaderSurvey.Implementation.Domain
{
using (var unit = IoC.Resolve<IUnitOfWork>())
{
unit.SaveChanges = false;
requestJob = await unit.Resolve<IRequestJobRepository>().GetWithRequestHeadersAsync(requestJob.Id);
if (requestJob?.Uri == null)
......@@ -325,23 +323,7 @@ namespace Compilenix.HttpHeaderSurvey.Implementation.Domain
return (requestJob, isCompleted);
}
if (jobResult.Headers == null)
{
throw new AggregateException("http result has no response headers");
}
if (jobResult.Content?.Headers == null)
{
throw new AggregateException("http result has no response content headers");
}
var responseHeaderModule = unit.Resolve<IResponseHeaderModule>();
var headers = await responseHeaderModule.GetResponseHeadersFromListAsync(jobResult.Headers, unit);
headers.AddRange(await responseHeaderModule.GetResponseHeadersFromListAsync(jobResult.Content.Headers, unit));
responseMessage.ResponseHeaders = headers;
responseMessage.ProtocolVersion = jobResult.Version?.ToString();
responseMessage.StatusCode = (int)jobResult.StatusCode;
await ProcessHttpResponseAsync(jobResult, unit, responseMessage);
unit.Resolve<IResponseMessageRepository>().Add(responseMessage);
......@@ -352,7 +334,7 @@ namespace Compilenix.HttpHeaderSurvey.Implementation.Domain
catch (Exception exception)
{
isCompleted = false;
if (!exception.GetAllMessages().Any(e => e?.Contains($"Cannot insert duplicate key row in object 'dbo.{nameof(ResponseHeader)}s'") ?? false))
if (!exception.GetAllMessages().Any(e => e?.Contains("Cannot insert duplicate key row in object") ?? false))
{
throw;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment