diff --git a/core/src/main/scala/org/renci/relationgraph/RelationGraph.scala b/core/src/main/scala/org/renci/relationgraph/RelationGraph.scala index b0a30df..f30009d 100644 --- a/core/src/main/scala/org/renci/relationgraph/RelationGraph.scala +++ b/core/src/main/scala/org/renci/relationgraph/RelationGraph.scala @@ -66,11 +66,11 @@ object RelationGraph extends StrictLogging { allClasses(ontology).map(c => ZIO.succeed(processSubclasses(c, indexedWhelk.state, outputConfig.reflexiveSubclasses, outputConfig.equivalenceAsSubclass, outputConfig.outputClasses, outputConfig.outputIndividuals))) } else ZStream.empty val streamZ = for { - queue <- Queue.unbounded[Restriction] + queue <- Queue.unbounded[Option[Restriction]] activeRestrictions <- Ref.make(0) seenRefs <- ZIO.foreach(allProperties)(p => Ref.make(Set.empty[AtomicConcept]).map(p -> _)).map(_.toMap) _ <- traverse(specifiedProperties, properties, classes, queue, activeRestrictions, seenRefs) - restrictionsStream = ZStream.fromQueue(queue).map(r => processRestrictionAndExtendQueue(r, properties, classes, indexedWhelk, outputConfig.mode, specifiedProperties.isEmpty, outputConfig.outputClasses, outputConfig.outputIndividuals, queue, activeRestrictions, seenRefs)) + restrictionsStream = ZStream.fromQueue(queue).collectWhile { case Some(r) => r }.map(r => processRestrictionAndExtendQueue(r, properties, classes, indexedWhelk, outputConfig.mode, specifiedProperties.isEmpty, outputConfig.outputClasses, outputConfig.outputIndividuals, queue, activeRestrictions, seenRefs)) allTasks = ontologyDeclarationStream ++ classesTasks ++ restrictionsStream } yield allTasks.mapZIOParUnordered(JRuntime.getRuntime.availableProcessors)(identity) ZStream.unwrap(streamZ) @@ -78,16 +78,16 @@ object RelationGraph extends StrictLogging { def allClasses(ont: OWLOntology): ZStream[Any, Nothing, OWLClass] = ZStream.fromIterable(ont.getClassesInSignature(Imports.INCLUDED).asScala.to(Set) - OWLThing - OWLNothing) - def traverse(specifiedProperties: Set[AtomicConcept], properties: Hierarchy, classes: Hierarchy, queue: Queue[Restriction], activeRestrictions: Ref[Int], seenRefs: Map[Role, Ref[Set[AtomicConcept]]]): UIO[Unit] = { + def traverse(specifiedProperties: Set[AtomicConcept], properties: Hierarchy, classes: Hierarchy, queue: Queue[Option[Restriction]], activeRestrictions: Ref[Int], seenRefs: Map[Role, Ref[Set[AtomicConcept]]]): UIO[Unit] = { val descendProperties = specifiedProperties.isEmpty val queryProperties = if (descendProperties) properties.subclasses.getOrElse(Top, Set.empty) - Bottom else specifiedProperties if (queryProperties.nonEmpty) ZIO.foreachParDiscard(queryProperties) { subprop => traverseProperty(subprop, classes, queue, activeRestrictions, seenRefs) } - else queue.shutdown + else queue.offer(None).unit } - def traverseProperty(property: AtomicConcept, classes: Hierarchy, queue: Queue[Restriction], activeRestrictions: Ref[Int], seenRefs: Map[Role, Ref[Set[AtomicConcept]]]): UIO[Unit] = { + def traverseProperty(property: AtomicConcept, classes: Hierarchy, queue: Queue[Option[Restriction]], activeRestrictions: Ref[Int], seenRefs: Map[Role, Ref[Set[AtomicConcept]]]): UIO[Unit] = { val restrictions = (classes.subclasses.getOrElse(Top, Set.empty) - Bottom).map(filler => Restriction(Role(property.id), filler)) val propSeenRef = seenRefs(Role(property.id)) for { @@ -95,13 +95,13 @@ object RelationGraph extends StrictLogging { seenForThisProperty ++ restrictions.map(_.filler) } _ <- activeRestrictions.update(current => current + restrictions.size) - _ <- queue.offerAll(restrictions).unit + _ <- queue.offerAll(restrictions.map(Option(_))).unit active <- activeRestrictions.get - _ <- queue.shutdown.when(active < 1) + _ <- queue.offer(None).when(active < 1).unit } yield () } - def processRestrictionAndExtendQueue(restriction: Restriction, properties: Hierarchy, classes: Hierarchy, whelk: IndexedReasonerState, mode: Config.TriplesMode, descendProperties: Boolean, outputClasses: Boolean, outputIndividuals: Boolean, queue: Queue[Restriction], activeRestrictions: Ref[Int], seenRefs: Map[Role, Ref[Set[AtomicConcept]]]): UIO[TriplesGroup] = { + def processRestrictionAndExtendQueue(restriction: Restriction, properties: Hierarchy, classes: Hierarchy, whelk: IndexedReasonerState, mode: Config.TriplesMode, descendProperties: Boolean, outputClasses: Boolean, outputIndividuals: Boolean, queue: Queue[Option[Restriction]], activeRestrictions: Ref[Int], seenRefs: Map[Role, Ref[Set[AtomicConcept]]]): UIO[TriplesGroup] = { val triples = processRestriction(restriction, whelk, mode, outputClasses, outputIndividuals) val continue = triples.redundant.nonEmpty for { @@ -128,9 +128,9 @@ object RelationGraph extends StrictLogging { } else ZIO.succeed(Set.empty[Restriction]) newRestrictions = directFillerSubclassesRestrictions ++ directSubPropertyRestrictions _ <- activeRestrictions.update(current => current - 1 + newRestrictions.size) - _ <- queue.offerAll(newRestrictions) + _ <- queue.offerAll(newRestrictions.map(Option(_))) active <- activeRestrictions.get - _ <- queue.shutdown.when(active < 1) + _ <- queue.offer(None).when(active < 1).unit } yield triples }