DEV Community

aavash1
aavash1

Posted on

Applying an action to a JavaPairRDD

How to correctly apply the JavaPairRDD.foreachPartition function?
I am new to apache spark and am trying to run a custom nearest neighbor algorithm on an RDD that has been partitioned into 2 parts using a custom partitioner. The JavaPairRDD contains the graph details and the random object created on the graph.

graph loading and object generation

graph partition and subgraphs

According to my logic, I am building subgraphs for-each partition, and I am running a custom algorithm on each subgraph. It seems to be working "although not properly". I am not sure if this is the correct way to apply action in each partition. I am adding my code and the results as well. Comments and suggestions are highly appreciated.
NN results



// <Partition_Index_Key, Map<Source_vertex, Map<Destination Vertex, Tuple2<Edge_Length, ArrayList of Random Objects>>
            JavaPairRDD<Object, Map<Object, Map<Object, Tuple2<Double, ArrayList<RoadObject>>>>> adjVertForSubgraphsRDD = jscontext
                    .parallelizePairs(adjacentVerticesForSubgraphs)
                    .partitionBy(new CustomPartitioner(CustomPartitionSize));

            //applying foreachPartition action on JavaPairRDD
            adjVertForSubgraphsRDD.foreachPartition(
                    new VoidFunction<Iterator<Tuple2<Object, Map<Object, Map<Object, Tuple2<Double, ArrayList<RoadObject>>>>>>>() {

                        /**
                         * 
                         */
                        private static final long serialVersionUID = 1L;

                        @Override
                        public void call(
                                Iterator<Tuple2<Object, Map<Object, Map<Object, Tuple2<Double, ArrayList<RoadObject>>>>>> tupleRow)
                                throws Exception {
                            int sourceVertex;
                            int destVertex;
                            double edgeLength;

                            int roadObjectId;
                            boolean roadObjectType;
                            double distanceFromStart;

                            CoreGraph subgraph0 = new CoreGraph();
                            CoreGraph subgraph1 = new CoreGraph();

                            while (tupleRow.hasNext()) {


                                Map<Object, Map<Object, Tuple2<Double, ArrayList<RoadObject>>>> newMap = tupleRow.next()
                                        ._2();

                                if ((Integer.parseInt(String.valueOf(tupleRow.next()._1())) == 0)) {

                                    for (Object srcVertex : newMap.keySet()) {

                                        for (Object dstVertex : newMap.get(srcVertex).keySet()) {
                                            if (newMap.get(srcVertex).get(dstVertex)._2() != null) {
                                                sourceVertex = Integer.parseInt(String.valueOf(srcVertex));
                                                destVertex = Integer.parseInt(String.valueOf(dstVertex));
                                                edgeLength = newMap.get(srcVertex).get(dstVertex)._1();

                                                subgraph0.addEdge(sourceVertex, destVertex, edgeLength);

                                                for (int i = 0; i < newMap.get(srcVertex).get(dstVertex)._2()
                                                        .size(); i++) {
                                                    int currentEdgeId = subgraph0.getEdgeId(sourceVertex, destVertex);

                                                    roadObjectId = newMap.get(srcVertex).get(dstVertex)._2().get(i)
                                                            .getObjectId();
                                                    roadObjectType = newMap.get(srcVertex).get(dstVertex)._2().get(i)
                                                            .getType();
                                                    distanceFromStart = newMap.get(srcVertex).get(dstVertex)._2().get(i)
                                                            .getDistanceFromStartNode();
                                                    RoadObject rn0 = new RoadObject();
                                                    rn0.setObjId(roadObjectId);
                                                    rn0.setType(roadObjectType);
                                                    rn0.setDistanceFromStartNode(distanceFromStart);

                                                    subgraph0.addObjectOnEdge(currentEdgeId, rn0);
                                                }
                                            } else {
                                                sourceVertex = Integer.parseInt(String.valueOf(srcVertex));
                                                destVertex = Integer.parseInt(String.valueOf(dstVertex));
                                                edgeLength = newMap.get(srcVertex).get(dstVertex)._1();

                                                subgraph0.addEdge(sourceVertex, destVertex, edgeLength);
                                            }

                                        }
                                    }

                                } else if ((Integer.parseInt(String.valueOf(tupleRow.next()._1())) == 1)) {

                                    for (Object srcVertex : newMap.keySet()) {
                                        for (Object dstVertex : newMap.get(srcVertex).keySet()) {
                                            if (newMap.get(srcVertex).get(dstVertex)._2() != null) {
                                                sourceVertex = Integer.parseInt(String.valueOf(srcVertex));
                                                destVertex = Integer.parseInt(String.valueOf(dstVertex));
                                                edgeLength = newMap.get(srcVertex).get(dstVertex)._1();

                                                subgraph1.addEdge(sourceVertex, destVertex, edgeLength);

                                                for (int i = 0; i < newMap.get(srcVertex).get(dstVertex)._2()
                                                        .size(); i++) {
                                                    int currentEdgeId = subgraph1.getEdgeId(sourceVertex, destVertex);

                                                    roadObjectId = newMap.get(srcVertex).get(dstVertex)._2().get(i)
                                                            .getObjectId();
                                                    roadObjectType = newMap.get(srcVertex).get(dstVertex)._2().get(i)
                                                            .getType();
                                                    distanceFromStart = newMap.get(srcVertex).get(dstVertex)._2().get(i)
                                                            .getDistanceFromStartNode();
                                                    RoadObject rn1 = new RoadObject();
                                                    rn1.setObjId(roadObjectId);
                                                    rn1.setType(roadObjectType);
                                                    rn1.setDistanceFromStartNode(distanceFromStart);

                                                    subgraph1.addObjectOnEdge(currentEdgeId, rn1);
                                                }
                                            } else {
                                                sourceVertex = Integer.parseInt(String.valueOf(srcVertex));
                                                destVertex = Integer.parseInt(String.valueOf(dstVertex));
                                                edgeLength = newMap.get(srcVertex).get(dstVertex)._1();

                                                subgraph1.addEdge(sourceVertex, destVertex, edgeLength);
                                            }

                                        }
                                    }
                                }

                            }
                            // Straight forward nearest neighbor algorithm from each true to false.
                            ANNNaive ann = new ANNNaive();
                            System.err.println("-------------------------------");
                            Map<Integer, Integer> nearestNeighorPairsSubg0 = ann.compute(subgraph0, true);
                            System.out.println("for subgraph0");
                            System.out.println(nearestNeighorPairsSubg0);
                            System.err.println("-------------------------------");

                            System.err.println("-------------------------------");
                            Map<Integer, Integer> nearestNeighorPairsSubg1 = ann.compute(subgraph1, true);
                            System.out.println("for subgraph1");
                            System.out.println(nearestNeighorPairsSubg1);
                            System.err.println("-------------------------------");

                        }
                    });`



Enter fullscreen mode Exit fullscreen mode

Top comments (0)