現在私が業務で携わっているプロジェクトでは、バッチ処理の定期実行にpekko-quartz-schedulerというライブラリを使っている。 簡単に言うと、Actorに対し決められたタイミングでメッセージを送ることでバッチ処理を定期実行させるためのライブラリだ。 バッチ処理を行うActorの間ではあまり複雑なメッセージのやりとりが行われることもないためTyped Actorに移行するメリットより学習コストが大きいと判断されたためか、プロジェクトではこれまでバッチ処理のためのActorを書くときにずっとClassic Actorで書いていた。
しかし、Typed Actorがstableになってからもう5年くらい経っておりいい加減移行すべきではないかと思った。 ただ、Play Frameworkが標準で採用しているDIコンテナであるGoogle GuiceとTyped Actorを組み合わせて使う場合の情報があまり見つからず、少し調べるのに時間がかかった。 今回はPlay Framework, Google Guiceを利用している前提でpekko-quartz-schedulerでClassic ActorからTyped Actorに移行する際の勘所を書いていく。 Typed Actor自体については説明すると長くなってしまうため、各自AkkaやApache Pekkoのドキュメントを確認してほしい。
なお、今回はpekko-quartz-schedulerを使う前提で書いているが、akka-quartz-schedulerを使う場合もおそらく設定のキー名やimportをpekkoからakkaに変更するくらいの違いしか無いと思われる。
サンプルコードはここに置いている。
QuartzSchedulerTypedExtensionを使う
Classic Actorに定期的にメッセージを送るためにはQuartzSchedulerExtension
のインスタンスが必要になるが、これをQuartzSchedulerTypedExtension
に変更する必要がある。
私のプロジェクトでは以下のようなクラスを定義し、Actorに送るメッセージのスケジューリングを送るクラスに対しSchedulerSetting
を注入している。
import org.apache.pekko.actor.ActorSystem import org.apache.pekko.actor.typed.scaladsl.adapter.ClassicActorSystemOps import org.apache.pekko.extension.quartz.QuartzSchedulerTypedExtension import javax.inject.Inject class SchedulerSetting @Inject()(actorSystem: ActorSystem) { val scheduler: QuartzSchedulerTypedExtension = QuartzSchedulerTypedExtension(actorSystem.toTyped) }
なお、QuartzSchedulerTypedExtension
はQuartzSchedulerExtension
を継承しておりClassic Actorからも同じインターフェースで利用できる。
なので、仮に私のプロジェクトのようにQuartzSchedulerExtension
のインスタンスが既に多くのClassic Actorに関する設定を行うクラスから参照されてしまっていたとしても、それらのClassic Actorや設定を行うクラスを作り直す必要は一切なく、新しく作られるTyped Actorと共存することができる。
親ActorへのDI
まず、Actorを以下のように定義する。
package scheduler import com.google.inject.{AbstractModule, Provides} import org.apache.pekko.actor.typed.scaladsl.Behaviors import org.apache.pekko.actor.typed.{ActorRef, Behavior} import play.api.libs.concurrent.{ActorModule, PekkoGuiceSupport} import javax.inject.Inject object SampleActor extends ActorModule { sealed trait SampleActorMessage case object Hello extends SampleActorMessage override type Message = SampleActorMessage @Provides def create(sampleService: SampleService): Behavior[SampleActorMessage] = Behaviors.receiveMessage { case Hello => println("SampleActor received Hello.") sampleService.exec() Behaviors.same } } class SampleService @Inject() () { def exec(): Unit = println("SampleService was executed.") }
Typed ActorではActorはclassではなくBehavior
を返すメソッドをSingleton Objectに定義することでふるまいを定義するようになったため、DIしようにもどう注入すればいいのか謎だったがProvides Methodを使うようだった。
これで、Behavior[SampleActorMessage]
のインスタンスをGuiceで生成しようとすると、SampleActor.create()
が呼び出されるようになる。
Provides Methodの引数がGuiceにより生成できるオブジェクトだけであれば、依存関係の解決も勝手にやってくれる。
SampleActor
でoverrideしているMessage
という型エイリアスについてはこのActorをバインディングする際に関わってくるため、後で説明する。
次に、このActorに定期的にメッセージを送るためのスケジュールを行うためのクラスを見てみよう。
package scheduler import com.google.inject.{AbstractModule, Provides} import org.apache.pekko.actor.typed.scaladsl.Behaviors import org.apache.pekko.actor.typed.{ActorRef, Behavior} import play.api.libs.concurrent.{ActorModule, PekkoGuiceSupport} import javax.inject.Inject class SampleActorScheduler @Inject() ( schedulerSetting: SchedulerSetting, sampleActor: ActorRef[SampleActor.SampleActorMessage] ) { schedulerSetting.scheduler.scheduleTyped( "Every5Seconds", sampleActor, SampleActor.Hello ) } class SampleActorModule extends AbstractModule with PekkoGuiceSupport { override def configure(): Unit = { bindTypedActor(SampleActor, "SampleActor") bind(classOf[SampleActorScheduler]).asEagerSingleton() } }
Classic ActorへのメッセージのスケジュールではQuartzSchedulerExtension.schedule()
を呼び出していたが、Typed ACtorの場合はQuartzSchedulerTypedExtension.scheduleTyped()
を使う。
SampleActorScheduler
にActorRef
を注入しているやり方に注目してほしいのだが、Classic ActorのActorRef
を注入する際は名前付きバインディングを使っていたのが、そうではなくなっている。
Classic ActorではbindActor()
によりActorRef
を生成し名前付きバインディングを行なっていたが、Typed ActorのActorRef
を生成しバインディングするbindTypedActor()
は名前付きバインディングを提供しないのだ。
第2引数に渡した文字列は、ただ生成したActorの名前に使われるだけである。
bindTypedActor()
は第1引数に受け取ったオブジェクトからActorRef[Message]
型の値を返すProvides Methodを探し、そのメソッドに必要な依存関係を引数に渡してBehavior[Message]
を生成し、Actorを生成してくれる。
このMessage
型は何かといえば、Actorに関する説明で後回しにしていた型エイリアスであり、今回はSampleActorMessage
の型エイリアスになる。
つまり、今回はSampleActor
の中からBehavior[SampleActorMessage]
型の値を返すProvides Methodを探してActorを生成することになる。
最後に、ここはClassic Actorと何も変更点が無いのだが、プロジェクトによってやり方が違うだろうからapplication.conf
の中身も一応載せて解説しておく。
pekko { quartz { schedules { Every5Seconds { description = "5秒ごとに実行" expression = "*/5 * * ? * *" } } } } play.modules.enabled += "scheduler.SampleActorModule"
SampleActorModule
を有効化することで、アプリケーションの起動時にActorの生成とバインディング, SampleActorScheduler
の初期化処理が走り、その中でActorに5秒ごとにHello
が送られるようスケジュールしているので、sbt run
を実行すると5秒ごとに以下のようなログが流れるだろう。
SampleActor received Hello. SampleService was executed. SampleActor received Hello. SampleService was executed. SampleActor received Hello. SampleService was executed. ...
状態を持つActorへのDI
ここまではakka-quartz-schedulerやPlay Frameworkのドキュメントを調べればわかるのだが、ここから先の話はどこにもベストプラクティスが書いていなかったので、あくまで私はこうやっているという話になる。
次にClassic Actorで以下のように記述されていたActorへDIを行うことを考える。
package scheduler import com.google.inject.AbstractModule import org.apache.pekko.actor.{Actor, ActorRef} import play.api.libs.concurrent.PekkoGuiceSupport import java.time.LocalDateTime import javax.inject.{Inject, Named} import scala.collection.mutable class StatefulClassicActor @Inject() (sampleService: SampleService) extends Actor { private var i: Int = 1 private val messageHistory : mutable.Buffer[(StatefulClassicActor.Message, LocalDateTime)] = mutable.Buffer.empty override def receive: Receive = { case StatefulClassicActor.Hello => println(i) println(messageHistory.mkString(",\n")) sampleService.exec() i += 1 messageHistory += ((StatefulClassicActor.Hello, LocalDateTime.now())) } } object StatefulClassicActor { sealed trait Message case object Hello extends Message } class StatefulClassicActorScheduler @Inject() ( schedulerSetting: SchedulerSetting, @Named("StatefulClassicActor") statefulClassicActor: ActorRef ) { schedulerSetting.scheduler.schedule( "Every5Seconds", statefulClassicActor, StatefulClassicActor.Hello ) } class StatefulClassicActorModule extends AbstractModule with PekkoGuiceSupport { override def configure(): Unit = { bindActor[StatefulClassicActor]("StatefulClassicActor") bind(classOf[StatefulClassicActorScheduler]).asEagerSingleton() } }
これも、application.conf
でStatefulClassicActorModule
を有効化してsbt run
すれば以下のようなログが得られるだろう。
1 SampleService was executed. 2 (Hello,2024-06-05T20:29:40.021417) SampleService was executed. 3 (Hello,2024-06-05T20:29:40.021417), (Hello,2024-06-05T20:29:45.007236) SampleService was executed.
DIをする必要があり、尚且つ状態を持っているというのが今回のミソだ。
Typed ActorではClassic Actorがインスタンス変数として持っていた内部状態をBehavior
を生成する関数の引数として持ち回ることでmutableなオブジェクトやvar
を使わずとも状態を持つことができるようになっている。
しかし、ProvidesMethodはGuiceが生成できる型の値以外は引数にとれないため、Int
やSeq[(StatefulClassicActor.Message, LocalDateTime)]
といった型の状態を管理しなければならないとなると、そのままでは困ってしまう。
このような場合、まずGuiceが生成できる型の値だけを引数にとるメソッドを定義し、そのメソッドの中で状態を持ち回るヘルパー関数を定義し、最後にヘルパー関数に初期状態を渡して呼び出してやるとよい。
package scheduler import com.google.inject.{AbstractModule, Provides} import org.apache.pekko.actor.typed.{ActorRef, Behavior} import org.apache.pekko.actor.typed.scaladsl.Behaviors import play.api.libs.concurrent.{ActorModule, PekkoGuiceSupport} import java.time.LocalDateTime import javax.inject.Inject object StatefulTypedActor extends ActorModule { sealed trait StatefulTypedActorMessage case object Hello extends StatefulTypedActorMessage override type Message = StatefulTypedActorMessage @Provides def create( sampleService: SampleService ): Behavior[StatefulTypedActorMessage] = { def loop( i: Int, messageHistory: Seq[(StatefulTypedActorMessage, LocalDateTime)] ): Behavior[StatefulTypedActorMessage] = Behaviors.receiveMessage { case m @ Hello => println(i) println(messageHistory.mkString(",\n")) sampleService.exec() loop(i + 1, messageHistory :+ (m, LocalDateTime.now())) } loop(1, Vector()) } } class StatefulTypedActorScheduler @Inject() ( schedulerSetting: SchedulerSetting, statefulTypedActor: ActorRef[StatefulTypedActor.StatefulTypedActorMessage] ) { schedulerSetting.scheduler.scheduleTyped( "Every5Seconds", statefulTypedActor, StatefulTypedActor.Hello ) } class StatefulTypedActorModule extends AbstractModule with PekkoGuiceSupport { override def configure(): Unit = { bindTypedActor(StatefulTypedActor, "StatefulTypedActor") bind(classOf[StatefulTypedActorScheduler]).asEagerSingleton() } }
特定のActorの子ActorにDIする
ここまではActorの親子関係について考えず、子を持たないActorにDIすることを考えてきた。
しかし、当然特定のActorから生成されるActorに依存性を注入したいこともあるだろう。
この場合、親Actorと同じようにbindTypedActor()
により依存性を注入させると、ある問題が発生する。
package scheduler import com.google.inject.{AbstractModule, Provides} import org.apache.pekko.actor.typed.{ActorRef, Behavior} import org.apache.pekko.actor.typed.scaladsl.Behaviors import play.api.libs.concurrent.{ActorModule, PekkoGuiceSupport} import javax.inject.Inject object WrongParentActor extends ActorModule { sealed trait WrongParentActorMessage case object Hello extends WrongParentActorMessage override type Message = WrongParentActorMessage @Provides def create( child: ActorRef[WrongChildActor.WrongChildActorMessage] ): Behavior[WrongParentActorMessage] = Behaviors.setup { ctx => Behaviors.receiveMessage { case Hello => println(ctx.self.path) child ! WrongChildActor.Hello Behaviors.same } } } object WrongChildActor extends ActorModule { sealed trait WrongChildActorMessage case object Hello extends WrongChildActorMessage override type Message = WrongChildActorMessage @Provides def create(sampleService: SampleService): Behavior[WrongChildActorMessage] = { Behaviors.setup { ctx => Behaviors.receiveMessage { case Hello => println(ctx.self.path) sampleService.exec() Behaviors.same } } } } class WrongActorScheduler @Inject() ( schedulerSetting: SchedulerSetting, wrongParentActor: ActorRef[WrongParentActor.WrongParentActorMessage] ) { schedulerSetting.scheduler.scheduleTyped( "Every5Seconds", wrongParentActor, WrongParentActor.Hello ) } class WrongActorModule extends AbstractModule with PekkoGuiceSupport { override def configure(): Unit = { bindTypedActor(WrongParentActor, "WrongParentActor") bindTypedActor(WrongChildActor, "WrongChildActor") bind(classOf[WrongActorScheduler]).asEagerSingleton() } }
このコードでは親のActorと子のActorにパスを出力させているが、以下のようにWrongParentActor
とWrongChildActor
が親子関係になっていないことがわかる。
特定のActor同士を親子関係にしたいのであれば、子ActorはGuiceにより生成するのではなく、親Actorが自ら生成しなければならない。
第一、この方法ではActorを動的に生成できないし、一度親からstopさせてしまったらもう一度生成できない。
pekko://application/user/WrongParentActor pekko://application/user/WrongChildActor SampleService was executed. pekko://application/user/WrongParentActor pekko://application/user/WrongChildActor SampleService was executed. pekko://application/user/WrongParentActor pekko://application/user/WrongChildActor SampleService was executed.
愚直にやるならば、次のように親Actorに子Actorが必要とする依存関係を注入してしまい、親Actorから子Actorにバケツリレーするという方法があるだろう。
package scheduler import com.google.inject.{AbstractModule, Provides} import org.apache.pekko.actor.typed.{ActorRef, Behavior} import org.apache.pekko.actor.typed.scaladsl.Behaviors import play.api.libs.concurrent.{ActorModule, PekkoGuiceSupport} import java.util.UUID import javax.inject.Inject object ParentActor extends ActorModule { sealed trait ParentActorMessage case object Hello extends ParentActorMessage override type Message = ParentActorMessage @Provides def create(sampleService: SampleService): Behavior[ParentActorMessage] = Behaviors.setup { ctx => Behaviors.receiveMessage { case Hello => println(ctx.self.path) // 同時に同じ名前で複数のActorを生成できないため、UUIDを名前の一部とする val child = ctx.spawn( ChildActor.create(sampleService), s"ChildActor${UUID.randomUUID()}" ) child ! ChildActor.Hello Behaviors.same } } } object ChildActor { sealed trait Message case object Hello extends Message def create(sampleService: SampleService): Behavior[Message] = Behaviors.setup { ctx => Behaviors.receiveMessage { case Hello => println(ctx.self.path) sampleService.exec() Behaviors.same } } } class ParentActorScheduler @Inject() ( schedulerSetting: SchedulerSetting, parentActor: ActorRef[ParentActor.ParentActorMessage] ) { schedulerSetting.scheduler.scheduleTyped( "Every5Seconds", parentActor, ParentActor.Hello ) } class ParentActorModule extends AbstractModule with PekkoGuiceSupport { override def configure(): Unit = { bindTypedActor(ParentActor, "ParentActor") bind(classOf[ParentActorScheduler]).asEagerSingleton() } }
実際、動かすと以下のようにChildActorがParentActorの子になっている。 なお、本来はちゃんとChildActorはstopしないとメモリ上に残り続けてしまうが、今回は説明の簡略化のため無視している。
pekko://application/user/ParentActor pekko://application/user/ParentActor/ChildActorf5d2e045-f2cd-4fa6-bed7-386b48b3c536 SampleService was executed. pekko://application/user/ParentActor pekko://application/user/ParentActor/ChildActor95d59545-c9a5-4e2f-b655-3e5594549d24 SampleService was executed. pekko://application/user/ParentActor pekko://application/user/ParentActor/ChildActor1c83857f-9b9d-440c-9750-fbebfe1a04e9 SampleService was executed.
ただ、見ていてわかるかもしれないが、これは筋のいい方法ではない。 親Actorが自分で使っている依存関係と子Actorに渡したいだけの依存関係の区別がぱっと見でつかないし、もし子Actorが追加で依存関係を必要としたら親Actorまで修正を入れなければならない。 親子関係が更にネストした状態でそんなことはやりたくないだろう。
これに対するベストプラクティスは探したのだが見つからなかったので、私は以下のように子ActorのBehavior
を生成するメソッドを呼び出すだけのヘルパークラスを用意することで解決している。
package scheduler import com.google.inject.{AbstractModule, Provides} import org.apache.pekko.actor.typed.{ActorRef, Behavior} import org.apache.pekko.actor.typed.scaladsl.Behaviors import play.api.libs.concurrent.{ActorModule, PekkoGuiceSupport} import java.util.UUID import javax.inject.Inject object ImprovedParentActor extends ActorModule { sealed trait ImprovedParentActorMessage case object Hello extends ImprovedParentActorMessage override type Message = ImprovedParentActorMessage @Provides def create( behaviorGenerator: BehaviorGenerator ): Behavior[ImprovedParentActorMessage] = Behaviors.setup { ctx => Behaviors.receiveMessage { case Hello => println(ctx.self.path) val child = ctx.spawn( behaviorGenerator.create(), s"ImprovedChildActor${UUID.randomUUID()}" ) child ! ImprovedChildActor.Hello Behaviors.same } } } object ImprovedChildActor { sealed trait Message case object Hello extends Message def create(sampleService: SampleService): Behavior[Message] = Behaviors.setup { ctx => Behaviors.receiveMessage { case Hello => println(ctx.self.path) sampleService.exec() Behaviors.same } } } class BehaviorGenerator @Inject() (sampleService: SampleService) { def create(): Behavior[ImprovedChildActor.Message] = ImprovedChildActor.create(sampleService) } class ImprovedParentActorScheduler @Inject() ( schedulerSetting: SchedulerSetting, improvedParentActor: ActorRef[ ImprovedParentActor.ImprovedParentActorMessage ] ) { schedulerSetting.scheduler.scheduleTyped( "Every5Seconds", improvedParentActor, ImprovedParentActor.Hello ) } class ImprovedParentActorModule extends AbstractModule with PekkoGuiceSupport { override def configure(): Unit = { bindTypedActor(ImprovedParentActor, "ImprovedParentActor") bind(classOf[ImprovedParentActorScheduler]).asEagerSingleton() } }
pekko://application/user/ImprovedParentActor pekko://application/user/ImprovedParentActor/ImprovedChildActor4dc53dcd-0411-44f5-9f2f-9c08b66cbd87 SampleService was executed. pekko://application/user/ImprovedParentActor pekko://application/user/ImprovedParentActor/ImprovedChildActor33053a72-2db4-4e16-ac7f-56157ce18c5f SampleService was executed. pekko://application/user/ImprovedParentActor pekko://application/user/ImprovedParentActor/ImprovedChildActorcb8dfc8a-5c06-4e2e-9d03-78d3f9963f58 SampleService was executed.
このようにすれば、もしImprovedChildActor
が新たな依存関係を必要としたとしても修正が必要なのはImprovedChildActor
とBehaviorGenerator
のみだ。
更に子孫のActorが登場したとしても、同様に孫用のBehaviorGenerator
にあたるクラスを用意し、そのクラスをBehaviorGenerator
にDIしてやれば対応できる。
ただ、くどいようだが私が見落としているだけでベストプラクティスがどこかに書かれているかもしれないので、見つけたら教えてほしい。