Technical Articles
Resume error suspended BPM processes via job implementation
Not every BPM process instance will be executed successfully every time during the operation of a productive Netweaver system. Especially when your BPM process works in a context of an integration scenario. The reasons for such issues can be complex, e.g. unavailability of third-party systems, master data inconsistency, network issues, internal issues, bugs or others. The most discovered effect is that in case of an exception e.g. in an automated activity the process will be suspended with status error. And now the process execution will be paused and only be continued when a business administrator will resume the process or the Netweaver system will be restarted. But you cannot restart every time a productive system in a end user or business critical environment.
Inside of this article I will show you an easy job implementation where your can resume such error suspended processes via NWA scheduler.
Solution
The solution for the known issue is a job implementation that select your error suspended processes, triggers several JMS messages (for a parallel execution). These messages will be consumed by a message driven bean (MDB) that resume these process instances via BPM API.
Hint: The job and the MDB should be executed with an user who has BPM administrator authorisations. I recommend a technical user for that.
Realization
- Create a job definition that can be scheduled via NWA schedulerHint: With the job parameters it is possible to schedule on the one hand a list of specific process instances (if blank than all suspended processes will be continued) and on the other hand a simulation of the execution. Simulation means that the job calculate the instances but the process will not be continued at the end of the execution code. This flag is to check if the implementation work correctly without the final step of resuming.
<?xml version="1.0" encoding="UTF-8" standalone="no"?> <job-definitions> <job-definition description="Job to resume Error Suspended Processes" name="ErrorSuspendedProcessResumeJob" retention-period="14"> <job-definition-parameter name="ErrorSuspendedProcessIds" data-type="String" data-default="" nullable="Y" description="List of root process ids from error suspended processes comma separated" direction="IN" /> <job-definition-parameter name="Simulate" data-type="Boolean" data-default="true" nullable="N" description="This flag indicates a test run" direction="IN" /> </job-definition> </job-definitions>
- Implement the job implementation who will be executed by NWA scheduler and triggers the JMS messages.
package <Vendor>.posystem.scheduler; import java.text.MessageFormat; import java.util.ArrayList; import java.util.Arrays; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map.Entry; import java.util.SortedMap; import java.util.TreeMap; import java.util.function.Predicate; import java.util.logging.Level; import java.util.logging.Logger; import javax.ejb.ActivationConfigProperty; import javax.ejb.EJB; import javax.ejb.MessageDriven; import javax.jms.MessageListener; import <Vendor>.common.facades.TechnicalDBProviderFacadeLocal; import <Vendor>.common.utility.jms.JMSContext; import <Vendor>.common.utility.jms.Parallelizer; import <Vendor>.common.utility.jms.ResponseData; import <Vendor>.posystem.properties.ApplicationPropertyReaderLocal; import com.sap.consulting.abpm.common.utils.DurationFormatter; import com.sap.consulting.abpm.common.utils.JndiUtils; import com.sap.consulting.abpm.core.bl.common.ABPMCoreConstants; import com.sap.scheduler.runtime.JobContext; import com.sap.scheduler.runtime.mdb.MDBJobImplementation; import com.sap.tc.logging.Location; import com.sap.tc.logging.Severity; import com.sap.tc.logging.SimpleLogger; /** * Message-Driven Bean implementation class for: ErrorSuspendedProcessResumeJob */ @MessageDriven(activationConfig = { @ActivationConfigProperty(propertyName = "messageSelector", propertyValue = "JobDefinition='ErrorSuspendedProcessResumeJob'"), @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue") }, messageListenerInterface = MessageListener.class) public class ErrorSuspendedProcessResumeJob extends MDBJobImplementation { private static final long serialVersionUID = 6702613557375408791L; private static final Location LOGGER = Location.getLocation(BATaskResumeJob.class); private static final TechnicalDBProviderFacadeLocal technicalDBProviderFacade = JndiUtils.getEjbByInterface(TechnicalDBProviderFacadeLocal.class); private Predicate<String> emptyPredicate = new Predicate<String>() { @Override public boolean test(String content) { if (content.matches(Constants.EMPTY)) { return true; } return false; } }; @EJB private ApplicationPropertyReaderLocal applicationPropertyReader; @Override public void onJob(JobContext jobContext) throws Exception { SimpleLogger.trace(Severity.DEBUG, LOGGER, MessageFormat.format(ABPMCoreConstants.TEXT_STARTED, this.getClass().getSimpleName())); Date start = new Date(); short returnCode = 0; Integer processesSizeFirst = 0; Logger jobLogger = jobContext.getLogger(); Boolean simulate = jobContext.getJobParameter(Constants.SIMULATE_PARAM).getBooleanValue(); String errorSuspendedProcessIds = jobContext.getJobParameter(Constants.ERROR_SUSPENDED_PROCESS_IDS).getStringValue(); List<String> processIdsToBeProcessed = new ArrayList<String>(); String simulateInfo = MessageFormat.format("{0} runs with simulate = {1} ...", this.getClass().getSimpleName(), simulate); SimpleLogger.trace(Severity.DEBUG, LOGGER, simulateInfo); jobLogger.log(Level.INFO, simulateInfo); // collect processing items if (null == errorSuspendedProcessIds || (null != errorSuspendedProcessIds && errorSuspendedProcessIds.isEmpty())) { processIdsToBeProcessed = technicalDBProviderFacade.executeNativeSQLSelectOnDb(Constants.SELECT_ERROR_SUSPENDED_ROOT_PROCESSES, Constants.DB_COLUMN_ID); } else { errorSuspendedProcessIds = errorSuspendedProcessIds.replaceAll(Constants.REGEX_WHITESPACES, Constants.EMPTY); processIdsToBeProcessed.addAll(Arrays.asList(errorSuspendedProcessIds.split(Constants.COMMA))); processIdsToBeProcessed.removeIf(emptyPredicate); } for (int i = 0; i < processIdsToBeProcessed.size(); i++) { // change content from <rootProcessId>, e.g. // 6ADDF293D3CE11E9C3AE00002C02DFBA // to <rootProcessId>#<simulate>, e.g. // 6addf293d3ce11e9c3ae00002c02dfba#true processIdsToBeProcessed.set(i, processIdsToBeProcessed.get(i).toLowerCase().concat(Constants.HASH).concat(simulate.toString())); } jobLogger.log(Level.INFO, MessageFormat.format("Found {0} error suspended processes to be processed.", processIdsToBeProcessed.size())); processesSizeFirst = processIdsToBeProcessed.size(); JMSContext jmsCtx = initializeJmsContext(processIdsToBeProcessed, jobContext.getJob().getId().toString()); jobLogger.log(Level.INFO, MessageFormat.format("BlockSize: {0}, MaxWaitingTimeout: {1}", jmsCtx.getBlockSize(), jmsCtx.getMaxWaitingTimeout())); String parallelSummary = Parallelizer.doParallel(jmsCtx); jobLogger.log(Level.INFO, parallelSummary); // prepare job logger output StringBuffer successfullyBuffer = new StringBuffer(); if (!jmsCtx.getSuccessfullyProcessedItems().isEmpty()) { SortedMap<String, ResponseData> sortedMap = new TreeMap<String, ResponseData>(jmsCtx.getSuccessfullyProcessedItems()); int i = 0; for (String aEntry : sortedMap.keySet()) { i++; String[] idParts = aEntry.toLowerCase().split(Constants.HASH); successfullyBuffer.append(MessageFormat.format("{0}, ", (0 < idParts.length) ? idParts[0] : "Unknown error suspended process id")); if (i % 5 == 0) { successfullyBuffer.append("\n"); } } successfullyBuffer.deleteCharAt(successfullyBuffer.length() - 2); } StringBuffer errorBuffer = new StringBuffer(); if (!jmsCtx.getErroneousProcessedItems().isEmpty()) { SortedMap<String, ResponseData> sortedMap = new TreeMap<String, ResponseData>(jmsCtx.getErroneousProcessedItems()); for (Entry<String, ResponseData> aEntry : sortedMap.entrySet()) { if (returnCode < aEntry.getValue().getReturnCode()) { returnCode = aEntry.getValue().getReturnCode(); } String[] idParts = aEntry.getKey().toLowerCase().split(Constants.HASH); errorBuffer.append(MessageFormat.format("ErrorSuspendedProcessId: {0}, ReturnCode: {1}, Reason: {2},\n", (0 < idParts.length) ? idParts[0] : "Unknown error suspended process id", aEntry.getValue().getReturnCode(), (null != aEntry.getValue().getReturnText()) ? aEntry.getValue().getReturnText() : ((null != aEntry.getValue().getException()) ? aEntry.getValue().getException().getMessage() : "Unknown reason\n"))); } errorBuffer.deleteCharAt(errorBuffer.length() - 2); } jobLogger.log(Level.INFO, MessageFormat.format(Constants.TEXT_ERROR_SUSPENDED_PROCESS_RESUME_SUMMARY, jmsCtx.getSuccessfullyProcessedItems().size(), successfullyBuffer, jmsCtx.getErroneousProcessedItems().size(), errorBuffer)); jobLogger.log(Level.INFO, MessageFormat.format("{0} error suspended processes were successfully resumed", jmsCtx.getSuccessfullyProcessedItems().size())); jobLogger.log(Level.INFO, MessageFormat.format(Constants.TEXT_ERROR_SUSPENDED_PROCESS_RESUME_BACKLOG_OPEN, processesSizeFirst - jmsCtx.getSuccessfullyProcessedItems().size())); SimpleLogger.trace(Severity.DEBUG, LOGGER, MessageFormat.format("{0} finished.", this.getClass().getSimpleName())); Date end = new Date(); long duration = end.getTime() - start.getTime(); jobLogger.log(Level.INFO, MessageFormat.format(ABPMCoreConstants.TEXT_ENDED_AFTER, this.getClass().getSimpleName(), DurationFormatter.formatDuration(duration))); SimpleLogger.trace(Severity.DEBUG, LOGGER, MessageFormat.format(ABPMCoreConstants.TEXT_TERMINATED, this.getClass().getSimpleName())); // set job return code jobContext.setReturnCode(returnCode); } private JMSContext initializeJmsContext(List<String> idsToBeProcessed, String correlationId) { JMSContext jmsCtx = new JMSContext(); jmsCtx.setItemsToBeProcessed(idsToBeProcessed); jmsCtx.setCorrelationId(correlationId); jmsCtx.setBlockSize(applicationPropertyReader.readErrorSuspendedProcessResumeBlockSize()); jmsCtx.setMaxWaitingTimeout(applicationPropertyReader.readErrorSuspendedProcessResumeMaxWaitingTimeout()); jmsCtx.setSuccessfullyProcessedItems(new HashMap<String, ResponseData>()); jmsCtx.setErroneousProcessedItems(new HashMap<String, ResponseData>()); jmsCtx.setTriggerQueueJndiName(Constants.ERROR_SUSPENDED_PROCESS_RESUME_TRIGGER_QUEUE); jmsCtx.setResponseQueueJndiName(Constants.ERROR_SUSPENDED_PROCESS_RESUME_RESPONSE_QUEUE); jmsCtx.setErrorQueueJndiName(Constants.ERROR_SUSPENDED_PROCESS_RESUME_ERROR_QUEUE); jmsCtx.setTriggerQueueText(Constants.TEXT_ERROR_SUSPENDED_PROCESS_RESUME_TRIGGER_JMS_MSG); jmsCtx.setResponseQueueText(Constants.TEXT_ERROR_SUSPENDED_PROCESS_RESUME_RESPONSE_JMS_MSG); jmsCtx.setErrorQueueText(Constants.TEXT_ERROR_SUSPENDED_PROCESS_RESUME_ERROR_JMS_MSG); jmsCtx.setUnusedItemsText(Constants.TEXT_ERROR_SUSPENDED_PROCESS_RESUME_UNUSED_JMS_MSG); jmsCtx.setSummaryText(Constants.TEXT_ERROR_SUSPENDED_PROCESS_RESUME_SUMMARY); jmsCtx.setBacklogOpenText(Constants.TEXT_ERROR_SUSPENDED_PROCESS_RESUME_BACKLOG_OPEN); return jmsCtx; } }
Constants:
public static final String SIMULATE_PARAM = "Simulate"; public static final String ERROR_SUSPENDED_PROCESS_IDS = "ErrorSuspendedProcessIds"; public static final String DB_COLUMN_ID = "ID"; public static final String REGEX_WHITESPACES = "\\s+"; public static final String EMPTY = ""; public static final String COMMA = ","; public static final String HASH = "#"; public static final String SELECT_ERROR_SUSPENDED_ROOT_PROCESSES = "SELECT ID FROM BPEM_IDX_OBJ WHERE INDEX_TYPE = 10 AND IN_ERROR = 1 AND LIFECYCLE_STATUS = 60 AND REFERENCE_ID IS NULL"; public static final String ERROR_SUSPENDED_PROCESS_RESUME_TRIGGER_QUEUE = "jmsqueues/<Vendor>/ResumeErrorSuspendedProcessesTriggerQueue"; public static final String ERROR_SUSPENDED_PROCESS_RESUME_RESPONSE_QUEUE = "jmsqueues/<Vendor>/ResumeErrorSuspendedProcessesResponseQueue"; public static final String ERROR_SUSPENDED_PROCESS_RESUME_ERROR_QUEUE = "jmsqueues/<Vendor>/ResumeErrorSuspendedProcessesErrorQueue"; public static final String TEXT_ERROR_SUSPENDED_PROCESS_RESUME_TRIGGER_JMS_MSG = "Resume ErrorSuspendedProcesses Trigger JMS messages (amount:{1}): \n{0}\n"; public static final String TEXT_ERROR_SUSPENDED_PROCESS_RESUME_RESPONSE_JMS_MSG = "Resume ErrorSuspendedProcesses Response JMS messages (amount:{1}): \n{0}\n"; public static final String TEXT_ERROR_SUSPENDED_PROCESS_RESUME_ERROR_JMS_MSG = "Resume ErrorSuspendedProcesses Error JMS messages (amount:{1}): \n{0}\n"; public static final String TEXT_ERROR_SUSPENDED_PROCESS_RESUME_UNUSED_JMS_MSG = "Resume ErrorSuspendedProcesses Unused JMS messages (amount:{1}): \n{0}\n"; public static final String TEXT_ERROR_SUSPENDED_PROCESS_RESUME_SUMMARY = "{0} error suspended processes were resumed: \n{1}\n{2} ErrorSuspendedProcesses were not resumed: \n{3}\n"; public static final String TEXT_ERROR_SUSPENDED_PROCESS_RESUME_BACKLOG_OPEN = "{0} error suspended processes not resumed and in an open backlog state";
Parallelizer (this class handles the connection and the send of the JMS messages:
package <Vendor>.common.utility.jms; import java.text.MessageFormat; import java.util.ArrayList; import java.util.Calendar; import java.util.GregorianCalendar; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import javax.xml.bind.JAXBException; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.ListUtils; import org.apache.commons.collections4.Transformer; import <Vendor>.common.utility.PropertyProviderLocal; import com.sap.consulting.abpm.common.utils.JndiUtils; import com.sap.consulting.abpm.core.bl.common.ABPMCoreConstants; import com.sap.consulting.abpm.core.bl.utils.MarshallerUtils; import com.sap.tc.logging.Location; import com.sap.tc.logging.Severity; import com.sap.tc.logging.SimpleLogger; public class Parallelizer { private static final Location LOGGER = Location.getLocation(Parallelizer.class); private static PropertyProviderLocal commonPropertyProvider = JndiUtils.getEjbByInterface(PropertyProviderLocal.class); public static String doParallel(JMSContext jmsCtx) throws JAXBException { List<String> itemsToBeProcessed = jmsCtx.getItemsToBeProcessed(); Map<String, ResponseData> successfullyProcessedItems = jmsCtx.getSuccessfullyProcessedItems(); Map<String, ResponseData> erroneousProcessedItems = jmsCtx.getErroneousProcessedItems(); List<List<String>> itemIdPartitions = ListUtils.partition(new ArrayList<String>(itemsToBeProcessed), jmsCtx.getBlockSize()); List<String> triggerJMSMessages = new ArrayList<String>(); List<String> responseJMSMessages = new ArrayList<String>(); List<String> errorJMSMessages = new ArrayList<String>(); List<ItemData> unsentItems = new ArrayList<ItemData>(); Map<String, String> unsentJMSMessages = new HashMap<String, String>(); String jmsPrincipal = commonPropertyProvider.readJMSPrincipal(); String jmsCredentials = commonPropertyProvider.readJMSCredentials(); SimpleLogger.trace(Severity.DEBUG, LOGGER, "itemIdPartitions.size():" + itemIdPartitions.size()); // sending lists via JMS Message for (List<String> anItemIdPartition : itemIdPartitions) { List<ItemData> itemList = new ArrayList<ItemData>(); for (String itemId : anItemIdPartition) { itemList.add(new ItemData(itemId)); } try { ItemDataList jmsMessageItemDataList = (null != jmsCtx.getAction()) ? new ItemDataList(itemList, jmsCtx.getAction()) : new ItemDataList(itemList); triggerJMSMessages.add(JMSMessageHelper.sendJMSMessage(jmsCtx.getTriggerQueueJndiName(), jmsPrincipal, jmsCredentials, MarshallerUtils.marshallObject(jmsMessageItemDataList), jmsCtx.getCorrelationId())); SimpleLogger.trace(Severity.DEBUG, LOGGER, "triggerJMSMessages!"); } catch (Exception e) { ResponseData responseData = new ResponseData(ABPMCoreConstants.RETURN_CODE_JMS_MESSAGE_NOT_SEND, e.getMessage() + ABPMCoreConstants.TEXT_LF); for (ItemData anItem : itemList) { anItem.setResponseData(responseData); } unsentItems.addAll(itemList); SimpleLogger.trace(Severity.DEBUG, LOGGER, "triggerJMSMessages - unsent"); itemsToBeProcessed.removeAll(CollectionUtils.collect(itemList, listTransformer)); } } SimpleLogger.trace(Severity.DEBUG, LOGGER, "triggerJMSMessagesSize:" + triggerJMSMessages.size() + ", unsentItemsSize:" + unsentItems.size()); // receive JMS message only in case inProgressIdList has // content if (!itemsToBeProcessed.isEmpty()) { GregorianCalendar inTheFuture = new GregorianCalendar(); inTheFuture.add(Calendar.MINUTE, jmsCtx.getMaxWaitingTimeout()); // collect response data for transfered items do { try { Thread.sleep(2000); } catch (InterruptedException e) { // maybe but doesn't matter } // receive response messages try { Map<String, String> responseMessages = JMSMessageHelper.receiveJMSMessage(jmsCtx.getResponseQueueJndiName(), jmsPrincipal, jmsCredentials, jmsCtx.getCorrelationId()); if (!responseMessages.isEmpty()) { responseJMSMessages.addAll(responseMessages.keySet()); unsentJMSMessages.putAll(ResponseClassifier.groupingMessageContentIntoMaps(itemsToBeProcessed, successfullyProcessedItems, erroneousProcessedItems, responseMessages)); } } catch (Exception e) { // maybe but doesn't matter SimpleLogger.traceThrowable(Severity.DEBUG, LOGGER, MessageFormat.format("Exception receiveJMSMessage() from {0}", jmsCtx.getResponseQueueJndiName()), e); } // receive error messages try { Map<String, String> errorMessages = JMSMessageHelper.receiveJMSMessage(jmsCtx.getErrorQueueJndiName(), jmsPrincipal, jmsCredentials, jmsCtx.getCorrelationId()); if (!errorMessages.isEmpty()) { errorJMSMessages.addAll(errorMessages.keySet()); unsentJMSMessages.putAll(ResponseClassifier.groupingMessageContentIntoMaps(itemsToBeProcessed, successfullyProcessedItems, erroneousProcessedItems, errorMessages)); } } catch (Exception e) { // maybe but doesn't matter SimpleLogger.traceThrowable(Severity.DEBUG, LOGGER, MessageFormat.format("Exception receiveJMSMessage() from {0}", jmsCtx.getErrorQueueJndiName()), e); } } while (!(itemsToBeProcessed.isEmpty() || new GregorianCalendar().after(inTheFuture))); } // group unsent items into maps for (ItemData anUnsentItem : unsentItems) { ResponseClassifier.groupByReturnCode(successfullyProcessedItems, erroneousProcessedItems, anUnsentItem.getItemId(), anUnsentItem.getResponseData()); } // for all remainder if (!itemsToBeProcessed.isEmpty()) { ResponseData responseData = new ResponseData(ABPMCoreConstants.RETURN_CODE_UNKNOWN, "Status unknown"); for (String remaindProcessInstance : itemsToBeProcessed) { ResponseClassifier.groupByReturnCode(successfullyProcessedItems, erroneousProcessedItems, remaindProcessInstance, responseData); } } // prepare logging summary of parallel item processing StringBuffer outputBuffer = new StringBuffer(); if (!triggerJMSMessages.isEmpty()) { StringBuffer triggerBuffer = new StringBuffer(); for (String aTriggerJMSMessage : triggerJMSMessages) { triggerBuffer.append(aTriggerJMSMessage + ", "); } triggerBuffer.deleteCharAt(triggerBuffer.length() - 2); outputBuffer.append(MessageFormat.format(jmsCtx.getTriggerQueueText(), triggerBuffer.toString(), triggerJMSMessages.size())); } if (!responseJMSMessages.isEmpty()) { StringBuffer responseBuffer = new StringBuffer(); for (String aResponseJMSMessage : responseJMSMessages) { responseBuffer.append(aResponseJMSMessage + ", "); } responseBuffer.deleteCharAt(responseBuffer.length() - 2); outputBuffer.append(MessageFormat.format(jmsCtx.getResponseQueueText(), responseBuffer.toString(), responseJMSMessages.size())); } if (!errorJMSMessages.isEmpty()) { StringBuffer errorBuffer = new StringBuffer(); for (String aErrorJMSMessage : errorJMSMessages) { errorBuffer.append(aErrorJMSMessage + ", "); } errorBuffer.deleteCharAt(errorBuffer.length() - 2); outputBuffer.append(MessageFormat.format(jmsCtx.getErrorQueueText(), errorBuffer.toString(), errorJMSMessages.size())); } if (!unsentJMSMessages.isEmpty()) { StringBuffer unusedJMSMessageBuffer = new StringBuffer(); for (Entry<String, String> anUnusedJMSMessageEntry : unsentJMSMessages.entrySet()) { unusedJMSMessageBuffer.append(MessageFormat.format("{0}: {1},\n", anUnusedJMSMessageEntry.getKey(), anUnusedJMSMessageEntry.getValue())); } unusedJMSMessageBuffer.deleteCharAt(unusedJMSMessageBuffer.length() - 2); outputBuffer.append(MessageFormat.format(jmsCtx.getUnusedItemsText(), unusedJMSMessageBuffer.toString(), unsentJMSMessages.size())); } return outputBuffer.toString(); } private static Transformer<ItemData, String> listTransformer = new Transformer<ItemData, String>() { @Override public String transform(ItemData itemData) { return itemData.getItemId(); } }; }
- Implement a MDB that consume these JMS messages and continue the process instances.
package <Vendor>.common.process; import java.net.URI; import java.text.MessageFormat; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import javax.annotation.security.DeclareRoles; import javax.annotation.security.RunAs; import javax.ejb.ActivationConfigProperty; import javax.ejb.EJB; import javax.ejb.MessageDriven; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; import javax.xml.bind.JAXBException; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import <Vendor>.common.utility.PropertyProviderLocal; import <Vendor>.common.utility.jms.ItemData; import <Vendor>.common.utility.jms.ItemDataList; import <Vendor>.common.utility.jms.JMSMessageHelper; import <Vendor>.common.utility.jms.ResponseData; import <Vendor>.posystem.scheduler.Constants; import com.sap.bpm.api.BPMFactory; import com.sap.bpm.helper.api.BPMIdHelper; import com.sap.bpm.helper.api.BPMIdType; import com.sap.bpm.pm.api.ProcessInstanceManager; import com.sap.consulting.abpm.core.bl.common.ABPMCoreConstants; import com.sap.consulting.abpm.core.bl.utils.MarshallerUtils; import com.sap.tc.logging.Location; import com.sap.tc.logging.Severity; import com.sap.tc.logging.SimpleLogger; /** * Message-Driven Bean implementation class for: ErrorSuspendedProcessResumeMDB */ @DeclareRoles(value = { "SUBSTITUTION_PROFILE_ACCESS" }) @RunAs("SUBSTITUTION_PROFILE_ACCESS") @MessageDriven(mappedName = "ResumeErrorSuspendedProcessesTriggerQueue", activationConfig = { @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue") }) public class ErrorSuspendedProcessResumeMDB implements MessageListener { private static final Location LOGGER = Location.getLocation(ErrorSuspendedProcessResumeMDB.class); private static final ProcessInstanceManager pim = BPMFactory.getProcessInstanceManager(); private static final BPMIdHelper idHelper = BPMFactory.getBPMIdHelper(); @EJB private PropertyProviderLocal commonPropertyProvider; public void onMessage(Message msg) { try { SimpleLogger.trace(Severity.DEBUG, LOGGER, MessageFormat.format(ABPMCoreConstants.TEXT_STARTED, this.getClass().getSimpleName())); SimpleLogger.trace(Severity.DEBUG, LOGGER, MessageFormat.format(ABPMCoreConstants.TEXT_JMS_MESSAGE_RECEIVED, msg.getJMSMessageID())); if (msg instanceof TextMessage) { TextMessage textMsg = (TextMessage) msg; String correlationId = textMsg.getJMSCorrelationID(); ItemDataList itemDataList = new ItemDataList(); try { itemDataList = MarshallerUtils.unmarshallIntoObject(textMsg.getText(), itemDataList); List<ItemData> itemDatas = itemDataList.getItemDataList(); Map<String, Pair<Boolean, ItemData>> errorSuspendedRootProcessIdToPairSI = new HashMap<String, Pair<Boolean, ItemData>>(); for (ItemData aItemData : itemDatas) { // <rootProcessId>#<simulate>, e.g. // 6addf293d3ce11e9c3ae00002c02dfba#true String[] idParts = aItemData.getItemId().split(Constants.HASH); errorSuspendedRootProcessIdToPairSI.put(idParts[0], new ImmutablePair<Boolean, ItemData>(Boolean.parseBoolean(idParts[1]), aItemData)); } for (Entry<String, Pair<Boolean, ItemData>> aErrorSuspendedRootProcessIdToPairSIEntry : errorSuspendedRootProcessIdToPairSI.entrySet()) { resumeErrorSuspendedProcess(aErrorSuspendedRootProcessIdToPairSIEntry); } } catch (JAXBException e) { SimpleLogger.traceThrowable(Severity.DEBUG, LOGGER, "JAXBException occured inside onMessage()", e); throw new RuntimeException(e); } String messageId = null; int retryCounter = 0; String jmsPrincipal = commonPropertyProvider.readJMSPrincipal(); String jmsCredentials = commonPropertyProvider.readJMSCredentials(); do { try { messageId = JMSMessageHelper.sendJMSMessage(Constants.ERROR_SUSPENDED_PROCESS_RESUME_RESPONSE_QUEUE, jmsPrincipal, jmsCredentials, MarshallerUtils.marshallObject(itemDataList), correlationId); } catch (Exception e) { // maybe but doesn't matter -> retry send SimpleLogger.traceThrowable(Severity.DEBUG, LOGGER, MessageFormat.format("Exception occured when sending JMS message into {0}", Constants.ERROR_SUSPENDED_PROCESS_RESUME_RESPONSE_QUEUE), e); retryCounter++; try { Thread.sleep(1000); } catch (InterruptedException e1) { // maybe but doesn't matter SimpleLogger.traceThrowable(Severity.DEBUG, LOGGER, "InterruptedException occured inside onMessage()", e1); } // if message could not be send ten times send message // into error queue if (10 <= retryCounter) { int errorRetryCounter = 0; do { try { messageId = JMSMessageHelper.sendJMSMessage(Constants.ERROR_SUSPENDED_PROCESS_RESUME_ERROR_QUEUE, jmsPrincipal, jmsCredentials, MarshallerUtils.marshallObject(itemDataList), correlationId); } catch (Exception e1) { SimpleLogger.traceThrowable(Severity.DEBUG, LOGGER, MessageFormat.format("Exception occured when sending JMS message into {0}", Constants.ERROR_SUSPENDED_PROCESS_RESUME_ERROR_QUEUE), e1); errorRetryCounter++; try { Thread.sleep(1000); } catch (InterruptedException e2) { // maybe but doesn't matter SimpleLogger.traceThrowable(Severity.DEBUG, LOGGER, "InterruptedException occured inside onMessage()", e2); } } } while (!(null != messageId || errorRetryCounter == 10)); } } } while (!(null != messageId || retryCounter == 10)); } else { SimpleLogger.trace(Severity.DEBUG, LOGGER, ABPMCoreConstants.TEXT_UNSUPPORTED_JMS_MESSAGE_TYPE); throw new RuntimeException(ABPMCoreConstants.TEXT_UNSUPPORTED_JMS_MESSAGE_TYPE); } } catch (JMSException e) { SimpleLogger.traceThrowable(Severity.DEBUG, LOGGER, "JMSException occured inside onMessage()", e); throw new RuntimeException(e); } finally { SimpleLogger.trace(Severity.DEBUG, LOGGER, MessageFormat.format(ABPMCoreConstants.TEXT_TERMINATED, this.getClass().getSimpleName())); } } private void resumeErrorSuspendedProcess(Entry<String, Pair<Boolean, ItemData>> aErrorSuspendedRootProcessIdToPairSIEntry) { try { URI errorSuspendedRootProcessId = idHelper.convertToUri(aErrorSuspendedRootProcessIdToPairSIEntry.getKey(), BPMIdType.ProcessInstanceId); if (!aErrorSuspendedRootProcessIdToPairSIEntry.getValue().getLeft()) { pim.resume(errorSuspendedRootProcessId); } aErrorSuspendedRootProcessIdToPairSIEntry.getValue().getRight().setResponseData(new ResponseData(ABPMCoreConstants.RETURN_CODE_EXECUTED_SUCCESSFULLY, MessageFormat.format("Resume of error suspended process id {0} was executed successfully!", aErrorSuspendedRootProcessIdToPairSIEntry.getKey()))); } catch (Exception e) { aErrorSuspendedRootProcessIdToPairSIEntry.getValue().getRight().setResponseData( new ResponseData(ABPMCoreConstants.RETURN_CODE_EXECUTED_WITH_ERROR, MessageFormat.format("Exception for resumeErrorSuspendedProcess(): {0}", e.getMessage()), e)); } } }
Add the following permission definition to ejb-j2ee-engine.xml and assign the server-role-name after deployment to a technical user (than the MDB will be executed with this user):
<security-permission> <security-role-map> <role-name>SUBSTITUTION_PROFILE_ACCESS</role-name> <server-role-name>INTERNAL_PROCESS_ORCHESTRATION</server-role-name> </security-role-map> </security-permission>
- Plan the job execution via NWA scheduler.
Conclusion
This job implementation resume error suspended processes in a real parallel JEE approach via JMS. This helps to reduce the business administration tasks for such stupid stuff where processes where only be tried to continue/resume. For sure if an issue for the same process instance occurs again and again than the business and/or technical administrator must investigate the root case in detail, but for short-term mistakes this resuming approach is very helpul.
Tip: When the NWA scheduler has executed this job it is possible to check the results inside of the job log.
Hi Oliver,
you didn't uploaded the source of all java classes (e.g. <Vendor>.common.utility.jms.JMSContext) . is it possible to get the complete source?
Best regards!
Tomislav
Hello Oliver,
in your solution, you use few abpm libraries:
Does it mean, that this solution works only with aBPM solution extensions ?
Thanks and best regards
Jan