Before we begin
For any JOIN to happen, Spark needs to have the same keys for the joining Column from both tables on the same executor. And if the keys are not present on the same executor, Spark uses Shuffle(Exchange) steps in order to achieve it.
Now, we all know that Shuffle leads to Disk I/O operations and Network data movement which is bad for Spark. Thus it's very important for us to optimize or remove this Step as much as possible.
Problem Explained
So, I was working on comparing each row with each other rows and one of the way could have been (generally not recommended). Number of rows in the file are typically more than 1M and columns are more than 21 on which I was working on, so that creates a pretty huge amount of data while comparing the each rows with each other rows i.e. 1M*1M ~=1000Billion of rows which is a pretty huge number with 42 number of columns.
Number of rows is not an issue, but the issue is comparing the rows present into each node with other nodes of the executor, as we wanted to compare each row with each other rows, this causes the huge data shuffling and leads to Disk IO operations and Network data movement.
For the above issue to solve, I get to know the strategy which spark uses/offers-
ShuffleHash Join, SortMerge Join and Broadcast Joins.
Shuffle Hash Join
Shuffle Hash join encompasses the following sequential steps:
Both datasets (tables) will be shuffled among the executors based on the key column.
The Smaller shuffled dataset (table) will be hashed in each executor in order to match with the hashed key of the bigger dataset.
Once the hashed key matches, the joining works.
Points to Note:
It majorly supports equi joins (=), except for Full outer join.
There is NO Sort step involved, thus keys are not sorted.
It is suitable for Joins where we have at least one Smaller dataset which can fit in memory.
To make sure Spark uses Shuffle Hash Join, we need to set: spark.sql.join.preferSortMergeJoin=false
Sort Merge Join
Sort Merge is another famous joining Strategies that follows the below steps:
Both datasets (tables) will be shuffled among the executors based on the key column.
The joining keys from both datasets will be sorted in same order.
Once the joining key are sorted the Merging happens (thus its Sort Merge).
Points to Note:
It supports all join types including full outer joins.
Since there is a Sort step, it can be an expensive join if not optimized properly.
Preferred when we have two Big dataset (tables) to join.
We can set — spark.sql.join.preferSortMergeJoin=true to use Sort Merge Join.
Broadcast Hash Join (Broadcast Join)
It's a famous joining technique that involves broadcasting (complete copy) the smaller dataset to all executors to avoid the Shuffle step. Steps are as follows:
Driver gets the Smaller dataset (table) from the executor and broadcasts it to all the executors.
Once the broadcasted dataset is present in all executors, the larger dataset partitions present are hashed and joined based on Key column.
Points to Note:
NO Sorting and NO Shuffle step is involved.
The smaller dataset which will be broadcasted, should not exceed 10MB (default size), but can be increased to 8G with spark.sql.autoBroadcastJoinThreshold configuration.
The Dataset to be broadcasted should fit in both executor and driver memory, else it will run out of memory errors.
Majorly supports equi joins, except full outer.
It is preferred, when we have one Big and one Small datasets to join.
How did I solve the problem?
First and most important thing was to re-partitioning the data across all the nodes, as we are using the inner join on the file itself it will increase the amount of data, which might create the executor to go out of memory, so we need to properly define the number of partitions according to the size of the file by using:-
File = file.repartition(16)
Now comparing all the columns of each row at once, what we came up with is dividing the columns into two or three parts, and rather than using the cross join, I used inner join for all the columns parts, which reduces the shuffling of the data across the nodes.
It uses shuffle hash strategy and each node matches the rows based on the join condition in the node itself.
For better understanding-
Considering a file has 14 columns, I divided 14 columns into three parts, i.e 5,5,4.
That will create three separated files, and now will do the inner join operation on them separately. After the completion of the inner join of three parts, Now we merge those three parts rows into one(same time if there any row repeated make it distinct). This will give me all the rows with all the columns of the file which have been matched.
コメント