Facebook Twitter LinkedIn E-mail
magnify
Home Posts tagged "Akka"

Akka 编程(20):容错处理(一)

我们在前面介绍Actor系统时说过每个Actor都是其子Actor的管理员,并且每个Actor定义了发生错误时的管理策略,策略一旦定义好,之后不能修改,就像是Actor系统不可分割的一部分。
实用错误处理
首先我们来看一个例子来显示一种处理数据存储错误的情况,这是现实中一个应用可能出现的典型错误。当然实际的应用可能针对数据源不存在时有不同的处理,这里我们使用重新连接的处理方法。
下面是例子的源码,比较长,需要仔细阅读,最好是实际运行,参考日志来理解:

import akka.actor._
import akka.actor.SupervisorStrategy._
import scala.concurrent.duration._
import akka.util.Timeout
import akka.event.LoggingReceive
import akka.pattern.{ask, pipe}
import com.typesafe.config.ConfigFactory

/**
 * Runs the sample
 */
object FaultHandlingDocSample extends App {

  import Worker._

  val config = ConfigFactory.parseString( """
      akka.loglevel = "DEBUG"
      akka.actor.debug {
      receive = on
      lifecycle = on
      }
      """)

  val system = ActorSystem("FaultToleranceSample", config)
  val worker = system.actorOf(Props[Worker], name = "worker")
  val listener = system.actorOf(Props[Listener], name = "listener")
  // start the work and listen on progress
  // note that the listener is used as sender of the tell,
  // i.e. it will receive replies from the worker
  worker.tell(Start, sender = listener)
}

/**
 * Listens on progress from the worker and shuts down the system when enough
 * work has been done.
 */
class Listener extends Actor with ActorLogging {

  import Worker._

  // If we don’t get any progress within 15 seconds then the service is unavailable
  context.setReceiveTimeout(15 seconds)

  def receive = {
    case Progress(percent) =>
      log.info("Current progress: {} %", percent)
      if (percent >= 100.0) {
        log.info("That’s all, shutting down")
        context.system.shutdown()
      }
    case ReceiveTimeout =>
      // No progress within 15 seconds, ServiceUnavailable
      log.error("Shutting down due to unavailable service")
      context.system.shutdown()
  }
}

object Worker {

  case object Start

  case object Do

  final case class Progress(percent: Double)

}

/**
 * Worker performs some work when it receives the ‘Start‘ message.
 * It will continuously notify the sender of the ‘Start‘ message
 * of current ‘‘Progress‘‘. The ‘Worker‘ supervise the ‘CounterService‘.
 */
class Worker extends Actor with ActorLogging {

  import Worker._
  import CounterService._

  implicit val askTimeout = Timeout(5 seconds)
  // Stop the CounterService child if it throws ServiceUnavailable
  override val supervisorStrategy = OneForOneStrategy() {
    case _: CounterService.ServiceUnavailable => Stop
  }
  // The sender of the initial Start message will continuously be notified
  // about progress
  var progressListener: Option[ActorRef] = None
  val counterService = context.actorOf(Props[CounterService], name = "counter")
  val totalCount = 51

  import context.dispatcher

  // Use this Actors’ Dispatcher as ExecutionContext
  def receive = LoggingReceive {
    case Start if progressListener.isEmpty =>
      progressListener = Some(sender())
      context.system.scheduler.schedule(Duration.Zero, 1 second, self, Do)
    case Do =>
      counterService ! Increment(1)
      counterService ! Increment(1)
      counterService ! Increment(1)
      // Send current progress to the initial sender
      counterService ? GetCurrentCount map {
        case CurrentCount(_, count) => Progress(100.0 * count / totalCount)
      } pipeTo progressListener.get
  }
}

object CounterService {

  final case class Increment(n: Int)

  case object GetCurrentCount

  final case class CurrentCount(key: String, count: Long)

  class ServiceUnavailable(msg: String) extends RuntimeException(msg)

  private case object Reconnect

}

/**
 * Adds the value received in ‘Increment‘ message to a persistent
 * counter. Replies with ‘CurrentCount‘ when it is asked for ‘CurrentCount‘.
 * ‘CounterService‘ supervise ‘Storage‘ and ‘Counter‘.
 */
class CounterService extends Actor {

  import CounterService._
  import Counter._
  import Storage._

  // Restart the storage child when StorageException is thrown.
  // After 3 restarts within 5 seconds it will be stopped.
  override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 3,
    withinTimeRange = 5 seconds) {
    case _: Storage.StorageException => Restart
  }
  val key = self.path.name
  var storage: Option[ActorRef] = None
  var counter: Option[ActorRef] = None
  var backlog = IndexedSeq.empty[(ActorRef, Any)]
  val MaxBacklog = 10000

  import context.dispatcher

  // Use this Actors’ Dispatcher as ExecutionContext
  override def preStart() {
    initStorage()
  }

  /**
   * The child storage is restarted in case of failure, but after 3 restarts,
   * and still failing it will be stopped. Better to back-off than continuously
   * failing. When it has been stopped we will schedule a Reconnect after a delay.
   * Watch the child so we receive Terminated message when it has been terminated.
   */
  def initStorage() {
    storage = Some(context.watch(context.actorOf(Props[Storage], name = "storage")))
    // Tell the counter, if any, to use the new storage
    counter foreach {
      _ ! UseStorage(storage)
    }
    // We need the initial value to be able to operate
    storage.get ! Get(key)
  }

  def receive = LoggingReceive {
    case Entry(k, v) if k == key && counter == None =>
      // Reply from Storage of the initial value, now we can create the Counter
      val c = context.actorOf(Props(classOf[Counter], key, v))
      counter = Some(c)
      // Tell the counter to use current storage
      c ! UseStorage(storage)
      // and send the buffered backlog to the counter
      for ((replyTo, msg) <- backlog) c.tell(msg, sender = replyTo)
      backlog = IndexedSeq.empty
    case msg@Increment(n) => forwardOrPlaceInBacklog(msg)

    case msg@GetCurrentCount => forwardOrPlaceInBacklog(msg)
    case Terminated(actorRef) if Some(actorRef) == storage =>
      // After 3 restarts the storage child is stopped.
      // We receive Terminated because we watch the child, see initStorage.
      storage = None
      // Tell the counter that there is no storage for the moment
      counter foreach {
        _ ! UseStorage(None)
      }
      // Try to re-establish storage after while
      context.system.scheduler.scheduleOnce(10 seconds, self, Reconnect)
    case Reconnect =>
      // Re-establish storage after the scheduled delay
      initStorage()
  }

  def forwardOrPlaceInBacklog(msg: Any) {
    // We need the initial value from storage before we can start delegate to
    // the counter. Before that we place the messages in a backlog, to be sent
    // to the counter when it is initialized.
    counter match {
      case Some(c) => c forward msg
      case None =>
        if (backlog.size >= MaxBacklog)
          throw new ServiceUnavailable(
            "CounterService not available, lack of initial value")
        backlog :+= (sender() -> msg)
    }
  }
}

object Counter {

  final case class UseStorage(storage: Option[ActorRef])

}

/**
 * The in memory count variable that will send current
 * value to the ‘Storage‘, if there is any storage
 * available at the moment.
 */
class Counter(key: String, initialValue: Long) extends Actor {

  import Counter._
  import CounterService._
  import Storage._

  var count = initialValue
  var storage: Option[ActorRef] = None

  def receive = LoggingReceive {
    case UseStorage(s) =>
      storage = s
      storeCount()
    case Increment(n) =>
      count += n
      storeCount()
    case GetCurrentCount =>
      sender() ! CurrentCount(key, count)
  }

  def storeCount() {
    // Delegate dangerous work, to protect our valuable state.
    // We can continue without storage.
    storage foreach {
      _ ! Store(Entry(key, count))
    }
  }
}

object DummyDB {

  import Storage.StorageException

  private var db = Map[String, Long]()

  @throws(classOf[StorageException])
  def save(key: String, value: Long): Unit = synchronized {
    if (11 <= value && value <= 14)
      throw new StorageException("Simulated store failure " + value)
    db += (key -> value)
  }

  @throws(classOf[StorageException])
  def load(key: String): Option[Long] = synchronized {
    db.get(key)
  }
}

object Storage {

  final case class Store(entry: Entry)

  final case class Get(key: String)

  final case class Entry(key: String, value: Long)

  class StorageException(msg: String) extends RuntimeException(msg)

}

/**
 * Saves key/value pairs to persistent storage when receiving ‘Store‘ message.
 * Replies with current value when receiving ‘Get‘ message.
 * Will throw StorageException if the underlying data store is out of order.
 */
class Storage extends Actor {

  import Storage._

  val db = DummyDB

  def receive = LoggingReceive {
    case Store(Entry(key, count)) => db.save(key, count)
    case Get(key) => sender() ! Entry(key, db.load(key).getOrElse(0L))
  }
}

这个例子定义了五个Actor,分别是Worker, Listener, CounterService ,Counter 和 Storage,下图给出了系统正常运行时的流程(无错误发生的情况):
20140830001

 

其中Worker是CounterService的父Actor(管理员),CounterService是Counter和Storage的父Actor(管理员)图中浅红色,白色代表引用,其中Worker引用了Listener,Listener也引用了Worker,它们之间不存在父子关系,同样Counter也引用了Storage,但Counter不是Storage的管理员。

正常流程如下:

步骤 描述
1 progress Listener 通知Worker开始工作.
2 Worker通过定时发送Do消息给自己来完成工作
3,4,5 Worker接受到Do消息时,通知其子Actor CounterService 三次递增计数器,

CounterService 将Increment消息转发给Counter,它将递增计数器变量然后把当前值发送给Storeage保存

6,7  Workier询问CounterService 当前计数器的值,然后通过管道把结果传给Listener

下图给出系统出错的情况,例子中Worker和CounterService作为管理员分别定义了两个管理策略,Worker在收到CounterService 的ServiceUnaviable上终止CounterService的运行,而CounterService在收到StorageException时重启Storage。

20140830002

 

出错时的流程

步骤 描述
1  Storage抛出StorageException异常
2  Storage的管理员CounterService根据策略在接受到StorageException异常后重启Storage
3,4,5,6  Storage继续出错并重启
7  如果在5秒钟之内Storage出错三次并重启,其管理员(CounterService)就终止Storage运行
8  CounterService 同时监听Storage的Terminated消息,它在Storeage终止后接受到Terminated消息
9,10,11  并且通知Counter 暂时没有Storage
12  CounterService 延时一段时间给自己发生Reconnect消息
13,14  当它收到Reconnect消息时,重新创建一个Storage
15,16  然后通知Counter使用新的Storage

这里给出运行的一个日志供参考。

 

Akka 编程(19): Actor初始化时的设计模式

Actor提供了许多生命周期回调函数,可以用来设计多种初始化模式。在ActorRef的整个生命周期过程中,它可能会经历多次重启,每次重启时,都会使用一个新的Actor取代之前的Actor实例,这从外面看是看不到的,Actor系统外部只能看到ActorRef,在重启的过程中是不变的。
你可以把新Actor实例看成是一次“重获新生”。每次重获新生有些初始化都是需要的,而某些初始化只要在创建第一个Actor实例时需要。下面给出了初始化时的一些设计模式。
使用构造函数来初始化
使用构造函数来初始化有几个好处:首先,它使得使用val变量来保存一些在Actor整个生命周期中恒定的状态称为可能,从而使得Actor的实现更加强健。每次Actor重生时都会调用构造函数,因此实现时总可以假定Actor的一些内部状态有正确的初始值。这是个优点同时也是个缺点,有时我们不希望Actor某个内部状态在重启时被重置,比如在重启时保存子Actor。此时可以使用下面的初始化模式。

通过preStart来初始化
actor的preStart回调函数只在创建第一个实例时直接调用。也就是在创建ActorRef时调用。在重启时,如果没有重载postRestart(),缺省情况postRestart也会调用preStart方法。因此在Actor每次重获新生时也会调用。然而,我们通过重载postRestart可以去掉这种行为,保证preStart()只被调用一次:
这种初始化模式的一个用法是在重启时不重新创建子Actor的ActorRef:

override def preStart(): Unit = {
	// Initialize children here
}
// Overriding postRestart to disable the call to preStart()
// after restarts
override def postRestart(reason: Throwable): Unit = ()
// The default implementation of preRestart() stops all the children
// of the actor. To opt-out from stopping the children, we
// have to override preRestart()
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
	// Keep the call to postStop(), but no stopping of children
	postStop()
}

要注意的是,此时所有子Actor依然会被重创建,但子Actor的ActorRef不会。

通过消息来初始化
有些时候,在使用构造函数初始化时没法传入所有的信息,比如操作循环依赖时。此时,一个Actor可以监听初始化消息,然后使用become或者一个有限自动机的状态迁移来处理初始化消息:

var initializeMe: Option[String] = None

override def receive = {
	case "init" =>
		initializeMe = Some("Up and running")
		context.become(initialized, discardOld = true)
}

def initialized: Receive = {
	case "U OK?" => initializeMe foreach { sender() ! _ }
}
 

Akka 编程(18): 使用PartialFunction链扩展Actor

有时我们可能需要在多个Actor之间使用一个公用的Actor行为,或者使用一些公用的函数来合成一个Actor的行为。
Actor的receive方法返回的类型为Actor.Receive,它实际上是类型PartialFunction[Any,Unit]的别名。这个PartialFunction可以使用PartialFunction#orElse方法串联起来,从而你可以使用任意多个PartialFunction串联起来,但需要注意的是“首先匹配”原则,这种多个PartialFunction可以处理同一消息类型时尤为重要。
比如,你有一组Actor,可以是Producers或者是Consumers,但有时你可能需要这两种Actor公用一些行为。我们此时不需要重复代码,而是可以把这些两种Actor都需要的行为抽取出来定义成一个Trait,然后通过组合,构成不同的Producer和Consumer行为:

trait ProducerBehavior {
	this: Actor =>
		val producerBehavior: Receive = {
		case GiveMeThings =>
		sender() ! Give("thing")
	}
}

trait ConsumerBehavior {
	this: Actor with ActorLogging =>
		val consumerBehavior: Receive = {
			case ref: ActorRef =>
				ref ! GiveMeThings
			case Give(thing) =>
				log.info("Got a thing! It’s {}", thing)
		}
}

class Producer extends Actor with ProducerBehavior {
	def receive = producerBehavior
}

class Consumer extends Actor with ActorLogging with ConsumerBehavior {
	def receive = consumerBehavior
}

class ProducerConsumer extends Actor with ActorLogging
	with ProducerBehavior with ConsumerBehavior {
	def receive = producerBehavior orElse consumerBehavior
}

// protocol
case object GiveMeThings
final case class Give(thing: Any)
 

Akka 编程(17): 当Actor出现异常时

当Actor处理消息时有可能出现异常,比如数据库操作异常。
对消息的影响
如果正在处理消息时出现异常(比如从邮箱中提取一个消息然后交给消息处理器处理),那么这个消息就可能丢失了。需要注意到是,这时这个消息没有重新放回邮箱中。因此,如果你需要重新处理这个消息,你需要在你的异常处理过程中做出相应的处理。此外要注意的是给重试的次数给个限制以避免出现无休止的循环。
对邮箱的影响
如果正在处理消息时出现异常,这不会对邮箱有任何影响。如果一个Actor因此重启,依然会使用同一个邮箱。之前的消息依旧保留。
对Actor的影响
如果Actor中出现异常,将暂停Actor的运行,其管理员将根据预定的策略来处理这种情况。Actor可能恢复运行,重启,或终止。

 

Akka 编程(16): 杀死一个Actor

你也可以强制杀死一个Actor,这可以通过向Actor发送Kill消息。 这将使Actor抛出ActorKilledException,触发一个failure.,Actor暂停任何操作,由它的管理员来处理这个failure。如果处理是根据预定的策略,参见Akka 编程(4): Actor管理和监视

管理员可以有如下四种选择来应对:

恢复其下属Actor的运行,并保持其当前的运行状态。
重启其下属Actor的运行,恢复其状态为初始状态。
终止其下属Actor的运行。
向上传递这个错误,从而自己报错,由其上级管理员来应对。

// kill the ’victim’ actor
victim ! Kill
 

Akka 编程(15): Stash 临时搁置消息

Stash trait允许Actor临时把一些当前Actor行为不能处理或是不该处理的消息搁置起来。然后在调用context.become或是context.unbecome之前恢复这些消息,将这些消息添加到actor邮箱其它消息的前面。这样,暂时搁置的消息的处理顺序和它们接收时的顺序一样。

下面为使用stash的一个例子:

import akka.actor.Stash
class ActorWithProtocol extends Actor with Stash {
	def receive = {
		case "open" =>
			unstashAll()
			context.become({
				case "write" => // do writing...
				case "close" =>
					unstashAll()
					context.unbecome()
				case msg => stash()
			}, discardOld = false) // stack on top instead of replacing
		case msg => stash()
	}
}

调用stash()方法将当前接收到的消息放到Actor的stash队列中,注意不可以把同一个消息stash两次,这么做的后果是抛出IllegalStateException异常。在Stash队列容量不够时,调用Stash也可能不成功,此时抛出StashOverflowException异常。Stash队列的容量可以通过stash-capacity来配置。
unstashAll() 方法将Stash队列中消息添加到Actor的邮箱中,如果邮箱满了,会抛出MessageQueueAppendFailedException异常。
如果你需要一个没有容量限制的Stash,那么你可以使用UnboundedStash Trait。