From Kafka stream

Ingest hydrated events from a hosted Kafka stream (as compared to dehydrated events from gRPC hub)

With Kafka, you can subscribe to the same data that we use for sending webhook notifications

📘

To get entire dataset, Kafka is best paired with one of our other data products (such as Parquet )

Kafka is not suitable to build a database with all of the data from Farcaster day 1. Our kafka topics currently keep data for 14 days. It's a good solution for streaming recent data in real time (P95 data latency of <1.5s).

Why

If you’re using Hub gRPC streaming, you’re getting dehydrated events that you have to put together yourself later to make useful (see here for example). With Neynar’s Kafka stream, you get a fully hydrated event (e.g., user.created) that you can use in your app/product immediately. See the example between the gRPC hub event and the Kafka event below.


How

  • Reach out, we will create credentials for you and send them via 1Password.
  • For authentication, the connection requires SASL/SCRAM SHA512.
  • The connection requires TLS (sometimes called SSL for legacy reasons) for encryption.
  • farcaster-mainnet-events is the primary topic name. There may be more topics in the future, but for now, there is just one. It has two partitions.

There are three brokers available over the Internet. Provide them all to your client:

  • b-1-public.tfmskneynar.5vlahy.c11.kafka.us-east-1.amazonaws.com:9196
  • b-2-public.tfmskneynar.5vlahy.c11.kafka.us-east-1.amazonaws.com:9196
  • b-3-public.tfmskneynar.5vlahy.c11.kafka.us-east-1.amazonaws.com:9196

Most clients accept the brokers as a comma-separated list:

b-2-public.tfmskneynar.5vlahy.c11.kafka.us-east-1.amazonaws.com:9196,b-1-public.tfmskneynar.5vlahy.c11.kafka.us-east-1.amazonaws.com:9196,b-3-public.tfmskneynar.5vlahy.c11.kafka.us-east-1.amazonaws.com:9196

You can use kcat (formerly kafkacat) to test things locally:

brew install kcat
brew home kcat
kcat -L \
    -b 'b-2-public.tfmskneynar.5vlahy.c11.kafka.us-east-1.amazonaws.com:9196,b-1-public.tfmskneynar.5vlahy.c11.kafka.us-east-1.amazonaws.com:9196,b-3-public.tfmskneynar.5vlahy.c11.kafka.us-east-1.amazonaws.com:9196' \
    -X security.protocol=sasl_ssl \
    -X sasl.mechanisms=SCRAM-SHA-512 \
    -X sasl.username='user-YOURNAME' \
    -X sasl.password='YOURPASSWORD'

Example output:

Metadata for farcaster-mainnet-events (from broker 1: sasl_ssl://b-1-public.tfmskneynar.5vlahy.c11.kafka.us-east-1.amazonaws.com:9196/1):
 3 brokers:
  broker 2 at b-2-public.tfmskneynar.5vlahy.c11.kafka.us-east-1.amazonaws.com:9196
  broker 3 at b-3-public.tfmskneynar.5vlahy.c11.kafka.us-east-1.amazonaws.com:9196 (controller)
  broker 1 at b-1-public.tfmskneynar.5vlahy.c11.kafka.us-east-1.amazonaws.com:9196
 1 topics:
  topic "farcaster-mainnet-events" with 2 partitions:
    partition 0, leader 2, replicas: 2,3,1, isrs: 2,3,1
    partition 1, leader 3, replicas: 3,1,2, isrs: 3,1,2

Consumer nodejs example

https://github.com/neynarxyz/farcaster-examples/tree/main/neynar-webhook-kafka-consumer

Data schema

// _when a new user is created on the network_

interface Bio {
  text: string;
}

interface Profile {
  bio: Bio;
}

interface VerifiedAddresses {
  eth_addresses: string[];
  sol_addresses: string[];
}

interface UserCreatedData {
  object: "user";
  fid: number;
  custody_address: string;
  username: string;
  display_name: string | null;
  pfp_url: string | null;
  profile: Profile;
  follower_count: number;
  following_count: number;
  verifications: string[];
  verified_addresses: VerifiedAddresses;
  active_status: "inactive" | "active";
  power_badge: boolean;
  event_timestamp: string; // ISO 8601 format
}

interface CustomHeaders {
  "x-convoy-message-type": "broadcast";
}

interface UserCreatedEvent {
  event_type: "user.created";
  data: UserCreatedData;
  custom_headers: CustomHeaders;
  idempotency_key?: string;
}

// _when a user profile field is updated_

interface Bio {
  text: string;
}

interface Profile {
  bio: Bio;
}

interface VerifiedAddresses {
  eth_addresses: string[];
  sol_addresses: string[];
}

interface UserUpdatedData {
  object: "user";
  fid: number;
  custody_address: string;
  username: string;
  display_name: string;
  pfp_url: string;
  profile: Profile;
  follower_count: number;
  following_count: number;
  verifications: string[];
  verified_addresses: VerifiedAddresses;
  active_status: "inactive" | "active";
  power_badge: boolean;
  event_timestamp: string; // ISO 8601 format
}

interface CustomHeaders {
  "x-convoy-message-type": "broadcast";
}

interface UserUpdatedEvent {
  event_type: "user.updated";
  data: UserUpdatedData;
  custom_headers: CustomHeaders;
  idempotency_key?: string;
}

// _when a new cast is created_

export interface CustomHeaders {
  "x-convoy-message-type": "broadcast"
}

interface Fid {
  fid?: number | null;
}

interface User {
  object: string;
  fid: number;
  custody_address: string;
  username: string;
  display_name: string;
  pfp_url: string;
  profile: {
    bio: {
      text: string;
    };
  };
  follower_count: number;
  following_count: number;
  verifications: string[];
  verified_addresses: {
    eth_addresses: string[];
    sol_addresses: string[];
  };
  active_status: string;
  power_badge: boolean;
}

interface EmbedUrlMetadata {
  content_type?: string | null;
  content_length?: number | null;
}

interface EmbedUrl {
  url: string;
  metadata?: EmbedUrlMetadata;
}

interface CastId {
  fid: number;
  hash: string;
}

interface EmbedCastId {
  cast_id: CastId;
}

type EmbeddedCast = EmbedUrl | EmbedCastId;

interface EventData {
	object: "cast";
  hash: string;
  parent_hash?: string | null;
  parent_url?: string | null;
  root_parent_url?: string | null;
  parent_author?: Fid;
  author: User;
  mentioned_profiles?: User[];
  text: string;
  timestamp: string; // ISO 8601 format
  embeds: EmbeddedCast[];
}

export interface CastCreatedEvent {
  event_type: "cast.created"
  data: EventData
  custom_headers: CustomHeaders
  idempotency_key?: string
}
// _when a user follows another user_

interface UserDehydrated {
  object: "user_dehydrated";
  fid: number;
  username: string;
}

interface EventData {
  object: "follow";
  event_timestamp: string; // ISO 8601 format
  timestamp: string;       // ISO 8601 format with timezone
  user: UserDehydrated;
  target_user: UserDehydrated;
}

interface CustomHeaders {
  "x-convoy-message-type": "broadcast";
}

interface FollowCreatedEvent {
  event_type: "follow.created";
  data: EventData;
  custom_headers: CustomHeaders;
  idempotency_key?: string
}
// _when a user unfollows another user_

interface UserDehydrated {
  object: "user_dehydrated";
  fid: number;
  username: string;
}

interface EventData {
  object: "unfollow";
  event_timestamp: string; // ISO 8601 format
  timestamp: string;       // ISO 8601 format with timezone
  user: UserDehydrated;
  target_user: UserDehydrated;
}

interface CustomHeaders {
  "x-convoy-message-type": "broadcast";
}

interface FollowDeletedEvent {
  event_type: "follow.deleted";
  data: EventData;
  custom_headers: CustomHeaders;
  idempotency_key?: string
}
// _when a reaction is added to a cast_

interface UserDehydrated {
  object: "user_dehydrated";
  fid: number;
  username: string;
}

interface CastDehydrated {
  object: "cast_dehydrated";
  hash: string;
  author: UserDehydrated;
}

interface EventData {
  object: "reaction";
  event_timestamp: string; // ISO 8601 format
  timestamp: string;       // ISO 8601 format with timezone
  reaction_type: number;
  user: UserDehydrated;
  cast: CastDehydrated;
}

interface CustomHeaders {
  "x-convoy-message-type": "broadcast";
}

interface ReactionCreatedEvent {
  event_type: "reaction.created";
  data: EventData;
  custom_headers: CustomHeaders;
  idempotency_key?: string;
}

// _when a reaction is removed from a cast_

interface UserDehydrated {
  object: "user_dehydrated";
  fid: number;
  username: string;
}

interface CastDehydrated {
  object: "cast_dehydrated";
  hash: string;
  author: UserDehydrated;
}

interface EventData {
  object: "reaction";
  event_timestamp: string; // ISO 8601 format
  timestamp: string;       // ISO 8601 format with timezone
  reaction_type: number;
  user: UserDehydrated;
  cast: CastDehydrated;
}

interface CustomHeaders {
  "x-convoy-message-type": "broadcast";
}

interface ReactionDeletedEvent {
  event_type: "reaction.deleted";
  data: EventData;
  custom_headers: CustomHeaders;
  idempotency_key?: string;
}