What's the status of your inventory ?
WHAT IS THE PURPOSE OF THIS ARTICLE ??
This documentation contains the complete working flow of the Dashboard widget of “ WHAT'S THE STATUS OF YOUR INVENTORY “ in the below manner :
Covers the entire flow of making new view for the widget which contains UPSERT mechanism.
It has detailed summary of things happening under the hood when you fetch data from BigQuery till showing it to workspace.
PREVIOUS FLOW :
Previously, We were fetching data from different resources such as SQL, Mongo , Algolia etc and then was showing it at workspace.
CURRENT FLOW :
Now, we are focusing on single resource. So All the data of different sources such as SQL, Mongo, Algolia are synced at bigQuery. And then from bigQuery we are fetching data and then showing it at workspace.
HOW DATA ARE POPULATING AT BIGQUERY ?
SQL (original data) and Mongo Data (original data) are getting synced at bigQuery via airbyte
The new view which we will make will pick data from bigQuery tables which are getting synced via airbyte.
NEW VIEW DETAILS :
DATASET NAME : DashboardView
NAME OF THE NEW VIEW FOR THIS WIDGET :report_warehouse_inventory_status
TABLE REQUIRED IN THIS VIEW FROM BIGQUERY :
→ InventoryProd.warehouse_inventories
→ MongoProduct.product
SEQUENCE DIAGRAM OF UPSERT CRON IMPLEMENTATION :
HOW TO IMPLEMENT UPSERT(Update & Insert) MECHANISM FOR THIS WIDGET FROM BIGQUERY DATA ???
STEP 1: A cron will be made, which will run at every 10 mins
API(GET) : _ah/api/esv/v1/view/inventory_status/sync
CASE 1 : For the first entry at new view table, we will fetch MIN updatedAt timestamp.
Note : This will be one time run only.
select MIN(_airbyte_emitted_at) from `InventoryProd.warehouse_inventories`
CASE 2 : To know which data we need to pick from InventoryProd.warehouse_inventories & MongoProduct.product we need to fetch the MAX updatedAt timestamp of new view table (which means last sync latest timestamp).
select MAX(_airbyte_emitted_at) from `DashboardView.report_warehouse_inventory_status`
where _airbyte_emitted_at >= (currentTIME - 24 hrs)
Condition by which we will able to identify which query needs to be executed will be :
If MAX updatedAt will be null, then we will fetch MIN updatedAt. If not we will fetch MAX updatedAt
STEP 2 : MAX updatedAt (fetched from the above query) , OFFSET (which initially will be set as 0 at starting of the cron) & LIMIT (Value will be 2000 and it will be set at Config - different for staging and prod)
All the above three parameters will be sent to a taskqueue
TaskQueue Name : bq_view_warehouse_status
API(POST) : _ah/api/esb/v1/view/inventory_status/sync/init
STEP 3 : Under the TaskQueue Listener, We will fetch the data from the request :
PARAMETERS ARE : MAX updatedAt , OFFSET & LIMIT
and then will pass it to the query below :
select bq_warehouse_inventories.warehouse , bq_warehouse_inventories.GOOD_INVENTORY , bq_warehouse_inventories.BAD_INVENTORY , bq_warehouse_inventories.REJECTED , bq_warehouse_inventories.CONSIGNMENT_RECEIVED , bq_warehouse_inventories.CONSIGNMENT_EXPECTED , bq_warehouse_inventories.CONSIGNMENT_AWAITED , bq_warehouse_inventories.QC_PENDING , bq_warehouse_inventories.QC_REJECTED , bq_warehouse_inventories.QC_ACCEPTED , bq_warehouse_inventories.QC_COMPLETE , bq_warehouse_inventories.QC_REJECTED_PUTAWAY_PENDING , bq_warehouse_inventories.QC_ACCEPTED_PUTAWAY_PENDING , bq_warehouse_inventories.INTRANSIT_GATEPASS_CREATED , bq_warehouse_inventories.COMMITTED ,
bq_warehouse_inventories.PUTBACK_PENDING , bq_warehouse_inventories.CANCELLED_ITEM_PUTAWAY_CREATED , bq_warehouse_inventories.RECEIVED_RETURN_PUTAWAY_PENDING , bq_warehouse_inventories.SHELF_TRANSFER_PUTAWAY_PENDING , bq_warehouse_inventories.INSPECTED_NOT_BAD_ITEM_PUTAWAY_PENDING , bq_warehouse_inventories.GATEPASS_PUTAWAY_PENDING , bq_warehouse_inventories.REVERSE_PICKUP_PUTAWAY_PENDING , bq_warehouse_inventories.COURIER_RETURNED_PUTAWAY_PENDING , bq_warehouse_inventories.PICKLIST_ITEM_PUTAWAY_PENDING , bq_warehouse_inventories.PUTAWAY_PENDING , bq_warehouse_inventories.INTRANSIT , bq_warehouse_inventories.BAD_INVENTORY_WAREHOUSE_DAMAGE ,
bq_warehouse_inventories.BAD_INVENTORY_CARRIER_DAMAGE , bq_warehouse_inventories.BAD_INVENTORY_SHIPPER_DAMAGE , bq_warehouse_inventories.QC_REJECTED_DEFECTIVE , bq_warehouse_inventories.BAD_INVENTORY_EXPIRED , bq_warehouse_inventories.CUSTOMER_RETURN_EXPECTED , bq_warehouse_inventories.BAD_INVENTORY_CUSTOMER_DAMAGED , bq_warehouse_inventories.CANCELLED_ITEM_PUTAWAY_PENDING , bq_warehouse_inventories.OPEN_INVENTORY_RECALL_REQUEST , bq_warehouse_inventories.RTV_GATEPASS_CREATED , bq_warehouse_inventories.RTV_GATEPASS_PACKED , bq_warehouse_inventories.RTV_COMMITTED_INVENTORY , bq_warehouse_inventories.RESERVE , bq_warehouse_inventories.RECALL_BLOCKED ,
bq_warehouse_inventories.NOT_FOUND , bq_warehouse_inventories.externalCreatedAt , bq_mongo_product.mrp , bq_mongo_product.sku , bq_mongo_product.tax , bq_mongo_product.esin , bq_mongo_product.type , bq_mongo_product.brand , bq_mongo_product.status , bq_mongo_product.weight , bq_mongo_product.brandId , bq_mongo_product.hsnCode , bq_mongo_product.taxCode , bq_mongo_product.wmsSync , bq_mongo_product.imageUrl , bq_mongo_product.vertical , bq_mongo_product.createdAt , bq_mongo_product.groupCode , bq_mongo_product.ucTaxCode , bq_mongo_product.unitPrice , bq_mongo_product.updatedAt , bq_mongo_product.components , bq_mongo_product.parentEsin , bq_mongo_product.properties , bq_mongo_product.verticalId , bq_mongo_product.weightUnit , bq_mongo_product.accountSlug ,
bq_mongo_product.availableOn , bq_mongo_product.description , bq_mongo_product.verticalName , bq_mongo_product.dimensionUnit , bq_mongo_product.isCreatedOnUc , bq_mongo_product.specification , bq_mongo_product.unavailableOn , bq_mongo_product.dimensionWidth , bq_mongo_product.additionalNames , bq_mongo_product.dimensionHeight , bq_mongo_product.dimensionLenght , bq_mongo_product.dimensionLength , bq_mongo_product.failureReasonForUC , bq_mongo_product.searchableProperties , bq_mongo_product.inventoryUpdateFailures, from `InventoryProd.warehouse_inventories` bq_warehouse_inventories
left join MongoProduct.products bq_mongo_product on bq_mongo_product.esin = bq_warehouse_inventories.sku
where bq_warehouse_inventories._airbyte_emitted_at > '2022-07-25 14:54:44.318000 UTC'
order by bq_warehouse_inventories._airbyte_emitted_at
LIMIT (OFFSET), 500
From the data we have received above :
We will make a composite unique key via code that will be a combination of esin and warehouse
We will make composite unique key from the data we have received in the listener.
Based on this we will put condition that :
→ If we will find the composite key in the our new view, we will UPDATE the record.
→ If we don't find the composite key in the our new view, we will INSERT the record.
A condition needs to be checked after sending the data in the taskqueue 2 :
recordCount : The record count of the data fecthed from the query
IF ( recordCount == LIMIT) {
// we will call the above taskqueue again with same parameters (MAX updatedAt, OFFSET , LIMIT)
the parameters will be set as :
OFFSET : this will be changed from 0 to OFFSET + LIMIT
LIMIT : this will be picked from config itself (i.e, 500)
MAX updatedAt : will be same that has been picked from first Query in the code
} else {
// exit
}
WHY DOES RECURSIVE CALL REQUIRED HERE ??
Our batch size limit is 500 here, Lets assume our data is 10 lakhs
According to the flow there will 2000 batches
Now our standard execution time of an API is 60s , 2000 batches will exceed the limit of 60s and API will start throwing error. To avoid such case we are using recursive call.
NEW VIEW TABLE COLUMNS TO BE USED :
TABLE NAME : DashboardView
.report_warehouse_inventory_status
TABLE COLUMN :
warehouseId
sku
warehouse
GOOD_INVENTORY
BAD_INVENTORY
REJECTED
CONSIGNMENT_RECEIVED
CONSIGNMENT_EXPECTED
CONSIGNMENT_AWAITED
QC_PENDING
QC_REJECTED
QC_ACCEPTED
QC_COMPLETE
QC_REJECTED_PUTAWAY_PENDING
QC_ACCEPTED_PUTAWAY_PENDING
INTRANSIT_GATEPASS_CREATED
COMMITTED
PUTBACK_PENDING
CANCELLED_ITEM_PUTAWAY_CREATED
RECEIVED_RETURN_PUTAWAY_PENDING
SHELF_TRANSFER_PUTAWAY_PENDING
INSPECTED_NOT_BAD_ITEM_PUTAWAY_PENDING
GATEPASS_PUTAWAY_PENDING
REVERSE_PICKUP_PUTAWAY_PENDING
COURIER_RETURNED_PUTAWAY_PENDING
PICKLIST_ITEM_PUTAWAY_PENDING
PUTAWAY_PENDING
INTRANSIT
BAD_INVENTORY_WAREHOUSE_DAMAGE
BAD_INVENTORY_CARRIER_DAMAGE
BAD_INVENTORY_SHIPPER_DAMAGE
QC_REJECTED_DEFECTIVE
BAD_INVENTORY_EXPIRED
CUSTOMER_RETURN_EXPECTED
BAD_INVENTORY_CUSTOMER_DAMAGED
CANCELLED_ITEM_PUTAWAY_PENDING
OPEN_INVENTORY_RECALL_REQUEST
RTV_GATEPASS_CREATED
RTV_GATEPASS_PACKED
RTV_COMMITTED_INVENTORY
RESERVE
RECALL_BLOCKED
NOT_FOUND
externalCreatedAt
warehouseCreatedAt
warehouseUpdatedAt
_id
mrp
tax
esin
type
brand
status
weight
brandId
hsnCode
taxCode
wmsSync
imageUrl
vertical
mongoCreatedAt
groupCode
ucTaxCode
unitPrice
mongoUpdatedAt
components
parentEsin
properties
verticalId
weightUnit
accountSlug
availableOn
description
verticalName
dimensionUnit
isCreatedOnUc
specification
unavailableOn
dimensionWidth
additionalNames
dimensionHeight
dimensionLenght
dimensionLength
failureReasonForUC
searchableProperties
inventoryUpdateFailures
viewCreatedAt
viewUpdatedAt