September 23, 2016

ReliableDictionaryの走査速度の調査

調査といってもそんなたいそうなことではない。 デバッグ実行して実行時間測っただけ。
アイテム数を100->1000->10000->100000->1000000と増やしていき、1カラムのフィルタリング(つまりSELECT * FROM [DICT] WHERE <COLUMN_CONDITION>)にかかる時間を測る。

結果

とりあえず結果。

size time[ms] relative value
100 6 1.00
1k 64 10.6
10k 574 95.7
100k 1005 168
1M 5862 977

グラフ

なんか例外吐いてリサイクル

一度に大量にWriteするとServiceが頻繁に例外吐いてリサイクルされる。なんじゃこりゃ。

System.Runtime.InteropServices.COMException’

登録されているサービス タイプの名前が、サービス マニフェストと一致していないときに吐くらしい。

System.Fabric.FabricTransientException

クォーラム損失発生した時に起こるらしい。 以下引用

クォーラムの損失
ステートフル サービスのパーティションのレプリカの過半数がダウンした場合、そのパーティションは “クォーラム損失” と呼ばれる状態になります。 この時点で、Service Fabric はそのパーティションへの書き込み許可を停止し、パーティションの一貫性と信頼性が保たれるようにします。実際には、クライアントのデータが本当は保存されていないのに保存されたものとしてクライアントに通知されないよう、使用不可能な期間を受け入れるようになっています。管理者がそのステートフル サービスのセカンダリ レプリカからの読み取りを許可している場合、この状態の間も読み取り操作の実行を続けられることに注意してください。十分な数のレプリカが復旧するまで、またはクラスターの管理者が Repair-ServiceFabricPartition API.を使って強制的にシステムを使用できるようにするまで、パーティションはクォーラム損失状態のままになります。

警告: プライマリ レプリカがダウンしたときに修復アクションを実行すると、データの損失が発生します。

システム サービスでもクォーラム損失が発生する可能性があり、その影響は問題のあるサービスに固有です。たとえば、ネーム サービスでクォーラム損失が発生すると名前の解決に影響があり、フェールオーバー マネージャー サービスの場合は新しいサービスの作成とフェールオーバーがブロックされます。ユーザー独自のサービスとは異なり、システム サービスは “修復しない” ことをお勧めします。代わりに、単にダウンしたレプリカが復旧するまで待つことをお勧めします。
https://azure.microsoft.com/ja-jp/documentation/articles/service-fabric-disaster-recovery/

所感

Read自体は線形ですね。そりゃそうだ。ただ、かなり遅いですね。
そもそも、そんなに数入れて使うもんじゃないのかなあ…

Readが線形なのはいいんですけどEnumeratorだし。それよりWriteがすごく遅いのが気になる。100万件入れるのにすごく時間がかかったので、Writeについてもうちょい調べよう。

使用コード

using System;
using System.Collections.Generic;
using System.Fabric;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.ServiceFabric.Data.Collections;
using Microsoft.ServiceFabric.Services.Communication.Runtime;
using Microsoft.ServiceFabric.Services.Runtime;
using System.Diagnostics;

namespace FabricSample.Service
{
  internal sealed class Service : StatefulService
  {
    public Service(StatefulServiceContext context)
      : base(context)
    { }
    protected override IEnumerable<ServiceReplicaListener> CreateServiceReplicaListeners()
    {
      return new ServiceReplicaListener[0];
    }
    IEnumerable<IEnumerable<T>> Div<T>(IEnumerable<T> items, int size)
    {
      var count = items.Count();
      var pageMax = count / size + Math.Min(count % size, 1);
      for(var i = 0; i < pageMax; i++)
      {
        yield return items.Skip(size * i).Take(size);
      }
    }
    protected override async Task RunAsync(CancellationToken cancellationToken)
    {
      var collection = 
            await this.StateManager.GetOrAddAsync<IReliableDictionary<long, Item>>("SampleItems");
      var items = SampleItemsFactory.CreateItems(1000000);
      var max = items.Count();
      var sw = new Stopwatch();

      long current = 0;
      using (var tx = StateManager.CreateTransaction())
      {
        current = await collection.GetCountAsync(tx);
      }
      var progress = (double)current / max;
      var size = Math.Max((int)((1- progress) * (1 - progress) * 10000), 100);
      int count = (int)current / size;
      
      // 書き込んでる最中に死ぬので、
      // リサイクルが起こるごとに書き込みサイズを小さくして前回の途中から挿入していく。
      foreach (var _items in Div(items, size).Skip(count))
      {
        sw.Start();
        using (var tx = StateManager.CreateTransaction())
        {
          cancellationToken.ThrowIfCancellationRequested();
          var tasks = _items.Select(item => 
                                      collection.AddOrUpdateAsync(tx, item.Id, item, (key, old) => item));
          Task.WaitAll(tasks.ToArray());
          await tx.CommitAsync();
        }
        sw.Stop();
        ServiceEventSource
          .Current
          .ServiceMessage(this, $"Initialized {++count * size}/{max}  {sw.ElapsedMilliseconds}[ms]");
        sw.Reset();
      }
          
      ServiceEventSource.Current.ServiceMessage(this, $"Initialized All");

      var today = DateTime.Today;
      var thisweek = today.AddDays(7);
      var list = new List<Item>();

      while (true)
      {
        cancellationToken.ThrowIfCancellationRequested();
        sw.Start();
        using (var tx = StateManager.CreateTransaction())
        {
          var @enum = (await collection.CreateEnumerableAsync(tx)).GetAsyncEnumerator();
          while (await @enum.MoveNextAsync(cancellationToken))
          {
            var item = @enum.Current.Value;
            if (today <= item.DateTime && item.DateTime < thisweek)
                list.Add(item);
          }
        }
        sw.Stop();
        ServiceEventSource
          .Current
          .ServiceMessage(this, $"items.count = {list.Count}, time = {sw.ElapsedMilliseconds}[ms]");
        sw.Reset();
        list.Clear();
        await Task.Delay(TimeSpan.FromMilliseconds(100), cancellationToken);
      }
    }
  }
}

© iwate 2016