Play Frameworkとpekko-quartz-schedulerで動かしているTyped ActorにDIする

現在私が業務で携わっているプロジェクトでは、バッチ処理の定期実行にpekko-quartz-schedulerというライブラリを使っている。 簡単に言うと、Actorに対し決められたタイミングでメッセージを送ることでバッチ処理を定期実行させるためのライブラリだ。 バッチ処理を行うActorの間ではあまり複雑なメッセージのやりとりが行われることもないためTyped Actorに移行するメリットより学習コストが大きいと判断されたためか、プロジェクトではこれまでバッチ処理のためのActorを書くときにずっとClassic Actorで書いていた。

しかし、Typed Actorがstableになってからもう5年くらい経っておりいい加減移行すべきではないかと思った。 ただ、Play Frameworkが標準で採用しているDIコンテナであるGoogle GuiceとTyped Actorを組み合わせて使う場合の情報があまり見つからず、少し調べるのに時間がかかった。 今回はPlay Framework, Google Guiceを利用している前提でpekko-quartz-schedulerでClassic ActorからTyped Actorに移行する際の勘所を書いていく。 Typed Actor自体については説明すると長くなってしまうため、各自AkkaやApache Pekkoのドキュメントを確認してほしい。

なお、今回はpekko-quartz-schedulerを使う前提で書いているが、akka-quartz-schedulerを使う場合もおそらく設定のキー名やimportをpekkoからakkaに変更するくらいの違いしか無いと思われる。

サンプルコードはここに置いている。

QuartzSchedulerTypedExtensionを使う

Classic Actorに定期的にメッセージを送るためにはQuartzSchedulerExtensionインスタンスが必要になるが、これをQuartzSchedulerTypedExtensionに変更する必要がある。 私のプロジェクトでは以下のようなクラスを定義し、Actorに送るメッセージのスケジューリングを送るクラスに対しSchedulerSettingを注入している。

import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.actor.typed.scaladsl.adapter.ClassicActorSystemOps
import org.apache.pekko.extension.quartz.QuartzSchedulerTypedExtension

import javax.inject.Inject

class SchedulerSetting @Inject()(actorSystem: ActorSystem) {
  val scheduler: QuartzSchedulerTypedExtension =
    QuartzSchedulerTypedExtension(actorSystem.toTyped)
}

なお、QuartzSchedulerTypedExtensionQuartzSchedulerExtensionを継承しておりClassic Actorからも同じインターフェースで利用できる。 なので、仮に私のプロジェクトのようにQuartzSchedulerExtensionインスタンスが既に多くのClassic Actorに関する設定を行うクラスから参照されてしまっていたとしても、それらのClassic Actorや設定を行うクラスを作り直す必要は一切なく、新しく作られるTyped Actorと共存することができる。

親ActorへのDI

まず、Actorを以下のように定義する。

package scheduler

import com.google.inject.{AbstractModule, Provides}
import org.apache.pekko.actor.typed.scaladsl.Behaviors
import org.apache.pekko.actor.typed.{ActorRef, Behavior}
import play.api.libs.concurrent.{ActorModule, PekkoGuiceSupport}

import javax.inject.Inject

object SampleActor extends ActorModule {
  sealed trait SampleActorMessage
  case object Hello extends SampleActorMessage

  override type Message = SampleActorMessage

  @Provides
  def create(sampleService: SampleService): Behavior[SampleActorMessage] =
    Behaviors.receiveMessage { case Hello =>
      println("SampleActor received Hello.")
      sampleService.exec()
      Behaviors.same
    }
}

class SampleService @Inject() () {
  def exec(): Unit = println("SampleService was executed.")
}

Typed ActorではActorはclassではなくBehaviorを返すメソッドをSingleton Objectに定義することでふるまいを定義するようになったため、DIしようにもどう注入すればいいのか謎だったがProvides Methodを使うようだった。 これで、Behavior[SampleActorMessage]インスタンスGuiceで生成しようとすると、SampleActor.create()が呼び出されるようになる。 Provides Methodの引数がGuiceにより生成できるオブジェクトだけであれば、依存関係の解決も勝手にやってくれる。

SampleActorでoverrideしているMessageという型エイリアスについてはこのActorをバインディングする際に関わってくるため、後で説明する。

次に、このActorに定期的にメッセージを送るためのスケジュールを行うためのクラスを見てみよう。

package scheduler

import com.google.inject.{AbstractModule, Provides}
import org.apache.pekko.actor.typed.scaladsl.Behaviors
import org.apache.pekko.actor.typed.{ActorRef, Behavior}
import play.api.libs.concurrent.{ActorModule, PekkoGuiceSupport}

import javax.inject.Inject

class SampleActorScheduler @Inject() (
    schedulerSetting: SchedulerSetting,
    sampleActor: ActorRef[SampleActor.SampleActorMessage]
) {
  schedulerSetting.scheduler.scheduleTyped(
    "Every5Seconds",
    sampleActor,
    SampleActor.Hello
  )
}

class SampleActorModule extends AbstractModule with PekkoGuiceSupport {
  override def configure(): Unit = {
    bindTypedActor(SampleActor, "SampleActor")
    bind(classOf[SampleActorScheduler]).asEagerSingleton()
  }
}

Classic ActorへのメッセージのスケジュールではQuartzSchedulerExtension.schedule()を呼び出していたが、Typed ACtorの場合はQuartzSchedulerTypedExtension.scheduleTyped()を使う。 SampleActorSchedulerActorRefを注入しているやり方に注目してほしいのだが、Classic ActorのActorRefを注入する際は名前付きバインディングを使っていたのが、そうではなくなっている。 Classic ActorではbindActor()によりActorRefを生成し名前付きバインディングを行なっていたが、Typed ActorのActorRefを生成しバインディングするbindTypedActor()は名前付きバインディングを提供しないのだ。 第2引数に渡した文字列は、ただ生成したActorの名前に使われるだけである。

bindTypedActor()は第1引数に受け取ったオブジェクトからActorRef[Message]型の値を返すProvides Methodを探し、そのメソッドに必要な依存関係を引数に渡してBehavior[Message]を生成し、Actorを生成してくれる。 このMessage型は何かといえば、Actorに関する説明で後回しにしていた型エイリアスであり、今回はSampleActorMessageの型エイリアスになる。 つまり、今回はSampleActorの中からBehavior[SampleActorMessage]型の値を返すProvides Methodを探してActorを生成することになる。

最後に、ここはClassic Actorと何も変更点が無いのだが、プロジェクトによってやり方が違うだろうからapplication.confの中身も一応載せて解説しておく。

pekko {
  quartz {
    schedules {
      Every5Seconds {
        description = "5秒ごとに実行"
        expression = "*/5 * * ? * *"
      }
    }
  }
}

play.modules.enabled += "scheduler.SampleActorModule"

SampleActorModuleを有効化することで、アプリケーションの起動時にActorの生成とバインディング, SampleActorSchedulerの初期化処理が走り、その中でActorに5秒ごとにHelloが送られるようスケジュールしているので、sbt runを実行すると5秒ごとに以下のようなログが流れるだろう。

SampleActor received Hello.
SampleService was executed.
SampleActor received Hello.
SampleService was executed.
SampleActor received Hello.
SampleService was executed.

...

状態を持つActorへのDI

ここまではakka-quartz-schedulerやPlay Frameworkのドキュメントを調べればわかるのだが、ここから先の話はどこにもベストプラクティスが書いていなかったので、あくまで私はこうやっているという話になる。

次にClassic Actorで以下のように記述されていたActorへDIを行うことを考える。

package scheduler

import com.google.inject.AbstractModule
import org.apache.pekko.actor.{Actor, ActorRef}
import play.api.libs.concurrent.PekkoGuiceSupport

import java.time.LocalDateTime
import javax.inject.{Inject, Named}
import scala.collection.mutable

class StatefulClassicActor @Inject() (sampleService: SampleService)
    extends Actor {
  private var i: Int = 1
  private val messageHistory
      : mutable.Buffer[(StatefulClassicActor.Message, LocalDateTime)] =
    mutable.Buffer.empty

  override def receive: Receive = { case StatefulClassicActor.Hello =>
    println(i)
    println(messageHistory.mkString(",\n"))
    sampleService.exec()

    i += 1
    messageHistory += ((StatefulClassicActor.Hello, LocalDateTime.now()))
  }
}

object StatefulClassicActor {
  sealed trait Message
  case object Hello extends Message
}

class StatefulClassicActorScheduler @Inject() (
    schedulerSetting: SchedulerSetting,
    @Named("StatefulClassicActor") statefulClassicActor: ActorRef
) {
  schedulerSetting.scheduler.schedule(
    "Every5Seconds",
    statefulClassicActor,
    StatefulClassicActor.Hello
  )
}

class StatefulClassicActorModule extends AbstractModule with PekkoGuiceSupport {
  override def configure(): Unit = {
    bindActor[StatefulClassicActor]("StatefulClassicActor")
    bind(classOf[StatefulClassicActorScheduler]).asEagerSingleton()
  }
}

これも、application.confStatefulClassicActorModuleを有効化してsbt runすれば以下のようなログが得られるだろう。

1

SampleService was executed.
2
(Hello,2024-06-05T20:29:40.021417)
SampleService was executed.
3
(Hello,2024-06-05T20:29:40.021417),
(Hello,2024-06-05T20:29:45.007236)
SampleService was executed.

DIをする必要があり、尚且つ状態を持っているというのが今回のミソだ。 Typed ActorではClassic Actorがインスタンス変数として持っていた内部状態をBehaviorを生成する関数の引数として持ち回ることでmutableなオブジェクトやvarを使わずとも状態を持つことができるようになっている。 しかし、ProvidesMethodはGuiceが生成できる型の値以外は引数にとれないため、IntSeq[(StatefulClassicActor.Message, LocalDateTime)]といった型の状態を管理しなければならないとなると、そのままでは困ってしまう。

このような場合、まずGuiceが生成できる型の値だけを引数にとるメソッドを定義し、そのメソッドの中で状態を持ち回るヘルパー関数を定義し、最後にヘルパー関数に初期状態を渡して呼び出してやるとよい。

package scheduler

import com.google.inject.{AbstractModule, Provides}
import org.apache.pekko.actor.typed.{ActorRef, Behavior}
import org.apache.pekko.actor.typed.scaladsl.Behaviors
import play.api.libs.concurrent.{ActorModule, PekkoGuiceSupport}

import java.time.LocalDateTime
import javax.inject.Inject

object StatefulTypedActor extends ActorModule {
  sealed trait StatefulTypedActorMessage
  case object Hello extends StatefulTypedActorMessage

  override type Message = StatefulTypedActorMessage

  @Provides
  def create(
      sampleService: SampleService
  ): Behavior[StatefulTypedActorMessage] = {
    def loop(
        i: Int,
        messageHistory: Seq[(StatefulTypedActorMessage, LocalDateTime)]
    ): Behavior[StatefulTypedActorMessage] =
      Behaviors.receiveMessage { case m @ Hello =>
        println(i)
        println(messageHistory.mkString(",\n"))
        sampleService.exec()
        loop(i + 1, messageHistory :+ (m, LocalDateTime.now()))
      }

    loop(1, Vector())
  }
}

class StatefulTypedActorScheduler @Inject() (
    schedulerSetting: SchedulerSetting,
    statefulTypedActor: ActorRef[StatefulTypedActor.StatefulTypedActorMessage]
) {
  schedulerSetting.scheduler.scheduleTyped(
    "Every5Seconds",
    statefulTypedActor,
    StatefulTypedActor.Hello
  )
}

class StatefulTypedActorModule extends AbstractModule with PekkoGuiceSupport {
  override def configure(): Unit = {
    bindTypedActor(StatefulTypedActor, "StatefulTypedActor")
    bind(classOf[StatefulTypedActorScheduler]).asEagerSingleton()
  }
}

特定のActorの子ActorにDIする

ここまではActorの親子関係について考えず、子を持たないActorにDIすることを考えてきた。 しかし、当然特定のActorから生成されるActorに依存性を注入したいこともあるだろう。 この場合、親Actorと同じようにbindTypedActor()により依存性を注入させると、ある問題が発生する。

package scheduler

import com.google.inject.{AbstractModule, Provides}
import org.apache.pekko.actor.typed.{ActorRef, Behavior}
import org.apache.pekko.actor.typed.scaladsl.Behaviors
import play.api.libs.concurrent.{ActorModule, PekkoGuiceSupport}

import javax.inject.Inject

object WrongParentActor extends ActorModule {
  sealed trait WrongParentActorMessage
  case object Hello extends WrongParentActorMessage

  override type Message = WrongParentActorMessage

  @Provides
  def create(
      child: ActorRef[WrongChildActor.WrongChildActorMessage]
  ): Behavior[WrongParentActorMessage] =
    Behaviors.setup { ctx =>
      Behaviors.receiveMessage { case Hello =>
        println(ctx.self.path)
        child ! WrongChildActor.Hello
        Behaviors.same
      }
    }
}

object WrongChildActor extends ActorModule {
  sealed trait WrongChildActorMessage
  case object Hello extends WrongChildActorMessage

  override type Message = WrongChildActorMessage

  @Provides
  def create(sampleService: SampleService): Behavior[WrongChildActorMessage] = {
    Behaviors.setup { ctx =>
      Behaviors.receiveMessage { case Hello =>
        println(ctx.self.path)
        sampleService.exec()
        Behaviors.same
      }
    }
  }
}

class WrongActorScheduler @Inject() (
    schedulerSetting: SchedulerSetting,
    wrongParentActor: ActorRef[WrongParentActor.WrongParentActorMessage]
) {
  schedulerSetting.scheduler.scheduleTyped(
    "Every5Seconds",
    wrongParentActor,
    WrongParentActor.Hello
  )
}

class WrongActorModule extends AbstractModule with PekkoGuiceSupport {
  override def configure(): Unit = {
    bindTypedActor(WrongParentActor, "WrongParentActor")
    bindTypedActor(WrongChildActor, "WrongChildActor")
    bind(classOf[WrongActorScheduler]).asEagerSingleton()
  }
}

このコードでは親のActorと子のActorにパスを出力させているが、以下のようにWrongParentActorWrongChildActorが親子関係になっていないことがわかる。 特定のActor同士を親子関係にしたいのであれば、子ActorはGuiceにより生成するのではなく、親Actorが自ら生成しなければならない。 第一、この方法ではActorを動的に生成できないし、一度親からstopさせてしまったらもう一度生成できない。

pekko://application/user/WrongParentActor
pekko://application/user/WrongChildActor
SampleService was executed.
pekko://application/user/WrongParentActor
pekko://application/user/WrongChildActor
SampleService was executed.
pekko://application/user/WrongParentActor
pekko://application/user/WrongChildActor
SampleService was executed.

愚直にやるならば、次のように親Actorに子Actorが必要とする依存関係を注入してしまい、親Actorから子Actorにバケツリレーするという方法があるだろう。

package scheduler

import com.google.inject.{AbstractModule, Provides}
import org.apache.pekko.actor.typed.{ActorRef, Behavior}
import org.apache.pekko.actor.typed.scaladsl.Behaviors
import play.api.libs.concurrent.{ActorModule, PekkoGuiceSupport}

import java.util.UUID
import javax.inject.Inject

object ParentActor extends ActorModule {
  sealed trait ParentActorMessage
  case object Hello extends ParentActorMessage

  override type Message = ParentActorMessage

  @Provides
  def create(sampleService: SampleService): Behavior[ParentActorMessage] =
    Behaviors.setup { ctx =>
      Behaviors.receiveMessage { case Hello =>
        println(ctx.self.path)
        // 同時に同じ名前で複数のActorを生成できないため、UUIDを名前の一部とする
        val child = ctx.spawn(
          ChildActor.create(sampleService),
          s"ChildActor${UUID.randomUUID()}"
        )
        child ! ChildActor.Hello
        Behaviors.same
      }
    }
}

object ChildActor {
  sealed trait Message
  case object Hello extends Message

  def create(sampleService: SampleService): Behavior[Message] =
    Behaviors.setup { ctx =>
      Behaviors.receiveMessage { case Hello =>
        println(ctx.self.path)
        sampleService.exec()
        Behaviors.same
      }
    }
}

class ParentActorScheduler @Inject() (
    schedulerSetting: SchedulerSetting,
    parentActor: ActorRef[ParentActor.ParentActorMessage]
) {
  schedulerSetting.scheduler.scheduleTyped(
    "Every5Seconds",
    parentActor,
    ParentActor.Hello
  )
}

class ParentActorModule extends AbstractModule with PekkoGuiceSupport {
  override def configure(): Unit = {
    bindTypedActor(ParentActor, "ParentActor")
    bind(classOf[ParentActorScheduler]).asEagerSingleton()
  }
}

実際、動かすと以下のようにChildActorがParentActorの子になっている。 なお、本来はちゃんとChildActorはstopしないとメモリ上に残り続けてしまうが、今回は説明の簡略化のため無視している。

pekko://application/user/ParentActor
pekko://application/user/ParentActor/ChildActorf5d2e045-f2cd-4fa6-bed7-386b48b3c536
SampleService was executed.
pekko://application/user/ParentActor
pekko://application/user/ParentActor/ChildActor95d59545-c9a5-4e2f-b655-3e5594549d24
SampleService was executed.
pekko://application/user/ParentActor
pekko://application/user/ParentActor/ChildActor1c83857f-9b9d-440c-9750-fbebfe1a04e9
SampleService was executed.

ただ、見ていてわかるかもしれないが、これは筋のいい方法ではない。 親Actorが自分で使っている依存関係と子Actorに渡したいだけの依存関係の区別がぱっと見でつかないし、もし子Actorが追加で依存関係を必要としたら親Actorまで修正を入れなければならない。 親子関係が更にネストした状態でそんなことはやりたくないだろう。

これに対するベストプラクティスは探したのだが見つからなかったので、私は以下のように子ActorのBehaviorを生成するメソッドを呼び出すだけのヘルパークラスを用意することで解決している。

package scheduler

import com.google.inject.{AbstractModule, Provides}
import org.apache.pekko.actor.typed.{ActorRef, Behavior}
import org.apache.pekko.actor.typed.scaladsl.Behaviors
import play.api.libs.concurrent.{ActorModule, PekkoGuiceSupport}

import java.util.UUID
import javax.inject.Inject

object ImprovedParentActor extends ActorModule {
  sealed trait ImprovedParentActorMessage
  case object Hello extends ImprovedParentActorMessage

  override type Message = ImprovedParentActorMessage

  @Provides
  def create(
      behaviorGenerator: BehaviorGenerator
  ): Behavior[ImprovedParentActorMessage] = Behaviors.setup { ctx =>
    Behaviors.receiveMessage { case Hello =>
      println(ctx.self.path)
      val child = ctx.spawn(
        behaviorGenerator.create(),
        s"ImprovedChildActor${UUID.randomUUID()}"
      )
      child ! ImprovedChildActor.Hello
      Behaviors.same
    }
  }
}

object ImprovedChildActor {
  sealed trait Message
  case object Hello extends Message

  def create(sampleService: SampleService): Behavior[Message] =
    Behaviors.setup { ctx =>
      Behaviors.receiveMessage { case Hello =>
        println(ctx.self.path)
        sampleService.exec()
        Behaviors.same
      }
    }
}

class BehaviorGenerator @Inject() (sampleService: SampleService) {
  def create(): Behavior[ImprovedChildActor.Message] =
    ImprovedChildActor.create(sampleService)
}

class ImprovedParentActorScheduler @Inject() (
    schedulerSetting: SchedulerSetting,
    improvedParentActor: ActorRef[
      ImprovedParentActor.ImprovedParentActorMessage
    ]
) {
  schedulerSetting.scheduler.scheduleTyped(
    "Every5Seconds",
    improvedParentActor,
    ImprovedParentActor.Hello
  )
}

class ImprovedParentActorModule extends AbstractModule with PekkoGuiceSupport {
  override def configure(): Unit = {
    bindTypedActor(ImprovedParentActor, "ImprovedParentActor")
    bind(classOf[ImprovedParentActorScheduler]).asEagerSingleton()
  }
}
pekko://application/user/ImprovedParentActor
pekko://application/user/ImprovedParentActor/ImprovedChildActor4dc53dcd-0411-44f5-9f2f-9c08b66cbd87
SampleService was executed.
pekko://application/user/ImprovedParentActor
pekko://application/user/ImprovedParentActor/ImprovedChildActor33053a72-2db4-4e16-ac7f-56157ce18c5f
SampleService was executed.
pekko://application/user/ImprovedParentActor
pekko://application/user/ImprovedParentActor/ImprovedChildActorcb8dfc8a-5c06-4e2e-9d03-78d3f9963f58
SampleService was executed.

このようにすれば、もしImprovedChildActorが新たな依存関係を必要としたとしても修正が必要なのはImprovedChildActorBehaviorGeneratorのみだ。 更に子孫のActorが登場したとしても、同様に孫用のBehaviorGeneratorにあたるクラスを用意し、そのクラスをBehaviorGeneratorにDIしてやれば対応できる。

ただ、くどいようだが私が見落としているだけでベストプラクティスがどこかに書かれているかもしれないので、見つけたら教えてほしい。