scala - Akka's actor based custom Event Bus implementation causes bottleneck -


i'm trying implement event bus (pub-sub) pattern on top of akka's actors model.

"native" eventbus implementation doesn't meet of requirements (e.g. possibility of retaining last message in topic, it's specific mqtt protocol, i'm implementing message broker https://github.com/butaji/jetmq).

current interface of eventbus following:

object bus {   case class subscribe(topic: string, actor: actorref)   case class unsubscribe(topic: string, actor: actorref)   case class publish(topic: string, payload: any, retain: boolean = false) } 

and usage looks this:

val system = actorsystem("system") val bus = system.actorof(props[mqtteventbus], name = "bus") val device1 = system.actorof(props(new deviceactor(bus))) val device2 = system.actorof(props(new deviceactor(bus))) 

all devices have reference single bus actor. bus actor responsible storing of state of subscriptions , topics (e.g. retain messages).

device actors inside can decide whatever want publish, subscribe or unsubscribe topics.

after performance benchmarks, realized current design affects processing time between publishings , subscriptions reasons that:

  1. my eventbus singleton
  2. it caused huge queue of processing load it

how can distribute (parallelize) workload event bus implementation? current solution fit akka-cluster?

currently, i'm thinking routing through several instances of bus following:

val paths = (1 5).map(x => {   system.actorof(props[eventbusactor], name = "event-bus-" + x).path.tostring })  val bus_publisher = system.actorof(roundrobingroup(paths).props()) val bus_manager = system.actorof(broadcastgroup(paths).props()) 

where:

  • bus_publisher responsible getting publish,
  • bus_manager responsible getting subscribe / unsubscribe.

and following replicate state across buses , reduce queue per actor distribution of load.

you route inside of singleton bus instead of outside. bus responsible routing messages , establishing topics, while sub-actors responsible distributing messages. basic example demonstrates i'm describing without unsubscribe functionality, duplicate subscription checks, or supervision:

import scala.collection.mutable import akka.actor.{actor, actorref}  class hashbus() extends actor {   val topicactors = mutable.map.empty[string, actorref]    def createdistributionactor = {     context.actorof(props[distributionactor])   }    override def receive = {     case subscribe : subscribe =>       topicactors.getorelseupdate(subscribe.topic, createdistributionactor) ! subscribe      case publish : publish =>       topicactors.get(topic).foreach(_ ! publish)   } }  class distributionactor extends actor {    val recipients = mutable.list.empty[actorref]    override def receive = {     case subscribe(topic: string, actorref: actorref) =>       recipients +: actorref      case publish : publish =>       recipients.map(_ ! publish)   } } 

this ensure bus actor's mailbox doesn't saturated because bus's job hash lookups. distributionactors responsible mapping on recipients , distributing payload. similarly, distributionactor retain state topic.


Comments

Popular posts from this blog

c - How to retrieve a variable from the Apache configuration inside the module? -

c# - Constructor arguments cannot be passed for interface mocks -

python - malformed header from script index.py Bad header -