code simple { }

Because a complex solution to a problem creates another problem


CloudEvents as a Data Product

TL;DR

Treat CloudEvents as the envelope for versioned, traceable domain aggregate snapshots. Use the CloudEvents contract for transport and a versioned aggregate schema in data to make events reliable event-based data products for analytics, ML, and agentic workloads.

This post expands on the Event as a Data Product pattern I introduced in my previous article. At its core: treat domain aggregates (your key entities and objects) as the canonical interface for your service, and expose them through APIs and event-based data products. This minimises impedance across channels and boosts data reusability.

I’m taking this further by applying the CloudEvents specification and proven best practices to craft high-quality event-based data products from events. Hence, the name “CloudEvents as a Data Product.”

Here I unpack why CloudEvents fits event-driven data products, practical guidance for implementing it, and how this pattern helps with LLMs and agentic workflows. In particular, events provide semantically rich, traceable data that grounds LLMs, reduces hallucination, and enables agentic systems to reason over consistent schemas, see the section on “How this helps LLMs and agentic workflows” for a deep dive.

Table of Contents

  1. Why Events
  2. CloudEvents in the Data Product Ecosystem
  3. Best practices and guidance
  4. CloudEvents as a queryable data product
  5. Implementation Guidance
  6. How this helps LLMs and agentic workflows
  7. Practical example: answering a business question
  8. Challenges and considerations
  9. Closing thoughts
  10. Glossary / Terminology

CloudEvents spec: https://github.com/cloudevents/spec/blob/v1.0.2/cloudevents/spec.md

Why Events

Let’s explore why events excel as data products and the advantages they offer over other approaches.

The problem with traditional data products

A common practice is using database extracts (via Change Data Capture – CDC or otherwise) as data products, often treating them as source-aligned (or raw) data products and building target-aligned layers on top. This is problematic because database structures may not directly reflect the domain model, they could be event-sourced logs, normalized, or optimized for other factors, making it hard to replicate in SQL queries for target-aligned views. Even if done, it duplicates logic across languages. Using operational databases directly as data products is a poor practice: it creates tight coupling between consumers and internal schemas, leading to breakage when databases evolve, and violates the “data on the outside” principle, where external interfaces should be independent of internal representations.

Data on the Outside versus Data on the Inside

Back in 2005, Pet Helland has written this excellent white paper called “Data on the Outside versus Data on the Inside“. It explores the concept of time “Then and Now” on the data. One of the key take-aways is the application of the theory of the real world on the data.

  • By the time you see a distant object, it may have changed!
  • By the time you see a message, the data may have changed!

I have applied this lens on the event-driven systems: https://codesimple.blog/2021/03/14/events-on-the-outside-vs-events-on-the-inside/

Events capture not just state but the why, when and who of a change. When you apply the “data on the outside” principle, events become a stream of point-in-time, versioned snapshots of your aggregates. That makes them a natural input for data products: they are timely, auditable, and carry contextual metadata.

In my previous post on this topic, I have covered in detail how your aggregate data structure model acts as a canonical interface for your domain model for both interfaces i.e. API or Events.

In a nutshell, as a source aligned data product:

  • An event will carry a complete snapshot (data on the outside)
  • An event will provide a version of the complete snapshot (data on the outside)
  • An event schema will be shared with API of the domain service: aggregate data structure

CloudEvents as the envelope

CloudEvents enables this event-driven data product paradigm by providing a standardized, versioned envelope for events, decoupling producers and consumers while enabling reliable, queryable streams of aggregate snapshots.

CloudEvents is a focused, well-adopted specification for describing event envelopes. It provides:

  • A small, consistent envelope (id, type, source, subject, time, datacontenttype, data, etc.).
  • SDKs and tooling across many languages, lowering friction for adoption.
  • Extensibility hooks so teams can add domain-specific metadata without breaking the common contract.

These properties make CloudEvents a solid basis for a data-product-grade event contract: predictable, discoverable, and toolable.

What CloudEvents are (short): CloudEvents is a vendor-neutral event envelope spec that standardises a small set of context attributes (the event “context”) and leaves the domain payload in data. It defines REQUIRED attributes (id, source, specversion, type), OPTIONAL attributes (datacontenttype, dataschema, subject, time) and allows extension attributes for domain or infra metadata. (See the spec: https://github.com/cloudevents/spec/blob/v1.0.2/cloudevents/spec.md)

Comparison: Traditional Data Extracts vs. CDC Data Streaming vs. CloudEvents as Data Products

AspectTraditional Data ExtractsCDC Data StreamingCloudEvents as Data Products
ContractDatabase schema (internal)Database schema (internal)CloudEvent envelope + schema (external)
DiscoveryManual/tribal knowledgeManual/tribal knowledgeType attribute + catalog/registry
VersioningBreaking changesBreaking changesSemantic versioning in type
Real-timeBatch onlyReal-time streamingBoth streaming & batch
ContextLostLostPreserved (why, when, who)
OwnershipCentralized ITCentralized ITDomain teams

Data Product Advantages

  • Product Quality: Traceability and auditability built-in being an Event, with domain data and lineage for trustworthy data products.
  • Consumer Independence: Schema-based contracts enable loose coupling, allowing product consumers to evolve independently.
  • Product Usability: Rich metadata (source, type, subject) improves discoverability and understandability.
  • Multi-channel Delivery: Same event-based data product serves real-time reactions, batch analytics, and AI/ML workflows.
  • Semantic Foundation: Events provide contextual, semantic-rich data that grounds LLMs, reduces hallucination, and enables agentic systems to reason over consistent schemas. Semantics (meaning, intent, relationships) are more appropriate for AI reasoning than syntax alone (structure without context), as they offer deeper insights for decision-making and understanding. For example, events explain “why” something happened (e.g., a customer order failed due to payment issues), not just “what” (e.g., order status changed).

It also reminds me of a brief discussion on X/Twitter with thought leaders Vaughn Vernon (author of Implementing Domain-Driven Design) and Greg Young (creator of CQRS & Event Sourcing), where I argued that certain events should capture full context at the moment they occur to preserve temporal accuracy for correct business outcomes, instead of pure data changes only. This insight is even more relevant today with AI and LLM flows.

Context is the King in LLM space: Data products built from events will increasingly be consumed by LLMs and AI agents. Context has many aspects, and one crucial one is the path we took to reach the current state of an entity. Simply keeping the current state without this historical context is lossy, it loses important data that enables better reasoning and decision-making.

This pattern has been validated through industry partnerships. For instance, I partnered with Google to architect and present a KAPPA architecture implementation for building data products using Google’s eventing systems, following the “CloudEvents as a Data Product” pattern I introduced. I co-presented this at Google Cloud Next, one of the industry’s premier technical conferences, where I demonstrated a working implementation that could be deployed in under 5 minutes.

CloudEvents in the Data Product Ecosystem

This section explains how CloudEvents work as a data product, and the following picture summarizes it, followed by a detailed guide.

CloudEvents and Data Mesh Principles

CloudEvents naturally align with data mesh’s four principles, as articulated by Zhamak Dehghani:

  1. Domain ownership: Each service/domain team owns its event contracts and data products, treating them as products with clear responsibilities and SLAs.
  2. Data as a product: Event streams are treated as first-class data products with discoverable schemas, quality metrics, and consumer contracts.
  3. Self-serve data platform: CloudEvents SDKs, schema registries, and ingestion pipelines provide a self-serve infrastructure that enables domain teams to publish data products without centralized IT bottlenecks.
  4. Federated governance: Schema registries and validation rules enforce organisational standards while allowing domain autonomy in schema design and evolution.

By implementing CloudEvents as data products, organizations can achieve the decentralized, product-oriented data culture that data mesh advocates.

CloudEvents into the Data Product Architecture

  • Domain Teams (Product Owners): Each domain team owns and publishes CloudEvent streams as data products with defined contracts, SLAs, and quality metrics.
  • Event Streams (Data Products): CloudEvents envelopes wrap aggregate snapshots, making them discoverable, addressable, and reusable across the organization.
  • Schema Registry (Product Catalog): Centralized registry for dataschemas and event types, enabling discovery and validation.
  • Ingestion Pipeline (Data Platform): Self-serve pipelines that consume events, validate schemas, and persist to data warehouses.
  • Data Warehouse (Consumption Layer): Queryable event logs serving analytics, ML, and real-time consumers.
  • Governance Layer: Overlaid policies for security, compliance, and quality monitoring.

This architecture enables domain teams to operate autonomously while contributing to an organization-wide data ecosystem.

CloudEvents Meet Data Product Criteria

CloudEvents-based event streams qualify as data products because they exhibit the key characteristics defined by data product thinking:

Discoverable: The type attribute and schema registries enable catalog-based discovery, allowing data consumers to find and understand available event-based data products without tribal knowledge.

Understandable: CloudEvent metadata (source, type, subject) and dataschema provide semantic context, making it clear what each event represents and how to use it.

Addressable: Each event has a unique id and subject (entity identifier), enabling precise querying and referencing of specific data points.

Trustworthy: Versioning in type and dataschema, along with schema validation, ensures reliability and prevents breaking changes from affecting consumers unexpectedly.

Interoperable: The standard CloudEvents envelope format works across platforms and languages, enabling seamless integration between different systems and teams.

Secure: Binary binding enables encryption of the data payload while maintaining routing capability through envelope metadata, supporting least-privilege access controls.

Reusable: By capturing complete aggregate snapshots with rich context, allowing the same data product to serve multiple use cases from real-time processing to batch analytics to AI/ML training.

Shareable: Domain teams can share event-based data products across the organisation, reducing duplication and ensuring consistent data usage.

CloudEvents Support the Data Product Lifecycle

Data products follow a lifecycle of design, development, deployment, and ongoing management. CloudEvents support this full lifecycle for event-based data products:

  1. Discovery & Design: Identify high-value aggregates and define event schemas with domain teams. Establish product contracts including SLAs, quality metrics, and consumer personas.
  2. Build: Implement CloudEvent envelopes with versioned schemas. Set up schema registries and validation pipelines.
  3. Test: Perform schema validation in CI/CD, contract testing between producers and consumers, and load testing for scalability.
  4. Deploy: Publish event streams to topics with monitoring and alerting. Make data products discoverable through catalogs.
  5. Iterate: Gather consumer feedback, monitor usage metrics, and evolve schemas with proper versioning. Deprecate outdated products following governance policies.
  6. Measure: Track business impact (e.g., improved decision-making speed), technical metrics (delivery latency, schema compliance), and product adoption (number of consumers).
  7. Govern: Ensure ongoing compliance with organizational standards, security policies, and data quality requirements.
  8. Retire: When a data product is no longer needed, provide migration paths and eventually decommission with proper notice.

This lifecycle ensures event-based data products remain valuable, reliable, and aligned with business needs over time.

Best practices and guidance

Below are practical recommendations for implementing CloudEvents as event-based data products and for making events more useful to LLMs and agentic workflows, based on my experiences. This section offers in-depth guidance on implementing CloudEvents as data products. The following diagram provides a quick visual reference to all key components.

Complete snapshot and data payload

Events contain full aggregate snapshots, eliminating the need for complex joins or stitching across multiple data sources. This keeps domain logic encapsulated within the service, preventing external consumers from needing to replicate intricate business rules and ensuring data integrity. Follow the aggregate event model discussed in the “fat vs thin” post. Prefer publishing an aggregate snapshot (versioned) in data when the aggregate is the source of truth for the change. If you choose thin events (deltas), make sure their schema and rehydration rules are well documented. Ensure the payload is versioned and schema-compliant for reliable querying and AI consumption.

Important: Avoid putting data that your domain service does not master, if you publish data to other domains’ data that you don’t own, consumers may assume you will publish updates for those fields (but you won’t), which creates confusion and brittle integrations. Instead, you can use a reference key/id to that field e.g. customer id instead of customer name when reference customer in the shopping cart.

Data Versioning

In line with the ‘Data on the Outside’ principle, data captures the ‘then’ state and must be versioned to enable consumers to account for the chronological order of changes on the aggregate. Although sequence is an optional extension attribute in CloudEvents, I recommend making it required (with exceptions) to fully support the “data on the outside” concept. The sequence attribute ensures events are ordered correctly within a partition, allowing consumers to reliably reconstruct the sequence of changes for a given subject/aggregate. Where using sequence is not feasible (e.g., in some messaging systems), fall back to using the time attribute for ordering. Consumers must always be able to determine the correct sequence of events to maintain data integrity and accurate state reconstruction.

Schema Evolution

CloudEvents should provide a well-defined external schema with versioning for change management. Maintain versioning at the service level rather than solely at the event level. This ensures that both API and event contracts remain aligned for the aggregate, as versioning encompasses more than just the schema contract, it includes behavioral changes, API endpoints, and event semantics. By versioning the service as a whole, you avoid mismatches between how the aggregate is accessed via APIs versus how its changes are published as events, promoting consistency across channels. For example, if the service is versioned as v2 (due to a major API change), the event type would reflect this: com.example.service.v2.cart.CartUpdated. This indicates that the event is part of the v2 service contract, ensuring consumers understand the aligned versioning across interfaces.

Best Practices for Schema Evolution:

  • Semantic Versioning: Use major.minor.patch versioning. Increment major for breaking changes, minor for backward-compatible additions, patch for bug fixes.
  • Backward Compatibility: Ensure new schemas can be consumed by existing consumers. Avoid removing fields; instead, mark them as deprecated.
  • Deprecation Notices: Provide advance notice (e.g., 6 months) for breaking changes, with migration guides.
  • Forward Compatibility: Design schemas to allow older producers to work with newer consumers by ignoring unknown fields.
  • Schema Registries: Use tools like Confluent Schema Registry to manage versions and enforce compatibility rules.
  • Testing: Include schema evolution tests in CI to validate compatibility across versions.
  • Documentation: Maintain clear documentation of schema changes and their impact on consumers.

Lineage (source and type)

The source and type attributes provide context about where the event originated and what it represents, crucial for governance, compliance, and understanding data provenance across systems. This helps in tracking data flow, ensuring accountability, and meeting regulatory requirements.

Distributed tracing

Incorporating distributed tracing allows correlating events across services, building a cohesive view of end-to-end processes. This supports debugging, monitoring, and constructing a bigger picture of system interactions, which is vital for complex, distributed architectures. See the Extensions attribute below for details on including tracing fields in CloudEvents. Additionally, publishers and consumers should forward tracing context to maintain end-to-end observability. Follow the OpenTelemetry messaging spans specification (https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/), which provides standard attributes for messaging operations, such as messaging.systemmessaging.operation, and messaging.destination, to instrument producers and consumers consistently.

Binding mode

Prefer binary binding over structured binding. Binary binding keeps the data payload opaque (e.g., encrypted), making all metadata (context attributes) accessible for routing and filtering without unpacking. This aligns with the “need to know” principle and facilitates least privilege access, as intermediaries can route based on envelope fields without decrypting sensitive data.

Event semantics

An event should represent something that actually occurred in the domain and should map to a change at the aggregate level (the aggregate root). If you find yourself emitting events about loosely-related or derived entities frequently, consider whether the aggregate boundaries are correct or whether a curated/target-aligned product is a better interface.

Reliable event publishing

Ensure event publishing is reliable, as data quality is critical for data products. Consider these patterns:

Outbox Pattern

Store events in an outbox table within the same transaction as the business logic, then use a separate process to publish them. This ensures events are persisted reliably and prevents loss during failures.

For example, in a shopping cart service, the outbox table includes columns for all CloudEvent core attributes, with extensions stored as a JSON blob:

BEGIN TRANSACTION;

-- Business logic: Update the cart aggregate
UPDATE carts SET total = total + 10 WHERE id = 'cart-123';

-- Store the event in the outbox with CloudEvent attributes
INSERT INTO outbox (
  id, source, specversion, type, datacontenttype, dataschema, subject, time, data, extensions
) VALUES (
  'evt-456',
  'cart-service',
  '1.0',
  'cart.updated',
  'application/json',
  NULL,
  'cart-123',
  CURRENT_TIMESTAMP,
  '{"total": 50}',
  '{"traceparent": "00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01", "tracestate": "vendorname=opaque-value"}'
);

COMMIT;

A background process then reads from the outbox table, publishes the events to the message broker, and marks them as published.

Durable execution engines:

Use frameworks like Temporal for event publishing workflows, providing built-in retries, timeouts, and state management to handle complex, reliable executions.

End-to-End Picture

The following diagram illustrates the end-to-end flow outlined in the guidance above.

Attributes

All the best practices discussed above are applied through the following CloudEvents attributes, which serve as the foundation for implementing comprehensive event data products.

  • id: Unique identity of the event, used for deduplication and non-repudiation. Ensures events can be processed idempotently and provides an immutable reference for auditing.
  • source: Specifies the implementing system. Per Conway’s Law, system boundaries often reflect team structures, so the solution space (systems) may not perfectly align with the problem space (domains). Use a consistent naming convention to map systems to domains where possible.
  • type: Provides context of what and why of the change.
    • Syntax: com.<organisation>.<sub-domain>.<domain service>.<version>.<aggregate>.<event>
    • Example: com.example.sales.shoppingcart.v1.cart.updated
    • Alternative pattern: org.domain[.sub-domain].domain-service.aggregate.event
    • Examples:
      • com.example.insurance.claims.claim.ClaimCreated.v1
      • org.acme.billing.customer.CustomerUpdated.v1
    • Guidelines: Follow a reverse-DNS prefix to avoid collisions and include the aggregate name and the domain event. Remember: events reflect occurrences, prefer naming that expresses the domain-level occurrence and ensure the event corresponds to an aggregate-level change.
    • Versioning: Include a major-version marker in the type for breaking changes (e.g., com.example.cart.cart.CartCreated.v1). Use dataschema for minor revisions (e.g., https://schemas.example.com/cart/v1.2.json). Prefer major-versioning in type and minor in dataschema.
  • subject: Use the entity id of the aggregate root in subject. The subject attribute is ideal for subscription filtering and fast routing when intermediaries cannot inspect data. Acts as the primary key for entity-level queries, enabling efficient partitioning and indexing in data warehouses.
  • time: Time when the event occurred. Critical for non-repudiation, auditability, and handling out-of-order events by consumers. For data products, it sequences events chronologically for accurate state reconstruction.
  • Extensions (e.g., traceparent, tracestate): Include distributed tracing context for end-to-end observability. traceparent and tracestate follow W3C standards, allowing correlation across services. Additional extensions like partitioningkey can optimize message routing, and sequence ensures ordered processing within a partition.

Governance for Event-Based Data Products

Effective governance ensures event-based data products remain trustworthy, secure, and valuable over time:

Schema Governance:

  • Required: All events must have valid dataschema URI pointing to a registered schema.
  • Validation: CI checks enforce schema compliance before deployment.
  • Evolution: Breaking changes require new major version in type; provide migration guides.
  • Registry: Use schema registries (e.g., Confluent Schema Registry, AWS Glue) for centralized management.

Access Governance:

  • Binary binding enables field-level encryption while keeping envelope metadata accessible for routing.
  • Source/Type/Subject based filtering allows least-privilege access (e.g., consumers see only events for entities they own).
  • Authentication and authorization enforced at topic/stream level.

Quality Governance:

  • Monitoring: Track schema validation failures, event delivery rates, and consumer error rates.
  • SLAs: Define and measure delivery guarantees (e.g., 99.9% uptime, <5min latency).
  • Lineage: Maintain audit trails of data flow from production to consumption.
  • Deprecation: 6-month notice for major changes; provide migration paths.

Organizational Governance:

  • Product ownership: Domain teams are responsible for their data products’ quality and evolution.
  • Cross-domain coordination: Establish data product councils for shared standards.
  • Compliance: Ensure events meet regulatory requirements (GDPR, HIPAA) through encryption and retention policies.

CloudEvents as a queryable data product

CloudEvents can act as a primary event-based data product: a log of complete aggregate snapshots (versioned, point-in-time) of your aggregates. When you publish aggregate snapshots (or well-defined deltas with rehydration rules) into a topic and persist them into a data warehouse (e.g. BigQuery), you get a queryable event-log that can answer state-at-time and reconstruction queries.

CloudEvents attributes play a key role in filtering and routing events efficiently, allowing consumers to subscribe to specific event types, sources, or subjects without needing to inspect the payload.

  • The event log is a time-ordered sequence of versioned snapshots. Each event contains subject (aggregate id), timetypedataschema, and data (the snapshot).
  • To find the latest state of an aggregate, query by subject and take the latest event by time (or use offsets/sequences if available).
  • To compute a point-in-time snapshot, filter events by time <= T and pick the latest per subject as of T.

Shopping Cart Example

Assume you persist CloudEvents into a BigQuery table named events.cart_events. Here is the sample CloudEvents in binary binding (attributes in headers, data in body):

Headers:

ce-specversion: 1.0
ce-id: cart-update-456
ce-source: /services/cart
ce-type: com.example.cart.CartUpdated.v1
ce-subject: cart-123
ce-time: 2025-11-01T10:00:00Z
ce-datacontenttype: application/json
ce-dataschema: https://schemas.example.com/cart/v1.0.json
traceparent: 00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01
tracestate: vendorname=opaque-value

Body:

{
  "aggregate": {
    "id": "cart-123",
    "items": [ { "sku": "sku-1", "qty": 2 }, { "sku": "sku-2", "qty": 1 } ],
    "total": 59.97,
    "currency": "USD"
  }
}

Get the latest snapshot of every cart (BigQuery):

SELECT t.subject, t.data.aggregate as latest
FROM (
  SELECT *, ROW_NUMBER() OVER (PARTITION BY subject ORDER BY time DESC) as rn
  FROM `project.dataset.events_cart_events`
) t
WHERE t.rn = 1;

Get the snapshot of a single cart (cart-123) as of a point in time TIMESTAMP('2025-10-01T12:00:00Z'):

SELECT data.aggregate
FROM `project.dataset.events_cart_events`
WHERE subject = 'cart-123'
  AND time <= TIMESTAMP('2025-10-01T12:00:00Z')
ORDER BY time DESC
LIMIT 1;

Filter events by type (e.g., only ‘CartUpdated’ events) for targeted analysis:

SELECT * FROM `project.dataset.events_cart_events`
WHERE type = 'com.example.cart.CartUpdated.v1';

Note: The entity ID (e.g., cart ID) is stored in the subject column. Use subject in WHERE clauses instead of assuming columns like user_id or cart_id.

These queries assume events are stored in a single table. For large scale, consider partitioning by date and sharding by domain or source.

Login System Example

Consider a login system publishing CloudEvents for authentication flows. Multiple events in a single login session share the same trace ID, allowing correlation across services.

Sample CloudEvents in binary binding:

Login attempt event:

Headers:

ce-specversion: 1.0
ce-id: login-attempt-789
ce-source: /services/auth
ce-type: com.example.auth.LoginAttempt.v1
ce-subject: user-456
ce-time: 2025-11-01T10:00:05Z
ce-datacontenttype: application/json
traceparent: 00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01

Body:

{
  "username": "john.doe",
  "ip": "192.168.1.1"
}

Authentication success event:

Headers:

ce-specversion: 1.0
ce-id: auth-success-790
ce-source: /services/auth
ce-type: com.example.auth.Authenticated.v1
ce-subject: user-456
ce-time: 2025-11-01T10:00:10Z
ce-datacontenttype: application/json
traceparent: 00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01

Body:

{
  "sessionId": "sess-123",
  "token": "jwt-token-here"
}

To associate authentication information with a login session, query events by trace ID and extract relevant data:

SELECT 
  type,
  JSON_EXTRACT(data, '$.username') as username,
  JSON_EXTRACT(data, '$.ip') as ip,
  JSON_EXTRACT(data, '$.sessionId') as session_id,
  JSON_EXTRACT(data, '$.token') as token
FROM `project.dataset.events_auth_events`
WHERE JSON_EXTRACT(extensions, '$.traceparent') = '00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01'
ORDER BY time ASC;

This query correlates all events in the authentication flow, associating the login attempt details with the successful authentication outcome.

To find cart updates associated with a login session, join cart events with authentication events using the shared trace ID:

SELECT 
  cart.subject as cart_id,
  cart.time as cart_update_time,
  JSON_EXTRACT(cart.data, '$.total') as cart_total,
  auth.type as auth_event_type,
  JSON_EXTRACT(auth.data, '$.username') as username,
  JSON_EXTRACT(auth.data, '$.ip') as login_ip,
  JSON_EXTRACT(auth.data, '$.sessionId') as session_id
FROM `project.dataset.events_cart_events` cart
JOIN `project.dataset.events_auth_events` auth
  ON JSON_EXTRACT(cart.extensions, '$.traceparent') = JSON_EXTRACT(auth.extensions, '$.traceparent')
WHERE cart.type = 'com.example.cart.CartUpdated.v1'
  AND auth.type = 'com.example.auth.Authenticated.v1'
ORDER BY cart.time ASC;

This join reveals which user (via username and IP) performed cart updates in a session, enabling audit trails and user behavior analysis across domains.

Note: The entity ID (e.g., user ID) is stored in the subject column. Use subject in WHERE clauses instead of assuming columns like user_id.

Implementation Guidance

Design notes

Keep your aggregate schema versioned (see schemaVersion above). Publish schemas to a registry or catalog so consumers can discover and validate payloads. Versioning lets producers evolve shape while giving consumers time to migrate.

2) Idempotency and deduplication

Use the id and a durable offset/sequence in your consumer logic to ensure idempotent processing. CloudEvents provides the envelope fields required for reliable processing.

3) Contract as first-class artifact

Treat the CloudEvent envelope + data schema as the contract for a data product. Publish the contract in documentation, automated tests, and CI checks. Provide client libraries or templates to generate events consistently where useful.

4) Observability and lineage

Emit metadata (traces, producer and pipeline ids) so downstream systems can attribute, debug, and compute lineage. This is essential for quality scoring and governance.

5) Tooling and ergonomics

Use CloudEvents SDKs and platform integrations (Kafka, Pub/Sub, HTTP) to reduce boilerplate. A small upfront investment in tooling pays off in adoption.

Building Guiderails

To make CloudEvents a reliable data product across teams, provide guiderails: SDKs, conventions, and a small ingestion pipeline.

  • SDKs: recommend and publish official CloudEvents SDKs for key runtimes (e.g., .NET, Java, Node.js, Python) so teams publish consistent envelopes. Provide lightweight wrappers/templates for your standard fields (source, team, dataschema, trace ids).For example, producing a CloudEvent in .NET:using CloudNative.CloudEvents; var cloudEvent = new CloudEvent { Id = Guid.NewGuid().ToString(), Source = new Uri("/services/cart"), Type = "com.example.cart.CartUpdated.v1", Subject = "cart-123", Time = DateTimeOffset.UtcNow, DataContentType = "application/json", Data = new { schemaVersion = "v1", aggregate = cartSnapshot } };
  • Ingestion pipeline: have a common pipeline that consumes events from messaging topics and writes them to your data mesh (for example BigQuery datasets). Options:
    • Use a managed connector (e.g., Google Cloud Pub/Sub -> BigQuery Dataflow template) to stream events directly into BigQuery for quick onboarding.
    • Or run a custom Dataflow/Beam/Flink pipeline that consumes any topic, validates CloudEvent envelopes and dataschema, and writes to domain-specific datasets/tables.
    Example implementations on popular platforms:
    • Kafka –> Kafka Sink Connector –> Redshift
    • Kafka –> Confluent Platform –> Snowflake
    • Pub/Sub –> Dataflow Template –> BigQuery
    • Kafka –> Debezium –> PostgreSQL
    • Redpanda –> Redpanda Sink Connector –> Redshift
  • Conventions for dataset/table layout:
    • Use source (or a normalized domain identifier derived from source) as the dataset or schema name to group events by producer/domain.
    • Use subject as the logical entity id (aggregate root) and partition or cluster on it for efficient queries.
    • Use type and dataschema to manage schema evolution: type indicates major version semantics, dataschema points to the concrete schema (minor versions).
  • Routing rules: implement a small router that reads topic messages, validates CloudEvent context, and writes them to the appropriate dataset/table based on source and type. Emit logs and metrics for schema validation failures and unknown types so teams can onboard.
  • Tests and CI: include a small schema validation job in CI that checks sample events against the published dataschema (JSON Schema or Avro). Fail builds when breaking changes are introduced without a new major type.

Putting it together, your pipeline can consume any event topic and, using the CloudEvent envelope conventions (sourcesubjecttypedataschema), place events into domain datasets and make them queryable as first-class event-based data products.

How this helps LLMs and agentic workflows

LLMs and agentic systems perform better when they operate over grounded, traceable, and semantically rich data.

  • Grounding and context: Events provide the why/when/where for a fact, improving retrieval relevance and reducing hallucination risk.
  • RAG pipelines: instead of dumping everything into a vector store, expose curated data products (with embeddings, semantic indexes, and metadata) that RAG pipelines can consult with confidence.
  • Agents as producers and consumers: Agents will create derived artifacts (summaries, embeddings, datasets) and also query domain products. A data-mesh-like model (domain-owned products with contracts and policies) fits naturally.

Practical example: leveraging interfaces in a shopping cart workflow

In an agentic workflow, an LLM-powered shopping assistant could leverage all three interfaces (API calls, realtime events, and data products), for a comprehensive, intelligent user experience:

  • API Call: Synchronous queries for current state. For example, the assistant makes a REST API call (e.g., GET /carts/{user-id}) to retrieve the user’s current cart contents, ensuring recommendations are based on the latest snapshot without delay.
  • Realtime Events: Streaming updates for immediate reactions. The assistant subscribes to an event stream (e.g., via WebSocket or Kafka) for cart events like CartUpdated. This allows real-time responses to user actions, such as suggesting alternatives when an item is out of stock or offering discounts during cart abandonment.
  • Data Product: Batch analytics for historical insights. The assistant queries the BigQuery data product for past cart events, enabling personalized recommendations based on user history (e.g., “Users who bought these items also purchased this accessory”) or trend analysis (e.g., “This item is trending in your region”).

By integrating these interfaces, the LLM maintains up-to-date context, reacts dynamically to changes, and grounds its responses in rich, traceable data, delivering a seamless and context-aware shopping experience.

Practical example: answering a business question

When an agent asks, “What’s the average claim size for auto insurance this quarter?”, it should query a domain-owned data product (or a materialised view derived from CloudEvents) with lineage and a quality score, not an opaque aggregated dump.

Challenges and considerations

While CloudEvents as data products offer significant benefits, there are challenges to consider:

  • Adoption friction: Teams accustomed to traditional data products may need to shift mindsets toward event-driven approaches. Training and tooling can help.
  • Performance overhead: Persisting and querying large volumes of events requires robust infrastructure. Use partitioning, indexing, and data lifecycle management.
  • Schema evolution: Even with versioning, evolving schemas across producers and consumers can be complex. Invest in schema registries and automated testing.
  • Security and compliance: Events may contain sensitive data; ensure encryption, access controls, and compliance with regulations like GDPR.

Addressing these proactively ensures successful implementation.

Closing thoughts

CloudEvents is not a silver bullet, but it is a pragmatic, low-friction envelope that makes events easier to adopt as first-class event-based data products. Combine a simple envelope, a clear data contract, schema versioning, and a mindset of domain ownership, and your events become powerful, reusable building blocks for analytics, ML, and agentic systems.

References

Glossary / Terminology

  • Aggregate: A cluster of domain objects that can be treated as a single unit for data changes. The aggregate root is the single entry point for consistency and transactional boundaries.
  • Aggregate root: The entity within an aggregate that is responsible for maintaining invariants and is the authoritative identity for the aggregate.
  • Entity: An object with an identity that persists over time.
  • Value Object: An immutable object described by its attributes rather than an identity.
  • Domain Event: A statement that something of significance happened in the domain (expressed as an event). In DDD, domain events represent facts about the domain and are often the source for integration events.
  • Bounded Context: A boundary in which a particular domain model applies. Names and semantics inside a bounded context are consistent; different contexts may use the same terms differently.
  • Ubiquitous Language: The shared language used by domain experts and developers within a bounded context.
  • Data Product: A discoverable, documented, and governed data interface (in this case, an event stream or materialised view) provided by a domain team.
  • Schema Registry / Catalog: A place to publish and discover dataschema URIs and schema versions.
  • RAG: Retrieval-Augmented Generation, an approach for LLMs that combines retrieval of context with generative models.
  • Data Mesh: An organisational approach that treats datasets (or data products) as domain-owned products with contracts and governance.
  • LLM: Large Language Model, an AI model trained on vast amounts of text data to understand and generate human-like text.


Leave a comment