> ## Documentation Index
> Fetch the complete documentation index at: https://docs.neynar.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Kafka Stream: Real-Time Farcaster Events via Neynar

> Ingest hydrated events from a hosted Kafka stream (as compared to dehydrated events from gRPC hub)

With Kafka, you can subscribe to the same data that we use for sending webhook notifications

<Info>
  ### To get entire dataset, Kafka is best paired with [one of our other data products](/docs/how-to-choose-the-right-data-product-for-you) (such as [Parquet](/docs/parquet) )

  Kafka is not suitable to build a database with all of the data from Farcaster day 1. Our kafka topics currently keep data for 14 days. It's a good solution for streaming recent data in real time (P95 data latency of \<1.5s).
</Info>

## Why

If you’re using Hub gRPC streaming, you’re getting dehydrated events that you have to put together yourself later to make useful (see [here](https://warpcast.com/rish/0x7c2997ec) for example). With Neynar’s Kafka stream, you get a fully hydrated event (e.g., [user.created](/docs/from-kafka-stream#data-schema)) that you can use in your app/product immediately. See the example between the gRPC hub event and the Kafka event below.

<Frame>
  <img src="https://mintcdn.com/neynar/4PNY113y9N9T-r9z/images/docs/a6a0e1902416b32b41fc096276cf333c08ed30429956f9708586d2910942c8ee-image.png?fit=max&auto=format&n=4PNY113y9N9T-r9z&q=85&s=cc1729266d620b61d194153e1216d225" alt="Kafka stream" width="2057" height="1322" data-path="images/docs/a6a0e1902416b32b41fc096276cf333c08ed30429956f9708586d2910942c8ee-image.png" />
</Frame>

## How

* [Reach out](https://neynar.com/slack), we will create credentials for you and send them via 1Password.
* For authentication, the connection requires `SASL/SCRAM SHA512`.
* The connection requires TLS (sometimes called SSL for legacy reasons) for encryption.
* `farcaster-mainnet-events` is the aggregated topic that contains all events.
* `farcaster-mainnet-user-events`     contains `user.created`, `user.updated` and `user.transferred`
* `farcaster-mainnet-cast-events`     contains `cast.created` and `cast.deleted`
* `farcaster-mainnet-follow-events`   contains `follow.created` and `follow.deleted`
* `farcaster-mainnet-reaction-events` contains `reaction.created` and `reaction.deleted`
* `farcaster-mainnet-signer-events`   contains `signer.created` and `signer.deleted`

You can subcribe to any combination of the event-specific topics above, or to the `farcaster-mainnet-events` topic to get all events in one topic.

There are three brokers available over the Internet. Provide them all to your client:

* `b-1-public.tfmskneynar.5vlahy.c11.kafka.us-east-1.amazonaws.com:9196`
* `b-2-public.tfmskneynar.5vlahy.c11.kafka.us-east-1.amazonaws.com:9196`
* `b-3-public.tfmskneynar.5vlahy.c11.kafka.us-east-1.amazonaws.com:9196`

Most clients accept the brokers as a comma-separated list:

<CodeGroup>
  ```bash cURL theme={"system"}
  b-2-public.tfmskneynar.5vlahy.c11.kafka.us-east-1.amazonaws.com:9196,b-1-public.tfmskneynar.5vlahy.c11.kafka.us-east-1.amazonaws.com:9196,b-3-public.tfmskneynar.5vlahy.c11.kafka.us-east-1.amazonaws.com:9196
  ```
</CodeGroup>

You can use `kcat` (formerly `kafkacat`) to test things locally:

<CodeGroup>
  ```bash cURL theme={"system"}
  brew install kcat
  brew home kcat
  ```
</CodeGroup>

<CodeGroup>
  ```bash cURL theme={"system"}
  kcat -L \
      -b 'b-2-public.tfmskneynar.5vlahy.c11.kafka.us-east-1.amazonaws.com:9196,b-1-public.tfmskneynar.5vlahy.c11.kafka.us-east-1.amazonaws.com:9196,b-3-public.tfmskneynar.5vlahy.c11.kafka.us-east-1.amazonaws.com:9196' \
      -X security.protocol=sasl_ssl \
      -X sasl.mechanisms=SCRAM-SHA-512 \
      -X sasl.username='user-YOURNAME' \
      -X sasl.password='YOURPASSWORD'
  ```
</CodeGroup>

Example output:

<CodeGroup>
  ```bash cURL theme={"system"}
  Metadata for farcaster-mainnet-events (from broker 1: sasl_ssl://b-1-public.tfmskneynar.5vlahy.c11.kafka.us-east-1.amazonaws.com:9196/1):
   3 brokers:
    broker 2 at b-2-public.tfmskneynar.5vlahy.c11.kafka.us-east-1.amazonaws.com:9196
    broker 3 at b-3-public.tfmskneynar.5vlahy.c11.kafka.us-east-1.amazonaws.com:9196 (controller)
    broker 1 at b-1-public.tfmskneynar.5vlahy.c11.kafka.us-east-1.amazonaws.com:9196
   5 topics:
    topic "farcaster-mainnet-user-events" with 2 partitions:
      partition 0, leader 2, replicas: 2,3,1, isrs: 2,3,1
      partition 1, leader 3, replicas: 3,1,2, isrs: 3,1,2
    topic "farcaster-mainnet-reaction-events" with 2 partitions:
      partition 0, leader 3, replicas: 3,2,1, isrs: 3,2,1
      partition 1, leader 1, replicas: 1,3,2, isrs: 1,3,2
    topic "farcaster-mainnet-cast-events" with 2 partitions:
      partition 0, leader 2, replicas: 2,3,1, isrs: 2,3,1
      partition 1, leader 3, replicas: 3,1,2, isrs: 3,1,2
    topic "farcaster-mainnet-follow-events" with 2 partitions:
      partition 0, leader 2, replicas: 2,3,1, isrs: 2,3,1
      partition 1, leader 3, replicas: 3,1,2, isrs: 3,1,2
    topic "farcaster-mainnet-events" with 2 partitions:
      partition 0, leader 2, replicas: 2,3,1, isrs: 3,1,2
      partition 1, leader 3, replicas: 3,1,2, isrs: 3,1,2
    topic "farcaster-mainnet-signer-events" with 2 partitions:
      partition 0, leader 1, replicas: 1,3,2, isrs: 1,3,2
      partition 1, leader 2, replicas: 2,1,3, isrs: 2,1,3

  ```
</CodeGroup>

The topics you see will vary depending on your access.

## Consumer nodejs example

<Card title="https://github.com/neynarxyz/farcaster-examples/tree/main/neynar-webhook-kafka-consumer" href="https://github.com/neynarxyz/farcaster-examples/tree/main/neynar-webhook-kafka-consumer" icon="github" iconType="solid" horizontal />

## Data schema

<CodeGroup>
  ```typescript user.created theme={"system"}
  // _when a new user is created on the network_

  interface Bio {
    text: string;
  }

  interface Profile {
    bio: Bio;
  }

  interface VerifiedAddresses {
    eth_addresses: string[];
    sol_addresses: string[];
  }

  interface UserCreatedData {
    object: "user";
    fid: number;
    custody_address: string;
    username: string;
    display_name: string | null;
    pfp_url: string | null;
    profile: Profile;
    follower_count: number;
    following_count: number;
    verifications: string[];
    verified_addresses: VerifiedAddresses;
    active_status: "inactive" | "active";
    power_badge: boolean;
    event_timestamp: string; // ISO 8601 format
  }

  interface CustomHeaders {
    "x-convoy-message-type": "broadcast";
  }

  interface UserCreatedEvent {
    event_type: "user.created";
    data: UserCreatedData;
    custom_headers: CustomHeaders;
    idempotency_key?: string;
  }
  ```

  ```typescript user.updated theme={"system"}
  // _when a user profile field is updated_

  interface Bio {
    text: string;
  }

  interface Profile {
    bio: Bio;
  }

  interface VerifiedAddresses {
    eth_addresses: string[];
    sol_addresses: string[];
  }

  interface UserUpdatedData {
    object: "user";
    fid: number;
    custody_address: string;
    username: string;
    display_name: string;
    pfp_url: string;
    profile: Profile;
    follower_count: number;
    following_count: number;
    verifications: string[];
    verified_addresses: VerifiedAddresses;
    active_status: "inactive" | "active";
    power_badge: boolean;
    event_timestamp: string; // ISO 8601 format
  }

  interface CustomHeaders {
    "x-convoy-message-type": "broadcast";
  }

  interface UserUpdatedEvent {
    event_type: "user.updated";
    data: UserUpdatedData;
    custom_headers: CustomHeaders;
    idempotency_key?: string;
  }
  ```

  ```typescript cast.created theme={"system"}
  // _when a new cast is created_

  export interface CustomHeaders {
    "x-convoy-message-type": "broadcast"
  }

  interface Fid {
    fid?: number | null;
  }

  interface User {
    object: string;
    fid: number;
    custody_address: string;
    username: string;
    display_name: string;
    pfp_url: string;
    profile: {
      bio: {
        text: string;
      };
    };
    follower_count: number;
    following_count: number;
    verifications: string[];
    verified_addresses: {
      eth_addresses: string[];
      sol_addresses: string[];
    };
    active_status: string;
    power_badge: boolean;
  }

  interface EmbedUrlMetadata {
    content_type?: string | null;
    content_length?: number | null;
  }

  interface EmbedUrl {
    url: string;
    metadata?: EmbedUrlMetadata;
  }

  interface CastId {
    fid: number;
    hash: string;
  }

  interface EmbedCastId {
    cast_id: CastId;
  }

  type EmbeddedCast = EmbedUrl | EmbedCastId;

  interface EventData {
  	object: "cast";
    hash: string;
    parent_hash?: string | null;
    parent_url?: string | null;
    root_parent_url?: string | null;
    parent_author?: Fid;
    author: User;
    mentioned_profiles?: User[];
    text: string;
    timestamp: string; // ISO 8601 format
    embeds: EmbeddedCast[];
  }

  export interface CastCreatedEvent {
    event_type: "cast.created"
    data: EventData
    custom_headers: CustomHeaders
    idempotency_key?: string
  }
  ```

  ```typescript cast.deleted theme={"system"}
  // _when a cast is deleted_

  export interface CustomHeaders {
    "x-convoy-message-type": "broadcast"
  }

  interface Fid {
    fid?: number | null;
  }

  interface User {
    object: string;
    fid: number;
    custody_address: string;
    username: string;
    display_name: string;
    pfp_url: string;
    profile: {
      bio: {
        text: string;
      };
    };
    follower_count: number;
    following_count: number;
    verifications: string[];
    verified_addresses: {
      eth_addresses: string[];
      sol_addresses: string[];
    };
    active_status: string;
    power_badge: boolean;
  }

  interface EmbedUrlMetadata {
    content_type?: string | null;
    content_length?: number | null;
  }

  interface EmbedUrl {
    url: string;
    metadata?: EmbedUrlMetadata;
  }

  interface CastId {
    fid: number;
    hash: string;
  }

  interface EmbedCastId {
    cast_id: CastId;
  }

  type EmbeddedCast = EmbedUrl | EmbedCastId;

  interface EventData {
  	object: "cast";
    hash: string;
    parent_hash?: string | null;
    parent_url?: string | null;
    root_parent_url?: string | null;
    parent_author?: Fid;
    author: User;
    mentioned_profiles?: User[];
    text: string;
    timestamp: string; // ISO 8601 format
    embeds: EmbeddedCast[];
  }

  export interface CastDeletedEvent {
    event_type: "cast.deleted"
    data: EventData
    custom_headers: CustomHeaders
    idempotency_key?: string
  }
  ```

  ```typescript follow.created theme={"system"}
  // _when a user follows another user_

  interface UserDehydrated {
    object: "user_dehydrated";
    fid: number;
    username: string;
  }

  interface AppDehydrated {
    object: "user_dehydrated";
    fid: number;
  }

  interface EventData {
    object: "follow";
    event_timestamp: string; // ISO 8601 format
    timestamp: string;       // ISO 8601 format with timezone
    user: UserDehydrated;
    target_user: UserDehydrated;
    app: AppDehydrated | null; // null if not signer isn't available
  }

  interface CustomHeaders {
    "x-convoy-message-type": "broadcast";
  }

  interface FollowCreatedEvent {
    event_type: "follow.created";
    data: EventData;
    custom_headers: CustomHeaders;
    idempotency_key?: string
  }
  ```

  ```typescript follow.deleted theme={"system"}
  // _when a user unfollows another user_

  interface UserDehydrated {
    object: "user_dehydrated";
    fid: number;
    username: string;
  }

  interface EventData {
    object: "unfollow";
    event_timestamp: string; // ISO 8601 format
    timestamp: string;       // ISO 8601 format with timezone
    user: UserDehydrated;
    target_user: UserDehydrated;
  }

  interface CustomHeaders {
    "x-convoy-message-type": "broadcast";
  }

  interface FollowDeletedEvent {
    event_type: "follow.deleted";
    data: EventData;
    custom_headers: CustomHeaders;
    idempotency_key?: string
  }
  ```

  ```typescript reaction.created theme={"system"}
  // _when a reaction is added to a cast_

  interface UserDehydrated {
    object: "user_dehydrated";
    fid: number;
    username: string;
  }

  interface AppDehydrated {
    object: "user_dehydrated";
    fid: number;
  }

  interface URIDehydrated {
    object: "uri_dehydrated";
    uri: string;
  }

  interface CastDehydrated {
    object: "cast_dehydrated";
    hash: string;
    author: UserDehydrated;
  }

  interface EventData {
    object: "reaction";
    event_timestamp: string; // ISO 8601 format
    timestamp: string;       // ISO 8601 format with timezone
    reaction_type: number;
    user: UserDehydrated;
    target: CastDehydrated | URIDehydrated;
    app: AppDehydrated | null; // null if not signer isn't available
  }

  interface CustomHeaders {
    "x-convoy-message-type": "broadcast";
  }

  interface ReactionCreatedEvent {
    event_type: "reaction.created";
    data: EventData;
    custom_headers: CustomHeaders;
    idempotency_key?: string;
  }
  ```

  ```typescript reaction.deleted theme={"system"}
  // _when a reaction is removed from a cast_

  interface UserDehydrated {
    object: "user_dehydrated";
    fid: number;
    username: string;
  }

  interface URIDehydrated {
    object: "uri_dehydrated";
    uri: string;
  }

  interface CastDehydrated {
    object: "cast_dehydrated";
    hash: string;
    author: UserDehydrated;
  }

  interface EventData {
    object: "reaction";
    event_timestamp: string; // ISO 8601 format
    timestamp: string;       // ISO 8601 format with timezone
    reaction_type: number;
    user: UserDehydrated;
    target: CastDehydrated | URIDehydrated;
  }

  interface CustomHeaders {
    "x-convoy-message-type": "broadcast";
  }

  interface ReactionDeletedEvent {
    event_type: "reaction.deleted";
    data: EventData;
    custom_headers: CustomHeaders;
    idempotency_key?: string;
  }
  ```
</CodeGroup>
