diff --git a/.gitignore b/.gitignore index 338f1b3..c74ab8b 100755 --- a/.gitignore +++ b/.gitignore @@ -1,9 +1,10 @@ # Binary files *.pyc +*.jar lionfish/project/ lionfish/target/ -lionfish/lib/ -*.jar +coral/core/project/ +coral/core/target/ # Temporary files *.~lock* diff --git a/coral/README.md b/coral/README.md index d8c88be..b72646a 100644 --- a/coral/README.md +++ b/coral/README.md @@ -21,29 +21,26 @@ The methods returns (in a HTTP response) a serialised **object**: *** -The whole communication begins when a client requests a handshake. It may be performed by a special operation: +The whole communication begins when the client requests a handshake. It may be performed by a special operation: #### handshake -Parameter contains keys: -* _client_key_ `string` - Returned value: **object** Returned value contains keys: -* _coral_key_ `string` +* _status_ `bool` * _client_id_ `string` *** -When keys are successfully exchanged and a client ID is assigned, then a client service is eligible to perform other requests: +When the client ID is assigned, then the client service is eligible to perform other requests: #### sign_in Parameter contains keys: +* _client_id_ `string` * _username_ `string` * _password_ `string` -* _client_id_ `string` Returned value: **object** @@ -71,10 +68,12 @@ Parameter contains keys: Returned value: **object** Returned value contains keys: +* _status_ `bool` * _article_list_ `list[object]` Each element of _article_list_ contains keys: * _article_id_ `string` +* _link_ `string` * _author_ `string` * _title_ `string` * _time_ `int` @@ -84,12 +83,13 @@ Each element of _article_list_ contains keys: #### get_article_details Parameter contains keys: -* _article_id_ `string` * _client_id_ `string` +* _article_id_ `string` Returned value: **object** Returned value contains keys: +* _status_ `bool` * _article_id_ `string` * _body_ `string` @@ -101,20 +101,26 @@ Parameter contains keys: Returned value: **object** Returned value contains keys: +* _status_ `bool` * _feed_list_ `list[object]` Each element of _feed_list_ contains keys: -* _link_ `string` -* _title_ `string` +* _id_ `string` +* _name_ `string` +* _included_tag_list_ `list[string]` +* _excluded_tag_list_ `list[string]` #### create_feed Parameter contains keys: -* _feed_tags_ `list[string]` * _client_id_ `string` +* _name_ `string` +* _included_tag_list_ `list[string]` +* _excluded_tag_list_ `list[string]` Returned value: **object** Returned value contains keys: * _status_ `bool` +* _non_existing_tag_list_ `list[string]` diff --git a/coral/core/build.sbt b/coral/core/build.sbt new file mode 100644 index 0000000..fe40069 --- /dev/null +++ b/coral/core/build.sbt @@ -0,0 +1,16 @@ +name := "coral-core" + +version := "0.9" + +scalaVersion := "2.10.4" + +mainClass in (Compile, run) := Some("com.coral.connector.Launcher") + +resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/" + +libraryDependencies ++= Seq("com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.3.3") + +libraryDependencies ++= Seq( + "com.typesafe.akka" %% "akka-actor" % "2.3.2", + "com.typesafe.akka" %% "akka-remote" % "2.3.2" +) diff --git a/coral/core/run.sh b/coral/core/run.sh new file mode 100644 index 0000000..84edf96 --- /dev/null +++ b/coral/core/run.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +sbt << EOF +run +EOF diff --git a/coral/core/src/main/resources/application.conf b/coral/core/src/main/resources/application.conf new file mode 100644 index 0000000..e69de29 diff --git a/coral/core/src/main/resources/cacheWorkerSystem.conf b/coral/core/src/main/resources/cacheWorkerSystem.conf new file mode 100644 index 0000000..f276f0f --- /dev/null +++ b/coral/core/src/main/resources/cacheWorkerSystem.conf @@ -0,0 +1,21 @@ +include "application" + +akka { + actor { + provider = "akka.remote.RemoteActorRefProvider" + deployment { + /parent/cacheWorkerPool { + router = round-robin-pool + nr-of-instances = 5 + target.nodes = ["akka.tcp://cacheWorkerSystem@localhost:7781"] + } + } + } + remote { + enabled-transports = ["akka.remote.netty.tcp"] + netty.tcp { + hostname = "localhost", + port = 7781 + } + } +} diff --git a/coral/core/src/main/resources/databaseWorkerSystem.conf b/coral/core/src/main/resources/databaseWorkerSystem.conf new file mode 100644 index 0000000..c23859c --- /dev/null +++ b/coral/core/src/main/resources/databaseWorkerSystem.conf @@ -0,0 +1,21 @@ +include "application" + +akka { + actor { + provider = "akka.remote.RemoteActorRefProvider" + deployment { + /parent/databaseWorkerPool { + router = round-robin-pool + nr-of-instances = 5 + target.nodes = ["akka.tcp://databaseWorkerSystem@localhost:7782"] + } + } + } + remote { + enabled-transports = ["akka.remote.netty.tcp"] + netty.tcp { + hostname = "localhost", + port = 7782 + } + } +} diff --git a/coral/core/src/main/resources/masterSystem.conf b/coral/core/src/main/resources/masterSystem.conf new file mode 100644 index 0000000..b2ca127 --- /dev/null +++ b/coral/core/src/main/resources/masterSystem.conf @@ -0,0 +1,14 @@ +include "application" + +akka { + actor { + provider = "akka.remote.RemoteActorRefProvider" + } + remote { + enabled-transports = ["akka.remote.netty.tcp"] + netty.tcp { + hostname = "localhost", + port = 7779 + } + } +} diff --git a/coral/core/src/main/resources/sessionWorkerSystem.conf b/coral/core/src/main/resources/sessionWorkerSystem.conf new file mode 100644 index 0000000..77f4b0b --- /dev/null +++ b/coral/core/src/main/resources/sessionWorkerSystem.conf @@ -0,0 +1,14 @@ +include "application" + +akka { + actor { + provider = "akka.remote.RemoteActorRefProvider" + } + remote { + enabled-transports = ["akka.remote.netty.tcp"] + netty.tcp { + hostname = "localhost", + port = 7780 + } + } +} diff --git a/coral/core/src/main/scala/com/coral/connector/Launcher.scala b/coral/core/src/main/scala/com/coral/connector/Launcher.scala new file mode 100644 index 0000000..bb2371b --- /dev/null +++ b/coral/core/src/main/scala/com/coral/connector/Launcher.scala @@ -0,0 +1,48 @@ +package com.coral.connector + +import com.coral.utils.Config + +object Launcher { + private def configure(args: Array[String]) = { + for (item <- args) { + val arg = item.split("=") + arg(0) match { + case "--num-of-request-handlers" => { + try { + val value = arg(1).toInt + if (value > 0) { + Config.numberOfRequestHandlers = value + } + } catch { + case e: Exception => println("Invalid parameter: num-of-request-handlers") + } + } + case "--num-of-cache-workers" => { + try { + val value = arg(1).toInt + if (value > 0) { + Config.numberOfCacheWorkers = value + } + } catch { + case e: Exception => println("Invalid parameter: num-of-cache-workers") + } + } + case "--num-of-db-workers" => { + try { + val value = arg(1).toInt + if (value > 0) { + Config.numberOfDatabaseWorkers = value + } + } catch { + case e: Exception => println("Invalid parameter: num-of-db-workers") + } + } + } + } + } + + def main(args: Array[String]) = { + configure(args) + new Thread(WebserviceConnector).start() + } +} diff --git a/coral/core/src/main/scala/com/coral/connector/RequestHandler.scala b/coral/core/src/main/scala/com/coral/connector/RequestHandler.scala new file mode 100644 index 0000000..bbcfbe4 --- /dev/null +++ b/coral/core/src/main/scala/com/coral/connector/RequestHandler.scala @@ -0,0 +1,28 @@ +package com.coral.connector + +import java.net.Socket +import java.util.UUID +import akka.actor._ +import com.coral.utils.IO +import com.coral.messages._ + +class RequestHandler(private val master: ActorRef) extends Actor { + private val uuid = UUID.randomUUID().toString + private implicit var socket: Socket = null + + def handle() = { + // Processes request + val request = IO.receive[Map[String, Any]]() + master ! Request(uuid, request) + } + + def receive = { + case Connection(newSocket) => { + socket = newSocket + handle() + } + case Response(_, _, result) => { + IO.send(result) + } + } +} diff --git a/coral/core/src/main/scala/com/coral/connector/WebserviceConnector.scala b/coral/core/src/main/scala/com/coral/connector/WebserviceConnector.scala new file mode 100644 index 0000000..70c5a6a --- /dev/null +++ b/coral/core/src/main/scala/com/coral/connector/WebserviceConnector.scala @@ -0,0 +1,71 @@ +package com.coral.connector + +import java.net.ServerSocket +import akka.actor.{Props, ActorSystem} +import akka.routing.RoundRobinPool +import com.typesafe.config.ConfigFactory +import com.coral.workers._ +import com.coral.messages.Connection +import com.coral.utils.Config + +object WebserviceConnector extends Runnable { + private val port = Config.webserviceConnectorPort + + // Creates cache workers + private val cacheWorkerSystem = ActorSystem( + "cacheWorkerSystem", ConfigFactory.load("cacheWorkerSystem")) + private val cacheWorkerPool = cacheWorkerSystem.actorOf( + Props(new CacheWorker).withRouter( + RoundRobinPool(Config.numberOfCacheWorkers)), "cacheWorkerPool") + println("Starting " + Config.numberOfCacheWorkers + " cache workers.") + + // Creates database workers + private val databaseWorkerSystem = ActorSystem( + "databaseWorkerSystem", ConfigFactory.load("databaseWorkerSystem")) + private val databaseWorkerPool = databaseWorkerSystem.actorOf( + Props(new DatabaseWorker).withRouter( + RoundRobinPool(Config.numberOfDatabaseWorkers)), "databaseWorkerPool") + println("Starting " + Config.numberOfDatabaseWorkers + " database workers.") + + // Creates session worker + private val sessionWorkerSystem = ActorSystem( + "sessionWorkerSystem", ConfigFactory.load("sessionWorkerSystem")) + private val sessionWorker = sessionWorkerSystem.actorOf(Props(new SessionWorker), "sessionWorker") + println("Starting session worker.") + + // Creates master worker + private val masterSystem = ActorSystem( + "masterSystem", ConfigFactory.load("masterSystem")) + private val master = masterSystem.actorOf(Props(new Master), "master") + println("Starting master.") + + // Creates request handlers + private val requestHandlerSystem = ActorSystem("requestHandlerSystem") + private val requestHandlerPool = requestHandlerSystem.actorOf( + Props(new RequestHandler(master)).withRouter( + RoundRobinPool(Config.numberOfRequestHandlers)), "requestHandlerPool") + println("Starting " + Config.numberOfRequestHandlers + " request handlers.") + + private def handleConnections(serverSocket: ServerSocket) = { + while (true) { + val socket = serverSocket.accept() + requestHandlerPool ! Connection(socket) + } + } + + def run() = { + try { + // Initialises socket + val serverSocket = new ServerSocket(port) + + handleConnections(serverSocket) + serverSocket.close() + } catch { + case e: Exception => { + println(s"Failed to start the Coral webservice connector. Error message: $e") + } + } finally { + println("The Coral webservice connector terminated.") + } + } +} diff --git a/coral/core/src/main/scala/com/coral/messages/Connection.scala b/coral/core/src/main/scala/com/coral/messages/Connection.scala new file mode 100644 index 0000000..39b1e85 --- /dev/null +++ b/coral/core/src/main/scala/com/coral/messages/Connection.scala @@ -0,0 +1,5 @@ +package com.coral.messages + +import java.net.Socket + +case class Connection(socket: Socket) extends Message diff --git a/coral/core/src/main/scala/com/coral/messages/Handshake.scala b/coral/core/src/main/scala/com/coral/messages/Handshake.scala new file mode 100644 index 0000000..d3fd6a2 --- /dev/null +++ b/coral/core/src/main/scala/com/coral/messages/Handshake.scala @@ -0,0 +1,3 @@ +package com.coral.messages + +case class Handshake() extends Message diff --git a/coral/core/src/main/scala/com/coral/messages/Message.scala b/coral/core/src/main/scala/com/coral/messages/Message.scala new file mode 100644 index 0000000..2e041a4 --- /dev/null +++ b/coral/core/src/main/scala/com/coral/messages/Message.scala @@ -0,0 +1,3 @@ +package com.coral.messages + +trait Message diff --git a/coral/core/src/main/scala/com/coral/messages/Request.scala b/coral/core/src/main/scala/com/coral/messages/Request.scala new file mode 100644 index 0000000..b40a7ed --- /dev/null +++ b/coral/core/src/main/scala/com/coral/messages/Request.scala @@ -0,0 +1,3 @@ +package com.coral.messages + +case class Request(uuid: String, request: Map[String, Any]) extends Message diff --git a/coral/core/src/main/scala/com/coral/messages/Response.scala b/coral/core/src/main/scala/com/coral/messages/Response.scala new file mode 100644 index 0000000..38cf572 --- /dev/null +++ b/coral/core/src/main/scala/com/coral/messages/Response.scala @@ -0,0 +1,3 @@ +package com.coral.messages + +case class Response(uuid: String, request: Map[String, Any], result: Any) extends Message diff --git a/coral/core/src/main/scala/com/coral/messages/SessionDetails.scala b/coral/core/src/main/scala/com/coral/messages/SessionDetails.scala new file mode 100644 index 0000000..5f66be6 --- /dev/null +++ b/coral/core/src/main/scala/com/coral/messages/SessionDetails.scala @@ -0,0 +1,3 @@ +package com.coral.messages + +case class SessionDetails(clientUuid: String) extends Message diff --git a/coral/core/src/main/scala/com/coral/messages/SignIn.scala b/coral/core/src/main/scala/com/coral/messages/SignIn.scala new file mode 100644 index 0000000..66b68f3 --- /dev/null +++ b/coral/core/src/main/scala/com/coral/messages/SignIn.scala @@ -0,0 +1,3 @@ +package com.coral.messages + +case class SignIn(clientUuid: String, userUuid: String) extends Message diff --git a/coral/core/src/main/scala/com/coral/messages/SignOut.scala b/coral/core/src/main/scala/com/coral/messages/SignOut.scala new file mode 100644 index 0000000..57db694 --- /dev/null +++ b/coral/core/src/main/scala/com/coral/messages/SignOut.scala @@ -0,0 +1,3 @@ +package com.coral.messages + +case class SignOut(clientUuid: String) extends Message diff --git a/coral/core/src/main/scala/com/coral/utils/Config.scala b/coral/core/src/main/scala/com/coral/utils/Config.scala new file mode 100755 index 0000000..28afab3 --- /dev/null +++ b/coral/core/src/main/scala/com/coral/utils/Config.scala @@ -0,0 +1,14 @@ +package com.coral.utils + +object Config { + val webserviceConnectorPort = 7778 + val masterSystemPort = 7779 + val sessionWorkerSystemPort = 7780 + val cacheWorkerSystemPort = 7781 + val databaseWorkerSystemPort = 7782 + val cachePort = 7783 + + var numberOfRequestHandlers = 10 + var numberOfCacheWorkers = 10 + var numberOfDatabaseWorkers = 10 +} diff --git a/coral/core/src/main/scala/com/coral/utils/IO.scala b/coral/core/src/main/scala/com/coral/utils/IO.scala new file mode 100644 index 0000000..d923317 --- /dev/null +++ b/coral/core/src/main/scala/com/coral/utils/IO.scala @@ -0,0 +1,65 @@ +package com.coral.utils + +import java.net.Socket +import java.nio.ByteBuffer + +object IO { + // Sends a message to socket + def send(rawData: Any)(implicit socket: Socket) = { + try { + val outputStream = socket.getOutputStream + + // Serializes the data + val serialisedMsg = JSON.serialise(rawData) + + // Prepares length of a outcoming array + val byteBuffer = ByteBuffer.allocate(4) + byteBuffer.putInt(serialisedMsg.length) + val msgLength = byteBuffer.array() + + // Prepares a certain message + val msg: Array[Byte] = msgLength ++ serialisedMsg.getBytes + outputStream.write(msg) + } catch { + case e: Exception => { + println(s"Failed to send data. Error message: $e") + } + } + } + + // Receives a message from socket + def receive[T: Manifest]()(implicit socket: Socket): T = { + try { + val inputStream = socket.getInputStream + + // Gets length of a incoming message + var readBuffer = new Array[Byte](4) + var count = inputStream.read(readBuffer, 0, 4) + + if (count == -1) { + return null.asInstanceOf[T] + } + + val dataLength: Int = ByteBuffer.wrap(readBuffer).getInt + var msg: String = "" + + // Gets a certain message + readBuffer = new Array[Byte](dataLength) + var totalCount = 0 + count = 0 + while (totalCount < dataLength) { + count = inputStream.read(readBuffer, 0, dataLength) + totalCount += count + msg += new String(readBuffer, 0, count) + } + + // Parses msg to data + JSON.deserialise[T](msg) + } catch { + case e: Exception => { + println(s"Failed to receive data. Error message: $e") + } + null.asInstanceOf[T] + } + } +} diff --git a/coral/core/src/main/scala/com/coral/utils/JSON.scala b/coral/core/src/main/scala/com/coral/utils/JSON.scala new file mode 100644 index 0000000..672d27e --- /dev/null +++ b/coral/core/src/main/scala/com/coral/utils/JSON.scala @@ -0,0 +1,35 @@ +package com.coral.utils + +import java.lang.reflect.{Type, ParameterizedType} +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule +import com.fasterxml.jackson.core.`type`.TypeReference + +object JSON { + private val jsonMapper = new ObjectMapper() + jsonMapper.registerModule(DefaultScalaModule) + + private [this] def typeReference[T: Manifest] = new TypeReference[T] { + override def getType = typeFromManifest(manifest[T]) + } + + private [this] def typeFromManifest(m: Manifest[_]): Type = { + if (m.typeArguments.isEmpty) { m.erasure } + else new ParameterizedType { + def getRawType = m.erasure + def getActualTypeArguments = m.typeArguments.map(typeFromManifest).toArray + def getOwnerType = null + } + } + + def serialise(rawData: Any): String = { + import java.io.StringWriter + val writer = new StringWriter() + jsonMapper.writeValue(writer, rawData) + writer.toString + } + + def deserialise[T: Manifest](rawData: String) : T = { + jsonMapper.readValue[T](rawData, typeReference[T]) + } +} diff --git a/coral/core/src/main/scala/com/coral/workers/CacheWorker.scala b/coral/core/src/main/scala/com/coral/workers/CacheWorker.scala new file mode 100644 index 0000000..1e8257c --- /dev/null +++ b/coral/core/src/main/scala/com/coral/workers/CacheWorker.scala @@ -0,0 +1,64 @@ +package com.coral.workers + +import java.net.InetSocketAddress +import scala.util.Random +import akka.actor._ +import net.spy.memcached.MemcachedClient +import com.coral.messages._ +import com.coral.utils.Config + +class CacheWorker extends Actor { + private val masterSystemPort = Config.masterSystemPort + private val databaseWorkerSystemPort = Config.databaseWorkerSystemPort + private val cacheClient = new MemcachedClient(new InetSocketAddress("127.0.0.1", Config.cachePort)) + + // Master worker + private val masterWorkerPath = + s"akka.tcp://masterSystem@localhost:$masterSystemPort/user/master" + private val masterWorker = context.actorSelection(masterWorkerPath) + + // Database worker pool system + private val databaseWorkerPath = + s"akka.tcp://databaseWorkerSystem@localhost:$databaseWorkerSystemPort/user/databaseWorkerPool" + private val databaseWorkerPool = context.actorSelection(databaseWorkerPath) + + // Decides whether fetch data from cache or database + def processRequest(request: Request) = { + val requestHash = request.request.hashCode().toString + + val cachedResult = cacheClient.get(requestHash) + if (cachedResult != null) { + // If data exists in the cache + val uuid = request.uuid + val coralMethodName = request.request("coralMethodName").asInstanceOf[String] + println(s"Fetching $coralMethodName request from cache.") + + context.self ! Response(uuid, request.request, cachedResult) + } else { + databaseWorkerPool ! request + } + } + + // Saves response to the cache + def processResponse(response: Response) = { + val coralMethodName = response.request("coralMethodName").asInstanceOf[String] + val requestHash = response.request.hashCode().toString + val result = response.result + + // TODO: make a set of "cacheable" methods + val cachedResult = cacheClient.get(requestHash) + if (coralMethodName == "getFeedList" && cachedResult == null) { + cacheClient.set(requestHash, 3600, result) + } + } + + def receive = { + case req @ Request(uuid, request) => { + processRequest(req) + } + case res @ Response(uuid, request, result) => { + processResponse(res) + masterWorker ! res + } + } +} diff --git a/coral/core/src/main/scala/com/coral/workers/DatabaseWorker.scala b/coral/core/src/main/scala/com/coral/workers/DatabaseWorker.scala new file mode 100644 index 0000000..9bf2a86 --- /dev/null +++ b/coral/core/src/main/scala/com/coral/workers/DatabaseWorker.scala @@ -0,0 +1,322 @@ +package com.coral.workers + +import scala.collection.mutable.ListBuffer +import scala.concurrent.Await +import scala.concurrent.duration._ +import akka.actor._ +import akka.pattern.ask +import akka.util.Timeout +import com.coral.messages._ +import com.lionfish.client._ +import com.coral.utils.Config + +class DatabaseWorker extends Actor { + private val sessionWorkerSystemPort = Config.sessionWorkerSystemPort + private implicit val timeout = Timeout(20 seconds) + + // Session worker system + private val sessionWorkerPath = + s"akka.tcp://sessionWorkerSystem@localhost:$sessionWorkerSystemPort/user/sessionWorker" + private val sessionWorker = context.actorSelection(sessionWorkerPath) + + Database.setServerAddress("127.0.0.1") + Database.setServerPort(7777) + private val seqStream = Database.getSequenceStream + private val batchStream = Database.getBatchStream + + private def handshake(): Map[String, Any] = { + var result: Map[String, Any] = null + + try { + val future = sessionWorker ? Handshake() + val clientUuid = Await.result[Any](future, timeout.duration) + .asInstanceOf[String] + + result = Map( + "status" -> true, + "client_id" -> clientUuid + ) + } catch { + case e: Exception => { + println(s"Failed to process handshake. Error message: $e") + result = Map("status" -> false) + } + } + + result + } + + private def signIn(clientUuid: String, + username: String, + password: String): Map[String, Any] = { + var result: Map[String, Any] = null + + try { + seqStream << Database.getByUsername(username) + val userList = seqStream.execute() + .asInstanceOf[List[Map[String, Any]]] + + // Checks if given username exists + if (userList.length == 1 && userList(0).contains("uuid")) { + // TODO: Authentication + val userUuid = userList(0)("uuid").asInstanceOf[String] + + val future = sessionWorker ? SignIn(clientUuid, userUuid) + val status = Await.result[Any](future, timeout.duration) + .asInstanceOf[Boolean] + + result = Map("status" -> status) + } else { + throw new Exception("Unknown username.") + } + } catch { + case e: Exception => { + println(s"Failed to process signIn. Error message: $e") + result = Map("status" -> false) + } + } + + result + } + + private def signOut(clientUuid: String): Map[String, Any] = { + var result: Map[String, Any] = null + + try { + val future = sessionWorker ? SignOut(clientUuid) + val status = Await.result[Any](future, timeout.duration) + .asInstanceOf[Boolean] + + result = Map("status" -> status) + } catch { + case e: Exception => { + println(s"Failed to process signOut. Error message: $e") + result = Map("status" -> false) + } + } + + result + } + + private def getFeedList(clientUuid: String): Map[String, Any] = { + var result: Map[String, Any] = null + + try { + // Gets userUuid from session + val future = sessionWorker ? SessionDetails(clientUuid) + val userUuid = Await.result[Any](future, timeout.duration) + .asInstanceOf[String] + + // Checks if user is signed in + if (userUuid.length > 0) { + val rawResult: ListBuffer[Map[String, Any]] = ListBuffer() + + // Fetches feed nodes + seqStream << Database.getUserFeeds(userUuid) + val feedNodeList = seqStream.execute() + .asInstanceOf[List[List[Map[String, Any]]]](0) + + // Fetches included tag nodes of each feed + val includesRelType = "<>" + for (item <- feedNodeList) { + val uuid = item("uuid").asInstanceOf[String] + batchStream << Database.getChildren(uuid, includesRelType) + } + val includedTagNodeList = batchStream.execute() + .asInstanceOf[List[List[Map[String, Any]]]] + + // Fetches excluded tag nodes of each feed + val excludesRelType = "<>" + for (item <- feedNodeList) { + val uuid = item("uuid").asInstanceOf[String] + batchStream << Database.getChildren(uuid, excludesRelType) + } + val excludedTagNodeList = batchStream.execute() + .asInstanceOf[List[List[Map[String, Any]]]] + + // Builds result + for (i <- 0 to feedNodeList.length - 1) { + var feedDetails: Map[String, Any] = Map( + "id" -> feedNodeList(i)("uuid"), + "name" -> feedNodeList(i)("name") + ) + + var includedTagList: ListBuffer[String] = ListBuffer() + for (item <- includedTagNodeList(i)) { + includedTagList += item("tag").asInstanceOf[String] + } + + var excludedTagList: ListBuffer[String] = ListBuffer() + for (item <- excludedTagNodeList(i)) { + excludedTagList += item("tag").asInstanceOf[String] + } + + feedDetails += "included_tag_list" -> includedTagList.toList + feedDetails += "excluded_tag_list" -> excludedTagList.toList + rawResult += feedDetails + } + + result = Map( + "status" -> true, + "feed_list" -> rawResult.toList + ) + } else { + throw new Exception("User is not signed in.") + } + } catch { + case e: Exception => { + println(s"Failed to process getFeedList. Error message: $e") + result = Map("status" -> false) + } + } + + result + } + + private def createFeed(clientUuid: String, + name: String, + includedTagList: List[String], + excludedTagList: List[String]): Map[String, Any] = { + var result: Map[String, Any] = null + + try { + // Gets userUuid from session + val future = sessionWorker ? SessionDetails(clientUuid) + val userUuid = Await.result[Any](future, timeout.duration) + .asInstanceOf[String] + + // Checks if user is signed in + if (userUuid.length > 0) { + // Checks if the feed name is unique + seqStream << Database.getUserFeeds(userUuid) + val feedNodeList = seqStream.execute() + .asInstanceOf[List[List[Map[String, Any]]]](0) + + for (item <- feedNodeList) { + val feedName = item("name").asInstanceOf[String] + if (feedName == name) { + throw new Exception("A feed with a given name already exists in the database.") + } + } + + // Creates feed node + val modelName = "Feed" + val relType = "<>" + val props: Map[String, Any] = Map( + "name" -> name + ) + seqStream << Database.createNode(modelName, relType, props) + val feedUuid = seqStream.execute() + .asInstanceOf[List[Map[String, Any]]](0)("uuid").asInstanceOf[String] + + var nonExistingTagList: ListBuffer[String] = ListBuffer() + + // Fetches included tag nodes + for (item <- includedTagList) { + batchStream << Database.getByTag(item) + } + val includedTagNodeList = batchStream.execute() + .asInstanceOf[List[Map[String, Any]]] + + // Fetches excluded tag nodes + for (item <- excludedTagList) { + batchStream << Database.getByTag(item) + } + val excludedTagNodeList = batchStream.execute() + .asInstanceOf[List[Map[String, Any]]] + + // Prepares create relationship requests + val includesRelType = "<>" + for (i <- 0 to includedTagList.length - 1) { + if (includedTagNodeList(i) != null) { + val tagNodeUuid = includedTagNodeList(i)("uuid").asInstanceOf[String] + batchStream << Database.createRelationship(feedUuid, tagNodeUuid, includesRelType) + } else { + nonExistingTagList += includedTagList(i) + } + } + + // Prepares create relationship requests + val excludesRelType = "<>" + for (i <- 0 to excludedTagList.length - 1) { + if (excludedTagNodeList(i) != null) { + val tagNodeUuid = excludedTagNodeList(i)("uuid").asInstanceOf[String] + batchStream << Database.createRelationship(feedUuid, tagNodeUuid, excludesRelType) + } else { + nonExistingTagList += excludedTagList(i) + } + } + + // Creates relationship to tag nodes + batchStream.execute() + + result = Map( + "status" -> true, + "non_existing_tag_list" -> nonExistingTagList.toList + ) + } else { + throw new Exception("User is not signed in.") + } + } catch { + case e: Exception => { + println(s"Failed to process createFeed. Error message: $e") + result = Map("status" -> false) + } + } + + result + } + + private def processRequest(request: Map[String, Any]): Any = { + var result: Any = null + + try { + val coralMethodName = request("coralMethodName").asInstanceOf[String] + val requestData = request("data").asInstanceOf[Map[String, Any]] + + println(s"Executing $coralMethodName in a database worker.") + + coralMethodName match { + case "handshake" => { + result = handshake() + } + case "signIn" => { + val clientUuid = requestData("clientUuid").asInstanceOf[String] + val username = requestData("username").asInstanceOf[String] + val password = requestData("password").asInstanceOf[String] + result = signIn(clientUuid, username, password) + } + case "signOut" => { + val clientUuid = requestData("clientUuid").asInstanceOf[String] + result = signOut(clientUuid) + } + case "getFeedList" => { + val clientUuid = requestData("clientUuid").asInstanceOf[String] + result = getFeedList(clientUuid) + } + case "createFeed" => { + val clientUuid = requestData("clientUuid").asInstanceOf[String] + val name = requestData("name").asInstanceOf[String] + val includedTagList = requestData("includedTagList").asInstanceOf[List[String]] + val excludedTagList = requestData("excludedTagList").asInstanceOf[List[String]] + result = createFeed(clientUuid, name, includedTagList, excludedTagList) + } + case _ => throw new NoSuchMethodException(coralMethodName) + } + } catch { + case e: Exception => { + println(s"Failed to process request. Error message: $e") + result = Map("status" -> false) + } + } + + result + } + + def receive = { + case Request(uuid, request) => { + val result = processRequest(request) + sender ! Response(uuid, request, result) + } + } +} diff --git a/coral/core/src/main/scala/com/coral/workers/Master.scala b/coral/core/src/main/scala/com/coral/workers/Master.scala new file mode 100644 index 0000000..38f6517 --- /dev/null +++ b/coral/core/src/main/scala/com/coral/workers/Master.scala @@ -0,0 +1,36 @@ +package com.coral.workers + +import akka.actor._ +import akka.routing._ +import com.typesafe.config.ConfigFactory +import com.coral.messages._ +import com.coral.utils.Config + +// TODO: Use akka configuration! + +class Master extends Actor { + private val cacheWorkerSystemPort = Config.cacheWorkerSystemPort + private var senderMap: Map[String, ActorRef] = Map() + + // Database worker pool system + private val cacheWorkerPath = + s"akka.tcp://cacheWorkerSystem@localhost:$cacheWorkerSystemPort/user/cacheWorkerPool" + private val cacheWorkerPool = context.actorSelection(cacheWorkerPath) + + def processRequest(request: Request) = { + cacheWorkerPool ! request + } + + def receive = { + case req @ Request(uuid, request) => { + // Master stores a remote reference of the request handler for a while + senderMap += uuid -> sender + processRequest(req) + } + case res @ Response(uuid, request, result) => { + // The result is being returned to the request handler + senderMap(uuid) ! res + senderMap -= uuid + } + } +} diff --git a/coral/core/src/main/scala/com/coral/workers/SessionWorker.scala b/coral/core/src/main/scala/com/coral/workers/SessionWorker.scala new file mode 100644 index 0000000..8f118a8 --- /dev/null +++ b/coral/core/src/main/scala/com/coral/workers/SessionWorker.scala @@ -0,0 +1,84 @@ +package com.coral.workers + +import java.util.UUID +import scala.concurrent.Lock +import akka.actor.Actor +import com.coral.messages._ + +class SessionWorker extends Actor { + // Mapping clientUuid -> userUuid + private var sessionMap: scala.collection.mutable.Map[String, String] = + scala.collection.mutable.Map() + private val writeLock = new Lock + + // Registers a new session (clientUuid -> null) + private def registerSession(): String = { + val newClientUuid = UUID.randomUUID().toString + writeLock.acquire() + sessionMap += newClientUuid -> null.asInstanceOf[String] + writeLock.release() + newClientUuid + } + + // Closes the session + private def closeSession(clientUuid: String): Boolean = { + if (sessionMap.contains(clientUuid)) { + sessionMap -= clientUuid + true + } else { + false + } + } + + // Sets userUuid to the session (clientUuid -> userUuid) + private def setUserUuid(clientUuid: String, userUuid: String): Boolean = { + if (sessionMap.contains(clientUuid) && sessionMap(clientUuid) == null) { + writeLock.acquire() + sessionMap(clientUuid) = userUuid + writeLock.release() + true + } else { + false + } + } + + // Clears userUuid in the session + private def clearUserUuid(clientUuid: String): Boolean = { + if (sessionMap.contains(clientUuid) && sessionMap(clientUuid) != null) { + writeLock.acquire() + sessionMap(clientUuid) = null + writeLock.release() + true + } else { + false + } + } + + // Gets userUuid from the session + private def getUserUuid(clientUuid: String): String = { + if (sessionMap.contains(clientUuid) && sessionMap(clientUuid) != null) { + sessionMap(clientUuid) + } else { + "" + } + } + + def receive = { + case Handshake() => { + val clientUuid = registerSession() + sender ! clientUuid + } + case SessionDetails(clientUuid) => { + val result = getUserUuid(clientUuid) + sender ! result + } + case SignIn(clientUuid, userUuid) => { + val result = setUserUuid(clientUuid, userUuid) + sender ! result + } + case SignOut(clientUuid) => { + val result = clearUserUuid(clientUuid) + sender ! result + } + } +} diff --git a/coral/run_webservice.sh b/coral/run_webservice.sh deleted file mode 100755 index 72a81f8..0000000 --- a/coral/run_webservice.sh +++ /dev/null @@ -1 +0,0 @@ -sudo python webservice/start.py diff --git a/coral/webservice/core_connector.py b/coral/webservice/core_connector.py new file mode 100644 index 0000000..d3e773f --- /dev/null +++ b/coral/webservice/core_connector.py @@ -0,0 +1,29 @@ +import socket +from socket_io import IO + + +class CoreConnector(object): + def __init__(self): + self._host = 'localhost' + self._port = 7778 + self._conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + + try: + self._conn.connect((self._host, self._port)) + except Exception as e: + print 'Failed to connect with Coral core. {error}'.format(error=str(e)) + + def disconnect(self): + try: + self._conn.close() + except Exception as e: + print 'Failed to disconnect from Coral core. {error}'.format(error=str(e)) + + def process_request(self, request): + response = None + try: + IO.send(self._conn, request) + response = IO.receive(self._conn) + except Exception, e: + print 'Failed to make a request. {error}'.format(error=str(e)) + return response diff --git a/coral/webservice/socket_io.py b/coral/webservice/socket_io.py new file mode 100644 index 0000000..45642e6 --- /dev/null +++ b/coral/webservice/socket_io.py @@ -0,0 +1,39 @@ +import socket +import struct +import json + + +class IO(object): + @staticmethod + def _read_from_socket(sock, n): + """ + Read exactly n bytes from the socket. + Raise RuntimeError if the connection closed before n bytes were read. + """ + buf = '' + while n > 0: + data = sock.recv(n) + if data == '': + raise RuntimeError('unexpected connection close') + buf += data + n -= len(data) + return buf + + @staticmethod + def send(conn, msg): + """ + Sends data to socket + """ + s = json.dumps(msg) + packed_len = struct.pack('>L', len(s)) # Number of bytes + conn.sendall(packed_len + s) + + @staticmethod + def receive(conn): + """ + Retrieves data from socket and then returns + """ + len_buf = IO._read_from_socket(conn, 4) # Read exactly n bytes + msg_len = struct.unpack('>L', len_buf)[0] + msg_buf = IO._read_from_socket(conn, msg_len) + return json.loads(msg_buf) diff --git a/coral/webservice/start.py b/coral/webservice/start.py index fe9ffe8..a51699a 100644 --- a/coral/webservice/start.py +++ b/coral/webservice/start.py @@ -2,56 +2,74 @@ import json from flask import Flask from flask import request, redirect, Response, url_for -from cryptography import Cryptography +from core_connector import CoreConnector app = Flask(__name__) -crypto = None @app.route('/') def index(): return redirect(url_for('about')) + @app.route('/about') def about(): - return 'Coral Service for Ocean' + return '

Hello!

You have already accessed the Coral webservice.

' + + +# @app.route('/test_connector', methods=['POST']) +# def test_connector(): +# request_data = request.get_json() +# final_response = CoreConnector().process_request(request_data) +# return Response(json.dumps(final_response), mimetype='application/json') + # Webservice methods @app.route('/handshake', methods=['POST']) def handshake(): - request_data = request.get_json() - client_key = request_data['client_key'] - - response_data = { - 'coral_key': crypto.get_coral_key(), - 'client_id': crypto.register_client_key(client_key) + final_request = { + 'coralMethodName': 'handshake', + 'data': {} } - return Response(json.dumps(response_data), mimetype='application/json') + + final_response = CoreConnector().process_request(final_request) + return Response(json.dumps(final_response), mimetype='application/json') + @app.route('/sign_in', methods=['POST']) def sign_in(): request_data = request.get_json() + client_id = request_data['client_id'] username = request_data['username'] password = request_data['password'] - client_id = request_data['client_id'] - # TODO: implementation + final_request = { + 'coralMethodName': 'signIn', + 'data': { + 'clientUuid': client_id, + 'username': username, + 'password': password + } + } - response_data = True + final_response = CoreConnector().process_request(final_request) + return Response(json.dumps(final_response), mimetype='application/json') - response_data = {'status': response_data} - return Response(json.dumps(response_data), mimetype='application/json') @app.route('/sign_out', methods=['POST']) def sign_out(): request_data = request.get_json() client_id = request_data['client_id'] - # TODO: implementation + final_request = { + 'coralMethodName': 'signOut', + 'data': { + 'clientUuid': client_id + } + } - response_data = True + final_response = CoreConnector().process_request(final_request) + return Response(json.dumps(final_response), mimetype='application/json') - response_data = {'status': response_data} - return Response(json.dumps(response_data), mimetype='application/json') @app.route('/get_article_list', methods=['POST']) def get_article_list(): @@ -67,6 +85,8 @@ def get_article_list(): for i in range(0, count): response_data.append({ 'article_id': '974eeacc-87{0}a-11e3-9f3a-2cd05ae1c39b'.format(i % 10), + 'link': 'http://www.deon.pl/inteligentne-zycie/firma-praca-i-kariera/' + 'art,206,wiara-nie-przeszkadza-byc-bogatym.html', 'author': 'Autor {0}'.format(i), 'title': 'Naglowek {0}'.format(i), 'time': 1397664087, @@ -75,8 +95,9 @@ def get_article_list(): '/ecommerce-business/256/news-icon.png' }) - response_data = {'article_list': response_data} - return Response(json.dumps(response_data), mimetype='application/json') + final_response = {'article_list': response_data} + return Response(json.dumps(final_response), mimetype='application/json') + @app.route('/get_article_details', methods=['POST']) def get_article_details(): @@ -91,45 +112,48 @@ def get_article_details(): 'body': 'Tekst 7', } - response_data = {'article_details': response_data} - return Response(json.dumps(response_data), mimetype='application/json') + final_response = {'article_details': response_data} + return Response(json.dumps(final_response), mimetype='application/json') + @app.route('/get_feed_list', methods=['POST']) def get_feed_list(): request_data = request.get_json() client_id = request_data['client_id'] - # TODO: implementation + final_request = { + 'coralMethodName': 'getFeedList', + 'data': { + 'clientUuid': client_id + } + } + + final_response = CoreConnector().process_request(final_request) + return Response(json.dumps(final_response), mimetype='application/json') - response_data = [] - response_data.append({ - 'link': 'http://www.tvn24.pl/', - 'title': 'TVN24.pl - Wiadomosci z kraju i ze swiata' - }) - response_data.append({ - 'link': 'http://www.gry-online.pl/', - 'title': 'GRY-OnLine' - }) - - response_data = {'feed_list': response_data} - return Response(json.dumps(response_data), mimetype='application/json') @app.route('/create_feed', methods=['POST']) def create_feed(): request_data = request.get_json() - feed_tags = request_data['feed_tags'] client_id = request_data['client_id'] + name = request_data['name'] + included_tag_list = request_data['included_tag_list'] + excluded_tag_list = request_data['excluded_tag_list'] + + final_request = { + 'coralMethodName': 'createFeed', + 'data': { + 'clientUuid': client_id, + 'name': name, + 'includedTagList': included_tag_list, + 'excludedTagList': excluded_tag_list + } + } - # TODO: implementation - - response_data = True + final_response = CoreConnector().process_request(final_request) + return Response(json.dumps(final_response), mimetype='application/json') - response_data = {'status': response_data} - return Response(json.dumps(response_data), mimetype='application/json') if __name__ == '__main__': - if not crypto: - crypto = Cryptography() - app.debug = True app.run(host='0.0.0.0', port=14) diff --git a/don_corleone/config/lionfish.config b/don_corleone/config/lionfish.config index 1313cd8..0b2d1ad 100644 --- a/don_corleone/config/lionfish.config +++ b/don_corleone/config/lionfish.config @@ -1,18 +1,26 @@ { "run_script":"lionfish_run.sh", "terminate_script":"lionfish_terminate.sh", -"test_script":"lionfish_test.sh", +"test_script":"lionfish_test.py", "unary": true, -"params":[ -["port", 7777], ["neo4j_port", 0 , "neo4j:port"], -["neo4j_host","", "neo4j:host"] +"default_service_params":{ + "port":7777, + "host":"$node:host" +}, + +"arguments":[ + ["port"], + ["neo4j_port", "$neo4j:port"], + ["neo4j_host", "$neo4j:host"] ] + + , "depends":["neo4j"] diff --git a/don_corleone/config/lionfish_scala.config b/don_corleone/config/lionfish_scala.config new file mode 100644 index 0000000..83269f5 --- /dev/null +++ b/don_corleone/config/lionfish_scala.config @@ -0,0 +1,39 @@ +{ +"run_script":"lionfish_scala_run.sh", +"terminate_script":"lionfish_scala_terminate.sh", +"test_script":"lionfish_scala_test.sh", + + + +"unary": true, + + + +"default_service_params":{ + "port":7771, + "host":"$node:host", + "neo4j-console-port":7478, + "neo4j-path":"/var/lib/neo4j/data/graph.db" +}, + +"arguments":[ + ["port"],["neo4j-console-port"], ["neo4j-path"] +] + + +, + + +"adds": [ + ["neo4j", + {"port":"$lionfish_scala:neo4j-console-port", + "host":"$lionfish_scala:host"} + ], + ["lionfish", + {"port":"$lionfish_scala:port", + "host":"$lionfish_scala:host"} + ] +] + + +} diff --git a/don_corleone/config/neo4j.config b/don_corleone/config/neo4j.config index 39b2baa..8eb0b25 100644 --- a/don_corleone/config/neo4j.config +++ b/don_corleone/config/neo4j.config @@ -1,9 +1,12 @@ { "run_script":"neo4j_run.sh", "terminate_script":"neo4j_terminate.sh", -"test_script":"neo4j_test.sh", - +"test_script":"neo4j_test.py", +"default_service_params":{ + "port":7474, + "host":"$node:host" +}, "unary": true, diff --git a/don_corleone/config/spidercrab_master.config b/don_corleone/config/spidercrab_master.config index 3e9fe24..4f0ae4a 100644 --- a/don_corleone/config/spidercrab_master.config +++ b/don_corleone/config/spidercrab_master.config @@ -7,10 +7,16 @@ "unary": false, - -"params":[ -["number", 1],["sources_urls_file", ""], ["export_stats_to"] +"default_service_params":{ + "number":1 +}, + +"arguments":[ + ["number"], + ["sources_urls_file"], + ["export_stats_to"] ] + , "depends":["lionfish"] diff --git a/don_corleone/config/spidercrab_slave.config b/don_corleone/config/spidercrab_slave.config index d2e5a3c..424068a 100644 --- a/don_corleone/config/spidercrab_slave.config +++ b/don_corleone/config/spidercrab_slave.config @@ -3,12 +3,22 @@ "terminate_script":"spidercrab_slave_terminate.sh", "test_script":"spidercrab_slave_test.sh", + + "unary": false, -"params":[ -["number", 1],["export-cs-to"], ["export_stats_to"] +"default_service_params":{ + "number":1 +}, + + +"arguments":[ + ["number"], + ["export-cs-to"], + ["export_stats_to"] ] + , "depends":["lionfish", "spidercrab_master"] diff --git a/don_corleone/don_corleone_client.py b/don_corleone/don_corleone_client.py index c3876fc..7182ba9 100644 --- a/don_corleone/don_corleone_client.py +++ b/don_corleone/don_corleone_client.py @@ -57,11 +57,11 @@ def create_parser(): run_node_owner = False terminated = False -def install_node(config, run=True): +def install_node(node_config, run=True): global terminated """ Waits for webserver to start """ - while config[MASTER_LOCAL] and os.system("./scripts/don_corleone_test.sh") != 0 and not terminated: + while node_config[MASTER_LOCAL] and os.system("./scripts/don_corleone_test.sh") != 0 and not terminated: logger.info("Still don corleone not running. Try running it yourself using ./scripts/don_corleone_run.sh") time.sleep(1) @@ -70,24 +70,24 @@ def install_node(config, run=True): # Terminating node logger.info("Terminating old responsibilities") - response = urllib2.urlopen(get_don_corleone_url(config)+"/terminate_node?node_id="+config[NODE_ID]).read() + response = urllib2.urlopen(get_don_corleone_url(node_config)+"/terminate_node?node_id="+node_config[NODE_ID]).read() print response logger.info("Registering the node") # Register node - params = urllib.urlencode({"config":json.dumps(config), "node_id":json.dumps(config[NODE_ID]) }) - response = urllib2.urlopen(get_don_corleone_url(config)+"/register_node", params).read() + params = urllib.urlencode({"node_config":json.dumps(node_config), "node_id":json.dumps(node_config[NODE_ID]) }) + response = urllib2.urlopen(get_don_corleone_url(node_config)+"/register_node", params).read() logger.info(response) # Reversed ssh support - if config.get(REVERSED_SSH, False): + if node_config.get(REVERSED_SSH, False): logger.info("Reversed ssh") - response = json.loads(urllib2.urlopen(get_don_corleone_url(config)+"/register_reversed?node_id="+str(config[NODE_ID])).read()) + response = json.loads(urllib2.urlopen(get_don_corleone_url(node_config)+"/register_reversed?node_id="+str(node_config[NODE_ID])).read()) print response cmd = "./scripts/run_reversed_ssh.sh {0} {1} {2} {3} {4}".format(response["result"]["ssh-user"], response["result"]["ssh-host"], \ - response["result"]["ssh-port-redirect"], config[SSH_PORT], response['result']['ssh-port']) + response["result"]["ssh-port-redirect"], node_config[SSH_PORT], response['result']['ssh-port']) logger.info("Running "+cmd) os.system(cmd) @@ -95,33 +95,35 @@ def install_node(config, run=True): time.sleep(1) logger.info("Installing the node") - print config[RESPONSIBILITIES] + print node_config[RESPONSIBILITIES] if not run: logger.info("WARNING: Only installing not running services") service_ids = [] - for id, responsibility in enumerate(config[RESPONSIBILITIES]): + for id, responsibility in enumerate(node_config[RESPONSIBILITIES]): logger.info("Registering "+str(id)+" responsibility "+str(responsibility)) service = responsibility[0] service_parameters = responsibility[1] params = urllib.urlencode\ - ({"service":json.dumps(service),"run":json.dumps(False) , "config":json.dumps(config), - "additional_config":json.dumps(service_parameters), "node_id":json.dumps(config[NODE_ID]), "public_url":json.dumps(config[PUBLIC_URL]) + ({"service":json.dumps(service),"run":json.dumps(False) , "node_config":json.dumps(node_config), + "additional_config":json.dumps(service_parameters), "node_id":json.dumps(node_config[NODE_ID]), + "public_url":json.dumps(node_config[PUBLIC_URL]) }) - print get_don_corleone_url(config) - response = urllib2.urlopen(get_don_corleone_url(config)+"/register_service", params).read() - print response + print get_don_corleone_url(node_config) + print "Registering and waiting for response..." + response = urllib2.urlopen(get_don_corleone_url(node_config)+"/register_service", params).read() + print "Response...", response if has_succeded(response): service_ids.append(json.loads(response)['result']) else: logger.error("NOT REGISTERED SERVICE "+str(responsibility)) - response = json.loads(urllib2.urlopen(get_don_corleone_url(config)+"/get_services").read()) + response = json.loads(urllib2.urlopen(get_don_corleone_url(node_config)+"/get_services").read()) print "Succeded = ", has_succeded(response) @@ -130,8 +132,10 @@ def install_node(config, run=True): for service_id in service_ids: print "Running ",service_id for i in xrange(20): - response = urllib2.urlopen(get_don_corleone_url(config)+\ + print "Calling run service" + response = urllib2.urlopen(get_don_corleone_url(node_config)+\ "/run_service?service_id="+str(service_id)).read() + print "Response..." if not has_succeded(response): logger.error("SHOULDNT HAPPEN FAILED RUNNING") logger.error(response) @@ -202,15 +206,28 @@ def install_node_daemon(): #Run daemon - ONLY PROTOTYPED while True: + while os.path.exists("command_queue_lock"): + print "Locked.." + time.sleep(0.01) + os.system("sudo -u {0} touch command_queue_lock".format(config[USER])) - commands = open("command_queue","r").readlines() - os.system("rm command_queue_lock && rm command_queue && sudo -u {0} touch command_queue".format(config[USER])) + + commands = [] + + if os.path.exists("command_queue"): + commands = open("command_queue","r").readlines() + os.system("sudo rm command_queue") + + + os.system("sudo rm command_queue_lock".format(config[USER])) + for cmd in commands: logger.info("Running remotedly requested command " + str(cmd)) ret = os.system("sudo -u {0} sh -c \"{1}\"".format(config["ssh-user"], cmd)) if ret != 0: logger.info("Failed command") logger.info("Done") + time.sleep(1) if __name__ == "__main__": diff --git a/don_corleone/don_corleone_constants.py b/don_corleone/don_corleone_constants.py index 1111e00..22e8bee 100644 --- a/don_corleone/don_corleone_constants.py +++ b/don_corleone/don_corleone_constants.py @@ -4,7 +4,7 @@ TODO: load from file """ -UPDATE_FREQ = 1 +UPDATE_FREQ = 10 KILL_NODE_COUNTER = 1000*60 / (1000*UPDATE_FREQ) # Kill not answering server if doesn't answer for 60s CONFIG_DIRECTORY = "config" OK = "ok" @@ -15,9 +15,10 @@ CLIENT_CONFIG_SSH_HOST = "public_ssh_domain" CLIENT_CONFIG_SSH_PORT = "ssh-port" - +CONFIG_DEFAULT_SERVICE_PARAMS = "default_service_params" +CONFIG_ADDS = "adds" CONFIG_DEPENDS = "depends" -CONFIG_PARAMS = "params" +CONFIG_ARGUMENTS = "arguments" CONFIG_UNARY = "unary" CONFIG_RUN_SCRIPT = "run_script" CONFIG_TERMINATE_SCRIPT = "terminate_script" @@ -31,11 +32,13 @@ SERVICE_NEO4J = "neo4j" SERVICE_SPIDERCRAB_MASTER = "spidercrab_master" SERVICE_SPIDERCRAB_SLAVE = "spidercrab_slave" +SERVICE_LIONFISH_SCALA = "lionfish_scala" KNOWN_SERVICES = set([SERVICE_LIONFISH, SERVICE_NEO4J, SERVICE_KAFKA, SERVICE_ZOOKEEPER, SERVICE_SPIDERCRAB_MASTER,\ - SERVICE_SPIDERCRAB_SLAVE, SERVICE_TEST_SERVICE + SERVICE_SPIDERCRAB_SLAVE, SERVICE_TEST_SERVICE, + SERVICE_LIONFISH_SCALA ]) @@ -57,12 +60,14 @@ #additionally local tag SERVICE_ID = "service_id" SERVICE_STATUS = "status" +SERVICE_AS_ADDED = "is_added" # If added by another service - cannot be run/terminated #Without http SERVICE_HOME = "home" SERVICE_PORT = "port" SERVICE_RUN_CMD = "run_cmd" SERVICE_NAME = "service" -SERVICE_PARAMETERS = "service_config" # Additional service config +SERVICE_PARAMETERS = "service_params" # Additional service parameters +SERVICE_CONFIG = "service_config" # Service config for given service, normally copied from config/* directory NODE_ID = "node_id" # Id for node SERVICE_LOCAL = "local" @@ -117,4 +122,83 @@ def __str__(self): ERROR_DUPLICATE = DonCorleoneException("error_duplicate") ERROR_FILE_NOT_FOUND = DonCorleoneException("error_file_not_found") ERROR_NOT_SATISFIED_DEPENDENCIES = DonCorleoneException("error_not_satisfied_dependencies") -ERROR_NOT_SATISFIED_DEPENDENCIES_NOT_RUNNING = DonCorleoneException("error_not_satisfied_dependencies_not_running") \ No newline at end of file +ERROR_NOT_SATISFIED_DEPENDENCIES_NOT_RUNNING = DonCorleoneException("error_not_satisfied_dependencies_not_running") + + + +#TODO: move to utils + +import threading + +__author__ = "Mateusz Kobos" + +class RWLock: + """Synchronization object used in a solution of so-called second + readers-writers problem. In this problem, many readers can simultaneously + access a share, and a writer has an exclusive access to this share. + Additionally, the following constraints should be met: + 1) no reader should be kept waiting if the share is currently opened for + reading unless a writer is also waiting for the share, + 2) no writer should be kept waiting for the share longer than absolutely + necessary. + + The implementation is based on [1, secs. 4.2.2, 4.2.6, 4.2.7] + with a modification -- adding an additional lock (C{self.__readers_queue}) + -- in accordance with [2]. + + Sources: + [1] A.B. Downey: "The little book of semaphores", Version 2.1.5, 2008 + [2] P.J. Courtois, F. Heymans, D.L. Parnas: + "Concurrent Control with 'Readers' and 'Writers'", + Communications of the ACM, 1971 (via [3]) + [3] http://en.wikipedia.org/wiki/Readers-writers_problem + """ + + def __init__(self): + self.__read_switch = _LightSwitch() + self.__write_switch = _LightSwitch() + self.__no_readers = threading.Lock() + self.__no_writers = threading.Lock() + self.__readers_queue = threading.Lock() + """A lock giving an even higher priority to the writer in certain + cases (see [2] for a discussion)""" + + def reader_acquire(self): + self.__readers_queue.acquire() + self.__no_readers.acquire() + self.__read_switch.acquire(self.__no_writers) + self.__no_readers.release() + self.__readers_queue.release() + + def reader_release(self): + self.__read_switch.release(self.__no_writers) + + def writer_acquire(self): + self.__write_switch.acquire(self.__no_readers) + self.__no_writers.acquire() + + def writer_release(self): + self.__no_writers.release() + self.__write_switch.release(self.__no_readers) + + +class _LightSwitch: + """An auxiliary "light switch"-like object. The first thread turns on the + "switch", the last one turns it off (see [1, sec. 4.2.2] for details).""" + def __init__(self): + self.__counter = 0 + self.__mutex = threading.Lock() + + def acquire(self, lock): + self.__mutex.acquire() + self.__counter += 1 + if self.__counter == 1: + lock.acquire() + self.__mutex.release() + + def release(self, lock): + self.__mutex.acquire() + self.__counter -= 1 + if self.__counter == 0: + lock.release() + self.__mutex.release() \ No newline at end of file diff --git a/don_corleone/don_corleone.py b/don_corleone/don_corleone_server.py similarity index 71% rename from don_corleone/don_corleone.py rename to don_corleone/don_corleone_server.py index b1116ea..3490c19 100755 --- a/don_corleone/don_corleone.py +++ b/don_corleone/don_corleone_server.py @@ -1,21 +1,41 @@ -""" Server responsible for managing Ocean state. -Ocean admin should be run on every node of our Ocean cluster. +""" +# Server responsible for managing Ocean state. + + + State: prototype: everything is based on dictionaries. I should add classes to parse everything. It is very error prone + + Note: it is not checking and asserting everything partially because of the design, partly because of the limited time resources ;) + + TODO: repalce literals in request.form accessors + + Ocean admin should be run on every node of our Ocean cluster. + +## Key concepts -Protocol: JSONP: -if succeeded: - { - result: JSON(data) - } -if error: - { - error: JSON(data) - } + configuration: node, or service config is a entity deifning its installtion, properties + + params: can change from run to run, defines service parameters like host/port + + arguments: these are arguments **pased** during running to the service, in most cases are derived from params + + +## Protocol: + + JSONP: + if succeeded: + { + result: JSON(data) + } + if error: + { + error: JSON(data) + } Services are added and not tested for accessibility first time. Services are added to service list which is periodically updated (terminated/running) status -TODO: Testing server accessibiliy (ping) and terminating it if not accesible + Assumptions: @@ -23,7 +43,6 @@ --------------- * Every command can fail - it is ok if terminate/run fails - """ import json import threading @@ -32,10 +51,11 @@ from don_corleone_constants import * from don_utils import logger from flask.ext.jsonpify import jsonify +from flask import g #Configuration for services read from CONFIG_DIRECTORY -service_configs = {} +service_config = {} #Nodes in the system #Each node is "config" + "services" @@ -45,14 +65,36 @@ services = [] # Lock for synchronization -services_lock = threading.RLock() -registered_nodes_lock = threading.RLock() +node_services_lock = threading.RLock() # Sttatus of checker daemon status_checker_job_status = "NotRunning" # Default configs for services -default_service_parameters = {} +default_service_params = {} + + +class DummySynchronizer: + def __enter__(self): + pass + + def __exit__(self, type, value, traceback): + pass + + +class Synchronizer: + def __enter__(self): + global services, registered_nodes + services = getattr(g, "_services", None) + registered_nodes = getattr(g, "_registered_nodes", None) + + + + def __exit__(self, type, value, traceback): + global services, registered_nodes + json.dumps(services, ) + +synchronizer = DummySynchronizer() import socket @@ -79,32 +121,32 @@ def get_bare_ip(address): return address def is_service_by_id(service_id): - return filter(lambda x: x[SERVICE_ID] == service_id, services) + with synchronizer: + return filter(lambda x: x[SERVICE_ID] == service_id, services) #TODO: package as a class def get_service_by_id(service_id): """ Returns service given service_id """ - with services_lock: - if not filter(lambda x: x[SERVICE_ID] == service_id, services): - raise ERROR_NOT_REGISTERED_SERVICE - else: - if len(filter(lambda x: x[SERVICE_ID] == service_id, services)) > 1: - logger.error("Duplicated service_id") - exit(1) + if not filter(lambda x: x[SERVICE_ID] == service_id, services): + raise ERROR_NOT_REGISTERED_SERVICE + else: + if len(filter(lambda x: x[SERVICE_ID] == service_id, services)) > 1: + logger.error("Duplicated service_id") + exit(1) - result = filter(lambda x: x[SERVICE_ID] == service_id, services)[0] + result = filter(lambda x: x[SERVICE_ID] == service_id, services)[0] return result def get_node_services(node_id): """ Returns service of given node_id""" - with services_lock: + with synchronizer: # Return is fine in with clause return [s for s in services if s[NODE_ID] == node_id] #TODO: package as a class def add_service(service): """ Adds service """ - with services_lock: + with synchronizer: # Check if no duplicate or duplicate and local if not is_service_by_id(service[SERVICE_ID]) or service[SERVICE_LOCAL]: @@ -124,7 +166,7 @@ def add_service(service): #TODO: package as a class def remove_service(service_id): """ Removes service from registered services """ - with services_lock: + with synchronizer: found_id = None found_service = None #TODO: more functional style @@ -146,7 +188,7 @@ def remove_service(service_id): logger.error("Node not registered, fatal error") raise ERROR_NODE_NOT_REGISTERED else: - with registered_nodes_lock: + with synchronizer: for id, m in enumerate( registered_nodes[found_service[NODE_ID]][NODE_RESPONSIBILITIES]): if m[SERVICE_ID] == service_id: @@ -219,7 +261,7 @@ def update_status(m): cmd = "(cd {0} && {1})".format( os.path.join(m[SERVICE_HOME],"don_corleone"), - "./scripts/{0}".format(service_configs[m[SERVICE]][CONFIG_TEST_SCRIPT])) + "./scripts/{0}".format(service_config[m[SERVICE]][CONFIG_TEST_SCRIPT])) logger.info("Running {0} to check if service is running ".format(cmd)) @@ -228,7 +270,7 @@ def update_status(m): - with services_lock: + with synchronizer: logger.info(("Checking server availability for ",m[SERVICE_ID], "result ", str(status))) logger.info(output) @@ -249,7 +291,7 @@ def status_checker_job(): for m in services: update_status(m) - with registered_nodes_lock: + with synchronizer: for n in registered_nodes.itervalues(): ret = os.system("ping {0} -c 1".format(n[NODE_ADDRESS])) if ret != 0: @@ -291,7 +333,7 @@ def _terminate_service(service_id): """ Terminate service given service_id @returns OK or DonCorleoneExcpetion """ - with services_lock: + with synchronizer: if not get_service_by_id(service_id): raise ERROR_NOT_REGISTERED_SERVICE @@ -312,7 +354,7 @@ def _terminate_service(service_id): status, output = cautious_run_cmd_over_ssh(cmd, m[NODE_ID],m) - with services_lock: + with synchronizer: logger.info(("Terminating service ",service_id, "output", output, "status ",status)) if status == OK: @@ -321,68 +363,90 @@ def _terminate_service(service_id): return status -def _run_service(service_id): - """ Run service given service_id - @returns OK or DonCorleone exception +def resolve_don_value(value, node_id): + """ + @returns Resolved don value, for instance $node:host is service node's host (public_ssh_domain from config.json) """ - with services_lock: - m = get_service_by_id(service_id) - if m[SERVICE_STATUS] == STATUS_RUNNING: - return ERROR_SERVICE_ALREADY_RUNNING + if str(value) == value: + if len(value) == 0: return value - if m[SERVICE_STATUS] != STATUS_TERMINATED: - logger.error("Wrong service status") - exit(1) + if value[0] == "$": + s, config = value[1:].split(":") + print s, config + if s == "node": + if config == "host": + with synchronizer: + return registered_nodes[node_id][NODE_ADDRESS] + else: + raise ERROR_NOT_RECOGNIZED_CONFIGURATION - # Check dependencies - for d in service_configs[m[SERVICE]][CONFIG_DEPENDS]: - try: - s = _get_service(d, None, m[NODE_ID]) - if not s[SERVICE_STATUS] == STATUS_RUNNING: - raise ERROR_NOT_SATISFIED_DEPENDENCIES_NOT_RUNNING - except Exception, e: - logger.error("Failed getting service in checking dependencies "+str(e)) - raise e + return _get_configuration(service_name=s, service_id=None, node_id=node_id, config_name=config) + else: + return value + else: + return value - # Calculate params - params = "" - for p in service_configs[m[SERVICE]][CONFIG_PARAMS]: - if len(p) == 2: - #Non dependant - params += " --{0} {1}".format(p[0], m[SERVICE_PARAMETERS].get(p[0], p[1])) - elif len(p) == 3: - # Dependant - params += " --{0} {1}".format(p[0], _get_configuration(service_name=p[2].split(":")[0], service_id=None,\ - node_id=m[NODE_ID], config_name=p[2].split(":")[1])) - # No default value - elif len(p) == 1 and p[0] in m[SERVICE_PARAMETERS]: - params += " --{0} {1}".format(p[0], m[SERVICE_PARAMETERS][p[0]]) +def _run_service(service_id): + try: + """ Run service given service_id + @returns OK or DonCorleone exception + """ + with synchronizer: + m = get_service_by_id(service_id) + if m[SERVICE_STATUS] == STATUS_RUNNING: + raise ERROR_SERVICE_ALREADY_RUNNING + if m[SERVICE_STATUS] != STATUS_TERMINATED: + logger.error("Wrong service status") + exit(1) - logger.info("Running {0} with params ".format(m[SERVICE_ID], params)) + # Check dependencies + for d in service_config[m[SERVICE]][CONFIG_DEPENDS]: + try: + s = _get_service(d, None, m[NODE_ID]) + if not s[SERVICE_STATUS] == STATUS_RUNNING: + raise ERROR_NOT_SATISFIED_DEPENDENCIES_NOT_RUNNING + except Exception, e: + logger.error("Failed getting service in checking dependencies "+str(e)) + raise e - cmd = "(cd {0} && {1})".format( - os.path.join(m[SERVICE_HOME],"don_corleone"), - "./scripts/run.sh {1} ./scripts/{0} {2}".format(service_configs[m[SERVICE]][CONFIG_RUN_SCRIPT], m[SERVICE_ID], params)) + # Calculate params + params = "" - status, output = cautious_run_cmd_over_ssh(cmd, m[NODE_ID],m) + for p in service_config[m[SERVICE]][CONFIG_ARGUMENTS]: + if p[0] in m[SERVICE_PARAMETERS]: + params += " --{0}={1}".format(p[0], m[SERVICE_PARAMETERS][p[0]]) + else: + params += " --{0}={1}".format(p[0], m[SERVICE_PARAMETERS].get(p[0], resolve_don_value(p[1], m[NODE_ID]))) - with services_lock: - logger.info(("Running service ",service_id, "output", output, "status ",status)) - - update_status(m) - return status + + + + logger.info("Running {0} with params ".format(m[SERVICE_ID], params)) + + cmd = "(cd {0} && {1})".format( + os.path.join(m[SERVICE_HOME],"don_corleone"), + "./scripts/run.sh {1} ./scripts/{0} {2}".format(service_config[m[SERVICE]][CONFIG_RUN_SCRIPT], m[SERVICE_ID], params)) + + status, output = cautious_run_cmd_over_ssh(cmd, m[NODE_ID],m) + + with synchronizer: + logger.info(("Running service ",service_id, "output", output, "status ",status)) + + return status + except Exception, e: + raise DonCorleoneException(pack_error(e)) def _deregister_service(service_id): """ Deregister service given service_id @returns OK or DonCorleone exception """ - with services_lock: + with synchronizer: #TODO: add special handling for local if not get_service_by_id(service_id): @@ -426,14 +490,14 @@ def deregister_service(): def register_node(): try: output = OK - with services_lock: - config = json.loads(request.form['config']) + with synchronizer: + node_config = json.loads(request.form['node_config']) node_id = json.loads(request.form['node_id']) - with registered_nodes_lock: + with synchronizer: if not node_id in registered_nodes: - node = {NODE_ID:node_id, NODE_ADDRESS:config[CLIENT_CONFIG_SSH_HOST],NODE_CONFIG:config, NODE_RESPONSIBILITIES:[], NODE_SSH_HOST:config[CLIENT_CONFIG_SSH_HOST], - NODE_SSH_PORT:config[CLIENT_CONFIG_SSH_PORT], NODE_SSH_USER:config[CLIENT_CONFIG_SSH_USER] + node = {NODE_ID:node_id, NODE_ADDRESS:node_config[CLIENT_CONFIG_SSH_HOST],NODE_CONFIG:node_config, NODE_RESPONSIBILITIES:[], NODE_SSH_HOST:node_config[CLIENT_CONFIG_SSH_HOST], + NODE_SSH_PORT:node_config[CLIENT_CONFIG_SSH_PORT], NODE_SSH_USER:node_config[CLIENT_CONFIG_SSH_USER] } registered_nodes[node_id] = node @@ -457,47 +521,39 @@ def get_service(): except Exception, e: return pack_error(e) -@app.route('/register_service', methods=['POST']) -def register_service(): + +def _register_service(run, service_name, public_url, node_config, local, node_id, forced_service_params): try: - output = OK - with services_lock: - run = json.loads(request.form['run']) - service_name = json.loads(request.form['service']) - public_url = json.loads(request.form['public_url']) - config = json.loads(request.form['config']) - for node_resp in config[NODE_RESPONSIBILITIES]: - if node_resp[0] == service_name: - service_id = node_resp[1].get('service_id', None) + with synchronizer: + # Read in service and copy **parameters** + service_params = dict(default_service_params[service_name]) + service_params.update(forced_service_params) + for key in service_params.iterkeys(): + service_params[key] = resolve_don_value(service_params[key], node_id) - logger.info("Registering "+str(service_id)+ " service_name="+str(service_name)) - # Load default service additional config (like port configuration) - additional_service_parameters = default_service_parameters.get(service_name, {}) - try: - additional_service_parameters.update(json.loads(request.form['additional_config'])) - except Exception, ex: - print request.form['additional_config'] + service_id = None + + for node_resp in node_config[NODE_RESPONSIBILITIES]: + if node_resp[0] == service_name: + service_id = node_resp[1].get('service_id', None) - node_id = json.loads(request.form['node_id']) + logger.info("Registering "+str(service_id)+ " service_name="+str(service_name) +" and run = "+str(run)) - with registered_nodes_lock: + with synchronizer: if not node_id in registered_nodes: raise ERROR_NOT_REGISTERED_NODE - # Check dependencies - for d in service_configs[service_name][CONFIG_DEPENDS]: + for d in service_config[service_name][CONFIG_DEPENDS]: try: s = _get_service(d, None, node_id) except Exception, e: raise ERROR_NOT_SATISFIED_DEPENDENCIES - local = True if 'local' in request.form or additional_service_parameters.get('local', False)\ - else False logger.info("Registering local="+str(local)+" service") @@ -509,7 +565,7 @@ def register_service(): logger.info("Testing "+service_name+"_"+str(services_id_test)) while service_name+"_"+str(services_id_test) in service_list: services_id_test += 1 - service_id = service_name + "_" + str(services_id_test) + service_id = service_name + "_" + str(services_id_test) # Not known service.. @@ -528,30 +584,31 @@ def register_service(): raise ERROR_ALREADY_REGISTERED_SERVICE logger.info("Proceeding to registering {0} {1}".format(service_name, service_id)) - + #Prepare service - service_dict = {SERVICE:service_name, + service_dict = { + SERVICE:service_name, SERVICE_ID:service_id, SERVICE_STATUS:STATUS_TERMINATED, NODE_ID: node_id, NODE_ADDRESS: public_url, SERVICE_LOCAL: local, SERVICE_RUN_CMD:DEFAULT_COMMAND, - SERVICE_HOME:config[CLIENT_CONFIG_HOME], - SERVICE_PARAMETERS: additional_service_parameters - } + SERVICE_HOME:node_config[CLIENT_CONFIG_HOME], + SERVICE_PARAMETERS: service_params + } #Modify service_id to make it unique - + add_service(service_dict) logger.info(("Registering " if not run else "Running and registering ")+str(service_dict)) - update_status(service_dict) + #update_status(service_dict) if run: @@ -562,22 +619,53 @@ def register_service(): logger.info("Running service "+service_dict[SERVICE_ID]+" result "+output_run_service) except DonCorleoneException,e: logger.error("Failed deregistering service " + service_id + " with DonCorleoneException "+str(e)) - output = e + raise e except Exception,e: logger.error("Failed deregistering service " + service_id + " with non expected exception "+str(e)) - output = ERROR_FAILED_SERVICE_RUN + raise ERROR_FAILED_SERVICE_RUN if service_dict[SERVICE_STATUS] != STATUS_RUNNING: - output = ERROR_FAILED_SERVICE_RUN - else: - output = service_id - else: - output=service_id + raise ERROR_FAILED_SERVICE_RUN - return jsonify(result=str(output)) + # Registers (without running!) "adds" services + for service_to_add in service_config[service_name][CONFIG_ADDS]: + logger.info("Adding child service "+str(service_to_add)) + service_to_add_name, service_to_add_params = service_to_add + _register_service(False, service_to_add_name, public_url, node_config, local, node_id, service_to_add_params) + + return service_id + except Exception ,e: + raise DonCorleoneException(pack_error(e)) + +@app.route('/register_service', methods=['POST']) +def register_service(): + try: + + + run = json.loads(request.form['run']) + service_name = json.loads(request.form['service']) + if service_name not in KNOWN_SERVICES: + raise ERROR_NOT_RECOGNIZED_SERVICE + public_url = json.loads(request.form['public_url']) + node_config = json.loads(request.form['node_config']) + + node_id = json.loads(request.form['node_id']) + # Load default service additional config (like port configuration) + additional_service_parameters = {} + if 'additional_config' in request.form: + additional_service_parameters.update(json.loads(request.form['additional_config'])) + local = additional_service_parameters.get("local", False) + + + service_id = \ + _register_service(run, service_name, public_url, node_config, local, node_id, additional_service_parameters) + + + + return jsonify(result=service_id) except Exception, e: return jsonify(error=pack_error(e)) @@ -624,7 +712,7 @@ def run_service(): return jsonify(error=pack_error(e)) except Exception,e: logger.error("Failed running service " + service_id + " with non expected exception "+str(e)) - return jsonify(result=str(ERROR_FAILED_SERVICE_RUN)) + return jsonify(result=pack_error(e)) @@ -632,13 +720,15 @@ def run_service(): @app.route('/get_services') def get_services(): - with services_lock: + running = request.args.get('running') + if running == "true": + return jsonify(result=[s for s in services if s[SERVICE_STATUS] == STATUS_RUNNING]) + else: return jsonify(result=services) @app.route('/get_nodes') def get_nodes(): - with registered_nodes_lock: - return jsonify(result=registered_nodes) + return jsonify(result=registered_nodes) @@ -681,9 +771,9 @@ def _get_configuration(service_name, service_id, config_name, node_id): try: s = _get_service(service_name, service_id, node_id) - return s[SERVICE_PARAMETERS].get(config_name, default_service_parameters[s[SERVICE]][config_name]) + return s[SERVICE_PARAMETERS][config_name] except Exception, e: - raise pack_error(e) + raise DonCorleoneException(pack_error(e)) @@ -696,6 +786,7 @@ def get_configuration(): if not picks the global one (always only one!!). """ try: + logger.info("Getting configuration") # Get input parameters service_name = request.args.get('service_name') @@ -738,7 +829,7 @@ def _terminate_node(node_id): logger.info("Terminating node "+node_id) try: - with services_lock: + with synchronizer: for m in get_node_services(node_id): # Try deregistering the service and delegate errror checking to _run_service try: @@ -752,7 +843,7 @@ def _terminate_node(node_id): logger.error("Failed deregistering service " + m[SERVICE_ID] + " with unexpected error "+str(e)) return jsonify(error=str(ERROR_FAILED_SERVICE_RUN)) - with registered_nodes_lock: + with synchronizer: registered_nodes.pop(node_id) except Exception, e: @@ -803,7 +894,7 @@ def register_reversed(): while os.system("./scripts/test_port.sh "+str(port)) == 0: port += 1 - with registered_nodes_lock: + with synchronizer: registered_nodes[node_id].update({NODE_SSH_HOST: "127.0.0.1", NODE_SSH_PORT: port}) return jsonify(result={"ssh-user":ssh_user_don, "ssh-port":ssh_port_don, "ssh-host":ssh_host_don, "ssh-port-redirect":port}) @@ -835,35 +926,40 @@ def read_configs(): ### Read in configuration files and setup default parameters ### for s in KNOWN_SERVICES: - default_service_parameters[s] = {} + default_service_params[s] = {} config_file = os.path.join(CONFIG_DIRECTORY, s+".config") if os.path.exists(config_file): - service_configs[s] = {CONFIG_DEPENDS: [], CONFIG_PARAMS: []} - service_configs[s].update(json.loads(open(config_file,"r").read())) - if service_configs[s][CONFIG_UNARY]: + + service_config[s] = {CONFIG_DEPENDS: [], CONFIG_ARGUMENTS: [], CONFIG_ADDS: [], CONFIG_DEFAULT_SERVICE_PARAMS: {}} + + logger.info("Reading "+s+" configuration") + + service_config[s].update(json.loads(open(config_file,"r").read())) + + if service_config[s][CONFIG_UNARY]: UNARY_SERVICES.add(s) if s in NONUNARY_SERVICES: NONUNARY_SERVICES.remove(s) else: if s in UNARY_SERVICES: UNARY_SERVICES.remove(s) NONUNARY_SERVICES.add(s) + # Read in configuration + default_service_params[s] = service_config[s][CONFIG_DEFAULT_SERVICE_PARAMS] - params = service_configs[s][CONFIG_PARAMS] - for p in params: - if len(p) == 2: #non dependant - default_service_parameters[s][p[0]] = p[1] - + ## Resolve anchors + #for key in default_service_configs[s].iterkeys(): + # default_service_configs[s][key] = resolve_don_value(default_service_configs[s][key], None) logger.info("Read service configuration") logger.info(s) - logger.info(default_service_parameters[s]) - logger.info(service_configs[s]) + logger.info(default_service_params[s]) + logger.info(service_config[s]) else: logger.warn("No configuration for {0}, writing empty config (default)".format(s)) - service_configs[s] = {} + service_config[s] = {} @app.before_first_request def run_daemons_flask(): @@ -884,4 +980,4 @@ def run_daemons_flask(): if __name__ == '__main__': read_configs() run_daemons() - app.run(port=8881) + app.run(port=8881, processes=3) diff --git a/don_corleone/don_utils.py b/don_corleone/don_utils.py index d555cf5..c6787ce 100755 --- a/don_corleone/don_utils.py +++ b/don_corleone/don_utils.py @@ -3,31 +3,18 @@ import logging import os import sys +from don_corleone_constants import * -SERVICE = "service" -#Service ID is in most cases the same as SERVICE, however if it is local, or if it is multiple_slave it can differ -#For instance hadoop slaves will have service id hadoop_slave:2, whereas local service will have id -#neo4j_local, service id is basically service_name:additional_config ,where service_name can have -#additionally local tag -SERVICE_ID = "service_id" -SERVICE_STATUS = "status" -#Without http -SERVICE_HOME = "home" -SERVICE_PORT = "port" -SERVICE_RUN_CMD = "run_cmd" -SERVICE_NAME = "service" -SERVICE_CONFIG = "service_config" # Additional service config -NODE_ID = "node_id" # Id for node -SERVICE_LOCAL = "local" -STATUS_NOTREGISTERED = "not_registered" -STATUS_TERMINATED = "terminated" -STATUS_RUNNING = "running" +don_test_file_path = os.path.join(os.path.dirname(__file__),"scripts/don_corleone_test.sh") +don_test_file = don_test_file_path +#TODO: repair - why not calling get_configuration from don is available? +#TODO: broken gen_service for local ones + -#TODO: move to another class logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) ch = logging.StreamHandler() @@ -35,7 +22,7 @@ ch.setFormatter(formatter) logger.addHandler(ch) logger.propagate = False -ch_file = logging.FileHandler(os.path.join(os.path.dirname(__file__),"server.log"), ) +ch_file = logging.FileHandler(os.path.join(os.path.dirname(__file__),"don_utils.log"), ) formatter = logging.Formatter('%(funcName)s - %(asctime)s - %(levelname)s - %(message)s') ch_file.setFormatter(formatter) logger.addHandler(ch_file) @@ -65,7 +52,7 @@ def get_all_services(config=None): return resp['result'] -def get_service(services, service_id=None, service_name=None, node_id=None, service_config={}): +def get_service(services, service_id=None, service_name=None, node_id=None, service_params={}): for s in services: if service_id is not None and s[SERVICE_ID] != service_id: continue @@ -74,18 +61,18 @@ def get_service(services, service_id=None, service_name=None, node_id=None, serv if node_id is not None and s[NODE_ID] != node_id: continue - config_cpy = dict(s[SERVICE_CONFIG]) - config_cpy.update(service_config) + config_cpy = dict(s[SERVICE_PARAMETERS]) + config_cpy.update(service_params) - if set(config_cpy) != set(s[SERVICE_CONFIG]): + if set(config_cpy) != set(s[SERVICE_PARAMETERS]): continue return s return None -def get_running_service(service_id=None, node_id=None, service_name=None, service_config={}, \ - config=None, enforce_running=True, enforce_local=False): +def get_running_service(service_id=None, node_id=None, service_name=None, service_params={}, \ + node_config=None, enforce_running=True, enforce_local=False): """ @returns given service if service is running with given optionally service_id or service_name and having parameters specified in params @@ -93,24 +80,26 @@ def get_running_service(service_id=None, node_id=None, service_name=None, servic @note Also if don corleone is not running it pulls config from config.json and assumes running_services==node_responsibilities """ - if service_id is None and service_name is None and len(service_config) == 0: + if service_id is None and service_name is None and len(service_params) == 0: logger.error("Incorrect parameters") return None - if config is None: - config = json.load(open(os.path.join(os.path.dirname(__file__),"config.json"),"r")) + if node_config is None: + node_config = json.load(open(os.path.join(os.path.dirname(__file__),"config.json"),"r")) - if (config[MASTER_LOCAL] and os.system("./scripts/don_corleone_test.sh") != 0) or enforce_local: - logger.error(os.system("./scripts/don_corleone_test.sh")) + if (node_config[MASTER_LOCAL] and os.system(don_test_file) != 0) or enforce_local: + logger.error(enforce_local) logger.error("WARNING: don corleone is not running !! Pulling config from config.json") services = [] - for node_resp in config["node_responsibilities"]: - services.append({"local":True, SERVICE:node_resp[0], SERVICE_ID:node_resp[0], SERVICE_CONFIG:node_resp[1], NODE_ID:node_id}) - return get_service(services, service_id = service_id, node_id=node_id, service_name=service_name, service_config=service_config) + for node_resp in node_config["node_responsibilities"]: + services.append({"local":True, SERVICE:node_resp[0], SERVICE_ID:node_resp[0], SERVICE_PARAMETERS:node_resp[1], NODE_ID:node_id}) + + + return get_service(services, service_id = service_id, node_id=node_id, service_name=service_name, service_params=service_params) # Get running services from don corleone - response = json.loads(run_procedure(config, "get_services")) + response = json.loads(run_procedure(node_config, "get_services")) if not has_succeded(response): logger.error("FAILED DON CORLEONE COMMUNICATION") @@ -119,11 +108,11 @@ def get_running_service(service_id=None, node_id=None, service_name=None, servic services = [] if enforce_running: - services = [s for s in json.loads(run_procedure(config, "get_services"))['result'] if s[SERVICE_STATUS] == STATUS_RUNNING] + services = [s for s in json.loads(run_procedure(node_config, "get_services"))['result'] if s[SERVICE_STATUS] == STATUS_RUNNING] else: - services = [s for s in json.loads(run_procedure(config, "get_services"))['result']] + services = [s for s in json.loads(run_procedure(node_config, "get_services"))['result']] - return get_service(services, service_id = service_id, node_id=node_id, service_name=service_name, service_config=service_config) + return get_service(services, service_id = service_id, node_id=node_id, service_name=service_name, service_params=service_params) @@ -133,7 +122,7 @@ def has_succeded(response): -def get_configuration_query(config_name, node_id=None, service_id=None, service_name=None, service_config={}, config=None): +def get_configuration_query(param_name, node_id=None, service_id=None, service_name=None, service_params={}, config=None): """ More complicated version of get_configuration @@ -145,30 +134,22 @@ def get_configuration_query(config_name, node_id=None, service_id=None, service_ s = get_running_service(service_id=service_id, node_id=node_id, service_name=service_name, \ - service_config=service_config, config=config, enforce_running=False) - - # Try local - if s is None: - s = get_running_service(service_id=service_id, node_id=node_id, service_name=service_name, \ - service_config=service_config, config=config, enforce_running=False,\ - enforce_local=True - ) + service_params=service_params, node_config=config, enforce_running=False) if s is None: logger.error("Not found requested service!") - return None - + raise DonCorleoneException("Not found requested service") # Special handling for config.json if s.get("local", False) is True: logger.error("WARNING: don corleone is not running !! Pulling config from config.json") - if config_name in s[SERVICE_CONFIG]: - return s[SERVICE_CONFIG][config_name] + if param_name in s[SERVICE_PARAMETERS]: + return s[SERVICE_PARAMETERS][param_name] else: raise "Not found configuration. Try adding it to don_corleone/config.json" # Handles request back to server - return _get_configuration_by_id(s[SERVICE_ID], config_name, config) + return _get_configuration_by_id(s[SERVICE_ID], param_name, config) def get_my_config(): @@ -179,7 +160,10 @@ def get_my_node_id(): return config['node_id'] -def get_configuration(service_name, config_name, config=None, service_config={}): + + + +def get_configuration(service_name, config_name, node_config=None, service_params={}): """ @returns configuration config_name for service_name. @@ -188,49 +172,70 @@ def get_configuration(service_name, config_name, config=None, service_config={}) Also if don corleone is not running it pulls config from config.json """ + if node_config is None: + node_config = json.load(open(os.path.join(os.path.dirname(__file__),"config.json"),"r")) - s = get_running_service(service_id=None, service_name=service_name, \ - service_config=service_config, config=config, enforce_running=False) + + # Not running don corleone: getting from config.json + if node_config[MASTER_LOCAL] and os.system(don_test_file) != 0: + logger.error("No don corleone? wuuuut") + print os.system(don_test_file) - # Try local - if s is None: - logger.info("No given service registered on don corleone!") s = get_running_service(service_id=None, service_name=service_name, \ - service_config=service_config, config=config, enforce_running=False,\ - enforce_local=True - ) + service_params=service_params, node_config=node_config, enforce_running=False, + enforce_local=True) - if s is None: - logger.error("Not found requested service!") - return None + if s is None: + logger.error("Not found requested service!") + raise DonCorleoneException("Not found requested service") + # Special handling for config.json + if s.get("local", False) is True: + if config_name in s[SERVICE_PARAMETERS]: + return s[SERVICE_PARAMETERS][config_name] + else: + raise DonCorleoneException("Not found configuration. Try adding it to don_corleone/config.json") - # Special handling for config.json - if s.get("local", False) is True: - if config_name in s[SERVICE_CONFIG]: - return s[SERVICE_CONFIG][config_name] - else: - raise "Not found configuration. Try adding it to don_corleone/config.json" + # Handles request back to server + return _get_configuration_by_id(s[SERVICE_ID], config_name, node_config) + else: + return _get_configuration_by_name(service_name, config_name, node_config) - # Handles request back to server - return _get_configuration_by_id(s[SERVICE_ID], config_name, config) - - +OCEAN_ROOT_MARKER_FILE = '.__ocean_root__' -def _get_configuration_by_id(service_id, config_name, config=None): + +def get_ocean_root_dir(): + """ + Search for the root directory location and return it + """ + # TODO: Store it inside don_corleone config.json? It is inside ;) "home" ;p + lookup_dir = './' + while not os.path.isfile(lookup_dir + OCEAN_ROOT_MARKER_FILE): + lookup_dir += '../' + if os.path.abspath(lookup_dir) == '/': + raise SystemError( + 'Ocean root directory could not be found. ' + 'Is there a ' + OCEAN_ROOT_MARKER_FILE + ' file inside it?' + ) + return os.path.abspath(lookup_dir) + + + + +def _get_configuration_by_name(service_name, config_name, config=None): if config is None: config = json.load(open(os.path.join(os.path.dirname(__file__),"config.json"),"r")) - if config[MASTER_LOCAL] and os.system("./scripts/don_corleone_test.sh") != 0: + if config[MASTER_LOCAL] and os.system(don_test_file) != 0: raise "Error - not running don - should nt call _get_configuration_by_id" - + try: - params = urllib.urlencode({"service_id":service_id, "node_id":config[NODE_ID], "config_name":config_name}) + params = urllib.urlencode({"service_name":service_name, "node_id":config[NODE_ID], "config_name":config_name}) response = json.loads(urllib2.urlopen(get_don_corleone_url(config)+"/get_configuration?%s" % params).read()) # Sometimes it is incompatible @@ -250,21 +255,32 @@ def _get_configuration_by_id(service_id, config_name, config=None): logger.error("Failed get_configuration with error "+str(e)) return None +def _get_configuration_by_id(service_id, config_name, config=None): -OCEAN_ROOT_MARKER_FILE = '.__ocean_root__' + if config is None: + config = json.load(open(os.path.join(os.path.dirname(__file__),"config.json"),"r")) -def get_ocean_root_dir(): - """ - Search for the root directory location and return it - """ - # TODO: Store it inside don_corleone config.json? - lookup_dir = './' - while not os.path.isfile(lookup_dir + OCEAN_ROOT_MARKER_FILE): - lookup_dir += '../' - if os.path.abspath(lookup_dir) == '/': - raise SystemError( - 'Ocean root directory could not be found. ' - 'Is there a ' + OCEAN_ROOT_MARKER_FILE + ' file inside it?' - ) - return os.path.abspath(lookup_dir) \ No newline at end of file + if config[MASTER_LOCAL] and os.system(don_test_file) != 0: + raise "Error - not running don - should nt call _get_configuration_by_id" + + try: + params = urllib.urlencode({"service_id":service_id, "node_id":config[NODE_ID], "config_name":config_name}) + response = json.loads(urllib2.urlopen(get_don_corleone_url(config)+"/get_configuration?%s" % params).read()) + + # Sometimes it is incompatible + if has_succeded(response): + if response['result'] is str or response['result'] is unicode: + response['result'] = response['result'].replace("http","") + response['result'] = response['result'].replace("127.0.0.1", "localhost") + return response['result'] + + logger.error("Failed don corleone get_config with don error "+response['error']) + return None + + except Exception, e: + exc_type, exc_obj, exc_tb = sys.exc_info() + fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1] + print(exc_type, fname, exc_tb.tb_lineno) + logger.error("Failed get_configuration with error "+str(e)) + return None diff --git a/don_corleone/scripts/don_corleone_run.sh b/don_corleone/scripts/don_corleone_run.sh index 2c000fa..d38c50b 100755 --- a/don_corleone/scripts/don_corleone_run.sh +++ b/don_corleone/scripts/don_corleone_run.sh @@ -1,4 +1,5 @@ #!/bin/bash source ./init.sh source ./run_bg_job.sh -gunicorn -c gunicorn_config.py don_corleone:app +#sudo -E python don_corleone_server.p +gunicorn -c gunicorn_config.py don_corleone_server:app diff --git a/don_corleone/scripts/don_corleone_terminate.sh b/don_corleone/scripts/don_corleone_terminate.sh index 56a4d64..fc97564 100755 --- a/don_corleone/scripts/don_corleone_terminate.sh +++ b/don_corleone/scripts/don_corleone_terminate.sh @@ -2,7 +2,7 @@ source ./init.sh -list=$(ps -ef | grep don_corleone:app | grep -v grep | awk '{print $2}') +list=$(ps -ef | grep don_corleone_server:app | grep -v grep | awk '{print $2}') echo "Terminating" echo $list @@ -12,7 +12,7 @@ do echo `cat ocean_password` | sudo -S kill -9 $proc done -list=$(ps -ef | grep don_corleone:app | grep -v grep | awk '{print $2}') +list=$(ps -ef | grep don_corleone_server:app | grep -v grep | awk '{print $2}') if [ -z "$list" ] then echo "Terminating Don Corleone server... OK" diff --git a/don_corleone/scripts/don_corleone_test.sh b/don_corleone/scripts/don_corleone_test.sh index 0e1bf74..7e29150 100755 --- a/don_corleone/scripts/don_corleone_test.sh +++ b/don_corleone/scripts/don_corleone_test.sh @@ -1,3 +1,2 @@ #!/bin/bash -source ./init.sh -(( `ps ax | grep don_corleone:app | wc -l` > 2 )) +(( `ps ax | grep don_corleone_server:app | wc -l` > 2 )) diff --git a/don_corleone/scripts/lionfish_run.sh b/don_corleone/scripts/lionfish_run.sh index 1f58048..b2e5355 100755 --- a/don_corleone/scripts/lionfish_run.sh +++ b/don_corleone/scripts/lionfish_run.sh @@ -1,4 +1,7 @@ #!/bin/bash source ./init.sh echo "Running ODM Server" + sudo -E python2 ../lionfish/python_lionfish/server/odm_server.py ${@:1} + + diff --git a/don_corleone/scripts/lionfish_scala_run.sh b/don_corleone/scripts/lionfish_scala_run.sh new file mode 100755 index 0000000..18458bd --- /dev/null +++ b/don_corleone/scripts/lionfish_scala_run.sh @@ -0,0 +1,11 @@ +#!/bin/bash +source ./init.sh +echo "Running Lionfish Scala" +if [[ ! -f ../lionfish/lionfish.jar ]]; then + echo "Packaging lionfish into lionfish.jar" + cd ../lionfish && sudo -E sbt one-jar + cd ../lionfish && sudo -E find target -type f -name "*-one-jar.jar" -exec mv {} lionfish.jar \; + echo "Packaged lionfish into lionfish.jar" +fi + +cd ../lionfish && sudo java -jar lionfish.jar "${@:1}" diff --git a/don_corleone/scripts/lionfish_scala_terminate.sh b/don_corleone/scripts/lionfish_scala_terminate.sh new file mode 100755 index 0000000..1d1289f --- /dev/null +++ b/don_corleone/scripts/lionfish_scala_terminate.sh @@ -0,0 +1,16 @@ +#!/bin/bash +source ./init.sh +list=$(pgrep -f "lionfish.jar") + +for proc in $list +do + sudo -E kill -9 $proc +done + +list=$(pgrep -f "lionfish.jar") +if [ -z "$list" ] +then + echo "Terminating lionfish.. OK" + exit 0 +fi +exit 1 diff --git a/don_corleone/scripts/lionfish_scala_test.sh b/don_corleone/scripts/lionfish_scala_test.sh new file mode 100755 index 0000000..8953eed --- /dev/null +++ b/don_corleone/scripts/lionfish_scala_test.sh @@ -0,0 +1,13 @@ +#!/bin/bash +source ./init.sh + +list=$(pgrep -f "lionfish.jar") + + +if [ -n "$list" ] +then + echo "Running lionfish scala server" + exit 0 +fi +exit 1 + diff --git a/don_corleone/scripts/lionfish_terminate.sh b/don_corleone/scripts/lionfish_terminate.sh index dac6821..ff726d5 100755 --- a/don_corleone/scripts/lionfish_terminate.sh +++ b/don_corleone/scripts/lionfish_terminate.sh @@ -4,7 +4,7 @@ list=$(pgrep -f "odm_server.py") for proc in $list do - echo `cat ocean_password` | sudo -S kill -9 $proc + sudo -E kill -9 $proc done list=$(pgrep -f "odm_server.py") diff --git a/don_corleone/scripts/lionfish_test.py b/don_corleone/scripts/lionfish_test.py new file mode 100755 index 0000000..a5ef006 --- /dev/null +++ b/don_corleone/scripts/lionfish_test.py @@ -0,0 +1,30 @@ +#!/usr/bin/env python2 +# -*- coding: utf-8 -*- +import sys, os +sys.path.append(os.path.abspath(os.path.join(__file__, "../../../lionfish/"))) +sys.path.append(os.path.abspath(os.path.join(__file__, ".."))) +sys.path.append(os.path.abspath(os.path.join(__file__, "../.."))) + +import python_lionfish +from python_lionfish.client.client import Client +from don_corleone import don_utils as du + +def check_lionfish_communication(): + """ + Returns true if lionfish works OK + """ + lionfish_host = du.get_configuration('lionfish', 'host') + lionfish_port = du.get_configuration('lionfish', 'port') + lionfish_client = Client(lionfish_host, lionfish_port) + lionfish_client.connect() + found_instances = lionfish_client.get_by_uuid("xwdjwdwjw") + lionfish_client.disconnect() + return True + + + +if check_lionfish_communication(): + print "Lionfish running" + exit(0) +else: + exit(1) diff --git a/don_corleone/scripts/neo4j_terminate.sh b/don_corleone/scripts/neo4j_terminate.sh index 97e1a2f..8fb686b 100755 --- a/don_corleone/scripts/neo4j_terminate.sh +++ b/don_corleone/scripts/neo4j_terminate.sh @@ -1,3 +1,3 @@ #!/bin/bash source ./init.sh -sudo neo4j stop +sudo -E neo4j stop diff --git a/don_corleone/scripts/neo4j_test.py b/don_corleone/scripts/neo4j_test.py new file mode 100755 index 0000000..728a2df --- /dev/null +++ b/don_corleone/scripts/neo4j_test.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python2 +# -*- coding: utf-8 -*- +import sys, os + + +sys.path.append(os.path.abspath(os.path.join(__file__, "../../.."))) + +from don_corleone import don_utils as du +neo4j_host = du.get_configuration('neo4j', 'host') +neo4j_port = du.get_configuration('neo4j', 'port') + +print "Neo4j configured at ",neo4j_host, " ",neo4j_port + +try: + import socket + sock = socket.socket() + sock.connect((neo4j_host, neo4j_port)) + + print "Neo4j running" + + sock.close() +except: + print "Neo4j is not running" + exit(1) + diff --git a/don_corleone/test.sh b/don_corleone/test.sh new file mode 100755 index 0000000..12b8747 --- /dev/null +++ b/don_corleone/test.sh @@ -0,0 +1 @@ +echo ${@:1} diff --git a/don_corleone/tests/config_test_3.json b/don_corleone/tests/config_test_3.json index ac601e4..cc6c565 100755 --- a/don_corleone/tests/config_test_3.json +++ b/don_corleone/tests/config_test_3.json @@ -3,8 +3,8 @@ "master_local_url":"http://127.0.0.1:8881", "master_local":true, "node_responsibilities":[ - ["neo4j",{"port":7474, "host":"192.168.0.15", "local":true}], - ["lionfish",{"port":7777, "host":"192.168.0.15", "local":true}] + ["neo4j",{"port":7474, "host":"localhost", "local":true}], + ["lionfish",{"port":7777, "host":"localhost", "local":true}] ], "node_id":"staszek", "home":"/home/moje/Projekty/ocean/ocean", diff --git a/don_corleone/tests/jan_test_don_utils.py b/don_corleone/tests/jan_test_don_utils.py index 62c295a..b1f9f94 100644 --- a/don_corleone/tests/jan_test_don_utils.py +++ b/don_corleone/tests/jan_test_don_utils.py @@ -9,12 +9,12 @@ print get_running_service(service_name="spidercrab_master") - print get_running_service(service_id="neo4j_0",service_config={"port":7777}) + print get_running_service(service_id="neo4j_0", service_params={"port":7777}) services=[ {"service":"x", "service_id":"y", "service_config":{"a":1,"b":2}}] - print get_service(services, service_id="y", service_config={"a":1}) + print get_service(services, service_id="y", service_params={"a":1}) print "GET CONFIGURATION TESTS" print get_configuration_query("port", service_name="neo4j") diff --git a/don_corleone/tests/test_3.py b/don_corleone/tests/test_3.py index 984e52b..4dc72af 100644 --- a/don_corleone/tests/test_3.py +++ b/don_corleone/tests/test_3.py @@ -21,7 +21,10 @@ def test3(self): run_node(config, hang=False) print run_procedure(config, "get_services") print count_services(config) - assert(count_services(config) == 2) + + raw_input("Press [enter] to finish test") + + assert(count_services(config, running=True) == 2) print "Terminating don corleone node" # Terminate os.system("scripts/don_corleone_terminate.sh") diff --git a/don_corleone/tests/test_4.py b/don_corleone/tests/test_4.py index 1fabe3e..820c6dc 100644 --- a/don_corleone/tests/test_4.py +++ b/don_corleone/tests/test_4.py @@ -57,8 +57,8 @@ def test4(self): print run_procedure(config, "get_services") - assert( get_running_service(service_name="neo4j", config=config) is not None ) - assert( get_running_service(service_name="lionfish", config=config) is not None ) + assert( get_running_service(service_name="neo4j", node_config=config) is not None ) + assert( get_running_service(service_name="lionfish", node_config=config) is not None ) # response = json.loads(urllib2.urlopen(get_don_corleone_url(config) # +"/terminate_service?service_id=moj_neo4j").read()) diff --git a/don_corleone/tests/test_5.py b/don_corleone/tests/test_5.py new file mode 100644 index 0000000..6fc4009 --- /dev/null +++ b/don_corleone/tests/test_5.py @@ -0,0 +1,40 @@ +import json +import os +import sys +import unittest + +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) + +from run_node import run_node +from don_utils import get_configuration, run_procedure +from test_util import count_services, get_test_config +from terminate_node import terminate_node + +class BasicTests(unittest.TestCase): + + def test5(self): + """ Simple test - testing configuration with lionfish scala if is adding""" + + os.chdir(os.path.abspath("..")) + + # Prepare config file + self.config = get_test_config("config_test_5.json") + config = self.config + run_node(config, hang=False) + print run_procedure(config, "get_services") + print count_services(config) + + raw_input("Press [enter] to finish test") + + assert(count_services(config, running=True) == 3) + + config = self.config + print "Terminating don corleone noode" + terminate_node(config) + # Terminate + os.system("scripts/don_corleone_terminate.sh") + # Check if terminated correctly + assert(os.system("scripts/don_corleone_test.sh") != 0) + +if __name__ == "__main__": + unittest.main() diff --git a/don_corleone/tests/test_util.py b/don_corleone/tests/test_util.py index 4ba78f1..4c9254f 100644 --- a/don_corleone/tests/test_util.py +++ b/don_corleone/tests/test_util.py @@ -36,6 +36,6 @@ def get_test_config(config_name): return config_test -def count_services(config): - response = urllib2.urlopen(config[MASTER_LOCAL_URL] +"/get_services").read() +def count_services(config, running=False): + response = urllib2.urlopen(config[MASTER_LOCAL_URL] +"/get_services?running="+("true" if running else "false")).read() return len(json.loads(response)['result']) diff --git a/graph_workers/graph_defines.py b/graph_workers/graph_defines.py index e488468..656c45f 100644 --- a/graph_workers/graph_defines.py +++ b/graph_workers/graph_defines.py @@ -32,6 +32,7 @@ #root = graph_db.node(0) TAG_TYPE_MODEL_NAME = 'Tag' +FEED_TYPE_MODEL_NAME = 'Feed' CONTENT_TYPE_MODEL_NAME = "Content" NEOUSER_TYPE_MODEL_NAME = "NeoUser" NEWS_WEBSITE_TYPE_MODEL_NAME = "NewsWebsite" diff --git a/graph_workers/neo4j_wrapper.py b/graph_workers/neo4j_wrapper.py deleted file mode 100644 index 9b50aa5..0000000 --- a/graph_workers/neo4j_wrapper.py +++ /dev/null @@ -1,126 +0,0 @@ -""" Quick note about neo4j wrapper : -every node creation copies graph_db to inside of the class -when called update_properties it will use graph_db. -So if graph_db is closed - it fails. -""" -# TODO: implement wrapper (in 3rd iteration), which will check privileges of GraphWorker -# when writing to database and also wrap basic access patterns - - -from datetime import datetime, timedelta -from pytz import timezone -from py2neo import neo4j - -import py2neo -import time - - - - -#TODO: move to neo4j_wrapper to get_all_instances() -def get_all_instances(self, class_name): - """ - @returns lists of all news feeds in the system (as py2neo.node, - note: you can refer to properties by node["property"] and - to id by node._id :) - - if there were no needed fields, they are added :) - """ - query = \ - """ - START root=node(0) - MATCH root-[r:`<>`]->typenode-[q:`<>`]->n - WHERE typenode.name = { class_name } - RETURN n - """ - records = get_records_from_cypher(self.graph_db, query, params={"class_name": class_name}) - if type(records[0]) is py2neo.util.Record: return [r.n for r in records] - elif type(records[0]) is py2neo.neo4j.Node: return records - else: return None - - -def get_records_from_cypher(graph_db, cypher_query, params = None): - """ - Submit cypher cypher_query - @returns list of Records (py2neo class) or list of Nodes - - @warning: This function doesn;t work for all cases . I do not understand py2neo here - - - - TODO: Why is py2neo returning different things ????? - """ - my_batch = neo4j.ReadBatch(graph_db) - my_batch.append_cypher(cypher_query, params) - result = my_batch.submit() - if type(result) is py2neo.neo4j.Node: return [result] - if type(result[0]) is py2neo.neo4j.Node: return result - if type(result[0]) is list: return result[0] - else: return result - - -def count_same_news(graph_db, news_website, news_title): - my_batch = neo4j.ReadBatch(graph_db) - cypher_query = "START root=node(0) \n MATCH root-[r:`<>`]->"+\ - "n-[r2:`<>`]->w" + "\n WHERE n.model_name = "+\ - "\"Content\" \n"+\ - "and w.title = {news_title} \n" + "RETURN count(w)" - my_batch.append_cypher(cypher_query, {"news_title" : news_title.encode("utf8")}) - results = my_batch.submit() - return results[0] - - -def get_type_metanode(graph_db, model_name): - """ - @returns Metanode representing given "model_name" - """ - my_batch = neo4j.ReadBatch(graph_db) - my_batch.append_cypher( - """ - START v=node(0) - MATCH (v)-[]->(n) - WHERE has(n.model_name) - RETURN n - """ - ) - results = my_batch.submit() - metanode = None - for node in results[0]: - if node.n.get_properties()["model_name"] == model_name: - metanode = node.n - break - return metanode - - -def pubdate_to_datetime(pubdate): - """ Wrapper for annoying conversion""" - # Different time formats. Propses .. omg - #d = parser.parse(pubdate) - #d = d.replace(tzinfo=timezone("GMT")) - #return d - # Found better solution, but keeping the code for now - tok = pubdate.replace(",", "").replace(":", " ").split(" ") - try: - d = datetime.strptime(pubdate, "%a, %d %b %Y %H:%M:%S %Z") if \ - len(tok[3]) > 2 else time.strptime(pubdate, "%a, %d %b %y %H:%M:%S %Z") - return d.replace(tzinfo=timezone("GMT")) # make timezone aware for robustness - except: - # PYTHON 2.7 DOESNT SUPPORT %z DIRECTIVE !!!.. omg, official documentation.. - try: - offset = int(pubdate[-5:]) - delta = timedelta(hours=offset / 100) - d = datetime.strptime(pubdate[:-6], "%a, %d %b %Y %H:%M:%S") if \ - len(tok[3]) > 2 else datetime.strptime(pubdate[:-6], "%a, %d %b %y %H:%M:%S") - d -= delta - return d.replace(tzinfo=timezone("GMT")) # make timezone aware for robustness - except Exception, e: - print e - print pubdate - raise Exception("Wrong date format") - - -def datetime_to_pubdate(d): - """ - @returns "%a, %d %b %Y %H:%M:%S %Z" - """ - return d.strftime("%a, %d %b %Y %H:%M:%S %Z") #unify strftime diff --git a/graph_workers/spidercrab.py b/graph_workers/spidercrab.py index 64f199f..e3a8aac 100644 --- a/graph_workers/spidercrab.py +++ b/graph_workers/spidercrab.py @@ -15,15 +15,14 @@ import uuid ### TODO: this line shouldn't be here (it worked on Konrad's laptop?) adding toquickly test sys.path.append(os.path.join(os.path.dirname(__file__), '..')) -sys.path.append(os.path.join(os.path.dirname(__file__), '../lionfish')) +sys.path.append(os.path.join(os.path.dirname(__file__), '../lionfish/python_lionfish/client/')) from don_corleone import don_utils as du from graph_workers.graph_defines import * from graph_workers.graph_utils import * from graph_workers.graph_worker import GraphWorker from graph_workers.privileges import construct_full_privilege -import python_lionfish -from python_lionfish.client import Client +from client import Client # Defining levels to get rid of other loggers info_level = 100 @@ -129,7 +128,7 @@ def __init__( self.required_privileges = construct_full_privilege() # TODO: use configuration! - self.odm_client = Client('localhost', 7777) + self.odm_client = Client('localhost', 7777) self.terminate_event = threading.Event() self.runtime_id = runtime_id self.master_sources_urls_file = master_sources_urls_file @@ -339,7 +338,7 @@ def _check_and_pull_config(self): + str(self.given_config['graph_worker_id']) + '\'!') master_config = du.get_running_service( - service_config={ + service_params={ 'graph_worker_id': self.given_config['graph_worker_id'] }, enforce_running=False @@ -387,7 +386,7 @@ def _check_and_init_db(self): info_level, 'Spidercrab model not found in the database. Creating...' ) - self.odm_client.create_model('Spidercrab') + self.odm_client.create_model_node('Spidercrab') self.logger.log( info_level, 'Spidercrab model created.' diff --git a/graph_workers/tests/web_crawler_test_1_rss_wp.py b/graph_workers/tests/web_crawler_test_1_rss_wp.py deleted file mode 100644 index e1fcc2a..0000000 --- a/graph_workers/tests/web_crawler_test_1_rss_wp.py +++ /dev/null @@ -1,33 +0,0 @@ -""" -web_crawler_test_1_rss_wp.py -This simple test fetches 10 RSS feeds from rss.wp.pl website and then quits. -""" - -import sys -import time - -sys.path.append("../web_crawler") -from web_crawler import WebCrawler - -sys.path.append("..") -from privileges import construct_full_privilege, privileges_bigger_or_equal - - -master_crawler = WebCrawler.create_master ( - privileges = construct_full_privilege(), - start_url = "http://rss.wp.pl/" -) - - -WebCrawler.create_worker ( - master = master_crawler, - privileges = construct_full_privilege(), - max_internal_expansion = 10, - max_database_updates = 10 -) - -master_crawler.run() - -time.sleep(120) -master_crawler.terminate() - diff --git a/graph_workers/tests/web_crawler_test_2_wykop.py b/graph_workers/tests/web_crawler_test_2_wykop.py deleted file mode 100644 index a690e7a..0000000 --- a/graph_workers/tests/web_crawler_test_2_wykop.py +++ /dev/null @@ -1,32 +0,0 @@ -""" -web_crawler_test_2_wykop.py -This test precisely explores www.wykop.pl in search for RSS feeds. -""" - -import sys -import time - -sys.path.append("../web_crawler") -from web_crawler import WebCrawler - -sys.path.append("..") -from privileges import construct_full_privilege, privileges_bigger_or_equal - - -master_crawler = WebCrawler.create_master ( - privileges = construct_full_privilege(), - start_url = "http://www.wykop.pl/" -) - - -WebCrawler.create_worker ( - privileges = construct_full_privilege(), - master = master_crawler, - max_crawling_depth = 3 -) - -master_crawler.run() - -time.sleep(60*60*24*3) -master_crawler.terminate() - diff --git a/graph_workers/tests/web_crawler_test_3_supersurfer.py b/graph_workers/tests/web_crawler_test_3_supersurfer.py deleted file mode 100644 index 991c22b..0000000 --- a/graph_workers/tests/web_crawler_test_3_supersurfer.py +++ /dev/null @@ -1,35 +0,0 @@ -""" -web_crawler_test_2_supersurfer -The name of this test means that crawler will jump often to the distant -locations, increasing his depth quickly. -""" - -import sys -import time - -sys.path.append("../web_crawler") -from web_crawler import WebCrawler - -sys.path.append("..") -from privileges import construct_full_privilege, privileges_bigger_or_equal - - -master_crawler = WebCrawler.create_master ( - privileges = construct_full_privilege(), - start_url = "http://antyweb.pl/" -) - - -WebCrawler.create_worker ( - privileges = construct_full_privilege(), - master = master_crawler, - max_internal_expansion = 5, - max_external_expansion = 3, - max_crawling_depth = 100, -) - -master_crawler.run() - -time.sleep(60*60*24*3) -master_crawler.terminate() - diff --git a/graph_workers/tests/web_crawler_test_4_supersurfer_trap.py b/graph_workers/tests/web_crawler_test_4_supersurfer_trap.py deleted file mode 100644 index 5793b6d..0000000 --- a/graph_workers/tests/web_crawler_test_4_supersurfer_trap.py +++ /dev/null @@ -1,37 +0,0 @@ -""" -web_crawler_test_4_supersurfer.py -This supersurfer is in a trap. -Expanding only 2 first exteral urls from each website (not random choice) -will lead to trap, becouse of repetition of most popular websites urls -like google.com, twitter.com, facebook.com etc. -""" - -import sys -import time - -sys.path.append("../web_crawler") -from web_crawler import WebCrawler - -sys.path.append("..") -from privileges import construct_full_privilege, privileges_bigger_or_equal - - -master_crawler = WebCrawler.create_master ( - privileges = construct_full_privilege(), - start_url = "http://antyweb.pl/" -) - - -WebCrawler.create_worker ( - privileges = construct_full_privilege(), - master = master_crawler, - max_internal_expansion = 5, - max_external_expansion = 2, - max_crawling_depth = 100, -) - -master_crawler.run() - -time.sleep(60*60*24*3) -master_crawler.terminate() - diff --git a/lionfish/build.sbt b/lionfish/build.sbt index 591e1e9..d71c9b3 100755 --- a/lionfish/build.sbt +++ b/lionfish/build.sbt @@ -1,6 +1,10 @@ +import com.github.retronym.SbtOneJar._ + name := "lionfish" -version := "0.9.3" +oneJarSettings + +version := "0.9.5" scalaVersion := "2.10.4" @@ -47,7 +51,7 @@ libraryDependencies += "org.neo4j.app" % "neo4j-server" % "2.0.3" classifier "st //libraryDependencies += "commons-beanutils" % "commons-beanutils-core" % "1.8.0" libraryDependencies ++= Seq( - "org.neo4j.app" % "neo4j-server" % "2.0.2" + "org.neo4j.app" % "neo4j-server" % "2.0.3" ) libraryDependencies ++= Seq("com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.3.3") @@ -58,3 +62,5 @@ libraryDependencies ++= Seq( ) libraryDependencies += "org.scalatest" % "scalatest_2.10" % "2.1.4" % "test" + +libraryDependencies += "commons-lang" % "commons-lang" % "2.6" diff --git a/lionfish/project/plugins.sbt b/lionfish/project/plugins.sbt new file mode 100755 index 0000000..8331cfe --- /dev/null +++ b/lionfish/project/plugins.sbt @@ -0,0 +1,7 @@ +resolvers += "Sonatype snapshots" at "https://oss.sonatype.org/content/repositories/snapshots/" + +addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.7.0-SNAPSHOT") + +addSbtPlugin("org.scala-sbt.plugins" % "sbt-onejar" % "0.8") + +logLevel := Level.Warn diff --git a/lionfish/python_lionfish/__init__.py b/lionfish/python_lionfish/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/lionfish/python_lionfish/client/__init__.py b/lionfish/python_lionfish/client/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/lionfish/python_lionfish/client/client.py b/lionfish/python_lionfish/client/client.py index de2579e..d65522a 100755 --- a/lionfish/python_lionfish/client/client.py +++ b/lionfish/python_lionfish/client/client.py @@ -7,9 +7,20 @@ # The new Lionfish client (works properly only with the new server which is # based on Scala). -# HOST = 'localhost' # 'ocean-lionfish.no-ip.biz' -# PORT = 7777 +sys.path.append(os.path.abspath(os.path.join(__file__, "../../../../"))) +PORT, ADDRESS = 0, "" + +try: + import don_corleone.don_utils as du + ADDRESS = du.get_configuration('lionfish', 'host') + PORT = du.get_configuration('lionfish', 'port') +except Exception, e: + print "FAILED TO FETCH ADDRESS AND PORT CONFIGURATION FROM DON CORLEONE" + print str(e) + exit(1) + + sys.path.append(os.path.join(os.path.dirname(__file__), '../../../graph_workers')) from graph_utils import * @@ -72,7 +83,7 @@ def submit(self): self.count = 0 return results - def __init__(self, address, port): + def __init__(self, address=ADDRESS, port=PORT): self._address = address self._port = port self._conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM) @@ -80,7 +91,7 @@ def __init__(self, address, port): try: self._conn.connect((self._address, self._port)) except Exception as e: - raise Exception('Connecting failed. {error}'.format(error=str(e))) + raise Exception('Failed to connect with server. {error}'.format(error=str(e))) def connect(self): pass @@ -88,31 +99,63 @@ def connect(self): def disconnect(self): try: self._conn.close() - logger.log(info_level, 'The client has disconnected.') except Exception as e: - logger.log(error_level, 'Disconnecting failed. {error}' + logger.log(error_level, 'Failed to disconnect from server. {error}' .format(error=str(e))) + raise e def send(self, data): try: send_message(self._conn, data) except Exception, e: logger.log(info_level, 'Not sent data {data}'.format(data=data)) - logger.log(error_level, 'Sending data failed. {error}' + logger.log(error_level, 'Failed to send data. {error}' .format(error=str(e))) + raise e def recv(self): data = None try: data = get_message(self._conn) except Exception as e: - logger.log(error_level, 'Receiving data failed. {error}' + logger.log(error_level, 'Failed to receive data. {error}' .format(error=str(e))) + raise e return data def get_batch(self): return self.Batch(self) + def execute_query(self, query, **params): + """ + Executes query with given params + @param query string + @param params dictionary/keywords + """ + data = { + 'methodName': 'executeQuery', + 'args': { + 'query': query, + 'parameters': params + } + } + + request = { + 'type': 'sequence', + 'tasks': [data] + } + + if inspect.stack()[1][3] == '_get_data_for_batch': + return data + self.send(request) + result = None + try: + result = self.recv() + except: + logger.log(error_level, 'Failed to execute query. {error}' + .format(error=str(e))) + return result[0] + def get_by_uuid(self, uuid, **params): """ Gets a node of given uuid @@ -184,11 +227,33 @@ def get_by_tag(self, tag, **params): result = self.recv() return result[0] + def get_by_username(self, username, **params): + """ + Gets a node by given username + @param username string + """ + data = { + 'methodName': 'getByUsername', + 'args': { + 'username': username + } + } + + request = { + 'type': 'sequence', + 'tasks': [data] + } + + if inspect.stack()[1][3] == '_get_data_for_batch': + return data + self.send(request) + result = self.recv() + return result[0] + def get_by_label(self, label, **params): """ - Gets a node by given model_name and link - @param model_name string - @param link string + Gets a node by given label + @param label string """ data = { 'methodName': 'getByLabel', @@ -286,6 +351,29 @@ def get_instances(self, model_name, children_properties={}, relationship_propert result = self.recv() return result[0] + def get_user_feeds(self, uuid, **params): + """ + Gets all feeds of given user uuid + @param uuid string + """ + data = { + 'methodName': 'getUserFeeds', + 'args': { + 'uuid': uuid + } + } + + request = { + 'type': 'sequence', + 'tasks': [data] + } + + if inspect.stack()[1][3] == '_get_data_for_batch': + return data + self.send(request) + result = self.recv() + return result[0] + def set(self, uuid, **properties): """ Sets properties to a node of given uuid @@ -358,6 +446,31 @@ def delete_properties(self, uuid, **property_keys): self.send(request) self.recv() + def create_model_node(self, model_name, **properties): + """ + Creates a node with properties to the model given by model_name + @param model_name string + @param relationship_type string + @param properties dictionary/keywords + """ + data = { + 'methodName': 'createModelNodes', + 'args': { + 'modelName': model_name + } + } + + request = { + 'type': 'sequence', + 'tasks': [data] + } + + if inspect.stack()[1][3] == '_get_data_for_batch': + return data + self.send(request) + result = self.recv() + return result[0] + def create_node(self, model_name, relationship_type='<>', **properties): """ Creates a node with properties to the model given by model_name @@ -514,46 +627,3 @@ def delete_relationship_properties(self, start_node_uuid, end_node_uuid, **prop_ return data self.send(request) self.recv() - - # def execute_query(self, query_string, **params): - # """ - # Executes query_string with given query_params - # and returns results as python dictionaries - # @param query_string string - # @param params dictionary/keywords - # """ - # data = { - # 'methodName': 'execute_query', - # 'args': [query_string, params] - # } - # - # request = { - # 'type': 'sequence', - # 'tasks': [data] - # } - # - # if inspect.stack()[1][3] == '_get_data_for_batch': - # return data - # self.send(request) - # return self.recv() - # - # def run_query(self, query_string, **params): - # """ - # Runs query_string with given query_params - # @param query_string string - # @param params dictionary/keywords - # """ - # data = { - # 'methodName': 'run_query', - # 'args': [query_string, params] - # } - # - # request = { - # 'type': 'sequence', - # 'tasks': [data] - # } - # - # if inspect.stack()[1][3] == '_get_data_for_batch': - # return data - # self.send(request) - # self.recv() diff --git a/lionfish/run.py b/lionfish/run.py new file mode 100755 index 0000000..1b676ff --- /dev/null +++ b/lionfish/run.py @@ -0,0 +1,7 @@ +#!/usr/bin/env python2 +# -*- coding: utf-8 -*- + + +""" Script running python or scala based server """ + + diff --git a/lionfish/src/main/scala/com/lionfish/client/Database.scala b/lionfish/src/main/scala/com/lionfish/client/Database.scala index 552f113..7d44d85 100755 --- a/lionfish/src/main/scala/com/lionfish/client/Database.scala +++ b/lionfish/src/main/scala/com/lionfish/client/Database.scala @@ -32,6 +32,25 @@ object Database extends Factory { new SequenceStream(serverAddress, serverPort) } + /** + * Executes Cypher query + * @return list of lists of data + */ + case class executeQuery(private val query: String, + private val parameters: Map[String, Any]) extends Method { + override def getRequest: Map[String, Any] = { + val request = Map( + "methodName" -> "executeQuery", + "args" -> Map( + "query" -> query, + "parameters" -> parameters + ) + ) + + request + } + } + case class getByUuid(private val nodeUuid: String) extends Method { override def getRequest: Map[String, Any] = { val request = Map( @@ -45,7 +64,8 @@ object Database extends Factory { } } - case class getByLink(private val modelName: String, link: String) extends Method { + case class getByLink(private val modelName: String, + private val link: String) extends Method { override def getRequest: Map[String, Any] = { val request = Map( "methodName" -> "getByLink", @@ -85,6 +105,19 @@ object Database extends Factory { } } + case class getByUsername(private val username: String) extends Method { + override def getRequest: Map[String, Any] = { + val request = Map( + "methodName" -> "getByUsername", + "args" -> Map( + "username" -> username + ) + ) + + request + } + } + case class getModelNodes() extends Method { override def getRequest: Map[String, Any] = { val request = Map( @@ -96,9 +129,10 @@ object Database extends Factory { } } - case class getChildren(private val parentUuid: String, relationshipType: String, - childrenProperties: Map[String, Any] = Map(), - relationshipProperties: Map[String, Any] = Map()) extends Method { + case class getChildren(private val parentUuid: String, + private val relationshipType: String, + private val childrenProperties: Map[String, Any] = Map(), + private val relationshipProperties: Map[String, Any] = Map()) extends Method { override def getRequest: Map[String, Any] = { val request = Map( "methodName" -> "getChildren", @@ -115,8 +149,8 @@ object Database extends Factory { } case class getInstances(private val modelName: String, - childrenProperties: Map[String, Any] = Map(), - relationshipProperties: Map[String, Any] = Map()) extends Method { + private val childrenProperties: Map[String, Any] = Map(), + private val relationshipProperties: Map[String, Any] = Map()) extends Method { override def getRequest: Map[String, Any] = { val request = Map( "methodName" -> "getInstances", @@ -174,7 +208,8 @@ object Database extends Factory { } } - case class setProperties(private val uuid: String, properties: Map[String, Any]) + case class setProperties(private val uuid: String, + private val properties: Map[String, Any]) extends Method { override def getRequest: Map[String, Any] = { val request = Map( @@ -189,7 +224,8 @@ object Database extends Factory { } } - case class deleteProperties(private val uuid: String, propertyKeys: List[String]) + case class deleteProperties(private val uuid: String, + private val propertyKeys: List[String]) extends Method { override def getRequest: Map[String, Any] = { val request = Map( @@ -204,8 +240,22 @@ object Database extends Factory { } } - case class createNode(private val modelName: String, relationshipType: String, - properties: Map[String, Any]) extends Method { + case class createModelNode(private val modelName: String) extends Method { + override def getRequest: Map[String, Any] = { + val request = Map( + "methodName" -> "createModelNodes", + "args" -> Map( + "modelName" -> modelName + ) + ) + + request + } + } + + case class createNode(private val modelName: String, + private val relationshipType: String, + private val properties: Map[String, Any]) extends Method { override def getRequest: Map[String, Any] = { val request = Map( "methodName" -> "createNodes", @@ -233,9 +283,10 @@ object Database extends Factory { } } - case class createRelationship(private val startNodeUuid: String, endNodeUuid: String, - relationshipType: String, properties: Map[String, Any] = Map()) - extends Method { + case class createRelationship(private val startNodeUuid: String, + private val endNodeUuid: String, + private val relationshipType: String, + private val properties: Map[String, Any] = Map()) extends Method { override def getRequest: Map[String, Any] = { val request = Map( "methodName" -> "createRelationships", @@ -251,8 +302,8 @@ object Database extends Factory { } } - case class deleteRelationship(private val startNodeUuid: String, endNodeUuid: String) - extends Method { + case class deleteRelationship(private val startNodeUuid: String, + private val endNodeUuid: String) extends Method { override def getRequest: Map[String, Any] = { val request = Map( "methodName" -> "deleteRelationships", @@ -266,9 +317,9 @@ object Database extends Factory { } } - case class setRelationshipProperties(private val startNodeUuid: String, endNodeUuid: String, - properties: Map[String, Any]) - extends Method { + case class setRelationshipProperties(private val startNodeUuid: String, + private val endNodeUuid: String, + private val properties: Map[String, Any]) extends Method { override def getRequest: Map[String, Any] = { val request = Map( "methodName" -> "setRelationshipProperties", @@ -283,9 +334,9 @@ object Database extends Factory { } } - case class deleteRelationshipProperties(private val startNodeUuid: String, endNodeUuid: String, - propertyKeys: List[String]) - extends Method { + case class deleteRelationshipProperties(private val startNodeUuid: String, + private val endNodeUuid: String, + private val propertyKeys: List[String]) extends Method { override def getRequest: Map[String, Any] = { val request = Map( "methodName" -> "deleteRelationshipProperties", diff --git a/lionfish/src/main/scala/com/lionfish/client/SequenceStream.scala b/lionfish/src/main/scala/com/lionfish/client/SequenceStream.scala index 0a6d0fb..692f045 100755 --- a/lionfish/src/main/scala/com/lionfish/client/SequenceStream.scala +++ b/lionfish/src/main/scala/com/lionfish/client/SequenceStream.scala @@ -5,23 +5,12 @@ class SequenceStream( protected val serverPort: Int) extends Stream { override def !!(method: Method): Any = { - val result = method.executeSequence().asInstanceOf[List[Any]] - - if (result != null && result.length == 1) { - result(0) - } else { - result - } + method.executeSequence() } override def execute(): Any = { - val result = macroMethod.executeSequence().asInstanceOf[List[Any]] + val result = macroMethod.executeSequence() macroMethod = null - - if (result != null && result.length == 1) { - result(0) - } else { - result - } + result } } diff --git a/lionfish/src/main/scala/com/lionfish/server/Connection.scala b/lionfish/src/main/scala/com/lionfish/server/Connection.scala index 5c9cbbc..8072ae7 100644 --- a/lionfish/src/main/scala/com/lionfish/server/Connection.scala +++ b/lionfish/src/main/scala/com/lionfish/server/Connection.scala @@ -13,6 +13,7 @@ import com.lionfish.messages.Request class Connection(private val master: ActorRef)(private implicit val socket: Socket) extends Runnable { private implicit val timeout = Timeout(600 seconds) private val connectionUuid = UUID.randomUUID().toString + println(s"New connection $connectionUuid") private def disconnect() = { try { diff --git a/lionfish/src/main/scala/com/lionfish/server/DatabaseManager.scala b/lionfish/src/main/scala/com/lionfish/server/DatabaseManager.scala index a23e245..93df925 100755 --- a/lionfish/src/main/scala/com/lionfish/server/DatabaseManager.scala +++ b/lionfish/src/main/scala/com/lionfish/server/DatabaseManager.scala @@ -18,30 +18,28 @@ import com.lionfish.utils.Config // TODO: logging, nicer way of handling errors object DatabaseManager { - private val databasePath = "/data/graph.db" - private var neo4jPath = Config.defaultNeo4jPath - private var neo4jConsolePort = Config.defaultNeo4jConsolePort - private val graphDB = new GraphDatabaseFactory().newEmbeddedDatabase(neo4jPath + databasePath) - - private val globalOperations = GlobalGraphOperations.at(graphDB) - private val cypherEngine = new ExecutionEngine(graphDB) - private var cypherResult: ExecutionResult = null - - private val config = new ServerConfigurator(graphDB.asInstanceOf[GraphDatabaseAPI]) - config.configuration().setProperty( - Configurator.WEBSERVER_PORT_PROPERTY_KEY, neo4jConsolePort - ) - - config.configuration().setProperty( - Configurator.HTTP_LOGGING, Configurator.DEFAULT_HTTP_LOGGING - ) - - private val srv = new WrappingNeoServerBootstrapper(graphDB.asInstanceOf[GraphDatabaseAPI], config) - srv.start() - - // Simple cache of model nodes - private var modelNodes: Map[String, Node] = Map() - initCache() + val databasePath = "/data/graph.db" + var neo4jPath = Config.defaultNeo4jPath + var neo4jConsolePort = Config.defaultNeo4jConsolePort + val graphDB = new GraphDatabaseFactory().newEmbeddedDatabase(neo4jPath + databasePath) + + val globalOperations = GlobalGraphOperations.at(graphDB) + val cypherEngine = new ExecutionEngine(graphDB) + var cypherResult: ExecutionResult = null + + def initNeo4jConsole(){ + val config = new ServerConfigurator(graphDB.asInstanceOf[GraphDatabaseAPI]) + config.configuration().setProperty( + Configurator.WEBSERVER_PORT_PROPERTY_KEY, neo4jConsolePort + ) + + config.configuration().setProperty( + Configurator.HTTP_LOGGING, Configurator.DEFAULT_HTTP_LOGGING + ) + + val srv = new WrappingNeoServerBootstrapper(graphDB.asInstanceOf[GraphDatabaseAPI], config) + srv.start() + } def setNeo4jPath(path: String) = { neo4jPath = path @@ -134,8 +132,17 @@ object DatabaseManager { for (row: Map[String, Any] <- cypherResult) { var rowItems: ListBuffer[Any] = ListBuffer() for (column <- row) { - // TODO: Do not assume that every value is Node - rowItems += parseMap(column._2.asInstanceOf[Node]) + column._2 match { + case node: Node => { + rowItems += parseMap(node) + } + case rel: Relationship => { + rowItems += parseMap(rel) + } + case sth: Any => { + rowItems += sth + } + } } parsedResult += rowItems.toList } @@ -150,26 +157,35 @@ object DatabaseManager { } } - private def initCache() = { + def executeQuery(args: List[Map[String, Any]]): List[Any] = { + var result: List[Any] = null + + val tx = graphDB.beginTx() try { - // Simply gets model nodes - val tx = graphDB.beginTx() - val rawResult = globalOperations.getAllNodesWithLabel(DynamicLabel.label("Model")) - tx.success() + val rawResult: ListBuffer[Any] = ListBuffer() - // Saves result - val it = rawResult.iterator() - while (it.hasNext) { - val node = it.next() - modelNodes += node.getProperty("model_name").asInstanceOf[String] -> node + for (item <- args) { + val query = item("query").asInstanceOf[String] + val params = item("parameters").asInstanceOf[Map[String, Any]] + + // Executes query + val returnedData = executeCypher(query, params) + rawResult += returnedData } - it.close() + tx.success() + result = rawResult.toList } catch { case e: Exception => { val line = e.getStackTrace()(2).getLineNumber - println(s"Initialising cache failed at line $line. Error message: $e") + println(s"Failed to execute the function at line $line. Error message: $e") } + tx.failure() + result = List() + } finally { + tx.close() } + + result } def getByUuid(args: List[Map[String, Any]]): List[Any] = { @@ -286,6 +302,44 @@ object DatabaseManager { result } + def getByUsername(args: List[Map[String, Any]]): List[Any] = { + var result: List[Any] = null + + val tx = graphDB.beginTx() + try { + val rawResult: ListBuffer[Any] = ListBuffer() + val label = DynamicLabel.label("NeoUser") + + // Gets nodes by uuid + for (item <- args) { + // Extracts result + val username = item("username").asInstanceOf[String] + val rawNode = graphDB.findNodesByLabelAndProperty(label, "username", username) + + val it = rawNode.iterator() + if (it.hasNext) { + rawResult += parseMap(it.next()) + } else { + rawResult += null + } + it.close() + } + tx.success() + result = rawResult.toList + } catch { + case e: Exception => { + val line = e.getStackTrace()(2).getLineNumber + println(s"Failed to execute the function at line $line. Error message: $e") + } + tx.failure() + result = List() + } finally { + tx.close() + } + + result + } + def getByLabel(args: List[Map[String, Any]]): List[Any] = { var result: List[Any] = null @@ -717,6 +771,72 @@ object DatabaseManager { null } + def createModelNodes(args: List[Map[String, Any]]): List[Any] = { + var result: List[Map[String, Any]] = null + + val tx = graphDB.beginTx() + try { + val rawResult: ListBuffer[Map[String, Any]] = ListBuffer() + val rootLabel = DynamicLabel.label("Root") + + // Gets raw root + val rawRoot = globalOperations.getAllNodesWithLabel(rootLabel) + + // Extracts root + var root: Node = null + val it = rawRoot.iterator() + if (it.hasNext) { + root = it.next() + } else { + // TODO: for development purposes + val nodeLabel = DynamicLabel.label("Node") + root = graphDB.createNode(rootLabel, nodeLabel) + root.setProperty("uuid", "root") + } + it.close() + + for (item <- args) { + val modelName = item("modelName").asInstanceOf[String] + val modelProps: Map[String, Any] = Map( + "uuid" -> UUID.randomUUID().toString, + "app_label" -> "rss", + "name" -> ("rss:" + modelName), + "model_name" -> modelName + ) + + val nodeLabel = DynamicLabel.label("Node") + val modelLabel = DynamicLabel.label("Model") + val modelNode = graphDB.createNode() + modelNode.addLabel(nodeLabel) + modelNode.addLabel(modelLabel) + + // Sets properties + for ((key, value) <- modelProps) { + modelNode.setProperty(key, value) + } + + // Creates relationship from root + val rel = DynamicRelationshipType.withName("<>") + root.createRelationshipTo(modelNode, rel) + + rawResult += Map("uuid" -> modelProps("uuid")) + } + tx.success() + result = rawResult.toList + } catch { + case e: Exception => { + val line = e.getStackTrace()(2).getLineNumber + println(s"Failed to execute the function at line $line. Error message: $e") + } + tx.failure() + result = List() + } finally { + tx.close() + } + + result + } + // TODO: Add defaults def createNodes(args: List[Map[String, Any]]): List[Any] = { var result: List[Map[String, Any]] = null @@ -724,6 +844,19 @@ object DatabaseManager { val tx = graphDB.beginTx() try { val rawResult: ListBuffer[Map[String, Any]] = ListBuffer() + var modelNodes: Map[String, Node] = Map() + + // Simply gets model nodes + val modelLabel = DynamicLabel.label("Model") + val rawModelResult = globalOperations.getAllNodesWithLabel(modelLabel) + + // Saves model nodes + val it = rawModelResult.iterator() + while (it.hasNext) { + val node = it.next() + modelNodes += node.getProperty("model_name").asInstanceOf[String] -> node + } + it.close() // Creates nodes by given properties for (params <- args) { diff --git a/lionfish/src/main/scala/com/lionfish/server/Launcher.scala b/lionfish/src/main/scala/com/lionfish/server/Launcher.scala index 2cd7bf8..a2b0835 100755 --- a/lionfish/src/main/scala/com/lionfish/server/Launcher.scala +++ b/lionfish/src/main/scala/com/lionfish/server/Launcher.scala @@ -17,6 +17,7 @@ object Launcher { case "--port" => { try { result += "port" -> arg(1).toInt + println("port") } catch { case e: Exception => println("Invalid parameter: port") } @@ -24,6 +25,7 @@ object Launcher { case "--neo4j-path" => { try { result += "neo4j-path" -> arg(1) + println("neo4j-path") } catch { case e: Exception => println("Invalid parameter: neo4j-path") } @@ -31,6 +33,7 @@ object Launcher { case "--neo4j-console-port" => { try { result += "neo4j-console-port" -> arg(1).toInt + println("neo4j-console-port") } catch { case e: Exception => println("Invalid parameter: neo4j-console-port") } @@ -65,7 +68,8 @@ object Launcher { DatabaseManager.setNeo4jConsolePort(params("neo4j-console-port").asInstanceOf[Int]) } - DatabaseManager + + DatabaseManager.initNeo4jConsole() new Thread(Server).start() } } diff --git a/lionfish/src/main/scala/com/lionfish/server/RequestHandler.scala b/lionfish/src/main/scala/com/lionfish/server/RequestHandler.scala index ea257a8..526983c 100755 --- a/lionfish/src/main/scala/com/lionfish/server/RequestHandler.scala +++ b/lionfish/src/main/scala/com/lionfish/server/RequestHandler.scala @@ -19,11 +19,14 @@ class RequestHandler extends Actor { fullArgs += item(0).asInstanceOf[Map[String, Any]] } - println(s"Executing $methodName in batch.") + println(s"Executing $methodName in a batch.") // TODO: Solve this with reflection var rawResult: List[Any] = null methodName match { + case "executeQuery" => { + rawResult = DatabaseManager.executeQuery(fullArgs.toList) + } case "getByUuid" => { rawResult = DatabaseManager.getByUuid(fullArgs.toList) } @@ -33,6 +36,9 @@ class RequestHandler extends Actor { case "getByTag" => { rawResult = DatabaseManager.getByTag(fullArgs.toList) } + case "getByUsername" => { + rawResult = DatabaseManager.getByUsername(fullArgs.toList) + } case "getByLabel" => { rawResult = DatabaseManager.getByLabel(fullArgs.toList) } @@ -60,6 +66,9 @@ class RequestHandler extends Actor { case "deleteProperties" => { rawResult = DatabaseManager.deleteProperties(fullArgs.toList) } + case "createModelNodes" => { + rawResult = DatabaseManager.createModelNodes(fullArgs.toList) + } case "createNodes" => { rawResult = DatabaseManager.createNodes(fullArgs.toList) } @@ -106,11 +115,14 @@ class RequestHandler extends Actor { val methodName = item("methodName").asInstanceOf[String] val args = List(item("args").asInstanceOf[Map[String, Any]]) - println(s"Executing $methodName in sequence.") + println(s"Executing $methodName in a sequence.") // TODO: Solve this with reflection var rawResult: List[Any] = null methodName match { + case "executeQuery" => { + rawResult = DatabaseManager.executeQuery(args) + } case "getByUuid" => { rawResult = DatabaseManager.getByUuid(args) } @@ -120,6 +132,9 @@ class RequestHandler extends Actor { case "getByTag" => { rawResult = DatabaseManager.getByTag(args) } + case "getByUsername" => { + rawResult = DatabaseManager.getByUsername(args) + } case "getByLabel" => { rawResult = DatabaseManager.getByLabel(args) } @@ -147,6 +162,9 @@ class RequestHandler extends Actor { case "deleteProperties" => { rawResult = DatabaseManager.deleteProperties(args) } + case "createModelNodes" => { + rawResult = DatabaseManager.createModelNodes(args) + } case "createNodes" => { rawResult = DatabaseManager.createNodes(args) } @@ -180,7 +198,7 @@ class RequestHandler extends Actor { case e: Exception => { log.error(s"Failed to execute a sequence. Error message: $e") } - null + List() } } @@ -200,7 +218,7 @@ class RequestHandler extends Actor { case e: Exception => { log.error(s"Failed to process a request. Error message: $e") } - null + List() } } diff --git a/mantis_shrimp/scripts/add_logs.py b/mantis_shrimp/scripts/add_logs.py new file mode 100644 index 0000000..e181543 --- /dev/null +++ b/mantis_shrimp/scripts/add_logs.py @@ -0,0 +1,58 @@ +#!/usr/bin/env python +import pika +import sys + +""" +Simple script pushing datasets to kafka +""" + +import fnmatch +import os +import sys +from optparse import OptionParser +from nltk.tokenize import * +import logging +import codecs +# Try switching of kafka-python logger +try: + logging.getLogger("pika").setLevel(logging.ERROR) +except: + pass + +from sklearn.feature_extraction.text import CountVectorizer +from sklearn.feature_extraction.text import TfidfVectorizer + +sys.path.append(os.path.join(os.path.dirname(__file__), '../../don_corleone/')) + +from don_utils import get_configuration +import json + + +if __name__ == "__main__": + credentials = pika.PlainCredentials('admin', 'password') + connection = pika.BlockingConnection(pika.ConnectionParameters( + 'localhost', credentials=credentials)) + + + + channel = connection.channel() + + channel.queue_declare(queue='ocean_log') + + + for id in xrange(100): + + print "Sending ", id + + news = {"message":str(id), "temp_field":str(id+10)} + message = json.dumps(news).encode("utf-8") + + channel.basic_publish(exchange='', + routing_key='ocean_log', + body=message) + + + import time + time.sleep(0.1) + + connection.close() diff --git a/mantis_shrimp/src/main/scala/mantisshrimp/main.scala b/mantis_shrimp/src/main/scala/mantisshrimp/main.scala index 8f4a184..8cda744 100644 --- a/mantis_shrimp/src/main/scala/mantisshrimp/main.scala +++ b/mantis_shrimp/src/main/scala/mantisshrimp/main.scala @@ -1,6 +1,9 @@ /** * Main file running system */ + +//TODO: localhost!=127.0.0.1? See to that and replace in config to localhost everything + package mantisshrimp import com.typesafe.config.ConfigFactory @@ -50,7 +53,7 @@ case class Config( mantis_master_name: String = "mantis_master", config_path: String = "mantis.conf" , logging_strategy: String = MantisLiterals.MantisLoggerStdErrConf, - lionfish_host: String = "127.0.0.1", + lionfish_host: String = "localhost", lionfish_port: Int = 7777 ) @@ -265,8 +268,32 @@ object Main extends App{ } + val port = config.port + val hostname = config.host + + val config_string = s""" + akka { + actor { + provider = "akka.remote.RemoteActorRefProvider" + } + remote { + enabled-transports = ["akka.remote.netty.tcp"] + netty.tcp { + hostname = $hostname + port = $port + } + } + } + """ + + //Create system + system = ActorSystem(config.actor_system_name, + ConfigFactory.load(ConfigFactory.parseString(config_string))) + + + val config_local = Map[String, String](MantisLiterals.ParentMantisPath->"") val sample_job = system.actorOf(Props(new - MantisNewsDumperLionfish(Map[String, String]("parentMantisPath"->""))), "tmp") + MantisNewsDumperLionfish(config_local)), "tmp") //runMantisShrimp diff --git a/scripts/ocean_exemplary_data.py b/scripts/ocean_exemplary_data.py index 1814f8b..79c4fe3 100755 --- a/scripts/ocean_exemplary_data.py +++ b/scripts/ocean_exemplary_data.py @@ -14,13 +14,14 @@ from py2neo import neo4j sys.path.append(os.path.join(os.path.dirname(__file__), '../don_corleone/')) +sys.path.append(os.path.join(os.path.dirname(__file__), '../lionfish/python_lionfish/client/')) from don_utils import get_configuration sys.path.append('../graph_workers/') lib_path = os.path.abspath('./graph_workers') sys.path.append(lib_path) from graph_workers.graph_defines import * -from odm_client import ODMClient +from client import Client SOURCE_FILE = '../data/contentsource_nodes_exemplary' @@ -67,7 +68,7 @@ print '(You are permitted to run whole system too during this process)' enter = raw_input() - odm_client = ODMClient() + odm_client = Client('localhost', 7777) odm_client.connect() odm_batch = odm_client.get_batch() @@ -97,6 +98,7 @@ try: cs_node = eval(unicode(cs)) cs_node['last_updated'] = 0 + cs_node['description'] = cs_node['description'].encode('utf-8') odm_batch.append( odm_client.create_node, diff --git a/scripts/ocean_init_graph.py b/scripts/ocean_init_graph.py index 4f50c1a..8c34ad8 100755 --- a/scripts/ocean_init_graph.py +++ b/scripts/ocean_init_graph.py @@ -99,6 +99,12 @@ def run_and_return_type_list(): app_label=APP_LABEL, name=APP_LABEL+':'+CONTENT_TYPE_MODEL_NAME, model_name=CONTENT_TYPE_MODEL_NAME + ), + node( + uuid=str(uuid.uuid1()), + app_label=APP_LABEL, + name=APP_LABEL+':'+FEED_TYPE_MODEL_NAME, + model_name=FEED_TYPE_MODEL_NAME ) ] @@ -111,11 +117,11 @@ def run_and_return_type_list(): rel(root, HAS_TYPE_RELATION, type_list[0]), rel(root, HAS_TYPE_RELATION, type_list[1]), rel(root, HAS_TYPE_RELATION, type_list[2]), - rel(root, HAS_TYPE_RELATION, type_list[3]) + rel(root, HAS_TYPE_RELATION, type_list[3]), + rel(root, HAS_TYPE_RELATION, type_list[4]) ) print 'Graph populated successfully.' - print 'NOTE: Remember to restart Lionfish ODM server.' return type_list diff --git a/scripts/ocean_small_exemplary_data.py b/scripts/ocean_small_exemplary_data.py index 8b141b0..67c8b26 100755 --- a/scripts/ocean_small_exemplary_data.py +++ b/scripts/ocean_small_exemplary_data.py @@ -109,6 +109,7 @@ # Creates instance relations graph_db.create( + rel(type_list[4], HAS_INSTANCE_RELATION, feed_list[0]), rel(user_list[2], HAS_FEED_RELATION, feed_list[0]) ) @@ -267,5 +268,4 @@ batch.submit() print 'Graph populated successfully!' - print 'Remember to (RE)START Lionfish server!' diff --git a/scripts/ocean_small_exemplary_init.py b/scripts/ocean_small_exemplary_init.py deleted file mode 100755 index c37bcb3..0000000 --- a/scripts/ocean_small_exemplary_init.py +++ /dev/null @@ -1,164 +0,0 @@ -#!/usr/bin/env python2 -# -*- coding: utf-8 -*- - -""" - Exemplary data for neo4j database - NOTE: wipes database ! - Connection done using RESTApi and wrapper for python py2neo -""" - -import time -import sys -import uuid -import os - -from py2neo import neo4j -from py2neo import node, rel - -sys.path.append(os.path.join(os.path.dirname(__file__), '../don_corleone/')) - -from don_utils import get_configuration - -sys.path.append('../graph_workers/') -lib_path = os.path.abspath('./graph_workers') -sys.path.append(lib_path) -from graph_workers.graph_defines import * - - -if __name__ == '__main__': - - print 'Running', __file__ - print 'This script will *ERASE ALL NODES AND RELATIONS IN NEO4J DATABASE*' - print 'NOTE: This script *already executes* ocean_init_graph.py.' - print 'Press enter to proceed...' - enter = raw_input() - - os.system('python2 ocean_init_graph.py') - - # Create connection - graph_db = neo4j.GraphDatabaseService( - 'http://{0}:{1}/db/data/'.format( - get_configuration("neo4j", "host"), - get_configuration("neo4j", "port") - ) - ) - - read_batch = neo4j.ReadBatch(graph_db) - write_batch = neo4j.WriteBatch(graph_db) - - ### Add websites ### - websites = [ - node( - uuid='97678546-a07d-11e3-9f3a-2cd05ae1c39b', - link='http://www.gry-online.pl/', - title='GRY-OnLine', - language='pl', - ), - node( - uuid='976787bc-a07d-11e3-9f3a-2cd05ae1c39b', - link='http://www.wp.pl/', - title='Wirtualna Polska', - language='pl', - ), - node( - uuid='97678938-a07d-11e3-9f3a-2cd05ae1c39b', - link='http://www.tvn24.pl/', - title='TVN24.pl - Wiadomosci z kraju i ze swiata', - language='pl', - ), - ] - websites = graph_db.create(*websites) - for item in websites: - item.add_labels('Node', 'Website') - - # Create instance relations - query = """ - MATCH (m:Model {model_name: '%s'}), (w:%s) - CREATE (m)-[:`%s`]->(w) - RETURN m,w - """ - query %= ( - WEBSITE_TYPE_MODEL_NAME, - WEBSITE_TYPE_MODEL_NAME, - HAS_INSTANCE_RELATION, - ) - write_batch.append_cypher(query) - write_batch.submit() - - # Create nodes - content_sources_list = [ - node( - uuid='977466da-a07d-11e3-9f3a-2cd05ae1c39b', - link='http://www.gry-online.pl/rss/news.xml', - title='GRY-OnLine Wiadomosci', - description='Najnowsze Wiadomosci', - image_width='144', - image_height='18', - image_link='http://www.gry-online.pl/S012.asp', - image_url='http://www.gry-online.pl/rss/rss_logo.gif', - language='pl', - last_updated=int(time.time() - 100000), - source_type='rss' - ), - node( - uuid='97746a22-a07d-11e3-9f3a-2cd05ae1c39b', - link='http://wiadomosci.wp.pl/kat,1329,ver,rss,rss.xml', - title='Wiadomosci WP - Wiadomosci - Wirtualna Polska', - description='Wiadomosci.wp.pl to serwis, dzieki ktoremu mozna \ -zapoznac sie z biezaca sytuacja w kraju i na swiecie.', - image_width='70', - image_height='28', - image_link='http://wiadomosci.wp.pl', - image_url='http://i.wp.pl/a/i/finanse/logozr/WP.gif', - language='pl', - last_updated=int(time.time() - 1000000), - source_type='rss' - ), - node( - uuid='97746bf8-a07d-11e3-9f3a-2cd05ae1c39b', - link='http://www.tvn24.pl/najwazniejsze.xml', - title='TVN24.pl - Wiadomosci z kraju i ze swiata - najnowsze \ -informacje w TVN24', - description='Czytaj najnowsze informacje i ogladaj wideo w portalu \ -informacyjnym TVN24! U nas zawsze aktualne wiadomosci z kraju, ze swiata, \ -relacje na zywo i wiele wiecej.', - language='pl', - last_updated=int(time.time() - 100000), - source_type='rss' - ) - ] - - # Create content sources - content_sources = graph_db.create(*content_sources_list) - for item in content_sources: - item.add_labels('Node', 'ContentSource') - - # Create ContentSources instance relations - query = """ - MATCH (m:Model {model_name: '%s'}), (w:%s) - CREATE (m)-[:`%s`]->(w) - RETURN m,w - """ - query %= ( - CONTENT_SOURCE_TYPE_MODEL_NAME, - CONTENT_SOURCE_TYPE_MODEL_NAME, - HAS_INSTANCE_RELATION, - ) - write_batch.append_cypher(query) - write_batch.submit() - - # Create Website __has__ ContentSource relations - graph_db.create( - rel(websites[0], HAS_RELATION, content_sources[0]), - rel(websites[1], HAS_RELATION, content_sources[1]), - rel(websites[2], HAS_RELATION, content_sources[2]) - ) - - my_batch = neo4j.WriteBatch(graph_db) - my_batch.append_cypher('create index on :Node(uuid)') - my_batch.append_cypher('create index on :ContentSource(link)') - my_batch.submit() - - print 'Graph populated successfully!' - print 'Remember to (RE)START Lionfish ODM Server!' - diff --git a/tests/spidercrab_integrity_test.py b/tests/spidercrab_integrity_test.py index c7a4dfe..6923ac3 100755 --- a/tests/spidercrab_integrity_test.py +++ b/tests/spidercrab_integrity_test.py @@ -41,9 +41,12 @@ def check_lionfish_communication(): """ Returns true if lionfish works OK """ + lionfish_host = du.get_configuration('lionfish', 'host') + lionfish_port = du.get_configuration('lionfish', 'port') + lionfish_client = Client(lionfish_host, lionfish_port) lionfish_client.connect() - lionfish_client.create_model('spidercrab_integrity_test') + lionfish_client.create_model_node('spidercrab_integrity_test') found_instances = lionfish_client.get_model_nodes() model_instance = None model_uuid = ''