WebSocket implementation
All checks were successful
Deploy API / docker (ubuntu-latest, 2.44.0) (push) Successful in 54s

This commit is contained in:
Braydon 2024-09-09 17:31:13 -04:00
parent 42c1e1e685
commit c7af4a1e7f
14 changed files with 452 additions and 15 deletions

@ -3,5 +3,5 @@ An API designed to provide real-time access to a user's Discord data.
## TODO
- [x] Caching
- [ ] WebSockets
- [x] WebSockets
- [ ] User account for extra data? (about me, connections, etc)

@ -85,5 +85,11 @@
<version>3.1.8</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.11.0</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>

@ -0,0 +1,33 @@
package me.braydon.tether.config;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import lombok.NonNull;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.CorsRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
/**
* @author Fascinated (fascinated7)
*/
@Configuration
public class AppConfig {
public static final Gson GSON = new GsonBuilder()
.serializeNulls()
.create();
@Bean
public WebMvcConfigurer configureCors() {
return new WebMvcConfigurer() {
@Override
public void addCorsMappings(@NonNull CorsRegistry registry) {
// Allow all origins to access the API
registry.addMapping("/v*/**")
.allowedOrigins("*") // Allow all origins
.allowedMethods("*") // Allow all methods
.allowedHeaders("*"); // Allow all headers
}
};
}
}

@ -5,7 +5,6 @@ import net.dv8tion.jda.api.OnlineStatus;
import net.dv8tion.jda.api.entities.*;
import java.text.SimpleDateFormat;
import java.time.OffsetDateTime;
import java.util.EnumSet;
import java.util.List;
import java.util.Objects;
@ -15,7 +14,7 @@ import java.util.Objects;
*
* @author Braydon
*/
@AllArgsConstructor @Getter @EqualsAndHashCode(onlyExplicitlyIncluded = true) @ToString
@AllArgsConstructor @Getter @EqualsAndHashCode @ToString
public final class DiscordUser {
/**
* The unique snowflake of this user.
@ -70,7 +69,7 @@ public final class DiscordUser {
/**
* The Spotify activity of this user, if known.
*/
private final SpotifyActivity spotify;
@EqualsAndHashCode.Exclude private final SpotifyActivity spotify;
/**
* Is this user a bot?
@ -78,9 +77,9 @@ public final class DiscordUser {
private final boolean bot;
/**
* The user creation date.
* The unix time of when this user joined Discord.
*/
@NonNull private final OffsetDateTime createdAt;
private final long createdAt;
/**
* Builds a Discord user from the
@ -112,14 +111,14 @@ public final class DiscordUser {
}
return new DiscordUser(
user.getIdLong(), user.getName(), user.getGlobalName(), new UserFlags(user.getFlags(), user.getFlagsRaw()),
avatar, banner, accentColor, onlineStatus, activeClients, activities, spotify, user.isBot(), user.getTimeCreated()
avatar, banner, accentColor, onlineStatus, activeClients, activities, spotify, user.isBot(), user.getTimeCreated().toInstant().toEpochMilli()
);
}
/**
* A user's flags.
*/
@AllArgsConstructor @Getter
@AllArgsConstructor @Getter @EqualsAndHashCode
public static class UserFlags {
/**
* The list of flags the user has.
@ -135,7 +134,7 @@ public final class DiscordUser {
/**
* A user's avatar.
*/
@AllArgsConstructor @Getter
@AllArgsConstructor @Getter @EqualsAndHashCode
public static class Avatar {
/**
* The id of the user's avatar.
@ -151,7 +150,7 @@ public final class DiscordUser {
/**
* A user's banner.
*/
@AllArgsConstructor @Getter
@AllArgsConstructor @Getter @EqualsAndHashCode
public static class Banner {
/**
* The id of the user's avatar.
@ -221,8 +220,8 @@ public final class DiscordUser {
return new SpotifyActivity(
richPresence.getDetails(), richPresence.getState().replace(";", ","),
dateFormat.format(trackProgress), dateFormat.format(trackLength),
richPresence.getLargeImage().getText(), started, ends
richPresence.getLargeImage().getText(), dateFormat.format(trackProgress),
dateFormat.format(trackLength), started, ends
);
}
}

@ -0,0 +1,15 @@
package me.braydon.tether.packet;
/**
* Op codes for {@link Packet}'s.
*
* @author Braydon
*/
public final class OpCode {
// User Status
public static final int LISTEN_TO_USER = 0;
public static final int USER_STATUS = 1;
// Misc
public static final int ERROR_MESSAGE = 99;
}

@ -0,0 +1,20 @@
package me.braydon.tether.packet;
import com.google.gson.annotations.SerializedName;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.ToString;
/**
* A packet that can be
* sent over the messenger.
*
* @author Braydon
*/
@AllArgsConstructor @Getter @ToString
public class Packet {
/**
* The Op code of this packet.
*/
@SerializedName("op") private final int opCode;
}

@ -0,0 +1,44 @@
package me.braydon.tether.packet;
import me.braydon.tether.packet.impl.websocket.user.ListenToUserPacket;
import me.braydon.tether.packet.impl.websocket.user.UserStatusPacket;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
/**
* A registry of {@link Packet}'s.
*
* @author Braydon
*/
public final class PacketRegistry {
/**
* A registry of packets, identified by their op code.
*/
private static final Map<Integer, Class<? extends Packet>> REGISTRY = Collections.synchronizedMap(new HashMap<>());
static {
register(OpCode.LISTEN_TO_USER, ListenToUserPacket.class);
register(OpCode.USER_STATUS, UserStatusPacket.class);
}
/**
* Register a packet.
*
* @param opCode the packet op code
* @param packet the packet
*/
public static void register(int opCode, Class<? extends Packet> packet) {
REGISTRY.put(opCode, packet);
}
/**
* Get a packet from the registry by its op code.
*
* @param opCode the packet op code
* @return the packet, null if none
*/
public static Class<? extends Packet> get(int opCode) {
return REGISTRY.get(opCode);
}
}

@ -0,0 +1,20 @@
package me.braydon.tether.packet.impl.websocket.misc;
import lombok.NonNull;
import me.braydon.tether.packet.OpCode;
import me.braydon.tether.packet.Packet;
/**
* @author Braydon
*/
public final class ErrorMessagePacket extends Packet {
/**
* The error message.
*/
@NonNull private final String message;
public ErrorMessagePacket(@NonNull String message) {
super(OpCode.ERROR_MESSAGE);
this.message = message;
}
}

@ -0,0 +1,25 @@
package me.braydon.tether.packet.impl.websocket.user;
import lombok.Getter;
import lombok.Setter;
import me.braydon.tether.packet.OpCode;
import me.braydon.tether.packet.Packet;
/**
* This packet is sent from the client to the
* server to indicate that the client wants to
* listen to a specific user and get their status.
*
* @author Braydon
*/
@Setter @Getter
public final class ListenToUserPacket extends Packet {
/**
* The snowflake of the user to listen to.
*/
private long snowflake;
public ListenToUserPacket() {
super(OpCode.LISTEN_TO_USER);
}
}

@ -0,0 +1,25 @@
package me.braydon.tether.packet.impl.websocket.user;
import lombok.NonNull;
import me.braydon.tether.model.DiscordUser;
import me.braydon.tether.packet.OpCode;
import me.braydon.tether.packet.Packet;
/**
* This packet is sent from the server to the
* client to indicate the status of the user
* that the client is listening to.
*
* @author Braydon
*/
public final class UserStatusPacket extends Packet {
/**
* The user to send the status of.
*/
@NonNull private final DiscordUser user;
public UserStatusPacket(@NonNull DiscordUser user) {
super(OpCode.USER_STATUS);
this.user = user;
}
}

@ -62,15 +62,28 @@ public final class DiscordService {
*/
@NonNull
public DiscordUserResponse getUserBySnowflake(@NonNull String rawSnowflake) throws BadRequestException, ServiceUnavailableException, ResourceNotFoundException {
if (jda == null || (jda.getStatus() != JDA.Status.CONNECTED)) { // Ensure bot is connected
throw new ServiceUnavailableException("Not connected to Discord.");
}
long snowflake;
try {
snowflake = Long.parseLong(rawSnowflake);
} catch (NumberFormatException ex) {
throw new BadRequestException("Not a valid snowflake");
}
return getUserBySnowflake(snowflake);
}
/**
* Get a Discord user by their snowflake.
*
* @param snowflake the user snowflake
* @return the user response
* @throws ServiceUnavailableException if the bot is not connected
* @throws ResourceNotFoundException if the user is not found
*/
@NonNull
public DiscordUserResponse getUserBySnowflake(long snowflake) throws BadRequestException, ServiceUnavailableException, ResourceNotFoundException {
if (jda == null || (jda.getStatus() != JDA.Status.CONNECTED)) { // Ensure bot is connected
throw new ServiceUnavailableException("Not connected to Discord.");
}
try {
Member member = getMember(snowflake); // First try to locate the member in a guild

@ -0,0 +1,156 @@
package me.braydon.tether.service.websocket;
import com.google.gson.JsonSyntaxException;
import lombok.Getter;
import lombok.NonNull;
import lombok.SneakyThrows;
import lombok.extern.log4j.Log4j2;
import me.braydon.tether.config.AppConfig;
import me.braydon.tether.exception.impl.BadRequestException;
import me.braydon.tether.exception.impl.ResourceNotFoundException;
import me.braydon.tether.exception.impl.ServiceUnavailableException;
import me.braydon.tether.model.DiscordUser;
import me.braydon.tether.packet.Packet;
import me.braydon.tether.packet.PacketRegistry;
import me.braydon.tether.packet.impl.websocket.misc.ErrorMessagePacket;
import me.braydon.tether.packet.impl.websocket.user.ListenToUserPacket;
import me.braydon.tether.packet.impl.websocket.user.UserStatusPacket;
import me.braydon.tether.service.DiscordService;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
* @author Braydon
*/
@Log4j2(topic = "WebSocket Gateway") @Getter
public class WebSocket extends TextWebSocketHandler {
/**
* The discord service to use.
*/
@NonNull private final DiscordService discordService;
/**
* Mapped clients for each connected session.
*/
private final Map<String, WebSocketClient> activeSessions = Collections.synchronizedMap(new HashMap<>());
/**
* The unix time of when the last time stats were logged.
*/
private long lastStat;
protected WebSocket(@NonNull DiscordService discordService) {
this.discordService = discordService;
// Schedule a task to send statuses to listening clients
new Timer().scheduleAtFixedRate(new TimerTask() {
@Override @SneakyThrows
public void run() {
if (!activeSessions.isEmpty() && (System.currentTimeMillis() - lastStat) >= TimeUnit.SECONDS.toMillis(30L)) {
lastStat = System.currentTimeMillis();
log.info("Active Sessions: {}", activeSessions.size());
}
for (WebSocketClient client : activeSessions.values()) {
// Disconnect users that have not been active for 15 seconds
if (client.getListeningTo() == null && ((System.currentTimeMillis() - client.getConnected()) >= TimeUnit.SECONDS.toMillis(15L))) {
client.getSession().close(CloseStatus.NOT_ACCEPTABLE.withReason("Client is inactive"));
continue;
}
if (client.getListeningTo() == null) {
continue;
}
// Notify the listening client of the user's status if it has changed
try {
DiscordUser user = discordService.getUserBySnowflake(client.getListeningTo()).getUser();
if (!user.equals(client.getLastUser())) {
client.setLastUser(user);
dispatch(client.getSession(), new UserStatusPacket(user));
}
} catch (BadRequestException | ServiceUnavailableException | ResourceNotFoundException ex) {
client.setListeningTo(null);
dispatch(client.getSession(), new ErrorMessagePacket(ex.getLocalizedMessage()));
}
}
}
}, 1000L, 1000L);
}
/**
* Received a new session, store it.
*
* @param session the received session
*/
@Override
public final void afterConnectionEstablished(@NonNull WebSocketSession session) {
String sessionId = session.getId();
log.info("New session established: {}", sessionId);
activeSessions.put(sessionId, new WebSocketClient(session));
}
/**
* Handle receiving a string
* message from a session.
*
* @param session the session
* @param message the message
*/
@Override @SneakyThrows
protected final void handleTextMessage(@NonNull WebSocketSession session, @NonNull TextMessage message) {
String sessionId = session.getId();
WebSocketClient client = activeSessions.get(sessionId);
if (client == null) { // No active client for the session
session.close(CloseStatus.NOT_ACCEPTABLE.withReason("No active session"));
return;
}
try {
Packet received = AppConfig.GSON.fromJson(message.getPayload(), Packet.class); // Parse the received packet
int opCode = received.getOpCode();
Class<? extends Packet> packetClass = PacketRegistry.get(opCode);
// Received packet is not valid, ignore it
if (packetClass == null) {
return;
}
Packet packet = AppConfig.GSON.fromJson(message.getPayload(), packetClass);
log.info("Received packet (SID: {}, Op: {}): {}", sessionId, opCode, packetClass.getName());
// Handle the packet
if (packet instanceof ListenToUserPacket listenToUserPacket) {
client.setListeningTo(listenToUserPacket.getSnowflake());
log.info("Session {} is listening to user updates for {}", sessionId, client.getListeningTo());
}
} catch (JsonSyntaxException ex) { // The syntax provided is invalid, close the session
session.close(CloseStatus.NOT_ACCEPTABLE.withReason("Invalid payload"));
log.warn("Rejected invalid payload: {}", sessionId);
}
}
/**
* A session has closed, remove it.
*
* @param session the closed session
* @param status the close status
*/
@Override
public final void afterConnectionClosed(@NonNull WebSocketSession session, @NonNull CloseStatus status) {
String sessionId = session.getId();
activeSessions.remove(sessionId);
log.info("Session closed ({}): {}", status, sessionId);
}
/**
* Send a packet to the given session.
*
* @param session the session to send to
* @param packet the packet to send
*/
@SneakyThrows
private void dispatch(@NonNull WebSocketSession session, @NonNull Packet packet) {
session.sendMessage(new TextMessage(AppConfig.GSON.toJson(packet)));
}
}

@ -0,0 +1,47 @@
package me.braydon.tether.service.websocket;
import lombok.Getter;
import lombok.NonNull;
import lombok.Setter;
import me.braydon.tether.model.DiscordUser;
import org.springframework.web.socket.WebSocketSession;
/**
* A client currently connected
* to the {@link WebSocket}.
*
* @author Braydon
*/
@Setter @Getter
public class WebSocketClient {
/**
* The session this client is for.
*/
@NonNull private final WebSocketSession session;
/**
* The snowflake of the user this client
* is listening to for updates, if any.
*/
private Long listeningTo;
/**
* The last user this client
* has been sent a status for.
* <p>
* This is kept so we only notify
* the client if the user has changed.
* </p>
*/
private DiscordUser lastUser;
/**
* The unix time this client connected.
*/
private final long connected;
protected WebSocketClient(@NonNull WebSocketSession session) {
this.session = session;
connected = System.currentTimeMillis();
}
}

@ -0,0 +1,34 @@
package me.braydon.tether.service.websocket;
import lombok.NonNull;
import lombok.extern.log4j.Log4j2;
import me.braydon.tether.service.DiscordService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
/**
* @author Braydon
*/
@Service @EnableWebSocket @Log4j2(topic = "WebSockets")
public class WebSocketService implements WebSocketConfigurer {
private static final String WS_PATH = "/gateway";
/**
* The WebSocket to use.
*/
@NonNull private final WebSocket webSocket;
@Autowired
public WebSocketService(@NonNull DiscordService discordService) {
webSocket = new WebSocket(discordService);
}
@Override
public void registerWebSocketHandlers(@NonNull WebSocketHandlerRegistry registry) {
registry.addHandler(webSocket, WS_PATH).setAllowedOrigins("*");
log.info("Added WebSocket on path {}", WS_PATH);
}
}