From Kafka stream
Ingest hydrated events from a hosted Kafka stream (as compared to dehydrated events from gRPC hub)
With Kafka, you can subscribe to the same data that we use for sending webhook notifications
To get entire dataset, Kafka is best paired with one of our other data products (such as Parquet )
Kafka is not suitable to build a database with all of the data from Farcaster day 1. Our kafka topics currently keep data for 14 days. It's a good solution for streaming recent data in real time.
Why
If you’re using Hub gRPC streaming, you’re getting dehydrated events that you have to put together yourself later to make useful (see here for example). With Neynar’s Kafka stream, you get a fully hydrated event (e.g. user.created) that you can use in your app / product immediately
How
- Reach out, we will create credentials for you and send them via 1Password.
- For authentication, the connection requires
SASL/SCRAM SHA512
. - For encryption, the connection requires TLS (sometimes called SSL for legacy reasons).
farcaster-mainnet-events
is the primary topic name. There may be more topics in the future, but for now there is just one. It has 2 partitions.
There are three brokers available over the Internet. Provide them all to your client:
b-1-public.tfmskneynar.5vlahy.c11.kafka.us-east-1.amazonaws.com:9196
b-2-public.tfmskneynar.5vlahy.c11.kafka.us-east-1.amazonaws.com:9196
b-3-public.tfmskneynar.5vlahy.c11.kafka.us-east-1.amazonaws.com:9196
Most clients accept the brokers as a comma separated list:
b-2-public.tfmskneynar.5vlahy.c11.kafka.us-east-1.amazonaws.com:9196,b-1-public.tfmskneynar.5vlahy.c11.kafka.us-east-1.amazonaws.com:9196,b-3-public.tfmskneynar.5vlahy.c11.kafka.us-east-1.amazonaws.com:9196
You can use kcat
(formerly kafkacat
) to test things locally:
brew install kcat
brew home kcat
kcat -L \
-b 'b-2-public.tfmskneynar.5vlahy.c11.kafka.us-east-1.amazonaws.com:9196,b-1-public.tfmskneynar.5vlahy.c11.kafka.us-east-1.amazonaws.com:9196,b-3-public.tfmskneynar.5vlahy.c11.kafka.us-east-1.amazonaws.com:9196' \
-X security.protocol=sasl_ssl \
-X sasl.mechanisms=SCRAM-SHA-512 \
-X sasl.username='user-YOURNAME' \
-X sasl.password='YOURPASSWORD'
Example output:
Metadata for farcaster-mainnet-events (from broker 1: sasl_ssl://b-1-public.tfmskneynar.5vlahy.c11.kafka.us-east-1.amazonaws.com:9196/1):
3 brokers:
broker 2 at b-2-public.tfmskneynar.5vlahy.c11.kafka.us-east-1.amazonaws.com:9196
broker 3 at b-3-public.tfmskneynar.5vlahy.c11.kafka.us-east-1.amazonaws.com:9196 (controller)
broker 1 at b-1-public.tfmskneynar.5vlahy.c11.kafka.us-east-1.amazonaws.com:9196
1 topics:
topic "farcaster-mainnet-events" with 2 partitions:
partition 0, leader 2, replicas: 2,3,1, isrs: 2,3,1
partition 1, leader 3, replicas: 3,1,2, isrs: 3,1,2
Consumer nodejs example
https://github.com/neynarxyz/farcaster-examples/tree/main/neynar-webhook-kafka-consumer
Data schema
// _when a new user is created on the network_
interface Bio {
text: string;
}
interface Profile {
bio: Bio;
}
interface VerifiedAddresses {
eth_addresses: string[];
sol_addresses: string[];
}
interface UserCreatedData {
object: "user";
fid: number;
custody_address: string;
username: string;
display_name: string | null;
pfp_url: string | null;
profile: Profile;
follower_count: number;
following_count: number;
verifications: string[];
verified_addresses: VerifiedAddresses;
active_status: "inactive" | "active";
power_badge: boolean;
event_timestamp: string; // ISO 8601 format
}
interface CustomHeaders {
"x-convoy-message-type": "broadcast";
}
interface UserCreatedEvent {
event_type: "user.created";
data: UserCreatedData;
custom_headers: CustomHeaders;
idempotency_key?: string;
}
// _when a user profile field is updated_
interface Bio {
text: string;
}
interface Profile {
bio: Bio;
}
interface VerifiedAddresses {
eth_addresses: string[];
sol_addresses: string[];
}
interface UserUpdatedData {
object: "user";
fid: number;
custody_address: string;
username: string;
display_name: string;
pfp_url: string;
profile: Profile;
follower_count: number;
following_count: number;
verifications: string[];
verified_addresses: VerifiedAddresses;
active_status: "inactive" | "active";
power_badge: boolean;
event_timestamp: string; // ISO 8601 format
}
interface CustomHeaders {
"x-convoy-message-type": "broadcast";
}
interface UserUpdatedEvent {
event_type: "user.updated";
data: UserUpdatedData;
custom_headers: CustomHeaders;
idempotency_key?: string;
}
// _when a new cast is created_
export interface CustomHeaders {
"x-convoy-message-type": "broadcast"
}
interface Fid {
fid?: number | null;
}
interface User {
object: string;
fid: number;
custody_address: string;
username: string;
display_name: string;
pfp_url: string;
profile: {
bio: {
text: string;
};
};
follower_count: number;
following_count: number;
verifications: string[];
verified_addresses: {
eth_addresses: string[];
sol_addresses: string[];
};
active_status: string;
power_badge: boolean;
}
interface EmbedUrlMetadata {
content_type?: string | null;
content_length?: number | null;
}
interface EmbedUrl {
url: string;
metadata?: EmbedUrlMetadata;
}
interface CastId {
fid: number;
hash: string;
}
interface EmbedCastId {
cast_id: CastId;
}
type EmbeddedCast = EmbedUrl | EmbedCastId;
interface EventData {
object: "cast";
hash: string;
parent_hash?: string | null;
parent_url?: string | null;
root_parent_url?: string | null;
parent_author?: Fid;
author: User;
mentioned_profiles?: User[];
text: string;
timestamp: string; // ISO 8601 format
embeds: EmbeddedCast[];
}
export interface CastCreatedEvent {
event_type: "cast.created"
data: EventData
custom_headers: CustomHeaders
idempotency_key?: string
}
// _when a user follows another user_
interface UserDehydrated {
object: "user_dehydrated";
fid: number;
username: string;
}
interface EventData {
object: "follow";
event_timestamp: string; // ISO 8601 format
timestamp: string; // ISO 8601 format with timezone
user: UserDehydrated;
target_user: UserDehydrated;
}
interface CustomHeaders {
"x-convoy-message-type": "broadcast";
}
interface FollowCreatedEvent {
event_type: "follow.created";
data: EventData;
custom_headers: CustomHeaders;
idempotency_key?: string
}
// _when a user unfollows another user_
interface UserDehydrated {
object: "user_dehydrated";
fid: number;
username: string;
}
interface EventData {
object: "unfollow";
event_timestamp: string; // ISO 8601 format
timestamp: string; // ISO 8601 format with timezone
user: UserDehydrated;
target_user: UserDehydrated;
}
interface CustomHeaders {
"x-convoy-message-type": "broadcast";
}
interface FollowDeletedEvent {
event_type: "follow.deleted";
data: EventData;
custom_headers: CustomHeaders;
idempotency_key?: string
}
// _when a reaction is added to a cast_
interface UserDehydrated {
object: "user_dehydrated";
fid: number;
username: string;
}
interface CastDehydrated {
object: "cast_dehydrated";
hash: string;
author: UserDehydrated;
}
interface EventData {
object: "reaction";
event_timestamp: string; // ISO 8601 format
timestamp: string; // ISO 8601 format with timezone
reaction_type: number;
user: UserDehydrated;
cast: CastDehydrated;
}
interface CustomHeaders {
"x-convoy-message-type": "broadcast";
}
interface ReactionCreatedEvent {
event_type: "reaction.created";
data: EventData;
custom_headers: CustomHeaders;
idempotency_key?: string;
}
// _when a reaction is removed from a cast_
interface UserDehydrated {
object: "user_dehydrated";
fid: number;
username: string;
}
interface CastDehydrated {
object: "cast_dehydrated";
hash: string;
author: UserDehydrated;
}
interface EventData {
object: "reaction";
event_timestamp: string; // ISO 8601 format
timestamp: string; // ISO 8601 format with timezone
reaction_type: number;
user: UserDehydrated;
cast: CastDehydrated;
}
interface CustomHeaders {
"x-convoy-message-type": "broadcast";
}
interface ReactionDeletedEvent {
event_type: "reaction.deleted";
data: EventData;
custom_headers: CustomHeaders;
idempotency_key?: string;
}
Updated 13 days ago