package net.conejos.communication
import com.rabbitmq.client._
import scala.actors.Actor
import scala.actors.Actor._
import Builder._
import Builder.ExchangeBuilder._
import Builder.QueueBuilder._
object Communication {
case class Message(array : Array[Byte])
case class Done(actor : Actor)
case class Config(exchange : ExchangeBuilder, queue : QueueBuilder, routingKey : String)
case class Dispatcher(config : Config, worker : Actor) extends Actor {
worker.start
def act {
loop {
react {
case msg @ Message(a) => worker ! msg
case Done(actor) => println("work complete1")
}
}
}
start
}
abstract class AbstractSetup {
def setupParams : Param
var (broker , port) = ("localhost", AMQP.PROTOCOL.PORT)
val connFactory = new ConnectionFactory(setupParams.asConnectionParams)
implicit private [AbstractSetup] val(conn, channel) = connect
def connect : (Connection, Channel) = {
val conn = connFactory.newConnection(broker, port)
val chan = conn.createChannel
(conn, chan)
}
val dispatcher = setupDispatcher
def setupDispatcher: Dispatcher
def start {
val config = dispatcher.config
val queue = config.queue.declare
val exchange = config.exchange.declare
channel.queueBind(queue, exchange, config.routingKey)
channel.basicConsume(queue, new DefaultConsumer(channel){
override def handleDelivery(tag: String, env: Envelope, props: AMQP.BasicProperties, body: Array[Byte]) {
val deliveryTag = env.getDeliveryTag
dispatcher ! Message(body)
channel.basicAck(deliveryTag, false);
}
})
println("Wating for messages on Queue" + queue)
}
}
case class Param(value : String, preceeding : Option[Param]) {
require(value != null)
def % (value : String) = Param(value, Some(this))
private [Communication] def asConnectionParams = {
val (user, pass, vHost) = this match {
case Param(user, None) => (user, ConnectionParameters.DEFAULT_PASS, ConnectionParameters.DEFAULT_VHOST)
case Param(pass, Some(Param(user, None))) => (user, pass, ConnectionParameters.DEFAULT_VHOST)
case Param(vHost, Some(Param(pass, Some(Param(user, None))))) => (user, pass, vHost)
case _=> error("ConnectionParameters configuration Failed")
}
new ConnectionParameters {
setUsername(user)
setPassword(pass)
setVirtualHost(vHost)
}
}
}
implicit def s2Param(s : String) = Param(s, None)
}