CRM and CX Blogs by SAP
Stay up-to-date on the latest developments and product news about intelligent customer experience and CRM technologies through blog posts from SAP experts.
cancel
Showing results for 
Search instead for 
Did you mean: 
The catalog synchronization job publishes frequently updates about its progress and synchronization speed. The Solr indexing job however does not offer a such feature, although it would be very beneficial to better understand indexing performances. I faced recently a situation where the indexing job was not scaling: the more items were indexed, the longer the job ran to index one item. Without insight on the indexing speed, it was hard to know where to start investigations. Moreover, it is in general hard to estimate the finishing time of a Solr indexing job. To address these issues, I enhanced SAP Commerce, so that the indexing job would regularly update the statusCode attribute of the active CronJobHistory item with a message similar to the following one:
9,456,049 item(s) indexed within 14m 23s (10,957 items/s), 0 error(s)

Like for the catalog synchronization job, it then makes it possible to track live the Solr indexing job progresses through Backoffice.

Approach


In a nutshell, the Solr indexing job fetches the primary keys of the items to index, splits them into batches and indexes each batch in parallel. The approach is to implement the IndexerListener and IndexerBatchListener interfaces to intercept the batch end events and report the number of items being indexed in a given period. The hard part is that the batches are not connected to the indexing job as they run in their own session and in a different thread. Moreover, the job is not exposed to the batches. The only information connecting the indexing job to the batches is the indexing operation identifier. The logic to implement is consequently the following:

  • Intercept the indexing start event to store the job with its indexing operation identifier. Batches can then get access to the indexing job via the indexing operation identifier.

  • Intercept the batch end event to count the number of indexed items and refresh the indexing speed.

  • Intercept the indexing end event to publish the final indexing speed and remove any allocated resources like the mapping between the job and its indexing operation identifier.


Refreshing the indexing speed each time a batch finishes could be too much and slow down the system. Therefore, the proposed approach is to update only after a certain time interval (e.g. 1 minute like the catalog synchronization job).

Implementation


The most important aspect to focus on during the implementation was to measure as accurately as possible without slowing down the operations. Other said, precision can be sacrificed over performance. I therefore banned synchronization during the implementation and relied on the AtomicInteger and AtomicLong classes to prevent issues with concurrent updates. First, I needed a container holding the indexing statistics for an indexing job.
public class SolrIndexerCronJobStatistics {
private final SolrIndexerCronJobModel cronJob;
private final AtomicLong totalIndexedItems = new AtomicLong(0);
private final AtomicInteger totalErrors = new AtomicInteger(0);
private Date indexStartTime;
private Date indexEndTime;

SolrIndexerCronJobStatistics(final SolrIndexerCronJobModel cronJob) {
this.cronJob = cronJob;
setIndexStartTime(new Date());
}

public SolrIndexerCronJobModel getCronJob() {
return cronJob;
}

public long addIndexedItems(final int additionalIndexedItems) {
return this.totalIndexedItems.addAndGet(additionalIndexedItems);
}

public int addErrors(final int additionalErrors) {
return this.totalErrors.addAndGet(additionalErrors);
}

public long getTotalIndexedItems() {
return this.totalIndexedItems.get();
}

public int getTotalErrors() {
return this.totalErrors.get();
}

public Date getIndexStartTime() {
return indexStartTime;
}

public void setIndexStartTime(Date indexStartTime) {
this.indexStartTime = indexStartTime;
}

public Date getIndexEndTime() {
return indexEndTime;
}

public void setIndexEndTime(Date indexEndTime) {
this.indexEndTime = indexEndTime;
}
}

Then, the listener shall implement the logic described in the previous section. When a batch ends, the indexing job statistics are updated and if the time interval since the last refresh is elapsed, it updates the statusCode attribute of the active CronJobHistory item. Note that the modifiedTime attribute is updated, which traditionally is useless since SAP Commerce overwrites it when modelService.save() is executed. The reason for this odd line is to prevent parallel updates of the CronJobHistory item. Under high parallelism, batches will very likely ends at the same time and it can take couple milliseconds to save the CronJobHistory in the database and update the modifiedTime attribute.
public class SolrIndexerCronJobListener implements IndexerListener, IndexerBatchListener {
private static final Logger LOG = LogManager.getLogger(SolrIndexerCronJobListener.class);

private ModelService modelService;
private SessionService sessionService;

private final Map<Long, SolrIndexerCronJobStatistics> solrIndexerCronJobStatistics = new ConcurrentHashMap<>();
private long updateInterval = 1000 * 60 * 1;

@Override
public void beforeBatch(final IndexerBatchContext ctx) throws IndexerException {
// Nothing to do
}

@Override
public void afterBatch(final IndexerBatchContext ctx) throws IndexerException {
final int countIndexedItems = ctx.getItems().size();
updateSolrIndexerCronJobStatistics(ctx, countIndexedItems, 0);
}

protected void updateSolrIndexerCronJobStatistics(final IndexerBatchContext ctx, final int additionalIndexedItems, final int additionalErrors) {
final long indexOperationId = ctx.getIndexOperationId();
final SolrIndexerCronJobStatistics stats = solrIndexerCronJobStatistics.get(indexOperationId);
if (stats != null) {
stats.addIndexedItems(additionalIndexedItems);
stats.addErrors(additionalErrors);
updateSolrIndexerCronJobHistory(stats, false);
}
}

protected void updateSolrIndexerCronJobHistory(final SolrIndexerCronJobStatistics stats, final boolean force) {
final CronJobHistoryModel cronJobHistory = stats.getCronJob().getActiveCronJobHistory();
if (cronJobHistory != null) {
if (force || needsUpdate(cronJobHistory)) {
cronJobHistory.setModifiedtime(new Date());
final String statusLine = generateStatusLine(stats);
if (LOG.isInfoEnabled()) {
LOG.info("(" + stats.getCronJob().getCode() + ") " + statusLine);
}
cronJobHistory.setStatusLine(statusLine);
getModelService().save(cronJobHistory);
}
}
}

protected String generateStatusLine(final SolrIndexerCronJobStatistics stats) {
final long totalIndexedItems = stats.getTotalIndexedItems();
final int totalIndexErrors = stats.getTotalErrors();
final long totalIndexDuration =
(stats.getIndexEndTime() == null ? System.currentTimeMillis() : stats.getIndexEndTime().getTime())
- (stats.getIndexStartTime() == null ? stats.getCronJob().getStartTime() : stats.getIndexStartTime()).getTime();
final NumberFormat nf = NumberFormat.getIntegerInstance(Locale.ENGLISH);
return String.format("%s item(s) indexed within %s (%s items/s), %s error(s)",
nf.format(totalIndexedItems),
totalIndexDuration > 0 ? Duration.ofMillis(totalIndexDuration - (totalIndexDuration % 1000)).toString().substring(2).toLowerCase(Locale.ROOT) : "-",
nf.format((double) totalIndexedItems / (double)(totalIndexDuration / 1000)), nf.format(totalIndexErrors));
}

protected boolean needsUpdate(final CronJobHistoryModel cronJobHistory) {
return cronJobHistory.getModifiedtime().before(new Date(System.currentTimeMillis() - getUpdateInterval()));
}

@Override
public void afterBatchError(final IndexerBatchContext ctx) throws IndexerException {
updateSolrIndexerCronJobStatistics(ctx, 0, 1);
}

@Override
public void beforeIndex(final IndexerContext ctx) throws IndexerException {
hookSolrIndexerCronJob(ctx);
}

@Override
public void afterIndex(final IndexerContext ctx) throws IndexerException {
unhookSolrIndexerCronJob(ctx);
}

@Override
public void afterIndexError(final IndexerContext ctx) throws IndexerException {
unhookSolrIndexerCronJob(ctx);
}

protected void hookSolrIndexerCronJob(final IndexerContext ctx) {
final Object currentCronJob = getSessionService().getAttribute("currentCronJob");
if (currentCronJob instanceof SolrIndexerCronJobModel) {
final long indexOperationId = ctx.getIndexOperationId();
solrIndexerCronJobStatistics.put(indexOperationId, new SolrIndexerCronJobStatistics((SolrIndexerCronJobModel) currentCronJob));
}
}

protected void unhookSolrIndexerCronJob(final IndexerContext ctx) {
final Date indexEndTime = new Date();
final long indexOperationId = ctx.getIndexOperationId();
final SolrIndexerCronJobStatistics stats = solrIndexerCronJobStatistics.remove(indexOperationId);
if (stats != null) {
stats.setIndexEndTime(indexEndTime);
updateSolrIndexerCronJobHistory(stats, true);
}
}

protected ModelService getModelService() {
return modelService;
}

@Resource
public void setModelService(ModelService modelService) {
this.modelService = modelService;
}

protected SessionService getSessionService() {
return sessionService;
}

@Resource
public void setSessionService(SessionService sessionService) {
this.sessionService = sessionService;
}

public long getUpdateInterval() {
return updateInterval;
}

public void setUpdateInterval(long updateInterval) {
this.updateInterval = updateInterval;
}
}

Finally, the listener should be registered, which is achieved with the following Spring configuration.
<bean id="solrIndexerCronJobListener" class="com.discounttire.core.search.solrfacetsearch.indexer.listeners.SolrIndexerCronJobListener">
<property name="updateInterval" value="#{configurationService.configuration.getInt('solr.indexer.cronjob.status.updateInterval', 60000)}"/>
</bean>

<bean id="solrIndexerCronJobListenerDefinition" parent="solrListenerDefinition">
<property name="priority" value="2000"/>
<property name="listener" ref="solrIndexerCronJobListener"/>
</bean>

Limitations


First, this approach works only when the indexing is performed on one node only. It's the case today by default in SAP Commerce but it could change in the future. Secondly, you might see multiple log messages one after the other published about the same time. It's normal and due to the fact that updates to CronJobHistory are not synchronized to not slow down operations. Even though modifiedTime is updated manually, it can still happen that batches end at the same time and detect at the same time that CronJobHistory should be updated.

Conclusion


Tracking the speed of the Solr indexing job allowed me to discover, that my indexing job was not scaling because the indexing speed decreased over time. Once the reasons were identified and addressed, it helped to ensure the indexing job was indexing at constant speed. Then, it helped comparing more easily indexing speeds between environments and address infrastructure issues. Finally, it helped detecting early indexing performance issues in pre-production and production environments by comparing the actual indexing speed to the expected one.

Last but not least, if you wonder why I do not set the progress attribute in the active CronJobHistory item, it's simply because the total count of items to be indexed is not easily available. The context objects passed with the event do not expose this information and customizing the indexing framework to get this information is not simple. If someone has an idea, feel free to share here or to post your own solution referencing this one.