Sunday, November 22, 2015

Presenting the File Tracker

This project goal is to track changes in files and manage those changes as a byte array in asynchronous way using Akka actors and Java NIO library.
This is done by registering directory for the WatchService and filtering the files using PathMatcher . For each change in the file the requester will receive the byte array reflecting that change.
Currently this project supports only addition to file, i.e deletion of characters in file is not supported.
The Complete Source Code can be found here


Let's dive in.

The Ingredients :

FileSyncIo .

This part is copied from the FileAsyncIo project with some adjustments, and it is very handy for reading files asynchronously.
In order to read the file we use Java NIO AsynchronousFileChannel . Since we are only reading the file, we open the channel with the Read option. The AsynchronousFileChannel.read method accepts buffer, the start position and a handler :
  val p = Promise[Array[Byte]]()
  val buffer = ByteBuffer.allocate(channel.size().toInt)
  channel.read(buffer, position, buffer, onComplete(channel, p))
I really like this implementation of the handler that uses a promise to complete the handler and consume the byte array with the promised change
private def onComplete(channel: AsynchronousFileChannel, p: Promise[Array[Byte]]) = {
    new CompletionHandler[Integer, ByteBuffer]() {
      def completed(res: Integer, buffer: ByteBuffer) {
        p.complete(Try {
          buffer.array().take(res)
        })
        closeSafely(channel)
      }

WatchServiceActor

The watch service actor uses the WatcService - to register directory and getting create,modify and delete events.
  path.register(watchService, Array(ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY))
The WatchServiceActor reads the events and notifies the registered actor. In order to avoid sacrificing a thread the watchservice actor sends a message to itself periodically
def monitor: Receive = {
    case Poll =>
      val key = watchService.take()
      key.pollEvents() foreach { event =>
        val relativePath = event.context.asInstanceOf[Path]
        val path = contextAbsolutePath(key, relativePath)
        event.kind match {
          case kind: WatchEvent.Kind[_] if monitoredTypes.contains(kind) =>
            notifyActor ! EventOccured(kind, path)
          case _ => // do nothing
        }
      }
      key.reset()
      context.system.scheduler.scheduleOnce(500 millis, self, Poll)
    case Interrupt =>
      context.stop(self)
  }

FileMonitorActor

This actor's purpose is to process the byte stream received from the file and send to the requester. In it's constructor, it accepts the directory to monitor and a pattern to use to identify which files we want to track. Although the watchservice can handle an arbitrary number of directories, I chose to keep one FileMonitorActor per directory (and pattern to filter the monitored files) and for each one to keep a single WatchServiceActor. I found it is easier to manage. In order to filter desired files the FileMonitorActor uses the PathMatcher and accepts a text pattern as glob (see this link). The actor registers to watch service in order to be notified about the events. In order to monitor the files and their changes, it uses two mutable collections
val filePos = new mutable.HashMap[Path, Long]
val fileQueue = new mutable.HashMap[Path, mutable.Queue[Array[Byte]]] 
  • the first one is used to preserve the last position that we read from that file.
  • the second keeps a map file and queue of bytes from each event. Once requested it dequeue the bulk of bytes that reflects the change (FIFO) and sends it to the requester.
This file monitor create a WatchServiceActor and subscribes for getting event changes in it's constructor
watchActor = Some(context.actorOf(WatchServiceActor(self)))
and sends watch request to the WatchServiceActor
watchActor foreach (_ ! Watch(dir))
Since the WatchService monitors all changes in the directory the FileMonitorActor accepts events change message from the watch service regarding all the files and filters only relevant files using the PathMatcher
case EventOccured(event, path) if matcher.matches(path.getFileName) =>
      event match {
        case ENTRY_CREATE => addFileToQueue(path)
        case ENTRY_MODIFY => process(path)
        case ENTRY_DELETE => removeFileFromQueue(path)
      }

FileMonitoringAggregatorActor .

This guy manages the FileMonitorActor actors and aggregates the changes from all monitors . It keeps a buffer that contains all changes from the monitors. Once requested it sends the requester all accumulated changes and clears the buffer (might cause overflow issues if not consumed) .
This actor accepts List of Tuples of directory and pattern
case class PathPattern(path:String,pattern:String)
It spawns FileMonitorActors per directory and keeps a map of actor per path(this will be useful for adding and removing paths and prevents creating duplicates ).
  val monitorActors = paths.collect {
    case p:PathPattern if Files.exists(p.path) =>
    p.path -> context.actorOf(FileMonitorActor(p.path, p.pattern), p.path)
  }(collection.breakOut):mutable.HashMap[String, ActorRef]
In order to start monitoring it accepts an Init message with a boolean flag to determine if we want to track existing files in the directory or just new ones. it also uses periodically message to request the changes and it accumulates the answers in a buffer. Once
case object GetNextBulks
is called it returns all accumulated changes since the last request to the requester
    case GetNextBulks =>
      sender ! bulksBuffer.toList
      bulksBuffer.clear()
The FileMonitoringAggregatorActor manages the FileMonitorActors , when getting AddPath command it simply adds another FileMonitorActor for that Path. when getting remove path it simply sends Stop message to that Actor .
  def monitoring: Receive = {

    case GetNextBulks =>
      log.info("Sending back "+bulksBuffer)
      sender ! bulksBuffer.toList
      bulksBuffer.clear()
    case RequestNextBulk =>
      monitorActors.values foreach (_ ! GetBulk)
      context.system.scheduler.scheduleOnce(500 millis, self, RequestNextBulk)
    case bs: ByteBulks =>
      bulksBuffer += bs
    case Stop =>
      monitorActors.values foreach (_ ! Stop)
      context become ready
    case AddPath(p,i)=>
      if (monitorActors.contains(p.path))
        log.warning(s"Request Add ${p.path} is redundant because it is already monitored ")
      else{
        if (Files.exists(p.path)) {
          val m = context.actorOf(FileMonitorActor(p.path, p.pattern), removeSlashes(p.path))
          monitorActors += p.path -> m
          m ! Init(i)
        }else
          log.error(s"Cannot add path. Reason: Directory ${p.path} does not exists")
      }
    case RemovePath(p) =>
      monitorActors.find(_._1 == p) match {
        case Some(a) =>
          log.info(s"Remove ${a._1} from monitor")
          a._2 ! Stop
          monitorActors -= a._1
        case None => log.warning(s"Cannot remove $p. Reason: Not Found ")
      }
  }

Summary

We can use the watchservice to register a directory and be notified on and file change in that directory. A PathMatcher is useful to filter only the Path's/Files. In this project we use Akka Actors for maintaining non blocking operations and keeping the state of position and the changes as Byte array. The FileMonitoringAggregatorActor will return all accumulated changes since the last request (i.e GetNextBulk message).
Usage:
object ApplicationMain extends App {
  val system = ActorSystem("MyActorSystem")

  val pattern = "*.txt"
  val pathPattern1 = PathPattern("/tmp",pattern)
  val pathPattern2 = PathPattern("/home/avi/Downloads",pattern)
implicit val timeout = Timeout(10 seconds)

  val monitorActor = system.actorOf(FileMonitoringAggregatorActor(List(pathPattern1,pathPattern2)))
  monitorActor ! Init(true)
}
and start making changes , you can see they are reflected in the log file you can also send a request message to see them
val changes = ask(monitorActor , GetNextBulks).mapTo[List[Bulks]]
hope you enjoyed it .
The Complete Source Code can be found here ,Feedback and remarks are always welcome

Acknowledgments : This project was inspired by: