trajectories.py


Perform GPU-based streamline integration using OpenCL.

Provides PyOpenCL-accelerated functions to integrate streamlines using 2nd-order Runge-Kutta and (streamline tail-step only) Euler methods. Basins of interest can be delimited by masking.

Requires PyOpenCL.

Imports streamlines module pocl.py.

Imports functions from streamlines module useful.py.


Code

"""
---------------------------------------------------------------------

Perform GPU-based streamline integration using `OpenCL`_.

Provides PyOpenCL-accelerated functions to integrate streamlines using
2nd-order Runge-Kutta and (streamline tail-step only) Euler methods.
Basins of interest can be delimited by masking. 

Requires `PyOpenCL`_.

Imports streamlines module :doc:`pocl`.

Imports functions from streamlines module :doc:`useful`.


---------------------------------------------------------------------

.. _OpenCL: https://www.khronos.org/opencl
.. _PyOpenCL: https://documen.tician.de/pyopencl/index.html


"""

import pyopencl as cl
import numpy as np
import os
os.environ['PYTHONUNBUFFERED']='True'
os.environ['PYOPENCL_NO_CACHE']='True'
os.environ['PYOPENCL_COMPILER_OUTPUT']='0'

from streamlines import pocl
from streamlines.useful import neatly, vprint, create_seeds, compute_stats

__all__ = ['Trajectories']

import warnings
pdebug = print

class Trajectories():
    def __init__(   self,
                    which_cl_platform,
                    which_cl_device,
                    cl_src_path         = None,
                    info                = None,
                    data                = None,
                    do_trace_downstream = True,
                    do_trace_upstream   = True,
                    verbose             = False,
                    gpu_verbose         = False ):
        """
        Initialize.
        
        Args:
            which_cl_platform (int):
            which_cl_device (int):
            cl_src_path (str):
            info (obj):
            data (obj):
            do_trace_downstream (bool):
            do_trace_upstream (bool):
            verbose (bool):
            gpu_verbose (bool):
        """
        self.platform, self.device, self.context \
            = pocl.prepare_cl_context(which_cl_platform, which_cl_device)
        self.queue = cl.CommandQueue(self.context,
                                properties=cl.command_queue_properties.PROFILING_ENABLE)
        self.cl_src_path         = cl_src_path
        self.info                = info
        self.data                = data
        self.do_trace_downstream = do_trace_downstream
        self.do_trace_upstream   = do_trace_upstream
        self.verbose             = verbose
        self.gpu_verbose         = gpu_verbose
        self.vprogress           = verbose
        
    def integrate(self):
        """
        Trace each streamline from its corresponding seed point using 2nd-order 
        Runge-Kutta integration of the topographic gradient vector field.
        
        This function is a master wrapper connecting the streamlines object and its 
        trace() method to the GPU/OpenCL wrapper function gpu_compute_trajectories(). 
        As such it acts on a set of parameters passed as arguments here rather than by 
        accessing object variables. 
        
        Workflow parameters are transferred here bundled in the info object,
        which is parsed as well as passed on to gpu_compute_trajectories.
        
        The tasks undertaken by this function are to:
           1) prepare the OpenCL context, device and kernel source string
           2) calculate how to split the streamline tracing into chunks
           3) invoke the GPU/OpenCL device computation
           4) post-process the streamline total length (slt) array (scale, sqrt)
              and compute streamline trajectories statistics
        """
        vprint(self.verbose,'Integrating trajectories...')
        
        # Shorthand
        cl_src_path         = self.cl_src_path
        info                = self.info
        mask_array          = self.data.mask_array
        uv_array            = self.data.uv_array
        mapping_array       = self.data.mapping_array

        # Prepare CL essentials
        device        = self.device
        context       = self.context
        queue         = self.queue
        cl_kernel_source \
            = pocl.read_kernel_source(cl_src_path,['rng.cl','essentials.cl',
                    'writearray.cl','updatetraj.cl','computestep.cl',
                    'rungekutta.cl','trajectory.cl','integratetraj.cl'])
                
        # Seed point selection, padding and shuffling
        n_trajectory_seed_points = info.n_trajectory_seed_points
        pad_width                = info.pad_width
        n_work_items             = info.n_work_items
        do_shuffle               = info.do_shuffle
        shuffle_rng_seed         = info.shuffle_rng_seed
        self.data.seed_point_array, n_seed_points, n_padded_seed_points \
            = create_seeds(mask_array, pad_width, n_work_items, 
                           n_seed_points=n_trajectory_seed_points, 
                           do_shuffle=do_shuffle, rng_seed=shuffle_rng_seed,
                           verbose=self.verbose)
        info.n_seed_points        = n_seed_points
        info.n_padded_seed_points = n_padded_seed_points
        
        # Mapping flag array - not likely already defined, but just in case...
        if mapping_array is None:
            mapping_array = np.zeros_like(mask_array, dtype=np.uint32)
    
        # Chunkification
        gpu_traj_memory_limit = (device.get_info(cl.device_info.GLOBAL_MEM_SIZE) 
                                 *info.gpu_memory_limit_pc)//100
        full_traj_memory_request = (n_seed_points*np.dtype(np.uint8).itemsize
                                    *info.max_n_steps*2)
        n_chunks_required = max(1,int(np.ceil(
                            full_traj_memory_request/gpu_traj_memory_limit)) )
                
        vprint(self.verbose,
               'GPU/OpenCL device global memory limit for streamline trajectories: {}' 
                  .format(neatly(gpu_traj_memory_limit)))
        vprint(self.verbose,
               'GPU/OpenCL device memory required for streamline trajectories: {}'
                  .format(neatly(full_traj_memory_request)), end='')
        vprint(self.verbose,' => {}'.format('no need to chunkify' if n_chunks_required==1
                        else 'need to split into {} chunks'.format(n_chunks_required) ))
        self.choose_chunks(n_chunks_required)
        
        # Do integrations on the GPU
        self.gpu_compute_trajectories( device, context, queue, cl_kernel_source )
            
        # Streamline stats
        pixel_size = info.pixel_size
        self.data.traj_stats_df = compute_stats(self.data.traj_length_array, 
                                                self.data.traj_nsteps_array,
                                                pixel_size, self.verbose)
        dds = self.data.traj_stats_df['ds']['downstream','mean']
        uds = self.data.traj_stats_df['ds']['upstream','mean']
            
        # Done
        vprint(self.verbose,'...done')
    
    def choose_chunks(self, n_chunks):
        """
        Compute lists of parameters needed to carry out GPU/OpenCL device computations
        in chunks.
        
        Args:
            n_chunks (int):
            
        """
        n_seed_points        = self.info.n_seed_points
        n_padded_seed_points = self.info.n_padded_seed_points
        n_work_items         = self.info.n_work_items
        
        self.chunk_size = int(np.round(n_padded_seed_points/n_chunks))
        n_global = n_padded_seed_points
        chunk_list = [[chunk_idx, chunk,
                       min(n_seed_points,chunk+self.chunk_size)-chunk, self.chunk_size] 
                       for chunk_idx,chunk 
                        in enumerate(range(0,n_global,self.chunk_size))]            
    
        trace_do_list = [[self.do_trace_downstream, 'Downstream:', 0, np.float32(+1.0)]]\
                      + [[self.do_trace_upstream,   'Upstream:  ', 1, np.float32(-1.0)]]
        self.trace_do_chunks = [td+chunk for td in trace_do_list for chunk in chunk_list]
            
        vprint(self.verbose,'Total number of kernel instances: {0:,}'
                    .format(n_global))
        vprint(self.verbose,'Number of chunks = seed point array divisor:', n_chunks)   
        vprint(self.verbose,'Chunk size = number of kernel instances per chunk: {0:,}'
                    .format(self.chunk_size))   
    
    def gpu_compute_trajectories( self, device, context, queue, cl_kernel_source ):
        """
        Carry out GPU/OpenCL device computations in chunks.
        This function is the basic wrapper interfacing Python with the GPU/OpenCL device.
        
        The tasks undertaken by this function are to:
            1) prepare Numpy data arrays and their corresponding PyOpenCL buffers
            2) iterate over each OpenCL computation chunk
            3) postprocess the streamline count and length arrays
           
        Chunk iteration involves:
            1) building the CL program
            2) preparing the CL kernel
            3) enqueueing kernel events (inc. passing data to CL device via buffers)
            4) reporting kernel status
            5) enqueueing data returns from CL device to CPU
            6) parsing returned streamline trajectories into master np array
            7) parsing returned streamline array (slt, slc) chunks into master np arrays
    
        Args:
            device (pyopencl.Device):
            context (pyopencl.Context):
            queue (pyopencl.CommandQueue):
            cl_kernel_source (str):
            

        """
        # Shorthand
        info                = self.info
        seed_point_array    = self.data.seed_point_array
        mask_array          = self.data.mask_array
        uv_array            = self.data.uv_array
        mapping_array       = self.data.mapping_array
        
        # Prepare memory, buffers 
        streamline_arrays_list = [[],[]]
        ns = info.n_seed_points
        nl = info.max_n_steps
        traj_nsteps_array  = np.zeros([ns,2], dtype=np.uint16)
        traj_length_array  = np.zeros([ns,2], dtype=np.float32)
        # Chunk-sized temporary arrays
        # Use "bag o' bytes" buffer for huge trajectories array. Write (by GPU) only.
        chunk_trajcs_array = np.zeros([self.chunk_size,nl,2], dtype=np.int8)
        chunk_nsteps_array = np.zeros([self.chunk_size], dtype=traj_nsteps_array.dtype)
        chunk_length_array = np.zeros([self.chunk_size], dtype=traj_length_array.dtype)
        array_dict = { 'seed_point':   {'array': seed_point_array,  'rwf': 'RO'},
                       'mask':         {'array': mask_array,        'rwf': 'RO'}, 
                       'uv':           {'array': uv_array,          'rwf': 'RO'}, 
                       'mapping':      {'array': mapping_array,     'rwf': 'RW'}, 
                       'chunk_trajcs': {'array': chunk_trajcs_array,'rwf': 'WO'},
                       'chunk_nsteps': {'array': chunk_nsteps_array,'rwf': 'WO'}, 
                       'chunk_length': {'array': chunk_length_array,'rwf': 'WO'}, 
                       'traj_nsteps':  {'array': traj_nsteps_array, 'rwf': 'NB'}, 
                       'traj_length':  {'array': traj_length_array, 'rwf': 'NB'} }
        buffer_dict = pocl.prepare_buffers(context, array_dict, self.verbose)
             
        # Downstream and upstream passes aka streamline integrations from
        #   chunks of seed points aka subsets of the total set
        n_work_items = info.n_work_items
        chunk_size_factor = info.chunk_size_factor
        max_time_per_kernel = info.max_time_per_kernel
        for downup_str, downup_idx, downup_sign, chunk_idx, \
            seeds_chunk_offset, n_chunk_seeds, n_chunk_ki in [td[1:] \
                for td in self.trace_do_chunks if td[0]]:
            vprint(self.verbose,
                   '{0} downup={1} sgn(uv)={2:+} chunk={3} seeds: {4}+{5} => {6:}'
                       .format(downup_str, downup_idx, downup_sign, chunk_idx,
                               seeds_chunk_offset, n_chunk_seeds, 
                               seeds_chunk_offset+n_chunk_seeds))
    
            # Specify this integration job's parameters
            global_size = [n_chunk_ki,1]
            vprint(self.verbose,
                   'Seed point buffer size = {}*8 bytes'
                   .format(buffer_dict['seed_point'].size/8))
            local_size = [info.n_work_items,1]
            info.downup_sign = downup_sign
            info.seeds_chunk_offset = seeds_chunk_offset
            
            ##################################
            
            # Compile the CL code
            compile_options = pocl.set_compile_options(info, 'INTEGRATE_TRAJECTORY', 
                                                       downup_sign=downup_sign)
            vprint(self.gpu_verbose,'Compile options:\n',compile_options)
            with warnings.catch_warnings():
                warnings.simplefilter("ignore")
                program = cl.Program(context,
                                     cl_kernel_source).build(options=compile_options)
            pocl.report_build_log(program, device, self.gpu_verbose)
            # Set the GPU kernel
            kernel = program.integrate_trajectory
            
            # Designate buffered arrays
            kernel.set_args(*list(buffer_dict.values()))
            kernel.set_scalar_arg_dtypes( [None]*len(buffer_dict) )
            
            # Trace the streamlines on the GPU    
            vprint(self.gpu_verbose,
                '#### GPU/OpenCL computation: {0} work items... ####'
                .format(global_size[0]))    
            pocl.report_kernel_info(device,kernel,self.gpu_verbose)
            elapsed_time \
                = pocl.adaptive_enqueue_nd_range_kernel(queue, kernel, global_size, 
                                               local_size, n_work_items,
                                               chunk_size_factor=chunk_size_factor,
                                               max_time_per_kernel=max_time_per_kernel,
                                               verbose=self.gpu_verbose,
                                                vprogress=self.vprogress )
            vprint(self.gpu_verbose,
                   '#### ...elapsed time for {1} work items: {0:.3f}s ####'
                   .format(elapsed_time,global_size[0]))
            queue.finish()   
            
            # Copy GPU-computed results back to CPU
            for array_info in array_dict.items():
                if 'W' in array_info[1]['rwf']:
                    cl.enqueue_copy(queue, array_info[1]['array'], 
                                    buffer_dict[array_info[0]])
                    queue.finish()
    
            ##################################
                            
            # This is part going to be slow...
            for traj_nsteps,traj_vec in \
                zip(chunk_nsteps_array[:n_chunk_seeds], 
                    chunk_trajcs_array[:n_chunk_seeds]): 
                # Pair traj point counts with traj vectors
                #   over the span of seed points in this chunk
                streamline_arrays_list[downup_idx] += [  traj_vec[:traj_nsteps].copy()  ]
                # Add this traj vector nparray to the list of streamline nparrays
            
            # Fetch number of steps (integration points) per trajectory, and the lengths,
            #   and transfer the streamline trajectories to a compact np array of arrays
            traj_nsteps_array[seeds_chunk_offset:(seeds_chunk_offset+n_chunk_seeds),
                              downup_idx] = chunk_nsteps_array[:n_chunk_seeds].copy()
            traj_length_array[seeds_chunk_offset:(seeds_chunk_offset+n_chunk_seeds),
                              downup_idx] = chunk_length_array[:n_chunk_seeds].copy()
        
        vprint(self.verbose,'Building streamlines compressed array')
        for downup_idx in [0,1]:
            streamline_arrays_list[downup_idx] \
                = np.array(streamline_arrays_list[downup_idx])
        vprint(self.verbose,
               'Streamlines actual array allocation:  size={}'.format(neatly(
           np.sum(traj_nsteps_array[:,:])*np.dtype(chunk_trajcs_array.dtype).itemsize)))
       
       # Do copy() to force array truncation rather than return of a truncated view
        self.data.streamline_arrays_list = streamline_arrays_list[0:ns].copy()
        self.data.traj_nsteps_array      = traj_nsteps_array[0:ns].copy()
        self.data.traj_length_array      = traj_length_array[0:ns].copy()
        
    
    #     vprint(self.verbose,'Array sizes:\n',
    #            'ROI-type =', mask_array.shape, '\n',
    #            'uv =',       uv_array.shape)
    #     vprint(self.verbose,'Streamlines virtual array allocation:  ',
    #                  '   dims={0}'.format(
    #                      (n_seed_points, chunk_trajcs_array.shape[1],2)), 
    #                  '  size={}'.format(neatly(
    #                      n_seed_points*chunk_trajcs_array.shape[1]*2
    #                         *np.dtype(chunk_trajcs_array.dtype).itemsize)))
    #     vprint(self.verbose,'Streamlines array allocation per chunk:',
    #                      '   dims={0}'.format(chunk_trajcs_array.shape), 
    #                      '  size={}'.format(neatly(
    #       chunk_trajcs_array.size*np.dtype(chunk_trajcs_array.dtype).itemsize)))