Skip to main content

Private Stream Usage

Learn how to effectively use Private SSE with generated tokens for real-time data streaming.

Complete Implementation Example

JavaScript (Browser)

class PrivateSSEClient {
  constructor(apiKey) {
    this.apiKey = apiKey;
    this.eventSource = null;
    this.reconnectAttempts = 0;
    this.maxReconnectAttempts = 10;
  }

  async connect(filters = {}) {
    try {
      // Generate token
      const token = await this.generateToken();

      // Build URL with filters
      const url = this.buildUrl(token, filters);

      // Create EventSource connection
      this.eventSource = new EventSource(url);

      // Setup event handlers
      this.setupEventHandlers();

      console.log("Connected to Private SSE");
    } catch (error) {
      console.error("Connection failed:", error);
      this.handleReconnect();
    }
  }

  async generateToken() {
    const response = await fetch("https://scrape.st/stream/token", {
      method: "POST",
      headers: {
        "x-api-key": this.apiKey,
        "Content-Type": "application/json",
      },
    });

    if (!response.ok) {
      throw new Error(`Token generation failed: ${response.status}`);
    }

    const data = await response.json();
    return data.token;
  }

  buildUrl(token, filters) {
    const baseUrl = `https://scrape.st/sse/private?token=${token}`;
    const params = new URLSearchParams();

    // Add filters
    if (filters.keywords) {
      params.append("keywords", filters.keywords.join(","));
    }
    if (filters.users) {
      params.append("users", filters.users.join(","));
    }
    if (filters.minFollowers) {
      params.append("min_followers", filters.minFollowers);
    }
    if (filters.verified !== undefined) {
      params.append("verified", filters.verified);
    }
    if (filters.language) {
      params.append("language", filters.language);
    }

    const queryString = params.toString();
    return queryString ? `${baseUrl}&${queryString}` : baseUrl;
  }

  setupEventHandlers() {
    this.eventSource.onmessage = (event) => {
      const data = JSON.parse(event.data);
      this.handleMessage(data);
    };

    this.eventSource.addEventListener("tweet", (event) => {
      const tweet = JSON.parse(event.data);
      this.handleTweet(tweet);
    });

    this.eventSource.addEventListener("user", (event) => {
      const user = JSON.parse(event.data);
      this.handleUser(user);
    });

    this.eventSource.onerror = (error) => {
      console.error("SSE Error:", error);
      this.handleReconnect();
    };
  }

  handleMessage(data) {
    console.log("Received message:", data);
    // Handle general messages
  }

  handleTweet(tweet) {
    console.log("Tweet received:", tweet);
    // Process tweet data
    this.onTweetCallback?.(tweet);
  }

  handleUser(user) {
    console.log("User data received:", user);
    // Process user data
    this.onUserCallback?.(user);
  }

  async handleReconnect() {
    if (this.reconnectAttempts >= this.maxReconnectAttempts) {
      console.error("Max reconnection attempts reached");
      return;
    }

    this.reconnectAttempts++;
    const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000);

    console.log(`Reconnecting in ${delay}ms...`);

    setTimeout(() => {
      this.connect();
    }, delay);
  }

  disconnect() {
    if (this.eventSource) {
      this.eventSource.close();
      this.eventSource = null;
    }
  }

  // Callback setters
  onTweet(callback) {
    this.onTweetCallback = callback;
  }

  onUser(callback) {
    this.onUserCallback = callback;
  }
}

// Usage Example
const client = new PrivateSSEClient("your_api_key");

client.onTweet((tweet) => {
  console.log(`New tweet from @${tweet.data.user.username}: ${tweet.data.text}`);
});

client.onUser((user) => {
  console.log(`User update: ${user.data.name}`);
});

// Connect with filters
client.connect({
  keywords: ["javascript", "programming"],
  minFollowers: 1000,
  verified: true,
});

Server-Side Implementation (Node.js)

const EventSource = require("eventsource");
const axios = require("axios");

class PrivateSSEServer {
  constructor(apiKey) {
    this.apiKey = apiKey;
    this.eventSource = null;
    this.token = null;
    this.tokenExpiry = null;
  }

  async start(filters = {}) {
    try {
      await this.ensureValidToken();
      const url = this.buildUrl(filters);

      this.eventSource = new EventSource(url);

      this.eventSource.onmessage = (event) => {
        const data = JSON.parse(event.data);
        this.processData(data);
      };

      this.eventSource.onerror = (error) => {
        console.error("SSE Error:", error);
        this.reconnect();
      };

      console.log("Private SSE server started");
    } catch (error) {
      console.error("Failed to start SSE server:", error);
    }
  }

  async ensureValidToken() {
    if (!this.token || !this.tokenExpiry || Date.now() >= this.tokenExpiry) {
      await this.refreshToken();
    }
  }

  async refreshToken() {
    const response = await axios.post(
      "https://scrape.st/stream/token",
      {},
      {
        headers: {
          "x-api-key": this.apiKey,
          "Content-Type": "application/json",
        },
      },
    );

    this.token = response.data.token;
    this.tokenExpiry = Date.now() + response.data.expiresIn * 1000;

    console.log("Token refreshed");
  }

  buildUrl(filters) {
    const baseUrl = `https://scrape.st/sse/private?token=${this.token}`;
    const params = new URLSearchParams();

    Object.entries(filters).forEach(([key, value]) => {
      if (Array.isArray(value)) {
        params.append(key, value.join(","));
      } else if (value !== undefined && value !== null) {
        params.append(key, value.toString());
      }
    });

    const queryString = params.toString();
    return queryString ? `${baseUrl}&${queryString}` : baseUrl;
  }

  processData(data) {
    switch (data.type) {
      case "tweet":
        this.handleTweet(data.data);
        break;
      case "user":
        this.handleUser(data.data);
        break;
      default:
        console.log("Unknown data type:", data.type);
    }
  }

  handleTweet(tweet) {
    // Process tweet data
    console.log(`Tweet: ${tweet.text} by @${tweet.user.username}`);
  }

  handleUser(user) {
    // Process user data
    console.log(`User: ${user.name} (@${user.username})`);
  }

  reconnect() {
    setTimeout(() => {
      this.start();
    }, 5000);
  }

  stop() {
    if (this.eventSource) {
      this.eventSource.close();
      this.eventSource = null;
    }
  }
}

// Usage
const server = new PrivateSSEServer("your_api_key");
server.start({
  keywords: ["technology", "innovation"],
  language: "en",
});

Data Processing Examples

Tweet Processing

function processTweet(tweet) {
  return {
    id: tweet.id,
    text: tweet.text,
    author: {
      username: tweet.user.username,
      name: tweet.user.name,
      verified: tweet.user.verified,
    },
    metrics: {
      retweets: tweet.metrics.retweets,
      likes: tweet.metrics.likes,
      replies: tweet.metrics.replies,
    },
    timestamp: new Date(tweet.created_at),
    hashtags: tweet.entities?.hashtags || [],
    mentions: tweet.entities?.mentions || [],
    urls: tweet.entities?.urls || [],
  };
}

Real-time Analytics

class TweetAnalytics {
  constructor() {
    this.tweets = [];
    this.hashtagCounts = {};
    this.userCounts = {};
  }

  addTweet(tweet) {
    this.tweets.push(tweet);

    // Count hashtags
    tweet.entities?.hashtags?.forEach((tag) => {
      this.hashtagCounts[tag] = (this.hashtagCounts[tag] || 0) + 1;
    });

    // Count users
    const username = tweet.user.username;
    this.userCounts[username] = (this.userCounts[username] || 0) + 1;
  }

  getTopHashtags(limit = 10) {
    return Object.entries(this.hashtagCounts)
      .sort(([, a], [, b]) => b - a)
      .slice(0, limit);
  }

  getTopUsers(limit = 10) {
    return Object.entries(this.userCounts)
      .sort(([, a], [, b]) => b - a)
      .slice(0, limit);
  }

  getTweetRate(timeWindow = 60000) {
    const now = Date.now();
    const recentTweets = this.tweets.filter((tweet) => now - new Date(tweet.created_at).getTime() <= timeWindow);
    return recentTweets.length / (timeWindow / 1000); // tweets per second
  }
}

Error Handling and Monitoring

Connection Health Check

class ConnectionMonitor {
  constructor(client) {
    this.client = client;
    this.lastMessageTime = Date.now();
    this.heartbeatInterval = null;
  }

  start() {
    this.heartbeatInterval = setInterval(() => {
      this.checkConnection();
    }, 30000); // Check every 30 seconds
  }

  checkConnection() {
    const now = Date.now();
    const timeSinceLastMessage = now - this.lastMessageTime;

    if (timeSinceLastMessage > 60000) {
      // No message for 1 minute
      console.warn("Connection appears stale, reconnecting...");
      this.client.disconnect();
      this.client.connect();
    }
  }

  updateLastMessageTime() {
    this.lastMessageTime = Date.now();
  }

  stop() {
    if (this.heartbeatInterval) {
      clearInterval(this.heartbeatInterval);
    }
  }
}

Best Practices

  1. Token Management: Always refresh tokens before expiration
  2. Error Handling: Implement robust reconnection logic
  3. Rate Limiting: Respect connection and message limits
  4. Data Processing: Process data asynchronously to avoid blocking
  5. Monitoring: Track connection health and message rates
  6. Security: Store API keys securely on server-side
Next: Stream Comparison