Private Stream Usage
Learn how to effectively use Private SSE with generated tokens for real-time data streaming.Complete Implementation Example
JavaScript (Browser)
class PrivateSSEClient {
constructor(apiKey) {
this.apiKey = apiKey;
this.eventSource = null;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 10;
}
async connect(filters = {}) {
try {
// Generate token
const token = await this.generateToken();
// Build URL with filters
const url = this.buildUrl(token, filters);
// Create EventSource connection
this.eventSource = new EventSource(url);
// Setup event handlers
this.setupEventHandlers();
console.log("Connected to Private SSE");
} catch (error) {
console.error("Connection failed:", error);
this.handleReconnect();
}
}
async generateToken() {
const response = await fetch("https://scrape.st/stream/token", {
method: "POST",
headers: {
"x-api-key": this.apiKey,
"Content-Type": "application/json",
},
});
if (!response.ok) {
throw new Error(`Token generation failed: ${response.status}`);
}
const data = await response.json();
return data.token;
}
buildUrl(token, filters) {
const baseUrl = `https://scrape.st/sse/private?token=${token}`;
const params = new URLSearchParams();
// Add filters
if (filters.keywords) {
params.append("keywords", filters.keywords.join(","));
}
if (filters.users) {
params.append("users", filters.users.join(","));
}
if (filters.minFollowers) {
params.append("min_followers", filters.minFollowers);
}
if (filters.verified !== undefined) {
params.append("verified", filters.verified);
}
if (filters.language) {
params.append("language", filters.language);
}
const queryString = params.toString();
return queryString ? `${baseUrl}&${queryString}` : baseUrl;
}
setupEventHandlers() {
this.eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
this.handleMessage(data);
};
this.eventSource.addEventListener("tweet", (event) => {
const tweet = JSON.parse(event.data);
this.handleTweet(tweet);
});
this.eventSource.addEventListener("user", (event) => {
const user = JSON.parse(event.data);
this.handleUser(user);
});
this.eventSource.onerror = (error) => {
console.error("SSE Error:", error);
this.handleReconnect();
};
}
handleMessage(data) {
console.log("Received message:", data);
// Handle general messages
}
handleTweet(tweet) {
console.log("Tweet received:", tweet);
// Process tweet data
this.onTweetCallback?.(tweet);
}
handleUser(user) {
console.log("User data received:", user);
// Process user data
this.onUserCallback?.(user);
}
async handleReconnect() {
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
console.error("Max reconnection attempts reached");
return;
}
this.reconnectAttempts++;
const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000);
console.log(`Reconnecting in ${delay}ms...`);
setTimeout(() => {
this.connect();
}, delay);
}
disconnect() {
if (this.eventSource) {
this.eventSource.close();
this.eventSource = null;
}
}
// Callback setters
onTweet(callback) {
this.onTweetCallback = callback;
}
onUser(callback) {
this.onUserCallback = callback;
}
}
// Usage Example
const client = new PrivateSSEClient("your_api_key");
client.onTweet((tweet) => {
console.log(`New tweet from @${tweet.data.user.username}: ${tweet.data.text}`);
});
client.onUser((user) => {
console.log(`User update: ${user.data.name}`);
});
// Connect with filters
client.connect({
keywords: ["javascript", "programming"],
minFollowers: 1000,
verified: true,
});
Server-Side Implementation (Node.js)
const EventSource = require("eventsource");
const axios = require("axios");
class PrivateSSEServer {
constructor(apiKey) {
this.apiKey = apiKey;
this.eventSource = null;
this.token = null;
this.tokenExpiry = null;
}
async start(filters = {}) {
try {
await this.ensureValidToken();
const url = this.buildUrl(filters);
this.eventSource = new EventSource(url);
this.eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
this.processData(data);
};
this.eventSource.onerror = (error) => {
console.error("SSE Error:", error);
this.reconnect();
};
console.log("Private SSE server started");
} catch (error) {
console.error("Failed to start SSE server:", error);
}
}
async ensureValidToken() {
if (!this.token || !this.tokenExpiry || Date.now() >= this.tokenExpiry) {
await this.refreshToken();
}
}
async refreshToken() {
const response = await axios.post(
"https://scrape.st/stream/token",
{},
{
headers: {
"x-api-key": this.apiKey,
"Content-Type": "application/json",
},
},
);
this.token = response.data.token;
this.tokenExpiry = Date.now() + response.data.expiresIn * 1000;
console.log("Token refreshed");
}
buildUrl(filters) {
const baseUrl = `https://scrape.st/sse/private?token=${this.token}`;
const params = new URLSearchParams();
Object.entries(filters).forEach(([key, value]) => {
if (Array.isArray(value)) {
params.append(key, value.join(","));
} else if (value !== undefined && value !== null) {
params.append(key, value.toString());
}
});
const queryString = params.toString();
return queryString ? `${baseUrl}&${queryString}` : baseUrl;
}
processData(data) {
switch (data.type) {
case "tweet":
this.handleTweet(data.data);
break;
case "user":
this.handleUser(data.data);
break;
default:
console.log("Unknown data type:", data.type);
}
}
handleTweet(tweet) {
// Process tweet data
console.log(`Tweet: ${tweet.text} by @${tweet.user.username}`);
}
handleUser(user) {
// Process user data
console.log(`User: ${user.name} (@${user.username})`);
}
reconnect() {
setTimeout(() => {
this.start();
}, 5000);
}
stop() {
if (this.eventSource) {
this.eventSource.close();
this.eventSource = null;
}
}
}
// Usage
const server = new PrivateSSEServer("your_api_key");
server.start({
keywords: ["technology", "innovation"],
language: "en",
});
Data Processing Examples
Tweet Processing
function processTweet(tweet) {
return {
id: tweet.id,
text: tweet.text,
author: {
username: tweet.user.username,
name: tweet.user.name,
verified: tweet.user.verified,
},
metrics: {
retweets: tweet.metrics.retweets,
likes: tweet.metrics.likes,
replies: tweet.metrics.replies,
},
timestamp: new Date(tweet.created_at),
hashtags: tweet.entities?.hashtags || [],
mentions: tweet.entities?.mentions || [],
urls: tweet.entities?.urls || [],
};
}
Real-time Analytics
class TweetAnalytics {
constructor() {
this.tweets = [];
this.hashtagCounts = {};
this.userCounts = {};
}
addTweet(tweet) {
this.tweets.push(tweet);
// Count hashtags
tweet.entities?.hashtags?.forEach((tag) => {
this.hashtagCounts[tag] = (this.hashtagCounts[tag] || 0) + 1;
});
// Count users
const username = tweet.user.username;
this.userCounts[username] = (this.userCounts[username] || 0) + 1;
}
getTopHashtags(limit = 10) {
return Object.entries(this.hashtagCounts)
.sort(([, a], [, b]) => b - a)
.slice(0, limit);
}
getTopUsers(limit = 10) {
return Object.entries(this.userCounts)
.sort(([, a], [, b]) => b - a)
.slice(0, limit);
}
getTweetRate(timeWindow = 60000) {
const now = Date.now();
const recentTweets = this.tweets.filter((tweet) => now - new Date(tweet.created_at).getTime() <= timeWindow);
return recentTweets.length / (timeWindow / 1000); // tweets per second
}
}
Error Handling and Monitoring
Connection Health Check
class ConnectionMonitor {
constructor(client) {
this.client = client;
this.lastMessageTime = Date.now();
this.heartbeatInterval = null;
}
start() {
this.heartbeatInterval = setInterval(() => {
this.checkConnection();
}, 30000); // Check every 30 seconds
}
checkConnection() {
const now = Date.now();
const timeSinceLastMessage = now - this.lastMessageTime;
if (timeSinceLastMessage > 60000) {
// No message for 1 minute
console.warn("Connection appears stale, reconnecting...");
this.client.disconnect();
this.client.connect();
}
}
updateLastMessageTime() {
this.lastMessageTime = Date.now();
}
stop() {
if (this.heartbeatInterval) {
clearInterval(this.heartbeatInterval);
}
}
}
Best Practices
- Token Management: Always refresh tokens before expiration
- Error Handling: Implement robust reconnection logic
- Rate Limiting: Respect connection and message limits
- Data Processing: Process data asynchronously to avoid blocking
- Monitoring: Track connection health and message rates
- Security: Store API keys securely on server-side