Class IRDAG

  • All Implemented Interfaces:
    java.io.Serializable, DAGInterface<IRVertex,​IREdge>

    @NotThreadSafe
    public final class IRDAG
    extends java.lang.Object
    implements DAGInterface<IRVertex,​IREdge>
    An IRDAG object captures a high-level data processing application (e.g., Spark/Beam application). - IRVertex: A data-parallel operation. (e.g., map) - IREdge: A data dependency between two operations. (e.g., shuffle)

    Largely two types of IRDAG optimization(modification) methods are provided. All of these methods preserve application semantics. - Annotation: setProperty(), getPropertyValue() on each IRVertex/IREdge - Reshaping: insert(), delete() on the IRDAG

    TODO #341: Rethink IRDAG insert() signatures

    See Also:
    Serialized Form
    • Constructor Detail

      • IRDAG

        public IRDAG​(DAG<IRVertex,​IREdge> originalUserApplicationDAG)
        Parameters:
        originalUserApplicationDAG - the initial DAG.
    • Method Detail

      • advanceDAGSnapshot

        public boolean advanceDAGSnapshot​(java.util.function.BiFunction<IRDAG,​IRDAG,​java.lang.Boolean> checker)
        Used internally by Nemo to advance the DAG snapshot after applying each pass.
        Parameters:
        checker - that compares the dagSnapshot and the modifiedDAG to determine if the snapshot can be set the current modifiedDAG.
        Returns:
        true if the checker passes, false otherwise.
      • irDAGSummary

        public java.lang.String irDAGSummary()
        Returns:
        a IR DAG summary string, consisting of only the vertices generated from the frontend.
      • getInputSize

        public java.lang.Long getInputSize()
        Returns:
        the total sum of the input size for the IR DAG.
      • recordExecutorInfo

        public void recordExecutorInfo​(java.util.List<Pair<java.lang.Integer,​ResourceSpecification>> parsedExecutorInfo)
        Setter for the executor specifications information.
        Parameters:
        parsedExecutorInfo - executor information parsed for processing.
      • getExecutorInfo

        public java.util.List<Pair<java.lang.Integer,​ResourceSpecification>> getExecutorInfo()
        Getter for the executor specifications information.
        Returns:
        the executor specifications information.
      • delete

        public void delete​(IRVertex vertexToDelete)
        Deletes a previously inserted utility vertex. (e.g., TriggerVertex, RelayVertex, SamplingVertex)

        Notice that the actual number of vertices that will be deleted after this call returns can be more than one. We roll back the changes made with the previous insert(), while preserving application semantics.

        Parameters:
        vertexToDelete - to delete.
      • insert

        public void insert​(RelayVertex relayVertex,
                           IREdge edgeToStreamize)
        Inserts a new vertex that streams data.

        Before: src - edgeToStreamize - dst After: src - edgeToStreamizeWithNewDestination - relayVertex - oneToOneEdge - dst (replaces the "Before" relationships)

        This preserves semantics as the relayVertex simply forwards data elements from the input edge to the output edge.

        Parameters:
        relayVertex - to insert.
        edgeToStreamize - to modify.
      • insert

        public void insert​(MessageGeneratorVertex messageGeneratorVertex,
                           MessageAggregatorVertex messageAggregatorVertex,
                           EncoderProperty triggerOutputEncoder,
                           DecoderProperty triggerOutputDecoder,
                           java.util.Set<IREdge> edgesToGetStatisticsOf,
                           java.util.Set<IREdge> edgesToOptimize)
        Inserts a new vertex that analyzes intermediate data, and triggers a dynamic optimization.

        For each edge in edgesToGetStatisticsOf...

        Before: src - edge - dst After: src - oneToOneEdge(a clone of edge) - triggerVertex - shuffleEdge - messageAggregatorVertex - broadcastEdge - dst (the "Before" relationships are unmodified)

        This preserves semantics as the results of the inserted message vertices are never consumed by the original IRDAG.

        TODO #345: Simplify insert(TriggerVertex)

        Parameters:
        messageGeneratorVertex - to insert.
        messageAggregatorVertex - to insert.
        triggerOutputEncoder - to use.
        triggerOutputDecoder - to use.
        edgesToGetStatisticsOf - to examine.
        edgesToOptimize - to optimize.
      • insert

        public void insert​(SignalVertex toInsert,
                           IREdge edgeToOptimize)
        Inserts new vertex which calls for runtime pass. e.g) suppose that we want to change vertex 2's property by using runtime pass, but the related data is not gained directly from the incoming edge of vertex 2 (for example, the data is gained from using simulation). In this case, it is unnecessary to insert message generator vertex and message aggregator vertex to launch runtime pass. Original case: (vertex1) -- shuffle edge -- (vertex 2) After inserting signal Vertex: (vertex 1) -------------------- shuffle edge ------------------- (vertex 2) -- control edge -- (signal vertex) -- control edge -- Therefore, the shuffle edge to vertex 2 is executed after signal vertex is executed. Since signal vertex only 'signals' the launch of runtime pass, its parallelism is sufficient to be only 1.
        Parameters:
        toInsert - Signal vertex to optimize.
        edgeToOptimize - Original edge to optimize(in the above example, shuffle edge).
      • insert

        public void insert​(java.util.Set<SamplingVertex> toInsert,
                           java.util.Set<IRVertex> executeAfter)
        Inserts a set of samplingVertices that process sampled data.

        This method automatically inserts the following three types of edges. (1) Edges between samplingVertices to reflect the original relationship (2) Edges from the original IRDAG to samplingVertices that clone the inEdges of the original vertices (3) Edges from the samplingVertices to the original IRDAG to respect executeAfterSamplingVertices

        Suppose the caller supplies the following arguments to perform a "sampled run" of vertices {V1, V2}, prior to executing them. - samplingVertices: {V1', V2'} - childrenOfSamplingVertices: {V1}

        Before: V1 - oneToOneEdge - V2 - shuffleEdge - V3 After: V1' - oneToOneEdge - V2' - controlEdge - V1 - oneToOneEdge - V2 - shuffleEdge - V3

        This preserves semantics as the original IRDAG remains unchanged and unaffected.

        (Future calls to insert() can add new vertices that connect to sampling vertices. Such new vertices will also be wrapped with sampling vertices, as new vertices that consume outputs from sampling vertices will process a subset of data anyways, and no such new vertex will reach the original DAG except via control edges)

        TODO #343: Extend SamplingVertex control edges

        Parameters:
        toInsert - sampling vertices.
        executeAfter - that must be executed after toInsert.
      • insert

        public void insert​(TaskSizeSplitterVertex toInsert)
        Insert TaskSizeSplitterVertex in dag.
        Parameters:
        toInsert - TaskSizeSplitterVertex to insert.
      • reshapeUnsafely

        public void reshapeUnsafely​(java.util.function.Function<DAG<IRVertex,​IREdge>,​DAG<IRVertex,​IREdge>> unsafeReshapingFunction)
        Reshape unsafely, without guarantees on preserving application semantics. TODO #330: Refactor Unsafe Reshaping Passes
        Parameters:
        unsafeReshapingFunction - takes as input the underlying DAG, and outputs a reshaped DAG.
      • topologicalDo

        public void topologicalDo​(java.util.function.Consumer<IRVertex> function)
        Description copied from interface: DAGInterface
        Applies the function to each node in the DAG in a topological order. This function brings consistent results.
        Specified by:
        topologicalDo in interface DAGInterface<IRVertex,​IREdge>
        Parameters:
        function - to apply.
      • pathExistsBetween

        public java.lang.Boolean pathExistsBetween​(IRVertex v1,
                                                   IRVertex v2)
        Description copied from interface: DAGInterface
        Function checks whether there is a path between two vertices.
        Specified by:
        pathExistsBetween in interface DAGInterface<IRVertex,​IREdge>
        Parameters:
        v1 - First vertex to check.
        v2 - Second vertex to check.
        Returns:
        Whether or not there is a path between two vertices.
      • isCompositeVertex

        public java.lang.Boolean isCompositeVertex​(IRVertex irVertex)
        Description copied from interface: DAGInterface
        Checks whether the given vertex is assigned with a wrapping LoopVertex.
        Specified by:
        isCompositeVertex in interface DAGInterface<IRVertex,​IREdge>
        Parameters:
        irVertex - Vertex to check.
        Returns:
        whether or not it is wrapped by a LoopVertex
      • getLoopStackDepthOf

        public java.lang.Integer getLoopStackDepthOf​(IRVertex irVertex)
        Description copied from interface: DAGInterface
        Retrieves the stack depth of the given vertex.
        Specified by:
        getLoopStackDepthOf in interface DAGInterface<IRVertex,​IREdge>
        Parameters:
        irVertex - Vertex to check.
        Returns:
        The depth of the stack of LoopVertices for the vertex.
      • asJsonNode

        public com.fasterxml.jackson.databind.node.ObjectNode asJsonNode()
        Specified by:
        asJsonNode in interface DAGInterface<IRVertex,​IREdge>
        Returns:
        JsonNode for this DAG.
      • storeJSON

        public void storeJSON​(java.lang.String directory,
                              java.lang.String name,
                              java.lang.String description)
        Description copied from interface: DAGInterface
        Stores JSON representation of this DAG into a file.
        Specified by:
        storeJSON in interface DAGInterface<IRVertex,​IREdge>
        Parameters:
        directory - the directory which JSON representation is saved to
        name - name of this DAG
        description - description of this DAG
      • getVertexById

        public IRVertex getVertexById​(java.lang.String id)
        Description copied from interface: DAGInterface
        Retrieves the vertex given its ID.
        Specified by:
        getVertexById in interface DAGInterface<IRVertex,​IREdge>
        Parameters:
        id - of the vertex to retrieve.
        Returns:
        the vertex.
      • getEdgeById

        public IREdge getEdgeById​(java.lang.String id)
        Description copied from interface: DAGInterface
        Retrieves the edge given its ID.
        Specified by:
        getEdgeById in interface DAGInterface<IRVertex,​IREdge>
        Parameters:
        id - of the edge to retrieve.
        Returns:
        the edge.
      • getIncomingEdgesOf

        public java.util.List<IREdge> getIncomingEdgesOf​(IRVertex v)
        Description copied from interface: DAGInterface
        Retrieves the incoming edges of the given vertex.
        Specified by:
        getIncomingEdgesOf in interface DAGInterface<IRVertex,​IREdge>
        Parameters:
        v - the subject vertex.
        Returns:
        the list of incoming edges to the vertex. Note that the result is never null, ensured by DAGBuilder.
      • getIncomingEdgesOf

        public java.util.List<IREdge> getIncomingEdgesOf​(java.lang.String vertexId)
        Description copied from interface: DAGInterface
        Retrieves the incoming edges of the given vertex.
        Specified by:
        getIncomingEdgesOf in interface DAGInterface<IRVertex,​IREdge>
        Parameters:
        vertexId - the ID of the subject vertex.
        Returns:
        the list of incoming edges to the vertex. Note that the result is never null, ensured by DAGBuilder.
      • getOutgoingEdgesOf

        public java.util.List<IREdge> getOutgoingEdgesOf​(IRVertex v)
        Description copied from interface: DAGInterface
        Retrieves the outgoing edges of the given vertex.
        Specified by:
        getOutgoingEdgesOf in interface DAGInterface<IRVertex,​IREdge>
        Parameters:
        v - the subject vertex.
        Returns:
        the list of outgoing edges to the vertex. Note that the result is never null, ensured by DAGBuilder.
      • getOutgoingEdgesOf

        public java.util.List<IREdge> getOutgoingEdgesOf​(java.lang.String vertexId)
        Description copied from interface: DAGInterface
        Retrieves the outgoing edges of the given vertex.
        Specified by:
        getOutgoingEdgesOf in interface DAGInterface<IRVertex,​IREdge>
        Parameters:
        vertexId - the ID of the subject vertex.
        Returns:
        the list of outgoing edges to the vertex. Note that the result is never null, ensured by DAGBuilder.
      • getParents

        public java.util.List<IRVertex> getParents​(java.lang.String vertexId)
        Description copied from interface: DAGInterface
        Retrieves the parent vertices of the given vertex.
        Specified by:
        getParents in interface DAGInterface<IRVertex,​IREdge>
        Parameters:
        vertexId - the ID of the subject vertex.
        Returns:
        the list of parent vertices.
      • getChildren

        public java.util.List<IRVertex> getChildren​(java.lang.String vertexId)
        Description copied from interface: DAGInterface
        Retrieves the children vertices of the given vertex.
        Specified by:
        getChildren in interface DAGInterface<IRVertex,​IREdge>
        Parameters:
        vertexId - the ID of the subject vertex.
        Returns:
        the list of children vertices.
      • getEdgeBetween

        public IREdge getEdgeBetween​(java.lang.String srcVertexId,
                                     java.lang.String dstVertexId)
        Description copied from interface: DAGInterface
        Retrieves the edge between two vertices.
        Specified by:
        getEdgeBetween in interface DAGInterface<IRVertex,​IREdge>
        Parameters:
        srcVertexId - the ID of the source vertex.
        dstVertexId - the ID of the destination vertex.
        Returns:
        the edge if exists.
      • getTopologicalSort

        public java.util.List<IRVertex> getTopologicalSort()
        Description copied from interface: DAGInterface
        Gets the DAG's vertices in topologically sorted order. This function brings consistent results.
        Specified by:
        getTopologicalSort in interface DAGInterface<IRVertex,​IREdge>
        Returns:
        the sorted list of vertices in topological order.
      • getAncestors

        public java.util.List<IRVertex> getAncestors​(java.lang.String vertexId)
        Description copied from interface: DAGInterface
        Retrieves the ancestors of a vertex.
        Specified by:
        getAncestors in interface DAGInterface<IRVertex,​IREdge>
        Parameters:
        vertexId - to find the ancestors for.
        Returns:
        the list of ancestors.
      • getDescendants

        public java.util.List<IRVertex> getDescendants​(java.lang.String vertexId)
        Description copied from interface: DAGInterface
        Retrieves the descendants of a vertex.
        Specified by:
        getDescendants in interface DAGInterface<IRVertex,​IREdge>
        Parameters:
        vertexId - to find the descendants for.
        Returns:
        the list of descendants.
      • filterVertices

        public java.util.List<IRVertex> filterVertices​(java.util.function.Predicate<IRVertex> condition)
        Description copied from interface: DAGInterface
        Filters the vertices according to the given condition.
        Specified by:
        filterVertices in interface DAGInterface<IRVertex,​IREdge>
        Parameters:
        condition - that must be satisfied to be included in the filtered list.
        Returns:
        the list of vertices that meet the condition.
      • toString

        public java.lang.String toString()
        Overrides:
        toString in class java.lang.Object