CREATE SOURCE: Webhook
CREATE SOURCE connects Materialize to an external system you want to read data from, and provides details about how to decode and interpret that data.
Webhook sources expose a public URL that allows your applications to push webhook events into Materialize.
Syntax
CREATE SOURCE [IF NOT EXISTS] <src_name>
[ IN CLUSTER <cluster_name> ]
FROM WEBHOOK
  BODY FORMAT <TEXT | JSON [ARRAY] | BYTES>
  [ INCLUDE HEADER <header_name> AS <column_alias> [BYTES]  |
    INCLUDE HEADERS [ ( [NOT] <header_name> [, [NOT] <header_name> ... ] ) ]
  ][...]
  [ CHECK (
      [ WITH ( <BODY|HEADERS|SECRET <secret_name>> [AS <alias>] [BYTES] [, ...])]
      <check_expression>
    )
  ]
webhook_check_option
| Field | Use | 
|---|---|
| src_name | The name for the source. | 
| IN CLUSTER cluster_name | The cluster to maintain this source. | 
| INCLUDE HEADER | Map a header value from a request into a column. | 
| INCLUDE HEADERS | Include a column named 'headers'of typemap[text => text]containing the headers of the request. | 
| CHECK | Specify a boolean expression that is used to validate each request received by the source. | 
CHECK WITH options
| Field | Type | Description | 
|---|---|---|
| BODY | textorbytea | Provide a bodycolumn to the check expression. The column can be renamed with the optional AS alias statement, and the data type can be changed tobyteawith the optional BYTES keyword. | 
| HEADERS | map[text=>text]ormap[text=>bytea] | Provide a column 'headers'to the check expression. The column can be renamed with the optional AS alias statement, and the data type can be changed tomap[text => bytea]with the optional BYTES keyword. | 
| SECRETsecret_name | textorbytea | Securely provide a SECRETto the check expression. Theconstant_time_eqvalidation function does not support fully qualified secret names: if the secret is in a different namespace to the source, the column can be renamed with the optional AS alias statement. The data type can also be changed tobyteausing the optional BYTES keyword. | 
Supported formats
| Body format | Type | Description | 
|---|---|---|
| BYTES | bytea | Does no parsing of the request, and stores the body of a request as it was received. | 
| JSON | jsonb | Parses the body of a request as JSON. Also accepts events batched as newline-delimited JSON ( NDJSON). If the body is not valid JSON, a response of400Bad Request will be returned. | 
| JSON ARRAY | jsonb | Parses the body of a request as a list of JSON objects, automatically expanding the list of objects to individual rows. Also accepts a single JSON object. If the body is not valid JSON, a response of 400Bad Request will be returned. | 
| TEXT | text | Parses the body of a request as UTF-8text. If the body is not validUTF-8, a response of400Bad Request will be returned. | 
Output
If source creation is successful, you’ll have a new source object with
name src_name and, based on what you defined with BODY FORMAT and INCLUDE HEADERS, the following columns:
| Column | Type | Optional? | 
|---|---|---|
| body | bytea,jsonb, ortext | |
| headers | map[text => text] | ✓ . Present if INCLUDE HEADERSis specified. | 
Webhook URL
After source creation, the unique URL that allows you to POST events to the
source can be looked up in the mz_internal.mz_webhook_sources
system catalog table. The URL will have the following format:
https://<HOST>/api/webhook/<database>/<schema>/<src_name>
A breakdown of each component is as follows:
- <HOST>: The Materialize instance URL, which can be found on the Materialize console.
- <database>: The name of the database where the source is created (default is- materialize).
- <schema>: The schema name where the source gets created (default is- public).
- <src_name>: The name you provided for your source at the time of creation.
Features
Exposing headers
In addition to the request body, Materialize can expose headers to SQL. If a
request header exists, you can map its fields to columns using the INCLUDE HEADER syntax.
CREATE SOURCE my_webhook_source FROM WEBHOOK
  BODY FORMAT JSON
  INCLUDE HEADER 'timestamp' as ts
  INCLUDE HEADER 'x-event-type' as event_type;
This example would have the following columns:
| Column | Type | Nullable? | 
|---|---|---|
| body | jsonb | No | 
| ts | text | Yes | 
| event_type | text | Yes | 
All of the header columns are nullable. If the header of a request does not
contain a specified field, the NULL value will be used as a default.
Excluding header fields
To exclude specific header fields from the mapping, use the INCLUDER HEADERS
syntax in combination with the NOT option. This can be useful if, for
example, you need to accept a dynamic list of fields but want to exclude
sensitive information like authorization.
CREATE SOURCE my_webhook_source FROM WEBHOOK
  BODY FORMAT JSON
  INCLUDE HEADERS ( NOT 'authorization', NOT 'x-api-key' );
This example would have the following columns:
| Column | Type | Nullable? | 
|---|---|---|
| body | jsonb | No | 
| headers | map[text => text] | No | 
All header fields but 'authorization' and 'x-api-key' will get included in
the headers map column.
Validating requests
CHECK statement, all requests will be accepted. To prevent bad
actors from injecting data into your source, it is strongly encouraged that
you define a CHECK statement with your webhook sources.
It’s common for applications using webhooks to provide a method for validating a
request is legitimate. You can specify an expression to do this validation for
your webhook source using the CHECK clause.
For example, the following source HMACs the request body using the sha256
hashing algorithm, and asserts the result is equal to the value provided in the
x-signature header, decoded with base64.
CREATE SOURCE my_webhook_source FROM WEBHOOK
  BODY FORMAT JSON
  CHECK (
    WITH (
      HEADERS, BODY AS request_body,
      SECRET my_webhook_shared_secret AS validation_secret
    )
    -- The constant_time_eq validation function **does not support** fully
    -- qualified secret names. We recommend always aliasing the secret name
    -- for ease of use.
    constant_time_eq(
        decode(headers->'x-signature', 'base64'),
        hmac(request_body, validation_secret, 'sha256')
    )
  );
The headers and body of the request are only subject to validation if WITH ( BODY, HEADERS, ... ) is specified as part of the CHECK statement. By
default, the type of body used for validation is text, regardless of the
BODY FORMAT you specified for the source. In the example above, the body
column for my_webhook_source has a type of jsonb, but request_body as
used in the validation expression has type text. Futher, the request headers
are not persisted as part of my_webhook_source, since INCLUDE HEADERS was
not specified — but they are provided to the validation expression.
Debugging validation
It can be difficult to get your CHECK statement correct, especially if your
application does not have a way to send test events. If you’re having trouble
with your CHECK statement, we recommend creating a temporary source without
CHECK and using that to iterate more quickly.
CREATE SOURCE my_webhook_temporary_debug FROM WEBHOOK
  -- Specify the BODY FORMAT as TEXT or BYTES,
  -- which is how it's provided to CHECK.
  BODY FORMAT TEXT
  INCLUDE HEADERS;
Once you have a few events in my_webhook_temporary_debug, you can query it with your would-be
CHECK statement.
SELECT
  -- Your would-be CHECK statement.
  constant_time_eq(
    decode(headers->'signature', 'base64'),
    hmac(headers->'timestamp' || body, 'my key', 'sha512')
  )
FROM my_webhook_temporary_debug
LIMIT 10;
SELECT statement, so you’ll need to
provide these values as raw text for debugging.
Handling duplicated and partial events
Given any number of conditions, e.g. a network hiccup, it’s possible for your application to send
an event more than once. If your event contains a unique identifier, you can de-duplicate these events
using a MATERIALIZED VIEW and the DISTINCT ON clause.
CREATE MATERIALIZED VIEW my_webhook_idempotent AS (
  SELECT DISTINCT ON (body->>'unique_id') *
  FROM my_webhook_source
  ORDER BY id
);
We can take this technique a bit further to handle partial events. Let’s pretend our application tracks the completion of build jobs, and it sends us JSON objects with following structure.
| Key | Value | Optional? | 
|---|---|---|
| id | text | No | 
| started_at | text | Yes | 
| finished_at | text | Yes | 
When a build job starts we receive an event containing id and the started_at timestamp. When a
build finished, we’ll receive a second event with the same id but now a finished_at timestamp.
To merge these events into a single row, we can again use the DISTINCT ON clause.
CREATE MATERIALIZED VIEW my_build_jobs_merged AS (
  SELECT DISTINCT ON (id) *
  FROM (
    SELECT
      body->>'id' as id,
      try_parse_monotonic_iso8601_timestamp(body->>'started_at') as started_at,
      try_parse_monotonic_iso8601_timestamp(body->>'finished_at') as finished_at
    FROM my_build_jobs_source
  )
  ORDER BY id, finished_at NULLS LAST, started_at NULLS LAST
);
text to timestamp you should prefer to use the try_parse_monotonic_iso8601_timestamp
function, which enables temporal filter pushdown.
Handling batch events
The application pushing events to your webhook source may batch multiple events into a single HTTP request. The webhook source supports parsing batched events in the following formats:
JSON arrays
You can automatically expand a batch of requests formatted as a JSON array into
separate rows using BODY FORMAT JSON ARRAY.
-- Webhook source that parses request bodies as a JSON array.
CREATE SOURCE webhook_source_json_array FROM WEBHOOK
  BODY FORMAT JSON ARRAY
  INCLUDE HEADERS;
If you POST a JSON array of three elements to webhook_source_json_array,
three rows will get appended to the source.
POST webhook_source_json_array
[
  { "event_type": "a" },
  { "event_type": "b" },
  { "event_type": "c" }
]
SELECT COUNT(body) FROM webhook_source_json_array;
----
3
You can also post a single object to the source, which will get appended as one row.
POST webhook_source_json_array
{ "event_type": "d" }
SELECT body FROM webhook_source_json_array;
----
{ "event_type": "a" }
{ "event_type": "b" }
{ "event_type": "c" }
{ "event_type": "d" }
Newline-delimited JSON (NDJSON)
You can automatically expand a batch of requests formatted as NDJSON into
separate rows using BODY FORMAT JSON.
-- Webhook source that parses request bodies as NDJSON.
CREATE SOURCE webhook_source_ndjson FROM WEBHOOK
BODY FORMAT JSON;
If you POST two elements delimited by newlines to webhook_source_ndjson, two
rows will get appended to the source.
POST 'webhook_source_ndjson'
  { 'event_type': 'foo' }
  { 'event_type': 'bar' }
SELECT COUNT(body) FROM webhook_source_ndjson;
----
2
Request limits
Webhook sources apply the following limits to received requests:
- The maximum size of the request body is 2MB. Requests larger than this will fail with413 Payload Too Large.
- The rate of concurrent requests/second across all webhook sources
is 500. Trying to connect when the server is at capacity will fail with
429 Too Many Requests.
- Requests that contain a header name specified more than once will be rejected
with 401 Unauthorized.
Examples
Using basic authentication
Basic authentication enables a simple and rudimentary way to grant authorization to your webhook source.
To store the sensitive credentials and make them reusable across multiple
CREATE SOURCE statements, use secrets.
CREATE SECRET basic_hook_auth AS 'Basic <base64_auth>';
Creating a source
After a successful secret creation, you can use the same secret to create different webhooks with the same basic authentication to check if a request is valid.
CREATE SOURCE webhook_with_basic_auth
FROM WEBHOOK
    BODY FORMAT JSON
    CHECK (
      WITH (
        HEADERS,
        BODY AS request_body,
        SECRET basic_hook_auth AS validation_secret
      )
      -- The constant_time_eq validation function **does not support** fully
      -- qualified secret names. We recommend always aliasing the secret name
      -- for ease of use.
      constant_time_eq(headers->'authorization', validation_secret)
    );
Your new webhook is now up and ready to accept requests using basic authentication.
JSON parsing
Webhook data is ingested as a JSON blob. We recommend creating a parsing view on top of your webhook source that maps the individual fields to columns with the required data types. To avoid doing this tedious task manually, you can use this JSON parsing widget!