scala - Akka's actor based custom Event Bus implementation causes bottleneck -
i'm trying implement event bus (pub-sub) pattern on top of akka's actors model.
"native" eventbus implementation doesn't meet of requirements (e.g. possibility of retaining last message in topic, it's specific mqtt protocol, i'm implementing message broker https://github.com/butaji/jetmq).
current interface of eventbus following:
object bus { case class subscribe(topic: string, actor: actorref) case class unsubscribe(topic: string, actor: actorref) case class publish(topic: string, payload: any, retain: boolean = false) }
and usage looks this:
val system = actorsystem("system") val bus = system.actorof(props[mqtteventbus], name = "bus") val device1 = system.actorof(props(new deviceactor(bus))) val device2 = system.actorof(props(new deviceactor(bus)))
all devices have reference single bus actor. bus actor responsible storing of state of subscriptions , topics (e.g. retain messages).
device actors inside can decide whatever want publish, subscribe or unsubscribe topics.
after performance benchmarks, realized current design affects processing time between publishings , subscriptions reasons that:
- my eventbus singleton
- it caused huge queue of processing load it
how can distribute (parallelize) workload event bus implementation? current solution fit akka-cluster?
currently, i'm thinking routing through several instances of bus following:
val paths = (1 5).map(x => { system.actorof(props[eventbusactor], name = "event-bus-" + x).path.tostring }) val bus_publisher = system.actorof(roundrobingroup(paths).props()) val bus_manager = system.actorof(broadcastgroup(paths).props())
where:
- bus_publisher responsible getting publish,
- bus_manager responsible getting subscribe / unsubscribe.
and following replicate state across buses , reduce queue per actor distribution of load.
you route inside of singleton bus instead of outside. bus responsible routing messages , establishing topics, while sub-actors responsible distributing messages. basic example demonstrates i'm describing without unsubscribe functionality, duplicate subscription checks, or supervision:
import scala.collection.mutable import akka.actor.{actor, actorref} class hashbus() extends actor { val topicactors = mutable.map.empty[string, actorref] def createdistributionactor = { context.actorof(props[distributionactor]) } override def receive = { case subscribe : subscribe => topicactors.getorelseupdate(subscribe.topic, createdistributionactor) ! subscribe case publish : publish => topicactors.get(topic).foreach(_ ! publish) } } class distributionactor extends actor { val recipients = mutable.list.empty[actorref] override def receive = { case subscribe(topic: string, actorref: actorref) => recipients +: actorref case publish : publish => recipients.map(_ ! publish) } }
this ensure bus actor's mailbox doesn't saturated because bus's job hash lookups. distributionactors responsible mapping on recipients , distributing payload. similarly, distributionactor retain state topic.
Comments
Post a Comment