Developers frequently want to be able to programmatically submit data into the Bloomreach Intelligent Index. This post explores communicating with the Bloomreach Data Ingestion API’s using a Node.js sample application. It is worth noting that more traditional tab and comma separated data formats are still supported.
Getting started with API based Data Ingestion
tl;dr, you can watch this in video format here.
tl;dr, repo here.
High Level Overview
Data is ingested into the Bloomreach Intelligent Index in a two phase approach where each phase is known as a ‘job’:
► ingesting the data
► updating the index
These jobs are decoupled as there could be different departments asking the platform to ingest data simultaneously.
Upon submitting a job, the platform will respond back with a jobId and you can use this to check the status of the job.
Data is sent to the API endpoint as a request where the body is a JSON ‘Patch’. The API endpoint supports two HTTP verbs: ‘PUT’ and ‘PATCH’. Doing ‘PUT’ will replace the index entirely whilst ‘PATCH’ will modify what is there (use with caution!).
The JSON patch requests will have an operation which is either ‘add’ or ‘remove’ (Note: This add / remove could apply at the product, variants, or view level).
Sample Application Workflow
The sample application will follow a simple workflow:
Loading Data
As the Bloomreach API expects a JSON Patch, a great place to start is just to store these as JSON files on disc:
[
{
"op": "add",
"path": "/products/pid-123",
"value": {
"attributes": {
"url": "https://wonderfulengineering.com/wp-content/uploads/2013/11/self-stirring-mug.jpg",
"price": 10,
"title": "Self stirring mug",
"thumb_image": "https://wonderfulengineering.com/wp-content/uploads/2013/11/self-stirring-mug.jpg",
"description": "The most amazing product that you never knew you needed...",
"tags": ["", ""],
"language": "en"
}
}
}
]
As a side note, JSON Lines was picked as it lends itself to large data volume applications where it makes sense to dump each line out to a file rather than trying to hold the whole JSON object in memory before dumping it. It also lends itself to being split across files much easier. More on this in a future article.
Our Load Data Function
This function takes the third parameter from the command which launched the app and uses it as the path to the data we want to load. In this way, we make a nice, flexible piece of code which can be used for many experiments without having to modify it:
loadData = () => {
return new promise(function (resolve, reject) {
try {
let processArguments = process.argv
let targetFile = processArguments[2]
logger.log("loading data from: " + targetFile)
let patchData = fs.readFileSync(targetFile)
resolve(patchData)
} catch (err) {
reject("Data load error. This could be because the path to your patch file is invalid: " + err)
}
})
}
Note: You’ll notice it calls to logger, which is a simple logging module I wrote so we can see what’s going on during data ingestion.
Submitting a Patch Data Request
Our function which submits a patch data request is relatively simple. It accepts the patch data as an argument and makes an HTTPS call to the data ingestion API using the ‘PATCH’ method. It waits for the response to be received completely before resolving itself.
let options = {
hostname: "api-staging.connect.bloomreach.com",
method: "PATCH",
path:
"/dataconnect/api/v1/accounts/" +
settings.BRSM_ACCOUNT_ID +
"/catalogs/" +
settings.BRSM_CATALOG_NAME +
"/products",
port: 443,
headers: {
"Content-Type": "application/json-patch+json",
Authorization: settings.BEARER_API_KEY,
},
}
► path is /dataconnect/api/v1/accounts//catalogs//products - which simply indicates the account, ,catalog and the fact we are going to put data into products.
► The method used here is ‘PATCH’ - we aren’t going to replace the whole index - simply update whatever is there.
► In the Headers, you will see that the content type is application/json-patch+json - which indicates that we are sending the patch file payload directly in this API request.
► Also in the headers you will see Authorization: which will be issued to you during implementation.
The rest of the code block simply makes the request and handles the response coming back:
req = https
.request(options, (resp) => {
let data = ""
// A chunk of data has been received.
resp.on("data", (chunk) => {
data += chunk
})
// The whole response has been received.
resp.on("end", () => {
resolve(JSON.parse(data))
})
})
.on("error", (err) => {
reject(err.message)
})
req.write(patchData)
req.end()
Requesting the Index to Update
Requesting the index to update is done via a POST request to the /indexes endpoint:
let options = {
hostname: "api-staging.connect.bloomreach.com",
method: "POST",
path:
"/dataconnect/api/v1/accounts/" +
settings.BRSM_ACCOUNT_ID +
"/catalogs/" +
settings.BRSM_CATALOG_NAME +
"/indexes",
port: 443,
headers: {
Authorization: settings.BEARER_API_KEY
},
}
► Path is to /indexes rather than /products
► Method is ‘POST’
The rest of the function is essentially a repeat of the above so I have omitted it for the sake of brevity.
Get Job Status
Our function to get the status of a job simply accepts a jobId and makes a ‘GET’ request to the /dataconnect/api/v1/jobs/ endpoint and returns the response:
getJobStatus = (jobId) => {
return new promise(function (resolve, reject) {
let options = {
hostname: "api-staging.connect.bloomreach.com",
method: "GET",
path: "/dataconnect/api/v1/jobs/" + jobId,
port: 443,
headers: {
Authorization: settings.BEARER_API_KEY,
},
}
req = https
.request(options, (resp) => {
let data = ""
// A chunk of data has been recieved.
resp.on("data", (chunk) => {
data += chunk
})
// The whole response has been received.
resp.on("end", () => {
resolve(JSON.parse(data))
})
})
.on("error", (err) => {
reject(err.message)
})
req.end()
})
}
Get Job Status Until We Have an Outcome
As jobs can go through a number of statuses (creating, queued, running, success, failed, skipped, killed), our code needs to be able to react to them before proceeding. Typically, you will see running, success or, if you have poorly formed JSON, failed.
In the sample app, I wrote an iterator which recursively calls the status endpoint until we reach a conclusion:
checkJobStatusUntilComplete = (jobId) => {
return new promise(function (resolve, reject) {
checkJobStatusUntilCompleteIterator(
jobId,
function () {
resolve()
},
function (err) {
reject(err)
}
)
})
}
checkJobStatusUntilCompleteIterator = (jobId, callback, errCB) => {
getJobStatus(jobId)
.then((statusMessage) => {
if (statusMessage.status == "failed") {
errCB(statusMessage)
} else if (statusMessage.status == "success") {
logger.log("\u2705 success")
callback()
} else {
logger.log("status: " + statusMessage.status, {sameLineLogging: true})
setTimeout(function () {
checkJobStatusUntilCompleteIterator(jobId, callback, errCB)
}, 10000)
}
})
.catch((err) => {
reject(err.message)
})
}
Note: You’ll notice I use a callback pattern for the iterator with a success and failure callback which just helps me to wrap the iterator into a promise whilst still maintaining the readability of the code. It’s a stylistic choice rather than anything important. I hope it doesn’t offend.
Building the Promise Chain
As I’ve built all of the functions out as promises, it’s easy to chain them together and to handle any errors:
submitJobAndMonitorStatus = () => {
return new promise(function (resolve, reject) {
logger.log("Running patching process.")
loadData()
.then((patchData) => {
log("Patch data loaded.")
return submitPatchDataRequest(patchData)
})
.then((response) => {
logger.log("Patch job submitted with id: " + response.jobId)
return checkJobStatusUntilComplete(response.jobId)
})
.then(() => {
logger.log("About to update the index")
return requestIndexUpdate()
})
.then((response) => {
logger.log("Index update job submitted with id: " + response.jobId)
return checkJobStatusUntilComplete(response.jobId)
})
.then(() => {
resolve()
})
.catch((err) => {
reject(err)
})
})
}
And to launch the whole process:
logger.log("", { noTime: true })
submitJobAndMonitorStatus()
.then(() => {
logger.log("\u2705 all done ")
})
.catch((err) => {
logger.log("\u274c error submitting job: " + err + "\n")
})
Summary and Closing Thoughts
The demo application above is good for payloads up to 5MB in size. After this, the platform asks that you submit your patch request data via SFTP and then use the API endpoints above to ask the platform to ingest and rebuild the index. The SLA for these APIs are — up to 1 Ingestion API request per minute and up to 1 index update request per hour.
As stated in the opening paragraph, you can find a video of this code in action here and the code-base itself is available here to help you iterate in your own projects. It took approximately two hours for me to go from an empty IDE to something which was indexing data in the Bloomreach Intelligent Index. With this fast-start, you can do it more quickly.
Blog written by: Paul Edwards from Bloomreach, 2021
Top comments (0)