Getting Started
@skyware/jetstream is a utility library for consuming data from a Jetstream instance.
Setup
import { class Jetstream<WantedCollections extends CollectionOrWildcard = CollectionOrWildcard, ResolvedCollections extends Collection = ResolveLexiconWildcard<WantedCollections>>The Jetstream client.Jetstream } from "@skyware/jetstream";
const const jetstream: Jetstream<CollectionOrWildcard, Collection>jetstream = new new Jetstream<CollectionOrWildcard, Collection>(options?: JetstreamOptions<CollectionOrWildcard> | undefined): Jetstream<...>The Jetstream client.Jetstream();
const jetstream: Jetstream<CollectionOrWildcard, Collection>jetstream.Jetstream<CollectionOrWildcard, Collection>.start(): voidOpens a WebSocket connection to the server.start();
The Jetstream class takes an object parameter with the following properties:
wantedCollections: An array of collections to subscribe to events for. This can contain any collection name or wildcard strings such asapp.bsky.feed.*to receive events for all collections whose name starts withapp.bsky.feed.. If not provided or empty, you will receive events for all collections.wantedDids: An array of DIDs to subscribe to events for. If not provided or empty, you will receive events for all DIDs.cursor: The Unix timestamp in microseconds to start listening from. A cursor is included in every event emitted as thetime_usproperty. If you don’t provide a cursor, the class will start listening from the most recent event.endpoint: The subscription URL of the Jetstream instance to connect to. Defaults towss://jetstream1.us-east.bsky.network/subscribe.
To begin listening for events, call the start method.
Handling events
const jetstream: Jetstream<CollectionOrWildcard, Collection>jetstream.Jetstream<CollectionOrWildcard, Collection>.onCreate<"app.bsky.feed.post">(collection: "app.bsky.feed.post", listener: (event: CommitCreateEvent<"app.bsky.feed.post">) => void): voidListen for records created in a specific collection.onCreate("app.bsky.feed.post", (event: CommitCreateEvent<"app.bsky.feed.post">event) => {
var console: Consoleconsole.Console.log(...data: any[]): voidlog("New post:", event: CommitCreateEvent<"app.bsky.feed.post">event.CommitCreateEvent<"app.bsky.feed.post">.commit: CommitCreate<"app.bsky.feed.post">commit.CommitCreate<"app.bsky.feed.post">.record: {
$type: "app.bsky.feed.post";
createdAt: string;
text: string;
labels?: ({
values: {
val: string;
$type?: "com.atproto.label.defs#selfLabel" | undefined;
}[];
$type?: "com.atproto.label.defs#selfLabels" | undefined;
} & {
...;
}) | undefined;
... 5 more ...;
tags?: string[] | undefined;
}record.text: stringtext)
});
Jetstream allows you to subscribe to a filtered feed of events related to specific collections. The Jetstream class has three useful methods for listening for commits:
- onCreate: Listen for new records created in a collection.
- onUpdate: Listen for updated records in a collection.
- onDelete: Listen for deleted records in a collection.
The class also emits broader events.
import { type CommitType = "create" | "update" | "delete"
const CommitType: {
readonly Create: "create";
readonly Update: "update";
readonly Delete: "delete";
}The types of commits that can be received.CommitType } from "@skyware/jetstream";
// Listen for all commits, regardless of collection
const jetstream: Jetstream<CollectionOrWildcard, Collection>jetstream.Jetstream<CollectionOrWildcard, Collection>.on(event: "commit", listener: (event: CommitEvent<Collection>) => void): Jetstream<CollectionOrWildcard, Collection> (+6 overloads)Emitted when any commit is received.on("commit", (event: CommitEvent<Collection>event) => {
if (event: CommitEvent<Collection>event.CommitEvent<Collection>.commit: Commit<Collection>commit.operation: "create" | "update" | "delete"operation === const CommitType: {
readonly Create: "create";
readonly Update: "update";
readonly Delete: "delete";
}The types of commits that can be received.CommitType.type Create: "create"A record was created.Create) {
var console: Consoleconsole.Console.log(...data: any[]): voidlog("create in ", event: CommitEvent<Collection>event.CommitEvent<Collection>.commit: CommitCreate<Collection>commit.CommitBase<Collection>.collection: Collectioncollection, event: CommitEvent<Collection>event.CommitEvent<Collection>.commit: CommitCreate<Collection>commit.CommitCreate<Collection>.record: {
lexicon: number;
$type: "com.atproto.lexicon.schema";
} | {
post: ResourceUri;
$type: "app.bsky.feed.postgate";
createdAt: string;
detachedEmbeddingUris?: ResourceUri[] | undefined;
embeddingRules?: ({
...;
} & {
...;
})[] | undefined;
} | ... 17 more ... | {
...;
}record);
} else if (event: CommitEvent<Collection>event.CommitEvent<Collection>.commit: CommitUpdate<Collection> | CommitDelete<Collection>commit.operation: "update" | "delete"operation === const CommitType: {
readonly Create: "create";
readonly Update: "update";
readonly Delete: "delete";
}The types of commits that can be received.CommitType.type Update: "update"A record was updated.Update) {
var console: Consoleconsole.Console.log(...data: any[]): voidlog("update in", event: CommitEvent<Collection>event.CommitEvent<Collection>.commit: CommitUpdate<Collection>commit.CommitBase<Collection>.collection: Collectioncollection, event: CommitEvent<Collection>event.CommitEvent<Collection>.commit: CommitUpdate<Collection>commit.CommitBase<Collection>.rkey: stringrkey);
} else if (event: CommitEvent<Collection>event.CommitEvent<Collection>.commit: CommitDelete<Collection>commit.CommitDelete<Collection>.operation: "delete"operation === const CommitType: {
readonly Create: "create";
readonly Update: "update";
readonly Delete: "delete";
}The types of commits that can be received.CommitType.type Delete: "delete"A record was deleted.Delete) {
var console: Consoleconsole.Console.log(...data: any[]): voidlog("delete in", event: CommitEvent<Collection>event.CommitEvent<Collection>.commit: CommitDelete<Collection>commit.CommitBase<Collection>.collection: Collectioncollection, event: CommitEvent<Collection>event.CommitEvent<Collection>.commit: CommitDelete<Collection>commit.CommitBase<Collection>.rkey: stringrkey);
}
});
// Listen for account status updates
const jetstream: Jetstream<CollectionOrWildcard, Collection>jetstream.Jetstream<CollectionOrWildcard, Collection>.on(event: "account", listener: (event: AccountEvent) => void): Jetstream<CollectionOrWildcard, Collection> (+6 overloads)Emitted when an account is updated.on("account", (event: AccountEventevent) => {
var console: Consoleconsole.Console.log(...data: any[]): voidlog("account update", event: AccountEventevent.AccountEvent.account: Accountaccount.status?: (string & {}) | "deactivated" | "suspended" | "takendown" | "throttled" | "deleted" | "desynchronized" | undefinedstatus)
});
// Listen for identity updates
const jetstream: Jetstream<CollectionOrWildcard, Collection>jetstream.Jetstream<CollectionOrWildcard, Collection>.on(event: "identity", listener: (event: IdentityEvent) => void): Jetstream<CollectionOrWildcard, Collection> (+6 overloads)Emitted when an identity event is received.on("identity", (event: IdentityEventevent) => {
var console: Consoleconsole.Console.log(...data: any[]): voidlog("identity update", event: IdentityEventevent.IdentityEvent.identity: Identityidentity.did: `did:${string}:${string}`did)
});
Using Jetstream over a direct Relay connection can help you save bandwidth and only receive the events you care about.
A Note on Types
When using method such as onCreate, the type of the event will be inferred from the collection name. However, you may encounter an error resulting in missing properties on the record object. Make sure that your tsconfig.json has the following:
{
"compilerOptions": {
"moduleResolution": "node16", // or nodenext
}
}
Event Reference
The Jetstream class may emit the following events:
Update events
| Event | Description |
|---|---|
commit | Represents a commit to a user’s repository. |
identity | Represents a change to an account’s identity. Could be an updated handle, signing key, or PDS hosting endpoint. |
account | Represents a change to an account’s status. The account may be deactivated, suspended, or deleted. |
System events
| Event | Description |
|---|---|
open | Emitted when the websocket connection is opened. |
close | Emitted when the websocket connection is closed. |
error | Emitted when an error occurs while handling a message. |