Technical Deep-Dive April 8, 2026 20 min read

Cross-Org Query Execution: A Distributed Query Planner for Salesforce

How BridgeQL turns a single query against multiple Salesforce orgs into an optimized execution plan with filter pushdown, hash joins, and automatic Bulk API switching.

Tyler Colby · Founder, Colby's Data Movers

The Problem

You have three Salesforce orgs. Sales in one, Support in another, Marketing in a third. A VP asks: "Show me all accounts that have open opportunities in Sales, open cases in Support, and active campaigns in Marketing." This is a three-way join across three Salesforce orgs. SOQL cannot do it. Salesforce Connect has a 2,000-row limit per external object query. Reports cannot span orgs. Dataloader cannot join.

BridgeQL's cross-org query engine solves this by treating each org as a remote data source and building a distributed query plan that minimizes data transfer, pushes filters to the source, and joins results locally.

The QueryPlan

Every cross-org query compiles into a QueryPlan: an ordered sequence of QueryStep operations. The plan is a directed acyclic graph of steps where each step either fetches data from an org or transforms intermediate results.

// From bridgeql-salesforce/src/query.rs

pub struct QueryPlan {
    pub steps: Vec<QueryStep>,
    pub intermediates: HashMap<String, Vec<String>>,
}

pub enum QueryStep {
    /// Execute a SOQL query against an org
    SoqlQuery {
        org: Option<String>,
        query: String,
        store_as: Option<String>,
    },
    /// Join results from two previous steps
    Join {
        left: String,
        right: String,
        left_key: String,
        right_key: String,
        join_type: JoinType,
        store_as: String,
    },
    /// Filter results
    Filter {
        source: String,
        condition: WhereCondition,
        store_as: String,
    },
    /// Project (select) specific fields
    Project {
        source: String,
        fields: Vec<String>,
        store_as: String,
    },
    /// Aggregate results
    Aggregate {
        source: String,
        group_by: Vec<String>,
        aggregates: Vec<(AggregateFunction, String, String)>,
        store_as: String,
    },
    /// Union results from multiple sources
    Union {
        sources: Vec<String>,
        store_as: String,
    },
    /// Sort results
    Sort {
        source: String,
        order_by: Vec<(String, SortDirection)>,
        store_as: String,
    },
    /// Limit/offset results
    Limit {
        source: String,
        limit: usize,
        offset: Option<usize>,
        store_as: String,
    },
}

Each step reads from named intermediate results and writes to a named output. The executor processes steps in order, maintaining a HashMap of intermediate result sets. This is fundamentally the same architecture as a traditional RDBMS query executor, except the leaf nodes are SOQL queries against remote Salesforce orgs instead of table scans against local storage.

A Real Cross-Org Query

Here is the VP's query expressed in BridgeQL:

SELECT
    sales.Account.Name,
    sales.Account.Industry,
    COUNT(sales.Opportunity.Id) as open_opps,
    COUNT(support.Case.Id) as open_cases,
    COUNT(marketing.CampaignMember.Id) as active_campaigns
FROM sales.Account
JOIN sales.Opportunity
    ON sales.Opportunity.AccountId = sales.Account.Id
JOIN support.Case
    ON support.Case.Account_External_Id__c
        = sales.Account.External_Id__c
JOIN marketing.CampaignMember
    ON marketing.CampaignMember.Contact.Account.External_Id__c
        = sales.Account.External_Id__c
WHERE sales.Opportunity.IsClosed = false
AND support.Case.Status != 'Closed'
AND marketing.CampaignMember.Status = 'Responded'
GROUP BY sales.Account.Name, sales.Account.Industry
ORDER BY open_opps DESC

The org prefix (sales., support., marketing.) tells BridgeQL which org each table lives in. The join conditions use External_Id__c fields for cross-org matching. The planner compiles this into:

QueryPlan:
  Step 1: SoqlQuery {
    org: "sales",
    query: "SELECT Account.Id, Account.Name,
        Account.Industry, Account.External_Id__c,
        Id as OppId
        FROM Opportunity
        WHERE IsClosed = false",
    store_as: "sales_opps"
  }

  Step 2: SoqlQuery {
    org: "support",
    query: "SELECT Account_External_Id__c,
        Id as CaseId
        FROM Case
        WHERE Status != 'Closed'",
    store_as: "support_cases"
  }

  Step 3: SoqlQuery {
    org: "marketing",
    query: "SELECT Contact.Account.External_Id__c,
        Id as CampaignMemberId
        FROM CampaignMember
        WHERE Status = 'Responded'",
    store_as: "marketing_campaigns"
  }

  Step 4: Join {
    left: "sales_opps",
    right: "support_cases",
    left_key: "External_Id__c",
    right_key: "Account_External_Id__c",
    join_type: Inner,
    store_as: "sales_support"
  }

  Step 5: Join {
    left: "sales_support",
    right: "marketing_campaigns",
    left_key: "External_Id__c",
    right_key: "External_Id__c",
    join_type: Inner,
    store_as: "all_joined"
  }

  Step 6: Aggregate {
    source: "all_joined",
    group_by: ["Name", "Industry"],
    aggregates: [
      (Count, "OppId", "open_opps"),
      (Count, "CaseId", "open_cases"),
      (Count, "CampaignMemberId", "active_campaigns")
    ],
    store_as: "aggregated"
  }

  Step 7: Sort {
    source: "aggregated",
    order_by: [("open_opps", Desc)],
    store_as: "result"
  }

Steps 1-3 execute in parallel against three different orgs. Steps 4-7 execute sequentially on the local machine. The total latency is dominated by the slowest SOQL query (typically 2-5 seconds), not the sum of all queries.

Filter Pushdown

The most important optimization. Filters that apply to a single org get pushed down into the SOQL query for that org. This reduces the data transferred over the wire and lets Salesforce's query optimizer handle the filtering.

// QueryOptimizer: push filters into SOQL queries
pub struct QueryOptimizer {
    schemas: HashMap<String, HashMap<String, ObjectSchema>>,
    stats: QueryStats,
}

impl QueryOptimizer {
    pub fn optimize(&self, plan: QueryPlan) -> QueryPlan {
        let mut optimized = plan;
        optimized = self.push_down_filters(optimized);
        optimized = self.merge_consecutive_filters(optimized);
        optimized = self.reorder_joins(optimized);
        optimized
    }

    fn push_down_filters(&self, plan: QueryPlan) -> QueryPlan {
        // For each Filter step, check if the condition
        // references fields from only one org. If so,
        // append the condition to that org's SoqlQuery
        // and remove the Filter step.

        let mut optimized_steps = Vec::new();
        let mut pushed_filters: HashMap<String, Vec<String>> =
            HashMap::new();

        for step in &plan.steps {
            match step {
                QueryStep::Filter {
                    source, condition, ..
                } => {
                    if let Some(org) =
                        self.single_org_condition(condition) {
                        pushed_filters
                            .entry(org)
                            .or_default()
                            .push(condition.to_soql());
                    } else {
                        optimized_steps.push(step.clone());
                    }
                }
                QueryStep::SoqlQuery {
                    org, query, store_as
                } => {
                    let org_name = org.as_deref()
                        .unwrap_or("default");
                    let mut q = query.clone();
                    if let Some(filters) =
                        pushed_filters.get(org_name) {
                        // Append filters to WHERE clause
                        if q.contains(" WHERE ") {
                            q += " AND ";
                        } else {
                            q += " WHERE ";
                        }
                        q += &filters.join(" AND ");
                    }
                    optimized_steps.push(
                        QueryStep::SoqlQuery {
                            org: org.clone(),
                            query: q,
                            store_as: store_as.clone(),
                        }
                    );
                }
                _ => optimized_steps.push(step.clone()),
            }
        }

        QueryPlan {
            steps: optimized_steps,
            intermediates: plan.intermediates,
        }
    }
}

Without filter pushdown, you might fetch 500,000 Cases from the support org and then filter locally to the 2,000 open ones. With filter pushdown, the SOQL query includes WHERE Status != 'Closed' and only 2,000 records transfer. This is the difference between a 5-second query and a 5-minute query.

Schema Caching via Describe API

The planner needs schema information to validate field names, determine join compatibility, and plan traversals. BridgeQL caches Describe API results per org:

// Per-org schema cache
// Schemas are fetched once and cached for the session

pub struct SchemaCache {
    cache: HashMap<String, HashMap<String, ObjectSchema>>,
    ttl: Duration,
    last_refresh: HashMap<String, Instant>,
}

impl SchemaCache {
    pub async fn get_schema(
        &mut self,
        org: &str,
        object: &str,
    ) -> Result<&ObjectSchema> {
        // Check cache freshness
        if let Some(last) = self.last_refresh.get(org) {
            if last.elapsed() < self.ttl {
                if let Some(schemas) = self.cache.get(org) {
                    if let Some(schema) = schemas.get(object) {
                        return Ok(schema);
                    }
                }
            }
        }

        // Cache miss: fetch from Salesforce
        let session = OrgManager::connect(org)?;
        let client = MetadataClient::new(session);
        let schema = client.describe(object).await?;

        self.cache
            .entry(org.to_string())
            .or_default()
            .insert(object.to_string(), schema);
        self.last_refresh
            .insert(org.to_string(), Instant::now());

        Ok(self.cache.get(org).unwrap().get(object).unwrap())
    }
}

The schema cache serves double duty. It validates the query at planning time (does Account.Industry exist in the sales org?) and provides metadata for join optimization (what type is External_Id__c, and is it indexed?).

The RelationshipGraph

BridgeQL models each org's object relationships as a graph using the RelationshipGraph struct. Objects are nodes. Lookups are directed edges with metadata about the relationship type.

// From bridgeql-salesforce/src/graph.rs

pub struct RelationshipGraph {
    objects: HashMap<String, ObjectNode>,
    relationships: Vec<RelationshipEdge>,
    outgoing: HashMap<String, Vec<usize>>,
    incoming: HashMap<String, Vec<usize>>,
}

pub struct RelationshipEdge {
    pub from_object: String,
    pub to_object: String,
    pub field_name: String,         // e.g., "AccountId"
    pub relationship_name: Option<String>, // e.g., "Account"
    pub relationship_type: RelationshipType,
    pub direction: EdgeDirection,
    pub is_polymorphic: bool,
}

pub enum EdgeDirection {
    ParentLookup,   // Contact -> Account
    ChildLookup,    // Account -> Contacts
}

The graph enables two critical optimizations. First, it validates join paths. If a query joins Contact to Campaign, the graph tells the planner whether a direct path exists or whether intermediate joins are needed. Second, it determines the optimal SOQL traversal strategy. A parent lookup (Contact.Account.Name) compiles to dot notation. A child relationship (Account's Contacts) compiles to a subquery.

// Find the shortest path between two objects
pub fn find_path(
    &self, from: &str, to: &str
) -> Option<Vec<&RelationshipEdge>> {
    // BFS to find shortest path
    let mut visited: HashSet<&str> = HashSet::new();
    let mut queue: Vec<(&str, Vec<usize>)> =
        vec![(from, Vec::new())];

    while let Some((current, path)) = queue.first().cloned() {
        queue.remove(0);
        if visited.contains(current) { continue; }
        visited.insert(current);

        if let Some(indices) = self.outgoing.get(current) {
            for &idx in indices {
                let edge = &self.relationships[idx];
                let mut new_path = path.clone();
                new_path.push(idx);

                if edge.to_object == to {
                    return Some(
                        new_path.iter()
                            .map(|&i| &self.relationships[i])
                            .collect()
                    );
                }

                if !visited.contains(edge.to_object.as_str()) {
                    queue.push((&edge.to_object, new_path));
                }
            }
        }
    }
    None
}

Bulk API 2.0 Auto-Switch

SOQL via REST API is fast for small result sets but hits pagination issues and timeout risks for large ones. BridgeQL automatically switches to Bulk API 2.0 when the estimated result count exceeds 2,000 records.

// Execution strategy selection
async fn execute_soql_step(
    &self,
    org: &str,
    query: &str,
    store_as: &str,
) -> Result<IntermediateResult> {
    // Step 1: Estimate result count with COUNT() query
    let count_query = self.build_count_query(query)?;
    let estimated_count = self.execute_count(
        org, &count_query
    ).await?;

    // Step 2: Choose execution strategy
    if estimated_count > 2000 {
        // Bulk API 2.0: create job, upload query, poll, download
        self.execute_bulk(org, query, store_as).await
    } else {
        // REST API: direct SOQL query with pagination
        self.execute_rest(org, query, store_as).await
    }
}

async fn execute_bulk(
    &self,
    org: &str,
    query: &str,
    store_as: &str,
) -> Result<IntermediateResult> {
    let session = self.get_connection(org).await?;

    // Create Bulk API 2.0 query job
    let job = session.post(
        "/services/data/v62.0/jobs/query",
        json!({
            "operation": "query",
            "query": query,
            "columnDelimiter": "COMMA",
            "lineEnding": "LF"
        })
    ).await?;

    let job_id = job["id"].as_str().unwrap();

    // Poll for completion (exponential backoff)
    let mut delay = Duration::from_millis(500);
    loop {
        let status = session.get(
            &format!("/services/data/v62.0/jobs/query/{}",
                job_id)
        ).await?;

        match status["state"].as_str() {
            Some("JobComplete") => break,
            Some("Failed") => {
                anyhow::bail!(
                    "Bulk query job failed: {}",
                    status["errorMessage"]
                );
            }
            _ => {
                tokio::time::sleep(delay).await;
                delay = (delay * 2).min(
                    Duration::from_secs(10));
            }
        }
    }

    // Download results (paginated by locator)
    let mut all_records = Vec::new();
    let mut locator: Option<String> = None;

    loop {
        let url = match &locator {
            Some(loc) => format!(
                "/services/data/v62.0/jobs/query/{}/results\
                 ?locator={}", job_id, loc),
            None => format!(
                "/services/data/v62.0/jobs/query/{}/results",
                job_id),
        };

        let (headers, csv_body) =
            session.get_with_headers(&url).await?;
        let records = parse_csv(&csv_body)?;
        all_records.extend(records);

        locator = headers.get("Sforce-Locator")
            .and_then(|v| v.to_str().ok())
            .filter(|v| *v != "null")
            .map(|v| v.to_string());

        if locator.is_none() { break; }
    }

    Ok(IntermediateResult {
        name: store_as.to_string(),
        rows: all_records,
    })
}

The auto-switch is transparent to the user. They write one query. The planner decides whether each org-level SOQL query uses REST or Bulk based on the estimated row count. Small queries get sub-second REST responses. Large queries get the throughput of Bulk API without the user managing jobs.

Hash Join Strategy

Cross-org joins happen locally in memory. BridgeQL uses hash joins because they handle the typical Salesforce join pattern well: one side is indexed by a unique key (External_Id__c, Id), the other side probes against it.

fn execute_hash_join(
    left: &IntermediateResult,
    right: &IntermediateResult,
    left_key: &str,
    right_key: &str,
    join_type: JoinType,
) -> IntermediateResult {
    // Build hash table from the smaller side
    let (build_side, probe_side, build_key, probe_key) =
        if left.rows.len() <= right.rows.len() {
            (left, right, left_key, right_key)
        } else {
            (right, left, right_key, left_key)
        };

    let mut hash_table: HashMap<String, Vec<&Row>> =
        HashMap::with_capacity(build_side.rows.len());

    for row in &build_side.rows {
        if let Some(key_value) = row.get(build_key) {
            hash_table
                .entry(key_value.to_string())
                .or_default()
                .push(row);
        }
    }

    // Probe phase
    let mut result_rows = Vec::new();

    for probe_row in &probe_side.rows {
        if let Some(key_value) = probe_row.get(probe_key) {
            if let Some(matches) =
                hash_table.get(&key_value.to_string())
            {
                for build_row in matches {
                    // Merge the two rows
                    let mut merged = probe_row.clone();
                    for (k, v) in build_row.fields() {
                        merged.insert(k.clone(), v.clone());
                    }
                    result_rows.push(merged);
                }
            } else if matches!(
                join_type,
                JoinType::Left | JoinType::Full
            ) {
                // Left/Full join: include probe row with nulls
                result_rows.push(probe_row.clone());
            }
        }
    }

    IntermediateResult {
        name: "joined".to_string(),
        rows: result_rows,
    }
}

The build side is always the smaller result set. For a typical query joining 10,000 accounts from Sales with 50,000 cases from Support, the hash table has 10,000 entries and each of the 50,000 cases does a single hash lookup. Total join time: under 50ms. Negligible compared to the SOQL query latency.

Cardinality Estimation

The optimizer needs to know which result set is smaller to choose the build side and to order multi-way joins correctly. BridgeQL estimates cardinality using COUNT() queries and cached statistics:

#[derive(Debug, Clone, Default)]
pub struct QueryStats {
    /// Estimated row counts per object
    pub row_counts: HashMap<String, usize>,
    /// Selectivity estimates for common filters
    pub selectivity: HashMap<String, f64>,
}

impl QueryStats {
    /// Estimate result count for a filtered query
    pub fn estimate(
        &self,
        object: &str,
        filters: &[WhereCondition],
    ) -> usize {
        let base_count = self.row_counts
            .get(object)
            .copied()
            .unwrap_or(10_000); // Default assumption

        let mut estimated = base_count as f64;

        for filter in filters {
            let selectivity = match filter {
                WhereCondition::Simple { field, operator, .. } => {
                    // Equality on indexed field: very selective
                    if matches!(operator, Operator::Eq)
                        && self.is_indexed(object, field) {
                        0.01
                    }
                    // Equality on non-indexed: moderate
                    else if matches!(operator, Operator::Eq) {
                        0.1
                    }
                    // Range: less selective
                    else {
                        0.33
                    }
                }
                WhereCondition::In { values, .. } => {
                    // IN clause: selectivity proportional to
                    // number of values
                    (values.len() as f64 * 0.01).min(0.5)
                }
                _ => 0.5, // Conservative default
            };

            estimated *= selectivity;
        }

        estimated.ceil() as usize
    }
}

The optimizer uses these estimates to reorder joins. Given three intermediate results (A: 10K rows, B: 50K rows, C: 500 rows), the optimizer joins C with A first (small with medium) and then joins the result with B. This minimizes the size of intermediate results and the total number of hash lookups.

Connection Pooling

Each Salesforce org connection is an authenticated HTTP session with an access token. BridgeQL maintains a pool of up to 5 connections per org to enable parallel query execution:

pub struct ConnectionPool {
    pools: HashMap<String, Vec<SalesforceSession>>,
    max_per_org: usize,  // default: 5
}

impl ConnectionPool {
    pub async fn get_connection(
        &mut self, org: &str
    ) -> Result<SalesforceSession> {
        let pool = self.pools
            .entry(org.to_string())
            .or_default();

        // Try to reuse an existing connection
        if let Some(session) = pool.pop() {
            if !session.is_expired() {
                return Ok(session);
            }
            // Token expired, will create new below
        }

        // Create new connection
        if pool.len() < self.max_per_org {
            let session = OrgManager::connect(org)?;
            return Ok(session);
        }

        // Pool exhausted, wait for a connection to return
        // (bounded wait with timeout)
        anyhow::bail!(
            "Connection pool exhausted for org '{}'", org
        );
    }

    pub fn return_connection(
        &mut self, org: &str, session: SalesforceSession
    ) {
        if !session.is_expired() {
            let pool = self.pools
                .entry(org.to_string())
                .or_default();
            if pool.len() < self.max_per_org {
                pool.push(session);
            }
        }
    }
}

5 connections per org is the sweet spot. Salesforce's concurrent API request limit is typically 25 per user, so 5 connections across 5 orgs uses exactly the limit. More connections would risk hitting API throttling. Fewer connections would serialize queries that could run in parallel.

Putting It Together

The full execution flow for a cross-org query:

1. Parse: BridgeQL query -> AST
2. Plan:  AST -> QueryPlan (sequence of QuerySteps)
3. Optimize:
   a. Push filters into SoqlQuery steps
   b. Estimate cardinality per step
   c. Reorder joins (smallest first)
   d. Choose REST vs Bulk per SoqlQuery
4. Execute:
   a. Run all SoqlQuery steps in parallel
      (one tokio task per org, pooled connections)
   b. Run Join/Filter/Aggregate/Sort sequentially
      on intermediate results
5. Return: Final IntermediateResult -> output format

For the VP's query, this means: parse in under 1ms, plan and optimize in under 5ms, execute three parallel SOQL queries in 2-5 seconds, join and aggregate locally in under 100ms. Total wall-clock time: 3-6 seconds for a query that would take a human analyst hours of manual data export and spreadsheet joining.

That is the power of a proper query planner. Not faster individual queries. Faster thinking about how to get the answer. The planner does in milliseconds what a developer would spend hours figuring out: which queries to run, in what order, how to join the results, where to filter, and when to switch from REST to Bulk.

BridgeQL makes multi-org Salesforce queryable as if it were a single database. The VP does not need to know or care that the data lives in three orgs. They write one query. The planner handles the rest.