Skip to Content

Data Replication using Delta detection services.

 

If you want to detect any changes in items, we use delta detection services Delta detection exists as a standalone extension.

You can detect whatever the changes we have in items. who needs to react to changes done on item data inside the Platform, that changes will be compared with C4C data (streamID) by using some methods that are provided by hybris.

ChangeDetectionService: is used for finding changes by item or type, by type returns all items means any changes like new, modified,deleted items found.

ItemVersionMarker: is used to mark changes.

Stream awareness: which is taken by api, using that api we will provide proper StreamID.which find the changes that are given by strem.ItemVersionMarker holds additionally information about the stream id.

Detecting and Consuming Changes with Cronjob class:

First create a new c4ccustomer cronjob class which extends the AbstractJobPerformable class.

By using flexible search query get the all customers data from hybris.

protected static final String CUSTOMER_QUERY = "SELECT {item.PK} FROM {Customer AS item} WHERE {item.customerid} LIKE '00%'";
//Get all the customers
final FlexibleSearchQuery query = new FlexibleSearchQuery(CUSTOMER_QUERY);
final List<CustomerModel> allCustomersList = flexibleSearchService.<CustomerModel> search(query).getResult();
LOG.info("Total Customer Count: " + allCustomersList.size());

In allCustomersList object having list of hybris customers data.

DeltaDetection:

Detect the customers which were changed using change detection service.

taken for loop for detecting every individuval customer with c4c streamid.

for (final CustomerModel customer : allCustomersList)
{

final ItemChangeDTO changeCustomer = getChangeDetectionService().getChangeForExistingItem(customer,
"c4cCustomerStream");

LOG.info("changeCustomer " + changeCustomer);

final List<ItemChangeDTO> changeCustomerList = new ArrayList<ItemChangeDTO>();

}
  • There is no ItemVersionMarker saved yet, there should be a change detected for the new customer, execute the lines below to find the change.
  • You should see the ItemChangeDTO object containing all the necessary information about the change.
  • You should see now again the change detected, this time showing the ChangeType = MODIFIED.
if (changeCustomer.getItemPK() != null && (changeCustomer.getChangeType() == ChangeType.MODIFIED)
|| (changeCustomer.getChangeType() == ChangeType.NEW))
{
isChanged = Boolean.TRUE;
//Make Item version marker as up-to-date for the changes detected

changeCustomerList.add(changeCustomer);
changeDetectionService.consumeChanges(changeCustomerList);
LOG.info("Change Customer PK:" + changeCustomer.getItemPK());

}
else
{
isChanged = Boolean.FALSE;
}

Consume the changes and verify if they are detected anymore Enable the commit mode (we will store the ItemVersionMarker internally) and execute the changeDetectionService.consumeChanges(changeCustomerList); method.

if new customer, Then The ItemVersionMarker for the given customer has been stored in the database.

see the below code for total c4ccustomer cronjob class.

 

package com.cronjob;

import de.hybris.deltadetection.ChangeDetectionService;
import de.hybris.deltadetection.ItemChangeDTO;
import de.hybris.deltadetection.enums.ChangeType;
import de.hybris.platform.core.model.c2l.CountryModel;
import de.hybris.platform.core.model.c2l.RegionModel;
import de.hybris.platform.core.model.user.AddressModel;
import de.hybris.platform.core.model.user.CustomerModel;
import de.hybris.platform.cronjob.enums.CronJobResult;
import de.hybris.platform.cronjob.enums.CronJobStatus;
import de.hybris.platform.cronjob.model.CronJobModel;
import de.hybris.platform.servicelayer.cronjob.AbstractJobPerformable;
import de.hybris.platform.servicelayer.cronjob.PerformResult;
import de.hybris.platform.servicelayer.model.ModelService;
import de.hybris.platform.servicelayer.search.FlexibleSearchQuery;
import de.hybris.platform.servicelayer.user.AddressService;
import de.hybris.platform.util.Config;

import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.DataOutputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.zip.GZIPInputStream;

import javax.annotation.Resource;

import org.apache.commons.io.IOUtils;
import org.apache.log4j.Logger;
import org.json.JSONObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Required;
import org.apache.commons.lang.StringEscapeUtils;

/**
 * @author brahma
 *
 */
public class c4ccustomer extends AbstractJobPerformable<CronJobModel>
{


	@Autowired
	private ModelService modelService;

	@Resource
	private AddressService addressService;

	private ChangeDetectionService changeDetectionService;
	protected static final String PRIMARY_KEY_PARAM_NAME = "PK";

	protected static final String CUSTOMER_QUERY = "SELECT {item.PK} FROM {Customer AS item} WHERE {item.customerid} LIKE '00%'";

	private static final Logger LOG = Logger.getLogger(createC4CCustomer.class);

	@Override
	public PerformResult perform(final CronJobModel cronJob)
	{
	
	
		//Get all the customers
		final FlexibleSearchQuery query = new FlexibleSearchQuery(CUSTOMER_QUERY);
		final List<CustomerModel> allCustomersList = flexibleSearchService.<CustomerModel> search(query).getResult();
		LOG.info("Total Customer Count: " + allCustomersList.size());
	
	try
			{

				
					for (final CustomerModel customer : allCustomersList)
					{
						
	final ItemChangeDTO changeCustomer = getChangeDetectionService().getChangeForExistingItem(customer,
									"c4cCustomerStreamID");

							LOG.info("changeCustomer " + changeCustomer);

							final List<ItemChangeDTO> changeCustomerList = new ArrayList<ItemChangeDTO>();



							try
							{
								
								LOG.info("change customer " +changeCustomer.getChangeType() + "get itm pk " +changeCustomer.getItemPK() );
								if (changeCustomer.getItemPK() != null && (changeCustomer.getChangeType() == ChangeType.MODIFIED)
										|| (changeCustomer.getChangeType() == ChangeType.NEW))
								{
									isChanged = Boolean.TRUE;
									//Make Item version marker as up-to-date for the changes detected
									changeDetectionService.consumeChanges(changeCustomerList);
									LOG.info("Change changeCustomerList length" + changeCustomerList.size());
									changeCustomerList.add(changeCustomer);
									//changeDetectionService.consumeChanges(changeCustomerList);
									LOG.info("Change Customer PK:" + changeCustomer.getItemPK());

								}
								else
								{
									isChanged = Boolean.FALSE;
								}
							}
							catch (final NullPointerException NPE)
							{
								LOG.info("No Change detected for customer: " + customer.getUid());
							}
	
	}
	}
	catch (final Exception e)
			{
				connection.disconnect();
				e.printStackTrace();
			}

	@Required
	public void setChangeDetectionService(final ChangeDetectionService changeDetectionService)
	{
		this.changeDetectionService = changeDetectionService;
	}

	/**
	 * @return change detection service.
	 */
	protected ChangeDetectionService getChangeDetectionService()
	{
		return changeDetectionService;
	}

}

To report this post you need to login first.

Be the first to leave a comment

You must be Logged on to comment or reply to a post.

Leave a Reply