September 25, 2016

ReliableDictionaryのWrite速度の調査

前回ReliableDictionaryの走査速度を調査するときに、Writeするのがすごく遅かったので手元のMacBookとParty Clusterで計測した。

結果

件数 Macでの所要時間[ms] PartyClusterでの所要時間[ms]
1 183 191
100 306 217
1k 926 318
10k 10289 2774
100k 122100 32880
1M - -

結果

100万件の挿入/更新は前回と同様にサービスが途中で再起動してしまので取れてない。

考察

まず、Party Clusterのほうが速い。Party ClusterのインスタンスはARMみたかんじStandard_D1かなあ。

でもそれはどうでもよくて、1000件くらいまでは結構すんなり入るけど、1万件になると急に時間がかかってるように見えるのが気になる。これは、Keyでソートしてるせいかな?(余談だけど、Keyの順序をランダムに挿入しても、Enumeratorで取り出すと順番に取得できる。)

あと1件あたりの時間も気になる。1件挿入/更新するのに180[ms]かかるってのどうよ?

あ、でも、10万件の時の所要時間を1件単位にならすと1[ms]ぐらいなのでそこまで悪くないのかもしれない。

次のような計算をして、初期時間を省いた1件あたりにかかる時間を出してみた。

(N件のタイム - 1件のタイム) / (N件 - 1)          N := 100, 1000, 10000, 100000, 100000

件数 Macでの所要時間[ms] PartyClusterでの所要時間[ms]
100 1.24 0.262
1k 0.689 0.112
10k 1.04 0.273
100k 1.24 0.335

うん、割とそろってる気がする。ストレージ書き込みや他クラスターと調停してるわけだからこのくらいバラつきは誤差内だと思う。

まとめ

トランザクションのロックかけるとこに時間がかかってる(200[ms]弱)ぽい。そりゃそうだ。

それを差し引けば1件あたりの挿入はそんなに遅くない(250[ns]くらい)

100万件つっこむの難しい。たぶんReliableQueueのほうはつっこめる気がするから、いったんキューイングしてから順々にReliableDictionaryに書き込むバージョンを明日試してみようと思う。

使用コード

using System;
using System.Collections.Generic;
using System.Fabric;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.ServiceFabric.Data.Collections;
using Microsoft.ServiceFabric.Services.Communication.Runtime;
using Microsoft.ServiceFabric.Services.Runtime;
using System.Runtime.Serialization;
using System.Diagnostics;
using Serilog;

namespace StatefullCollectionSample
{
  [DataContract]
  public class Sample
  {
    [DataMember]
    public long Id { get; set; }
    [DataMember]
    public string Text { get; set; }
    [DataMember]
    public DateTime DateTime { get; set; }

    public static IEnumerable<Sample> CreateSamples(int size)
    {
      var seed = 1024;
      var rand = new Random(seed);
      var anchorDate = new DateTime(2016, 1, 1).ToBinary();
      return Enumerable.Range(1, size).Select(i => new Sample
      {
        Id = i,
        Text = $"Sample Text {i}",
        DateTime = DateTime.FromBinary(anchorDate + rand.Next(-1000000, 1000000) * 3000000000)
      }).ToList();
    }
  }
  internal sealed class StatefullCollectionSample : StatefulService
  {
    public StatefullCollectionSample(StatefulServiceContext context)
      : base(context)
    { }

    protected override IEnumerable<ServiceReplicaListener> CreateServiceReplicaListeners()
    {
      return new ServiceReplicaListener[0];
    }
    IEnumerable<IEnumerable<T>> Div<T>(IEnumerable<T> items, int size)
    {
      var count = items.Count();
      var pageMax = count / size + Math.Min(count % size, 1);
      for (var i = 0; i < pageMax; i++)
      {
        yield return items.Skip(size * i).Take(size);
      }
    }
    protected override async Task RunAsync(CancellationToken cancellationToken)
    {
      var log = new LoggerConfiguration().WriteTo.Logentries("<API_KEY>").CreateLogger();
      var collection = await this.StateManager.GetOrAddAsync<IReliableDictionary<long, Sample>>("Samples");
      var Samples = Sample.CreateSamples(1000000);
      var sw = new Stopwatch();

      log.Information("initialzed!");
      while (true)
      {
        foreach (var size in new int[] { 1, 100, 1000, 10000, 100000, 1000000 })
        {
          sw.Start();
          var samples = Samples.Take(size);
          int unitSize = 50000, max = size / unitSize + Math.Min(size % unitSize, 1), count = 0;
          foreach(var unit in Div(samples, unitSize))
          {
            cancellationToken.ThrowIfCancellationRequested();
            using (var tx = this.StateManager.CreateTransaction())
            {
              var tasks = unit.Select(s => 
                collection.AddOrUpdateAsync(tx, s.Id, s, (key, value) => value)).ToArray();
              Task.WaitAll(tasks);
              await tx.CommitAsync();
            }
            log.Information($"size={size}, progress={++count}/{max}");
            ServiceEventSource.Current.ServiceMessage(this, $"size={size}, progress={count}/{max}");
          }

          sw.Stop();
          log.Information($"Complete (size={size}, time={sw.ElapsedMilliseconds}[ms])");
          ServiceEventSource
            .Current
            .ServiceMessage(this, $"Complete (size={size}, time={sw.ElapsedMilliseconds}[ms])");
          sw.Reset();
          await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
        }

        await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
      }
    }
  }
}

© iwate 2016