... | @@ -20,3 +20,89 @@ In iterative execution, a chunk of input is processed in one operation and then |
... | @@ -20,3 +20,89 @@ In iterative execution, a chunk of input is processed in one operation and then |
|
* BFS
|
|
* BFS
|
|
* Topological sorting
|
|
* Topological sorting
|
|
* DFS ( in-addition with data parallel execution)
|
|
* DFS ( in-addition with data parallel execution)
|
|
|
|
|
|
|
|
|
|
|
|
##Parallelism in cross-device chunked execution
|
|
|
|
|
|
|
|
which chunking of input, we can easily perform both task as well as data-parallel execution across devices. To enable this parallelism we have defined the metadata required for execution given a operator dependency graph
|
|
|
|
|
|
|
|
The engine provides a single lock-free task queue where all the tasks to be executed are available in their order of precedence. We use a single task queue, in order to see the raw performance of the system without any intelligent scheduling. However, any scheduling algorithm can be used to split the task queue for individual devices with the exception that the queues are to be transparent to each others.
|
|
|
|
|
|
|
|
Once the queue is populated, the task within the queue are accessed by a dedicated handler for each of the devices. During initiation, the engine sets up independent handler for the devices present in the system. These handler are not aware of other handlers and are independent of the execution of each other. Before, explain the mechanism of the execution engine, we detail the data structure that enables the handlers to share information for cross-device processing.
|
|
|
|
|
|
|
|
###Data sharing
|
|
|
|
|
|
|
|
For each of the tasks, we have a set of incoming and outgoing datasets. Each of these dataset is given some additional meta-information:
|
|
|
|
|
|
|
|
* source location
|
|
|
|
* target location
|
|
|
|
* EOD flag
|
|
|
|
* processed until index
|
|
|
|
* fetched until index
|
|
|
|
* estimated data size
|
|
|
|
|
|
|
|
other than estimated data size, other information are required for data handling. Given the data size, we can decide on allocating the memory space within the device memory accordingly. In case data size is not known in advance, we cannot exploit concurrent execution across devices when a task requires the result size from the parent task for execution.
|
|
|
|
|
|
|
|
Each of the device handlers have access to these meta data in the dataset and accordingly share their workload.
|
|
|
|
|
|
|
|
###Execution Steps
|
|
|
|
|
|
|
|
As shown earlier, the tasks are queued for the devices and each of the handler takes one task for themselves to process. Each of the handlers in-turn have two concurrent threads running, one for data transfer and another for execution. The work of data transfer thread is to provide data for execution thread.
|
|
|
|
|
|
|
|
At the start, the transfer thread reads the metadata of incoming data and finds a continuous memory space for storing the data. Also, since estimates are only required, the thread also estimates the size of the outgoing data and prepares memory space for them as well. The thread then reads metadata values of incoming dataset \texttt{processed until index} and compares it against \texttt{fetched until index}. If there are some values available, the thread fetches these new data and place them in their corresponding location in the target device. Once data is transferred, the fetched until index is updated. Finally, data transfer stops when the EOD flag is set.
|
|
|
|
|
|
|
|
|
|
|
|
Since the data transfer thread is ad-hoc in data transfer, the execution thread performs execution whenever the fetched until index of input set is greater than consumed until index. Whenever there is new data to be processed, the result are computed and the \texttt{processed until index} for the result data is updated accordingly. The complete workflow of the execution is given in Figure~\ref{}. The figure uses the OpenCL terminology to detail the workflow in OpenCL enabled heterogeneous processing system.
|
|
|
|
|
|
|
|
###Variability and flexibility in execution
|
|
|
|
With this execution, we have multiple points of flexibility present in execution. The main idea of having such an execution model is to provide runtime flexibility of query execution. This model can transparently support execution for both response time as well as throughput. In this section, we detail all the key characteristics of the model followed by the scenarios where we can use this execution model for exploiting flexibility.
|
|
|
|
|
|
|
|
####Parallelism
|
|
|
|
|
|
|
|
The engine enables cross-device concurrent execution in two models: data transfer and data execution. by decoupling these two, we are able to superimpose, transfer with execution. Further, we can also have concurrent execution across multiple devices in parallel. A SQL provides opportunities for three kinds of parallelism,namely - 1) horizontal, 2) vertical, 3) sub-tree level. In case of the latter, sub-tree parallelism can be realized out of the box in heterogeneous processing system without any additional steps.
|
|
|
|
|
|
|
|
For data parallelism, we have followed an lock free approach, where the data for the input is shared by multiple devices without any means for synchronization. The input data is split by handlers passing from either right to left from initial position or from left to right from final position based on the previously available pointers. In case of three pointers, the third starts from the middle and alternates between left and right. We ignored the use of branching equally from the middle for the third as this might lead to in-consistencies in memory access locations. This method is extended for arbitrary number of handlers present.
|
|
|
|
|
|
|
|
For functional parallelism, the threads must invariably wait for data to be processed from the parent node. Hence, here the thread requesting data can either wait for data to be sent for processing or using the above mentioned data parallel execution, can preemptively produce results by sharing the data with other devices. In worst case during preemptive execution, the data processed by slower system is blocked until the results are produced and the faster system can still provide partial results until the pipeline breaker.
|
|
|
|
|
|
|
|
##Load balancing
|
|
|
|
By enabling data as well as functional sharing while processing, we are able to clearly support load balancing. In case, a device is slower in processing its data due to selection of worst performance algorithm, wrong scheduling, down-clocking of the device and so on, the execution might be skewed as the data throughput is reduced. This exasperates the performance degradation if the degradation happens in the lower part of the query processing pipeline. Hence, in this case the additional co-processor must share the workload to balance in performance. Using these parallelism capabilities we can handle this.
|
|
|
|
|
|
|
|
##Robustness
|
|
|
|
Sebastian Breß has shown in his paper that a co-processor efficiency also depends on the memory consumption and operation allocation. In case the execution is not scheduled properly, the system might get affected by errors in the co-processor. This also requires that the system must be able to recover from failure on co-processors. The duration between co-processor failure and re-start is though very small, this might impact largely in the performance. Also, the scheduler is not able to predict such failure in advance that it cannot schedule the operations to avoid such disaster. With a runtime for load balancing, we can support such conditions.
|
|
|
|
%yet to write from here
|
|
|
|
|
|
|
|
|
|
|
|
#cross-device Parallelism
|
|
|
|
For cross device parallelism, we again have to look back into functional and data parallel execution. For functional parallel execution, due to the availability of primitives, we can fairly perform out-of-the-box. By, simply scheduling the operations on individual device queues, we can perform function parallel execution. There are two options available for functional parallel execution: separate queue for individual functions or one single shared queue for multiple devices~\footnote{a similar approach is followed in morsel driven parallelism} . A shared queue is useful for load balancing and work stealing setups. To keep things simple, we have used the latter method so that we can see the overall execution split-up across multiple devices.
|
|
|
|
|
|
|
|
Structure of Edge
|
|
|
|
Since function is a isolated component, that is executed only on a single device, we have a shared data structure for sharing data across multiple nodes. The structure is given in Figure
|
|
|
|
|
|
|
|
|
|
|
|
First, cardinality estimates are used to determine the number of total data to be processed. This is necessary for us to prepare space for the input dataset in the device memory. A cardinality-less method could also work the same, however, the data transfer thread is given an additional step to create space for all the data fetched within its loop. We also have a EOD flag, that is set when all the data are processed. This is helpful in data parallel execution. Also, the data is mapped with a data source from where the data is being fetched. finally, since multiple device can process a single dataset, the information about these indexes are stored as two values processed until and fetched until. The processed until is useful for other handlers to know the available data for next processing and fetched until is useful in determining the available data or processing the given function.
|
|
|
|
|
|
|
|
#Workflow of functional parallel execution
|
|
|
|
For purely functional parallel execution, the execution starts with flattening~\footnote{any directed graph traversal mechanism suits here} of the SQL graph. Here, the flattening mechanism has to take care of the way to pipeline operations. Currently we support two flattening graph traversals: BFS and topological sorting. The former is useful for providing good overall throughput, the latter is good for providing high response time. We will show in later, how switching between response time and throughput can be done easily.
|
|
|
|
|
|
|
|
Once the pipeline is ready, the tasks are fed into a lock-free queue that is shared among all the device handlers. These handlers de-queue these tasks one after another and process them. The data must be available for, in the worst case, at least one of the task must be having data. The data is completely pushed into the device for processing and we queue in the handlers in such a way that the handler with the data pre-loaded are starting to process the corresponding task. In case the data is not available in the device, the data transfer will be still hidden with execution of another independent operation on CPU at-least. Since, each of the device is allocated with two threads, one for data transfer and one for execution, these threads are coalesced across devices.
|
|
|
|
|
|
|
|
For dependent execution, since we process data in chunks, the data can be pipelined for processing in another device. Though we know that data transfer is a bottleneck in heterogeneous DBMS, we have to bare this to have full utilization. For current processing task, the handler checks its incoming edges and the data transfer thread is used to fetch the data. The data transfer thread checks the data that is processed and fetches them either as chunks of optimal processing size or as much data as possible. After fetching data the data is atomically incremented. The execution of functional parallelism is enabled as the data processed partially can be pre-loaded into another device and can be processed. With this model of execution, the data is processed partially by each of the devices providing pipelining of results faster. However, the bottleneck in this case would be the stall in data processing by a single device. In this case, though the task pipeline provides opportunities for executing other independent tasks, all the dependent tasks are pending to be executed and this is propagated severely with increase in level of dependency from the stalled task. Here, the data transfer thread loops around periodically checking the source device for availability of any data to be processed.
|
|
|
|
|
|
|
|
\subsubsection{Workflow of data parallel execution}
|
|
|
|
To overcome the data dependency problem from above, we use data parallelism as a runtime load balancing option. The major bottleneck, again is that no data is available for the device to process as the data is being generated by the source. In this case we propose that the handler preemptively starts processing the stalling task as well. In this scenario, the data is shared across multiple handlers. we use a lock free mechanism for processing data in parallel.
|
|
|
|
|
|
|
|
We use this lock-free mechanism for two reasons:
|
|
|
|
|
|
|
|
\item locking leads to contention
|
|
|
|
\item offsetting of data leads to gaps in data processing
|
|
|
|
|
|
|
|
Since bulk transfer of data is easy when we don't have an additional staging step where multiple chunks of data is combined together into a single bulk before transfer, we require a mechanism to make sure data is processed in bulks. For this we have the below mentioned progression for data fetch.
|
|
|
|
|
|
|
|
the device handler are provided a unique value from 0…n based on their value, a position for the shared data is given. Also the handler is provided with direction of processing data. the direction is either left, right or left-right. for example, 0 starts from 0th index and goes to the right, 1 starts from nth index and fetches in left. However the handler 3 is set in the middle and it alternates between left and right. Spreading on both sides, thigh looks beneficial at first, after very first data transfer we cannot have data bulk anymore as next is two chunks from left most and right most.
|
|
|
|
|
|
|
|
Execution of data parallel execution is done using shared values only for data values consumed, processed and fetched. Here, whenever a dataset is read, the edge of outgoing is internally added another dynamic information of the result chunk. With this information the data can be easily split across multiple devices. Though this ignores data locality and provides a worse execution plan, we have a system that is highly available for query processing and no two devices are idle while processing data.
|
|
|
|
|
|
|
|
|
|
|
|
##Data parallel execution with multiple devices
|
|
|
|
In case of more than two devices, we set the devices in the middle of two device pointers. For example, the input for device 3 will start from the middle and the device will alternate between left and right sweeps. In this way the data is processed in block and is easy to collect them. |