Parallel processing made easy
Inspired by this blog, by @Bruno Esperança, I though I would share a useful, reusable class I developed for making parallel processing simple, by abstracting away and encapsulating all the technical stuff.
CLASS zcl_thread_handler DEFINITION PUBLIC FINAL CREATE PUBLIC . PUBLIC SECTION. TYPE-POOLS abap . CONSTANTS: c_default_group TYPE rzlli_apcl VALUE 'parallel_generators', "#EC NOTEXT c_task TYPE char6 VALUE 'PARALL'. "#EC NOTEXT METHODS: all_threads_are_finished RETURNING VALUE(r_empty) TYPE abap_bool, clear_thread IMPORTING !i_task TYPE char8, constructor IMPORTING !i_task_prefix TYPE char6 DEFAULT c_task !i_threads TYPE i !i_group TYPE rzlli_apcl DEFAULT c_default_group, handle_resource_failure, get_free_thread RETURNING VALUE(r_thread) TYPE char8 . PROTECTED SECTION. PRIVATE SECTION. TYPES: BEGIN OF ty_thread, thread TYPE char8, used TYPE abap_bool, END OF ty_thread . DATA: task_prefix TYPE char6, threads_list TYPE TABLE OF ty_thread WITH DEFAULT KEY, threads TYPE i, used_threads TYPE i, group TYPE rzlli_apcl. METHODS get_free_threads RETURNING VALUE(r_free_threads) TYPE i . ENDCLASS. CLASS zcl_thread_handler IMPLEMENTATION. METHOD get_free_threads. " Get number of free threads CALL FUNCTION 'SPBT_INITIALIZE' EXPORTING group_name = me->group IMPORTING free_pbt_wps = r_free_threads EXCEPTIONS invalid_group_name = 1 internal_error = 2 pbt_env_already_initialized = 3 currently_no_resources_avail = 4 no_pbt_resources_found = 5 cant_init_different_pbt_groups = 6 OTHERS = 7. CASE sy-subrc. WHEN 0. " Do nothing WHEN 3. " Already initialised - get current number of free threads CALL FUNCTION 'SPBT_GET_CURR_RESOURCE_INFO' IMPORTING free_pbt_wps = r_free_threads EXCEPTIONS internal_error = 1 pbt_env_not_initialized_yet = 2 OTHERS = 3. IF sy-subrc IS NOT INITIAL. " Something has gone seriously wrong, so end it here. MESSAGE ID sy-msgid TYPE 'X' NUMBER sy-msgno WITH sy-msgv1 sy-msgv2 sy-msgv3 sy-msgv4. ENDIF. WHEN OTHERS. " Something has gone seriously wrong, so end it here. MESSAGE ID sy-msgid TYPE 'X' NUMBER sy-msgno WITH sy-msgv1 sy-msgv2 sy-msgv3 sy-msgv4. ENDCASE. ENDMETHOD. METHOD all_threads_are_finished. r_empty = xsdbool( used_threads EQ 0 ). ENDMETHOD. METHOD clear_thread. READ TABLE me->threads_list WITH KEY used = abap_true thread = i_task ASSIGNING FIELD-SYMBOL(<thread>). <thread>-used = abap_false. SUBTRACT 1 FROM used_threads. ENDMETHOD. METHOD constructor. me->group = i_group. me->task_prefix = i_task_prefix. " No more than 100 threads IF i_threads GT 100. me->threads = 100. ELSEIF i_threads LE 0. me->threads = 1. ELSE. me->threads = i_threads. ENDIF. DATA(free_threads) = me->get_free_threads( ). " Ensure that no more than half of the free threads are used free_threads = free_threads / 2 + 1. IF free_threads LT me->threads. me->threads = free_threads. ENDIF. " Initialise threads DO me->threads TIMES. DATA threadn TYPE n LENGTH 2 VALUE '00'. INSERT VALUE #( thread = me->task_prefix && threadn used = abap_false ) INTO TABLE me->threads_list. ADD 1 TO threadn. ENDDO. ENDMETHOD. METHOD handle_resource_failure. DATA(free_threads) = me->get_free_threads( ). IF free_threads LE 1 AND me->threads GT 1. SUBTRACT 1 FROM me->threads. ENDIF. WAIT UP TO 5 SECONDS. " Long enough for the system to update WAIT UNTIL me->used_threads LT me->threads. " Now there's an available thread ENDMETHOD. METHOD get_free_thread. " Wait for a free thread WAIT UNTIL me->used_threads LT me->threads. " Get number of first free thread READ TABLE me->threads_list WITH KEY used = abap_false ASSIGNING field-symbol(<thread>). ADD 1 TO used_threads. <thread>-used = abap_true. r_thread = <thread>-thread. ENDMETHOD. ENDCLASS.
To use it, instantiate with a prefix to use for the task id, and the ideal number of threads you want to run in parallel. (The code automatically limits it to no more than half of the available thread on the appserver).
Start your loop that contains the call to the logic that you want to run in parallel. E.g.
LOOP AT ... me->do_stuff_in_parallel( ). ENDLOOP.
The first statement after the ENDLOOP should be
WAIT UNTIL me->handler->all_threads_are_finished( ).
In your do_stuff_in_parallel method, you have the call to the function module that does the work.
DATA(thread) = me->handler->get_free_thread( ). DATA errmsg TYPE char255. CALL FUNCTION 'Z_...' STARTING NEW TASK thread DESTINATION IN GROUP zcl_thread_handler=>c_default_group CALLING on_end_of_action ON END OF TASK EXPORTING ... EXCEPTIONS communication_failure = 1 MESSAGE errmsg system_failure = 2 MESSAGE errmsg resource_failure = 3.
Finally, in method on_end_of_action (which has a single importing parameter p_task type clike) in your main application, you receive the results
... DATA errmsg TYPE c LENGTH 255. RECEIVE RESULTS FROM FUNCTION 'Z_...' IMPORTING ... EXCEPTIONS communication_failure = 1 MESSAGE errmsg system_failure = 2 MESSAGE errmsg. IF sy-subrc IS NOT INITIAL. ...handle error ENDIF. " Free the thread for the next thread to run me->clear_thread( CONV char8( p_task ) ). ...handle receive logic
The task for writing ABAP Unit Tests is left to the reader!
After the CALL FUNCTION, you can use the handle_resource_failure method. This will cause a wait loop until there are enough resources again. I used this when I’d written a BW extractor, and the person who set up the process chains to run the extractor was running it like twenty times in parallel, with each of those requesting 20 slots.
As a result, although initially there were enough free processes, they rapidly ran out. It is an issue if you have many programs using parallel processing at the same time, because the different programs don’t communicate with each other at all – assuming they have the whole system to themselves.
It wouldn’t take much effort to address this design issue. However, I only encountered it once, and we cut the parallelism built in the the process chain, and let the extractor program handle all the parallelism.