Browse Source

feat: add Qdrant integration to WooCommerce sync #74

- Updated WooCommerce sync with Qdrant vector database support
- Added change detection for deleted products
- Respects store data access permissions
- Collects all paginated data and syncs to Qdrant after SQL cache
- Comprehensive logging to qdrant_sync_logs table
Claude 5 months ago
parent
commit
8c2340e203
1 changed files with 443 additions and 19 deletions
  1. 443 19
      supabase/functions/woocommerce-sync/index.ts

+ 443 - 19
supabase/functions/woocommerce-sync/index.ts

@@ -10,6 +10,20 @@ import {
   WooCommerceCustomer
 } from '../_shared/woocommerce-client.ts'
 import { formatFirstValidPhone, detectCountryCode } from '../_shared/phone-formatter.ts'
+import {
+  collectionExists,
+  createCollection,
+  upsertPoints,
+  deletePointsByFilter,
+  scrollPoints,
+  getCollectionName,
+  initializeStoreCollections,
+  generateSimpleEmbedding,
+  createProductText,
+  createOrderText,
+  createCustomerText,
+  QdrantPoint
+} from '../_shared/qdrant-client.ts'
 
 const corsHeaders = {
   'Access-Control-Allow-Origin': '*',
@@ -74,17 +88,355 @@ async function fetchWithRetry<T>(
   throw new Error('Max retries exceeded')
 }
 
+// Log Qdrant sync operation
+async function logQdrantSync(
+  supabaseAdmin: any,
+  storeId: string,
+  syncType: string,
+  collectionName: string,
+  operation: string,
+  itemsProcessed: number,
+  itemsSucceeded: number,
+  itemsFailed: number,
+  startedAt: Date,
+  errorMessage?: string
+): Promise<void> {
+  try {
+    await supabaseAdmin
+      .from('qdrant_sync_logs')
+      .insert({
+        store_id: storeId,
+        sync_type: syncType,
+        collection_name: collectionName,
+        operation,
+        items_processed: itemsProcessed,
+        items_succeeded: itemsSucceeded,
+        items_failed: itemsFailed,
+        error_message: errorMessage,
+        started_at: startedAt.toISOString(),
+        completed_at: new Date().toISOString(),
+      })
+  } catch (error) {
+    console.error('[Qdrant] Failed to log sync operation:', error)
+  }
+}
+
+// Sync products to Qdrant
+async function syncProductsToQdrant(
+  storeId: string,
+  storeName: string,
+  products: WooCommerceProduct[],
+  supabaseAdmin: any
+): Promise<{ synced: number; errors: number }> {
+  const startTime = new Date()
+  const collectionName = getCollectionName(storeName, 'products')
+
+  console.log(`[Qdrant] Syncing ${products.length} products to ${collectionName}`)
+
+  let synced = 0
+  let errors = 0
+
+  try {
+    if (!(await collectionExists(collectionName))) {
+      await createCollection(collectionName, [
+        { field: 'store_id', type: 'keyword' },
+        { field: 'product_id', type: 'keyword' },
+        { field: 'platform', type: 'keyword' },
+        { field: 'stock_status', type: 'keyword' },
+        { field: 'price', type: 'float' },
+        { field: 'sku', type: 'keyword' },
+      ])
+    }
+
+    // Get existing product IDs from Qdrant to detect deletions
+    const existingPoints = await scrollPoints(collectionName, {
+      must: [{ key: 'store_id', match: { value: storeId } }]
+    }, 1000)
+
+    const existingProductIds = new Set(
+      existingPoints.points.map((p: any) => p.payload?.product_id).filter(Boolean)
+    )
+
+    const currentProductIds = new Set(products.map(p => p.id.toString()))
+
+    // Find deleted products
+    const deletedProductIds = Array.from(existingProductIds).filter(
+      id => !currentProductIds.has(id)
+    )
+
+    // Delete removed products from Qdrant
+    if (deletedProductIds.length > 0) {
+      console.log(`[Qdrant] Deleting ${deletedProductIds.length} removed products`)
+      await deletePointsByFilter(collectionName, {
+        must: [
+          { key: 'store_id', match: { value: storeId } },
+          { key: 'product_id', match: { any: deletedProductIds } }
+        ]
+      })
+    }
+
+    // Convert products to Qdrant points
+    const points: QdrantPoint[] = products.map((product) => {
+      const productText = createProductText({
+        name: product.name,
+        description: product.description,
+        sku: product.sku,
+        tags: product.tags?.map((t: any) => t.name) || [],
+      })
+
+      return {
+        id: `woocommerce-${storeId}-${product.id}`,
+        vector: generateSimpleEmbedding(productText),
+        payload: {
+          store_id: storeId,
+          product_id: product.id.toString(),
+          platform: 'woocommerce',
+          name: product.name,
+          sku: product.sku || null,
+          price: parseFloat(product.price) || 0,
+          stock_status: product.stock_status,
+          stock_quantity: product.stock_quantity,
+          type: product.type || 'simple',
+          synced_at: new Date().toISOString(),
+        }
+      }
+    })
+
+    await upsertPoints(collectionName, points)
+    synced = points.length
+
+    await logQdrantSync(
+      supabaseAdmin,
+      storeId,
+      'products',
+      collectionName,
+      'upsert',
+      products.length,
+      synced,
+      errors,
+      startTime
+    )
+
+    console.log(`[Qdrant] Products sync complete: ${synced} synced, ${deletedProductIds.length} deleted`)
+  } catch (error: any) {
+    console.error('[Qdrant] Product sync error:', error)
+    errors = products.length
+    await logQdrantSync(
+      supabaseAdmin,
+      storeId,
+      'products',
+      collectionName,
+      'upsert',
+      products.length,
+      synced,
+      errors,
+      startTime,
+      error.message
+    )
+  }
+
+  return { synced, errors }
+}
+
+// Sync orders to Qdrant
+async function syncOrdersToQdrant(
+  storeId: string,
+  storeName: string,
+  orders: WooCommerceOrder[],
+  supabaseAdmin: any
+): Promise<{ synced: number; errors: number }> {
+  const startTime = new Date()
+  const collectionName = getCollectionName(storeName, 'orders')
+
+  console.log(`[Qdrant] Syncing ${orders.length} orders to ${collectionName}`)
+
+  let synced = 0
+  let errors = 0
+
+  try {
+    if (!(await collectionExists(collectionName))) {
+      await createCollection(collectionName, [
+        { field: 'store_id', type: 'keyword' },
+        { field: 'order_id', type: 'keyword' },
+        { field: 'platform', type: 'keyword' },
+        { field: 'status', type: 'keyword' },
+        { field: 'total', type: 'float' },
+        { field: 'customer_email', type: 'keyword' },
+      ])
+    }
+
+    const points: QdrantPoint[] = orders.map((order) => {
+      const orderText = createOrderText({
+        order_number: order.number || order.id,
+        customer_name: `${order.billing?.first_name || ''} ${order.billing?.last_name || ''}`.trim(),
+        total_price: order.total,
+        currency: order.currency,
+        financial_status: order.status,
+        line_items: order.line_items,
+      })
+
+      return {
+        id: `woocommerce-${storeId}-${order.id}`,
+        vector: generateSimpleEmbedding(orderText),
+        payload: {
+          store_id: storeId,
+          order_id: order.id.toString(),
+          platform: 'woocommerce',
+          order_number: order.number || order.id.toString(),
+          status: order.status,
+          total: parseFloat(order.total) || 0,
+          currency: order.currency || 'USD',
+          customer_name: `${order.billing?.first_name || ''} ${order.billing?.last_name || ''}`.trim(),
+          customer_email: order.billing?.email || null,
+          synced_at: new Date().toISOString(),
+        }
+      }
+    })
+
+    await upsertPoints(collectionName, points)
+    synced = points.length
+
+    await logQdrantSync(
+      supabaseAdmin,
+      storeId,
+      'orders',
+      collectionName,
+      'upsert',
+      orders.length,
+      synced,
+      errors,
+      startTime
+    )
+
+    console.log(`[Qdrant] Orders sync complete: ${synced} synced`)
+  } catch (error: any) {
+    console.error('[Qdrant] Order sync error:', error)
+    errors = orders.length
+    await logQdrantSync(
+      supabaseAdmin,
+      storeId,
+      'orders',
+      collectionName,
+      'upsert',
+      orders.length,
+      synced,
+      errors,
+      startTime,
+      error.message
+    )
+  }
+
+  return { synced, errors }
+}
+
+// Sync customers to Qdrant
+async function syncCustomersToQdrant(
+  storeId: string,
+  storeName: string,
+  customers: WooCommerceCustomer[],
+  supabaseAdmin: any
+): Promise<{ synced: number; errors: number }> {
+  const startTime = new Date()
+  const collectionName = getCollectionName(storeName, 'customers')
+
+  console.log(`[Qdrant] Syncing ${customers.length} customers to ${collectionName}`)
+
+  let synced = 0
+  let errors = 0
+
+  try {
+    if (!(await collectionExists(collectionName))) {
+      await createCollection(collectionName, [
+        { field: 'store_id', type: 'keyword' },
+        { field: 'customer_id', type: 'keyword' },
+        { field: 'platform', type: 'keyword' },
+        { field: 'email', type: 'keyword' },
+      ])
+    }
+
+    const points: QdrantPoint[] = customers.map((customer) => {
+      const customerText = createCustomerText({
+        first_name: customer.first_name,
+        last_name: customer.last_name,
+        email: customer.email,
+        orders_count: customer.orders_count,
+        total_spent: customer.total_spent,
+      })
+
+      return {
+        id: `woocommerce-${storeId}-${customer.id}`,
+        vector: generateSimpleEmbedding(customerText),
+        payload: {
+          store_id: storeId,
+          customer_id: customer.id.toString(),
+          platform: 'woocommerce',
+          email: customer.email || null,
+          first_name: customer.first_name || null,
+          last_name: customer.last_name || null,
+          username: customer.username || null,
+          orders_count: customer.orders_count || 0,
+          total_spent: parseFloat(customer.total_spent || '0'),
+          synced_at: new Date().toISOString(),
+        }
+      }
+    })
+
+    await upsertPoints(collectionName, points)
+    synced = points.length
+
+    await logQdrantSync(
+      supabaseAdmin,
+      storeId,
+      'customers',
+      collectionName,
+      'upsert',
+      customers.length,
+      synced,
+      errors,
+      startTime
+    )
+
+    console.log(`[Qdrant] Customers sync complete: ${synced} synced`)
+  } catch (error: any) {
+    console.error('[Qdrant] Customer sync error:', error)
+    errors = customers.length
+    await logQdrantSync(
+      supabaseAdmin,
+      storeId,
+      'customers',
+      collectionName,
+      'upsert',
+      customers.length,
+      synced,
+      errors,
+      startTime,
+      error.message
+    )
+  }
+
+  return { synced, errors }
+}
+
 // Sync products from WooCommerce
 async function syncProducts(
   storeId: string,
+  storeName: string,
   supabaseAdmin: any,
-  rateLimiter: RateLimiter
-): Promise<{ synced: number; errors: number; errorMessage?: string }> {
+  rateLimiter: RateLimiter,
+  qdrantEnabled: boolean,
+  canSyncProducts: boolean
+): Promise<{ synced: number; errors: number; errorMessage?: string; qdrant?: { synced: number; errors: number } }> {
   console.log('[WooCommerce] Syncing products...')
   let synced = 0
   let errors = 0
   let page = 1
   const perPage = 25
+  const allProducts: WooCommerceProduct[] = []
+
+  if (!canSyncProducts) {
+    console.log('[WooCommerce] Product sync disabled by store permissions')
+    return { synced: 0, errors: 0 }
+  }
 
   try {
     while (true) {
@@ -96,7 +448,10 @@ async function syncProducts(
         break
       }
 
-      // Map and upsert products
+      // Collect all products for Qdrant sync
+      allProducts.push(...products)
+
+      // Map and upsert products to SQL cache
       const productsToCache = products.map((product: WooCommerceProduct) => ({
         store_id: storeId,
         wc_product_id: product.id.toString(),
@@ -138,28 +493,43 @@ async function syncProducts(
     }
 
     console.log(`[WooCommerce] Products sync complete: ${synced} synced, ${errors} errors`)
+
+    // Sync to Qdrant if enabled
+    let qdrantResult
+    if (qdrantEnabled && allProducts.length > 0) {
+      qdrantResult = await syncProductsToQdrant(storeId, storeName, allProducts, supabaseAdmin)
+    }
+
+    return { synced, errors, qdrant: qdrantResult }
   } catch (error) {
     const errorMessage = error instanceof Error ? error.message : 'Unknown error'
     console.error('[WooCommerce] Product sync error:', errorMessage, error)
     errors++
     return { synced, errors, errorMessage }
   }
-
-  return { synced, errors }
 }
 
 // Sync orders from WooCommerce
 async function syncOrders(
   storeId: string,
+  storeName: string,
   supabaseAdmin: any,
   rateLimiter: RateLimiter,
-  countryCode: string
-): Promise<{ synced: number; errors: number; errorMessage?: string }> {
+  countryCode: string,
+  qdrantEnabled: boolean,
+  canSyncOrders: boolean
+): Promise<{ synced: number; errors: number; errorMessage?: string; qdrant?: { synced: number; errors: number } }> {
   console.log('[WooCommerce] Syncing orders...')
   let synced = 0
   let errors = 0
   let page = 1
   const perPage = 25
+  const allOrders: WooCommerceOrder[] = []
+
+  if (!canSyncOrders) {
+    console.log('[WooCommerce] Order sync disabled by store permissions')
+    return { synced: 0, errors: 0 }
+  }
 
   try {
     while (true) {
@@ -171,7 +541,10 @@ async function syncOrders(
         break
       }
 
-      // Map and upsert orders
+      // Collect all orders for Qdrant sync
+      allOrders.push(...orders)
+
+      // Map and upsert orders to SQL cache
       const ordersToCache = orders.map((order: WooCommerceOrder) => ({
         store_id: storeId,
         wc_order_id: order.id.toString(),
@@ -216,28 +589,43 @@ async function syncOrders(
     }
 
     console.log(`[WooCommerce] Orders sync complete: ${synced} synced, ${errors} errors`)
+
+    // Sync to Qdrant if enabled
+    let qdrantResult
+    if (qdrantEnabled && allOrders.length > 0) {
+      qdrantResult = await syncOrdersToQdrant(storeId, storeName, allOrders, supabaseAdmin)
+    }
+
+    return { synced, errors, qdrant: qdrantResult }
   } catch (error) {
     const errorMessage = error instanceof Error ? error.message : 'Unknown error'
     console.error('[WooCommerce] Order sync error:', errorMessage, error)
     errors++
     return { synced, errors, errorMessage }
   }
-
-  return { synced, errors }
 }
 
 // Sync customers from WooCommerce
 async function syncCustomers(
   storeId: string,
+  storeName: string,
   supabaseAdmin: any,
   rateLimiter: RateLimiter,
-  countryCode: string
-): Promise<{ synced: number; errors: number; errorMessage?: string }> {
+  countryCode: string,
+  qdrantEnabled: boolean,
+  canSyncCustomers: boolean
+): Promise<{ synced: number; errors: number; errorMessage?: string; qdrant?: { synced: number; errors: number } }> {
   console.log('[WooCommerce] Syncing customers...')
   let synced = 0
   let errors = 0
   let page = 1
   const perPage = 25
+  const allCustomers: WooCommerceCustomer[] = []
+
+  if (!canSyncCustomers) {
+    console.log('[WooCommerce] Customer sync disabled by store permissions')
+    return { synced: 0, errors: 0 }
+  }
 
   try {
     while (true) {
@@ -249,7 +637,10 @@ async function syncCustomers(
         break
       }
 
-      // Map and upsert customers
+      // Collect all customers for Qdrant sync
+      allCustomers.push(...customers)
+
+      // Map and upsert customers to SQL cache
       const customersToCache = customers.map((customer: WooCommerceCustomer) => ({
         store_id: storeId,
         wc_customer_id: customer.id.toString(),
@@ -292,14 +683,20 @@ async function syncCustomers(
     }
 
     console.log(`[WooCommerce] Customers sync complete: ${synced} synced, ${errors} errors`)
+
+    // Sync to Qdrant if enabled
+    let qdrantResult
+    if (qdrantEnabled && allCustomers.length > 0) {
+      qdrantResult = await syncCustomersToQdrant(storeId, storeName, allCustomers, supabaseAdmin)
+    }
+
+    return { synced, errors, qdrant: qdrantResult }
   } catch (error) {
     const errorMessage = error instanceof Error ? error.message : 'Unknown error'
     console.error('[WooCommerce] Customer sync error:', errorMessage, error)
     errors++
     return { synced, errors, errorMessage }
   }
-
-  return { synced, errors }
 }
 
 serve(wrapHandler('woocommerce-sync', async (req) => {
@@ -443,7 +840,7 @@ serve(wrapHandler('woocommerce-sync', async (req) => {
       // Verify store exists and optionally belongs to user (for non-internal calls)
       let storeQuery = supabaseAdmin
         .from('stores')
-        .select('id, store_name, store_url')
+        .select('id, store_name, store_url, qdrant_sync_enabled, data_access_permissions')
         .eq('id', store_id)
         .eq('platform_name', 'woocommerce')
 
@@ -462,6 +859,33 @@ serve(wrapHandler('woocommerce-sync', async (req) => {
         )
       }
 
+      // Check data access permissions
+      const permissions = store.data_access_permissions || {}
+      const canSyncProducts = permissions.allow_product_access !== false
+      const canSyncOrders = permissions.allow_order_access !== false
+      const canSyncCustomers = permissions.allow_customer_access !== false
+      const qdrantEnabled = store.qdrant_sync_enabled !== false
+
+      console.log('[WooCommerce] Sync permissions:', {
+        products: canSyncProducts,
+        orders: canSyncOrders,
+        customers: canSyncCustomers,
+        qdrant: qdrantEnabled
+      })
+
+      // Initialize Qdrant collections if enabled
+      if (qdrantEnabled) {
+        try {
+          await initializeStoreCollections(
+            store.store_name,
+            canSyncOrders,
+            canSyncCustomers
+          )
+        } catch (error) {
+          console.error('[Qdrant] Failed to initialize collections:', error)
+        }
+      }
+
       // Detect country code from store URL for phone formatting
       const countryCode = detectCountryCode(store.store_url)
       console.log(`[WooCommerce] Detected country code: ${countryCode} for store: ${store.store_name}`)
@@ -478,15 +902,15 @@ serve(wrapHandler('woocommerce-sync', async (req) => {
 
       // Sync based on type
       if (sync_type === 'products' || sync_type === 'all') {
-        syncStats.products = await syncProducts(store_id, supabaseAdmin, rateLimiter)
+        syncStats.products = await syncProducts(store_id, store.store_name, supabaseAdmin, rateLimiter, qdrantEnabled, canSyncProducts)
       }
 
       if (sync_type === 'orders' || sync_type === 'all') {
-        syncStats.orders = await syncOrders(store_id, supabaseAdmin, rateLimiter, countryCode)
+        syncStats.orders = await syncOrders(store_id, store.store_name, supabaseAdmin, rateLimiter, countryCode, qdrantEnabled, canSyncOrders)
       }
 
       if (sync_type === 'customers' || sync_type === 'all') {
-        syncStats.customers = await syncCustomers(store_id, supabaseAdmin, rateLimiter, countryCode)
+        syncStats.customers = await syncCustomers(store_id, store.store_name, supabaseAdmin, rateLimiter, countryCode, qdrantEnabled, canSyncCustomers)
       }
 
       // Collect error messages for better diagnostics