Setting up your own Bluesky Feed
Introduction
Bluesky is a decentralized alternative to Twitter. It is a platform that praises itself on being more open and putting users at the center of the ecosystem. One of the features that derives from this mantra is the ability for anyone to create their own custom newsfeed. Other users can then discover those feeds and display then on their homepage.
This opens up the possibility for anyone to build their own algorithm and select content based on specific topics or based on popularity metrics such as reposts and likes.
However, making such a feed is a pretty technical task. The purpose of this blog post is to walk you through the steps required to setup your own custom newsfeed, using the Go programming language. You don’t need to be a Go expert to understand what I’m going to explain, but some level of knowledge of the language will make following through a bit easier. You should also know that the official documentation provides examples in Typescript. There are also community made templates for Python and Ruby
All the code in this post is hosted in this repo https://gitlab.com/Dhawos/bluesky-custom-feeds. Don’t hesitate to go have a look to understand in which context the snippets in this post are relevant.
The different steps
To build you own newsfeed, you will have to complete several steps:
1.Register the newsfeed on your Bluesky account: This part is simply about running a script that will make the correct API calls to Bluesky to register your newsfeed. You will need to have a Bluesky account as the feed will appear under your profile. You will also need a domain that you own, that will also be used to host the custom feed server later.
2.Index the content created on Bluesky: Before being able to choose which posts users are going to see. You must first know what posts are available. To do we will record events that occur on the Bluesky network and save them inside our database to later decide which posts a user should see on our feed.
3.Build the server: The last task will be building a web server that will listen to Bluesky specific requests (as described in the protocol). The server’s job will be to decide which posts the user should see based on the request and make that reply to Bluesky, who will take care of fetching the posts information and hydrating the view.
Let’s see that in more details
1. Register the newsfeed
It is the simplest task in the process. For our newsfeed to be usable, it must first be registered with Bluesky. It will then show up under our profile. To do so, we will use a little script which will use the indigo library which is the Bluesky SDK for Go.
package main
import (
...
"github.com/bluesky-social/indigo/api/atproto"
"github.com/bluesky-social/indigo/api/bsky"
"github.com/bluesky-social/indigo/lex/util"
"github.com/bluesky-social/indigo/xrpc"
)
Here is what it will look like (I’m omitting a portion of the code here for the sake of clarity). You will need to declare some information that will describe your feed:
recordName := feed.ShortName()
displayName := feed.DisplayName()
description := feed.Description()
avatarBytes := feed.Avatar() //[]bytes of an image file
Then, we will setup our client to talk to Bluesky API. For that, you will need your Bluesky handle (our nickname + domain). In my case this is @dhawos.bsky.social. You will also need an “application password” which you can generate here
client := xrpc.Client{
Host: "https://bsky.social",
}
ctx := context.Background()
if _, err := atproto.ServerDescribeServer(ctx, &client); err != nil {
panic(err)
}
authInput := atproto.ServerCreateSession_Input{
Identifier: handle, // our handle
Password: appPassword, // our application password
}
authOutput, err := atproto.ServerCreateSession(ctx, &client, &authInput)
if err != nil {
panic(err)
}
client.Auth = &xrpc.AuthInfo{
AccessJwt: authOutput.AccessJwt,
RefreshJwt: authOutput.RefreshJwt,
Handle: authOutput.Handle,
Did: authOutput.Did,
}
// here we will give our distributed identifier (DID) for our news feed
// The hostname variable must match the domain you chose and you own and
// on which you will host your server
feedDID := fmt.Sprintf("did:web:%s", hostname)
var avatar *util.LexBlob
if avatarBytes != nil {
uploadOutput, err := atproto.RepoUploadBlob(ctx, &client, bytes.NewReader(avatarBytes))
if err != nil {
panic(err)
}
avatar = uploadOutput.Blob
}
putInput := atproto.RepoPutRecord_Input{
Collection: "app.bsky.feed.generator",
Record: &util.LexiconTypeDecoder{Val: &bsky.FeedGenerator{
Avatar: avatar,
CreatedAt: time.Now().Format(time.RFC3339),
Description: &description,
Did: feedDID,
DisplayName: displayName,
}},
Repo: client.Auth.Did,
Rkey: recordName,
}
putOutput, err := atproto.RepoPutRecord(ctx, &client, &putInput)
if err != nil {
panic(err)
}
fmt.Printf("Feed %s published\n", recordName)
fmt.Println(putOutput)
}
If everything went well, please save the output carefully, you will need it later. You should now also be able to see your new feed on your profile.
2. Indexing content from Bluesky
Now that your feed is created on Bluesky’s end, you need to set it up on your side. To suggest posts to your users, you first need to know which posts exist on the network.
We’ll use Jetstream for this purpose, an open-source service provided by Bluesky that simplifies retrieving all content produced on the network. Note, however, that this service is not part of Bluesky’s official specifications and may change in the future. For now, it’s more user-friendly than the official(Firehose feed.
We’ll also need a database, which we’ll use PostgreSQL. We’ll utilize its full-text search capabilities.
To implement this, we’ll create a process that runs continuously to monitor activity on the network and save relevant information to our database. This involves defining a struct that listens for events and saves pertinent data.
package firehose
import (
...
)
type FirehoseListener struct {
db *repository.Queries
logger *slog.Logger
}
func NewFirehoseListener(db *repository.Queries) FirehoseListener {
return FirehoseListener{db: db, logger: slog.Default()}
}
Now we will instanciate the listener in our main function
package main
import (
"context"
"fmt"
"log/slog"
"os"
"bluesky-custom-feeds/internal/firehose"
"bluesky-custom-feeds/internal/repository"
"github.com/jackc/pgx/v5/pgxpool"
)
func main() {
ctx := context.Background()
dbHostname, dbHostnameSet := os.LookupEnv("FEED_DB_HOST")
if !dbHostnameSet {
slog.Error("FEED_DB_HOST env var not set")
os.Exit(1)
}
dbPassword, dbPasswordSet := os.LookupEnv("FEED_DB_PASSWORD")
if !dbPasswordSet {
slog.Error("FEED_DB_PASSWORD env var not set")
os.Exit(1)
}
connectionString := fmt.Sprintf("user=blueskyfeeds dbname=bluesky sslmode=disable host=%s password=%s", dbHostname, dbPassword)
pool, err := pgxpool.New(ctx, connectionString)
if err != nil {
panic(err)
}
defer pool.Close()
queries := repository.New(pool)
subscriber := firehose.NewFirehoseListener(queries)
}
We have the basic structure for our program. Now, we need to implement the function that will listen for events. Before that, let’s understand the foundation of AtProto, which is the protocol BlueSky is built on. Each user has a “repository” similar to Git. Everything you produce as a user results in a new commit within a specific collection. That is what we did when we created our feed earlier. We created a new commit in the app.bsky.feed.generator
collection with the relevant informations
For now, we’ll focus on listening to events related to the creation of commits. Other types of events, such as updates or deletions, are not relevant for our current purpose.
// A collection is a type of event that can occur
// We will only listen for post, like and reposts events
const (
postCollection = "app.bsky.feed.post"
likeCollection = "app.bsky.feed.like"
repostCollection = "app.bsky.feed.repost"
)
func (fh *FirehoseListener) SubscribeJetstream() {
uri := "wss://jetstream2.us-east.bsky.network/subscribe"
ctx := context.Background()
// Ici nous allons déclarer la fonction a exécuter pour chaque événement
// arrivant sur le JetStream
scheduler := jssequential.NewScheduler("stream", fh.logger, func(ctx context.Context, e *models.Event) error {
// We only handle commit creation
if e.Commit == nil || e.Commit.Operation != models.CommitOperationCreate {
return nil
}
// Only new posts are relevant to us. If you want to count likes
// or reposts, you'll have to handle it there.
switch e.Commit.Collection {
case postCollection:
var post bsky.FeedPost
// Post retrieval in JSON format
err := json.Unmarshal(e.Commit.Record, &post)
if err != nil {
fh.logger.Error("unmarshalling post", "err", err)
}
// Handling and saving of the post
err = fh.onPostCreate(ctx, &post, e.Commit.CID, e.Did, e.Commit.RKey)
if err != nil {
fh.logger.Error("handling post", "err", err)
}
}
return nil
})
// Initialisation du client Jetstream, ici nous nous intéressons
// Init Jetstream client and subscribe only to posts, likes and reposts
config := client.ClientConfig{
Compress: true,
WebsocketURL: uri,
WantedDids: []string{},
WantedCollections: []string{
postCollection,
likeCollection,
repostCollection,
},
MaxSize: 0,
ExtraHeaders: map[string]string{},
}
jsClient, err := client.NewClient(&config, fh.logger, scheduler)
if err != nil {
panic(err)
}
// We resume listening at the current point in time
// We could save the last time we got an event to void gaps
// if our service were to stop
cursor := time.Now().Unix()
jsClient.ConnectAndRead(ctx, &cursor)
}
We still need to write the function that will save the post to the database:
func (fh *FirehoseListener) onPostCreate(ctx context.Context, post *bsky.FeedPost, cid string, repoDID string, path string) error {
if post.Reply != nil {
return nil
}
// Only save post in French and in English
// You can setup any logic you'd like there
if !slices.Contains(post.Langs, "fr") && !slices.Contains(post.Langs, "en") {
return nil
}
for _, lang := range post.Langs {
fh.db.TagPostLang(ctx, repository.TagPostLangParams{
PostID: cid,
Lang: lang,
})
}
// Saving the post in database
// The most important information to save is the "atUri" for the post
// as that is what we'll need to send back to Bluesky application when
// it requests the feed from us
params := repository.CreatePostParams{
// CID is for "content ID" it is not strictly necessary to save
Cid: cid,
// repoDID = user repository ID
// postCollection = "app.bsky.feed.post"
// path = the "record key" that is a identifier for the post
AtUri: fmt.Sprintf("at://%s/%s/%s", repoDID, postCollection, path),
// We need to save the text to be able to search it later
Text: post.Text,
IndexedAt: pgtype.Timestamptz{
Time: time.Now(),
InfinityModifier: 0,
Valid: true,
},
}
_, err := fh.db.CreatePost(ctx, params)
if err != nil {
fh.logger.Error(err.Error())
return fmt.Errorf("indexing post : %w", err)
}
return nil
}
We only need to call that function at the end of our main.go
:
subscriber.SubscribeJetstream()
With that set up, and the PostgreSQL running in the background, it should be possible to start the program with the command:
go run cmd/indexer/main.go
If all went well, you should be able to see posts coming to your database. Be warned that it might fill up pretty quickly as there can be a lot of activity on the network. We’ll see later how we can avoid the database growing forever.
3. Writing your custom feed server
With our database filling up, we should be able to build the last piece of the puzzle. To do so we will write a small webserver which will have two features:
- Validate our identity with Bluesky
- Fulfill feed requests from Bluesky
First, let’s set up the building blocks for our server. We’ll write a controller that will answer requests comingin
package handlers
type FeedController struct {
hostname string
feedDID string
serviceDID string
feeds map[string]feeds.BlueskyFeed
}
func NewFeedController(db *repository.Queries, hostname string, feedDID string, serviceDID string) FeedController {
// Here I use my custom struct to define a newsfeed but you could create your own type
// or not even use a custom type at all, especially if you plan only on writing a single
// feed.
newFeeds := []feeds.BlueskyFeed{
feeds.NewCyclingFeed(db),
feeds.NewLilleFeed(db),
}
feeds := make(map[string]feeds.BlueskyFeed, len(newFeeds))
for _, feed := range newFeeds {
feeds[feed.ShortName()] = feed
}
return FeedController{hostname: hostname, feedDID: feedDID, serviceDID: serviceDID, feeds: feeds}
}
Then in our server’s main.go
file :
package main
func main() {
...
connectionString := fmt.Sprintf("user=blueskyfeeds dbname=bluesky sslmode=disable host=%s password=%s", dbHostname, dbPassword)
ctx := context.Background()
pool, err := pgxpool.New(ctx, connectionString)
if err != nil {
panic(err)
}
defer pool.Close()
queries := repository.New(pool)
// queries is our connection pool to our database
// hostname is our domain name that we chose earlier
// feedDID is the value you got as a response when registering your custom feed
// serviceDID is the DID for your server, the format is "did:web:<hostname>"
// in my case : "did:web:blueskyfeeds.dhawos.dev"
feedController := handlers.NewFeedController(queries, hostname, feedDID, serviceDID)
r := gin.Default()
r.GET("/healthz", func(c *gin.Context) {
c.JSON(200, gin.H{
"status": "ok",
})
})
xrpc := r.Group("/xrpc")
{
// It's on this URL that Bluesky will reach us when a user
// makes a request for a custom feed
xrpc.GET("/app.bsky.feed.getFeedSkeleton", feedController.FeedGenerator)
// This function is not technically required
xrpc.GET("/app.bsky.feed.describeFeedGenerator", feedController.DescribeFeedGenerator)
}
// This URL is the one Bluesky will use to verify our identity
r.GET("/.well-known/did.json", feedController.WellKnownDID)
r.Run() // listen and serve on 0.0.0.0:8080
}
Identity validation
Before sending us a request for a given custom feed, Bluesky will ensure that our identity is valid. There are several methods to assert our identity, namely did:web
and did:plc
. Here we’ll use the did:web
method as it is the easiest to setup. Be mindful that it has a limitation. It does not support domain migration, so ensure that the domain you chose is the correct one as changing it later on will prove difficult.
To assert our identity we just need to produce the correct response which in my case looks like this:
{
"@context": [
"https://www.w3.org/ns/did/v1"
],
"id": "did:web:blueskyfeeds.dhawos.dev",
"service": [
{
"id": "#bsky_fg",
"type": "BskyFeedGenerator",
"serviceEndpoint": "https://blueskyfeeds.dhawos.dev"
}
]
}
Here is the code snippet that generates such a response :
func (fc *FeedController) WellKnownDID(c *gin.Context) {
if !strings.HasSuffix(fc.serviceDID, fc.hostname) {
c.JSON(http.StatusNotFound, gin.H{})
return
}
result := wellKnownDIDResponse{
Context: []string{"https://www.w3.org/ns/did/v1"},
ID: fc.serviceDID,
Service: []wellKnownDIDResponseService{
{
Id: "#bsky_fg",
Type: "BskyFeedGenerator",
ServiceEndpoint: fmt.Sprintf("https://%s", fc.hostname),
},
},
}
c.JSON(http.StatusOK, result)
}
Answering feed requests
We are at the end of our little journey. We now only need to setup the actual custom feed implementation. But first let’s summarize what happens when a user requests a feed from Bluesky.
- The user load is newsfeed
- Bluesky will fetch the information related to that feed
- Bluesky will check the identity linked to that feed (which we just implemented)
- Bluesky will send a request to the feed’s domain name. This request will be made against the following endpoint :
https://<hostname/xrpc/app.bsky.feed.getFeedSkeleton?feed=at://<feedDID>/app.bsky.feed.generator/<feedName>&limit=<limit>&cursor=<cursor>.
Here is an example of a valid response for this request
{
"cursor": "17533861",
"feed": [
{
"post": "at://did:plc:h6uiyakv4szt7lwe7qioiuzq/app.bsky.feed.post/3lhdtxq7gwc2y"
},
{
"post": "at://did:plc:r5ucobelmauof6krhzn4ltja/app.bsky.feed.post/3lhdtfjh6rs27"
},
{
"post": "at://did:plc:6vul7s776b53z3xsttsveiae/app.bsky.feed.post/3lhdrwehye223"
},
{
"post": "at://did:plc:grxrtsbervppan4gxjvsvilc/app.bsky.feed.post/3lhdrksjhds2i"
},
...
]
}
The cursor field in the request and the response will be used for pagination. This is an opaque field that’s just generated by our server and sent back by Bluesky on the next request. This value has no meaning for Bluesky and its meaning is the responsibility of the custom feed developer. In our case we we’ll use the ID of the last post that we returned. That way, we will be able to send the next batch of posts when we receive a request for the next page. Since theses IDs are generated sequentially, we know for a fact that we will get the posts from newest to oldest
Here is how to generate such a response in our code.
func (fc *FeedController) FeedGenerator(c *gin.Context) {
// We retrieve the request information and validate that the format and
// and the values are as expected
feed := c.Query("feed")
if feed == "" {
c.JSON(http.StatusBadRequest, gin.H{})
return
}
atURI, err := syntax.ParseATURI(feed)
if err != nil {
slog.Error("invalid feed", "feed", feed, "err", err)
c.JSON(http.StatusBadRequest, gin.H{})
return
}
if atURI.Authority().String() != fc.feedDID {
slog.Error("invalid atURI authority", "authority", atURI.Authority().String(), "expectedAuthority", fc.feedDID)
c.JSON(http.StatusBadRequest, gin.H{})
return
}
expectedCollection := "app.bsky.feed.generator"
if atURI.Collection().String() != expectedCollection {
slog.Error("invalid atURI authority", "collection", atURI.Collection().String(), "expectedCollection", expectedCollection)
c.JSON(http.StatusBadRequest, gin.H{})
return
}
// Since we have several feeds, we must choose which one to use given the
// request's parameters
algo, exists := fc.feeds[atURI.RecordKey().String()]
if !exists {
slog.Error("non existant feed", "feed", atURI.RecordKey())
c.JSON(http.StatusBadRequest, gin.H{})
return
}
// If cursor is not set, we use a negative value.
// That way we will retrieve all posts in our requests
cursor := c.Query("cursor")
if cursor == "" {
cursor = "-1"
}
cursorInt64, err := strconv.ParseInt(cursor, 10, 64)
if cursorInt64 < 0 {
cursorInt64 = math.MaxInt64
}
if err != nil {
slog.Error("could not parse cursor", "cursor", cursor, "err", err)
c.JSON(http.StatusBadRequest, "")
return
}
// We must also honor how many posts were requested by Bluesky
limit := c.Query("limit")
if limit == "" {
limit = DEFAULT_FEED_LIMIT
}
limitInt, err := strconv.ParseInt(limit, 10, 32)
limitInt32 := int32(limitInt)
if err != nil {
slog.Error("could not parse limit", "limit", limit, "err", err)
c.JSON(http.StatusBadRequest, "")
return
}
req := feeds.FeedRequest{
Lang: "fr",
Cursor: cursorInt64,
Limit: limitInt32,
}
// Then, we need to call our function that will retrieve the relevant posts
// for the selected feed
response, err := fc.feeds[algo.ShortName()].GetFeedSkeleton(c, req)
if err != nil {
slog.Error("could not generate feed", "feed", feed, "err", err)
c.JSON(http.StatusInternalServerError, "")
return
}
c.JSON(http.StatusOK, response)
}
Here is the implementation for my custom feed for posts about cycling and commuting on a bicycle:
type FeedResponse struct {
Post string `json:"post"`
}
type FeedSkeletonResponse struct {
Cursor string `json:"cursor"`
Feed []FeedResponse `json:"feed"`
}
func (cf *CyclingFeed) GetFeedSkeleton(ctx context.Context, req FeedRequest) (FeedSkeletonResponse, error) {
// This implementation will be based on a simple list of keywords
// the words in the query mean
// bicycle, cyclist and bike commuting respectively
const query = "vélo | cycliste | vélotaf"
params := repository.FullTextSearchParams{
ID: req.Cursor,
Lang: req.Lang,
Limit: req.Limit,
ToTsquery: query,
}
queryResult, err := cf.db.FullTextSearch(ctx, params)
if err != nil {
return FeedSkeletonResponse{}, fmt.Errorf("searching for %q : %w", query, err)
}
results := make([]FeedResponse, len(queryResult))
lastCursor := int64(0)
for i, queryResult := range queryResult {
results[i] = FeedResponse{Post: queryResult.AtUri.String}
lastCursor = queryResult.ID.Int64
}
response := FeedSkeletonResponse{
Cursor: strconv.FormatInt(lastCursor, 10),
Feed: results,
}
return response, nil
}
I use sqlc
to generate the code that will execute the SQL requests. Here is the request I used (I’m not including the generated boilerplate code as it is not really relevant for this post).
-- name: FullTextSearch :many
SELECT id,at_uri FROM bluesky_posts
RIGHT JOIN post_languages ON cid = post_id
WHERE id < $1
AND lang = $2
AND text_search @@ to_tsquery($4)
ORDER BY id DESC
LIMIT $3;
This request will fetch any post in a given language, for which the ID is below the cursor. We also use the to_tsquery
function of PostgreSQL to perform a full text search and retrieve posts that contain terms for our query. Finally we limit the number of posts to retrieve with the given limit parameter given by Bluesky.
For the pagination to work correctly, we must ensure that the result of this request is stable and does not change over time. This is why we use the ID of the post (which is equivalent to the order in which the posts were indexed).
With that done, you should now be able to run this code and setup your domain name to point either to your local machine or to a server on which this code is deployed. Once that is done, you can go on your Bluesky profile to find your feed and navigate to it to see if everything works as expected.
Set-up garbage collection
If everything works well, congratulations. Although there is a last step that you must perform if you don’t want your database to grow infinitely over time. As there is quite a lot of traffic on Bluesky, the database can grow pretty quickly so regular cleanup is important.
To do so, we will simply launch a goroutine alongside the indexing process. This goroutine will simply run a cleanup command on a regular interval to delete posts older than 48 hours.
func (fh *FirehoseListener) deleteOldPosts(ctx context.Context) error {
const olderThan = time.Hour * time.Duration(-48)
cuttingPoint := time.Now().Add(olderThan)
fh.logger.Info("deleting posts before", "cuttingPoint", cuttingPoint.String())
err := fh.db.DeletePostsBefore(ctx, pgtype.Timestamptz{
Time: cuttingPoint,
InfinityModifier: 0,
Valid: true,
})
if err != nil {
return fmt.Errorf("deleting old posts : %w", err)
}
// The vacuum here is important so that the space that was used by
// the deleted rows can be reclaimed. This avoids storage usage growing
// infinitely
err = fh.db.Vacuum(ctx)
if err != nil {
return fmt.Errorf("doing vacuum: %w", err)
}
return nil
}
func (fh *FirehoseListener) RunGarbageCollectorInBackground(ctx context.Context) {
ticker := time.NewTicker(time.Hour * 2)
quit := make(chan struct{})
go func() {
for {
select {
case <-ticker.C:
err := fh.deleteOldPosts(ctx)
if err != nil {
fh.logger.Error("failed", "err", err.Error())
}
case <-quit:
ticker.Stop()
return
}
}
}()
}
Closing words
Thanks for reading along. I hope those explanations allowed you to see more clearly the tasks needed to set up your own Bluesky custom newsfeed. The complete source code is available at https://gitlab.com/Dhawos/bluesky-custom-feeds. Don’t hesitate to take a look
You can let me know if you enjoyed this article on my Bluesky: @dhawos.bsky.social. And if it helped you build your own feed, please share it, I would love to see what you build.
If you happen to be interested in my feeds, you can find them here : (although they only show posts in French for now)
- A feed on cycling and commuting with a bike : https://bsky.app/profile/dhawos.bsky.social/feed/velo
- A feed on the town of Lille in the north of France : https://bsky.app/profile/dhawos.bsky.social/feed/lille