Реализация пртокола для сервера g365sfu

This commit is contained in:
RoyceDa
2026-03-14 22:56:24 +02:00
parent b84f69da33
commit 68cdec860d
8 changed files with 561 additions and 1 deletions

View File

@@ -1,13 +1,22 @@
package io.g365sfu;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import io.g365sfu.exception.SFUException;
import io.g365sfu.exception.SFUHandshakeException;
import io.g365sfu.net.SfuSock;
import io.g365sfu.util.StrUtils;
import io.g365sfu.webrtc.ICECandidate;
import io.g365sfu.webrtc.SDPAnswer;
import io.g365sfu.webrtc.SDPOffer;
public class SFU {
@@ -16,6 +25,28 @@ public class SFU {
private String secretKey;
private SfuSock socket;
private HashMap<String, CompletableFuture<String>> pendingRoomCreations = new HashMap<>();
/**
* Комнаты которые принадлежат этому серверу SFU. Ключом является ID комнаты,
* а значением объект Room, который содержит информацию о комнате, ее участниках и связанном с ней сервере SFU.
*/
private HashMap<String, Room> rooms = new HashMap<>();
/**
* Потребитель для обработки входящих ICE-кандидатов от сервера SFU.
* Этот потребитель будет вызываться при получении сообщения от сервера SFU с кодом 0x04,
* содержащим информацию об ICE-кандидате для одного из участников комнаты.
*/
private Consumer<ICECandidate> onIceCandidate;
/**
* Потребитель для обработки входящих SDP Answer от сервера SFU.
* Этот потребитель будет вызываться при получении сообщения от сервера SFU с кодом 0x05,
* содержащим информацию об SDP Answer для одного из участников комнаты.
*/
private Consumer<SDPAnswer> onSdpAnswer;
private Consumer<SDPOffer> onSdpOffer;
/**
* Конструктор для создания объекта SFU, который будет использоваться для установления соединения с SFU сервером.
* @param serverAddress адрес SFU сервера в формате "host:port", например "sfu.example.com:8080"
@@ -38,6 +69,7 @@ public class SFU {
*/
public void connect() throws URISyntaxException, InterruptedException, SFUException, ExecutionException, TimeoutException, SFUHandshakeException {
this.socket = new SfuSock(this.serverAddress);
this.socket.setMessageConsumer(this::onMessage);
boolean connected = this.socket.connectBlocking(30, TimeUnit.SECONDS);
if(!connected){
throw new SFUException("Failed to connect to SFU server, timeout after 30 seconds: " + this.serverAddress);
@@ -51,6 +83,94 @@ public class SFU {
}
}
private void onMessage(ByteBuffer message) {
if(message.remaining() < 1) {
System.err.println("Received empty message from SFU server");
return;
}
byte packetId = message.get(0);
if(packetId == 0x02) {
/**
* Ответ на создание комнаты, который содержит ID созданной комнаты
*/
int roomIdLength = message.getInt();
byte[] roomIdBytes = new byte[roomIdLength];
message.get(roomIdBytes);
String roomId = new String(roomIdBytes).trim();
CompletableFuture<String> future = this.pendingRoomCreations.remove(roomId);
if(future != null) {
future.complete(roomId);
}
return;
}
if(packetId == 0x04) {
/**
* ICE-candidate от сервера SFU для одного из участников комнаты
*/
int roomidLength = message.getInt();
byte[] roomIdBytes = new byte[roomidLength];
message.get(roomIdBytes);
String roomId = new String(roomIdBytes).trim();
int peerIdLength = message.getInt();
byte[] peerIdBytes = new byte[peerIdLength];
message.get(peerIdBytes);
String peerId = new String(peerIdBytes).trim();
int candidateLength = message.getInt();
byte[] candidateBytes = new byte[candidateLength];
message.get(candidateBytes);
String candidate = new String(candidateBytes).trim();
ICECandidate iceCandidate = new ICECandidate(roomId, peerId, candidate);
if(this.onIceCandidate != null) {
this.onIceCandidate.accept(iceCandidate);
}
return;
}
if(packetId == 0x05) {
/**
* Ответ на Offer от сервера SFU, который содержит SDP Answer
*/
int roomidLength = message.getInt();
byte[] roomIdBytes = new byte[roomidLength];
message.get(roomIdBytes);
String roomId = new String(roomIdBytes).trim();
int peerIdLength = message.getInt();
byte[] peerIdBytes = new byte[peerIdLength];
message.get(peerIdBytes);
String peerId = new String(peerIdBytes).trim();
int sdpAnswerLength = message.getInt();
byte[] sdpAnswerBytes = new byte[sdpAnswerLength];
message.get(sdpAnswerBytes);
String sdpAnswer = new String(sdpAnswerBytes).trim();
SDPAnswer answer = new SDPAnswer(roomId, peerId, sdpAnswer);
if(this.onSdpAnswer != null) {
this.onSdpAnswer.accept(answer);
}
return;
}
if(packetId == 0x08) {
/**
* Offer от сервера SFU для одного из участников комнаты при renegotiation
*/
int roomidLength = message.getInt();
byte[] roomIdBytes = new byte[roomidLength];
message.get(roomIdBytes);
String roomId = new String(roomIdBytes).trim();
int peerIdLength = message.getInt();
byte[] peerIdBytes = new byte[peerIdLength];
message.get(peerIdBytes);
String peerId = new String(peerIdBytes).trim();
int sdpOfferLength = message.getInt();
byte[] sdpAnswerBytes = new byte[sdpOfferLength];
message.get(sdpAnswerBytes);
String sdpOffer = new String(sdpAnswerBytes).trim();
SDPOffer offer = new SDPOffer(roomId, peerId, sdpOffer);
if(this.onSdpOffer != null) {
this.onSdpOffer.accept(offer);
}
return;
}
}
/**
* Получить адрес SFU сервера, к которому установлено соединение
* @return адрес SFU сервера
@@ -74,4 +194,109 @@ public class SFU {
public boolean isOpen() {
return this.socket != null && this.socket.isOpen();
}
/**
* Создает комнату на сервере SFU для организации звонков между пользователями. Комната автоматически удаляется
* при выходе последнего участника из нее. Внутри комнаты пользователи могут обмениваться аудио и видео потоками, а сервер SFU
* будет эффективно их пересылать между участниками, минимизируя задержки и оптимизируя использование пропускной способности.
* @throws TimeoutException
* @throws ExecutionException
* @throws InterruptedException
* @internal Этот метод формирует пакет с кодом 0x02,
* за которым следует случайно сгенерированный идентификатор комнаты,
* и отправляет его на сервер SFU.
*/
public Room createRoom() throws InterruptedException, ExecutionException, TimeoutException {
String roomId = StrUtils.randomString(64);
/**
* 1 байт номер пакета, 4 байта длина ID комнаты, N байт ID комнаты
*/
ByteBuffer buffer = ByteBuffer.allocate(1 + 4 + roomId.getBytes().length);
CompletableFuture<String> future = new CompletableFuture<>();
this.pendingRoomCreations.put(roomId, future);
/**
* 0x02 - создание комнаты
*/
buffer.put((byte)0x02);
buffer.putInt(roomId.getBytes().length);
buffer.put(roomId.getBytes());
buffer.flip();
this.socket.send(buffer);
String createdRoomId = future.get(30, TimeUnit.SECONDS);
Room room = new Room(createdRoomId, this, new HashSet<>());
this.rooms.put(createdRoomId, room);
return room;
}
/**
* Получить все комнаты на сервере
* @return комнаты на этом сервере
*/
public HashSet<Room> getRooms() {
return new HashSet<>(this.rooms.values());
}
/**
* Получить комнату по ее идентификатору
* @param roomId идентификатор комнаты
* @return объект Room, представляющий комнату с данным идентификатором, или null, если комната не найдена
*/
public Room getRoom(String roomId) {
return this.rooms.get(roomId);
}
/**
* Получить комнату, в которой участвует пользователь с данным идентификатором
* @param participantId идентификатор пользователя, который является участником комнаты
* @return объект Room, представляющий комнату, в которой участвует пользователь с данным идентификатором, или null, если такой комнаты не найдено
*/
public Room getRoomByParticipantId(String participantId) {
for(Room room : this.rooms.values()) {
if(room.containsParticipant(participantId)) {
return room;
}
}
return null;
}
/**
* Получает количество комнат на этом сервере
* @return возвращает количество комнат на сервере
*/
public int getRoomsCount() {
return this.rooms.size();
}
/**
* Устанавливает потребителя для обработки входящих ICE-кандидатов от сервера SFU.
* @param onIceCandidate потребитель, который будет вызываться при получении сообщения от сервера SFU с кодом 0x04,
* содержащим информацию об ICE-кандидате для одного из участников комнаты.
* Параметром будет объект ICECandidate, который содержит информацию о комнате, участнике и самом
* кандидате, необходимую для правильной маршрутизации данных между участниками звонка через сервер SFU.
*/
public void setIceConsumer(Consumer<ICECandidate> onIceCandidate) {
this.onIceCandidate = onIceCandidate;
}
/**
* Устанавливает потребителя для обработки входящих SDP Answer от сервера SFU.
* @param onSdpAnswer потребитель, который будет вызываться при получении сообщения от сервера SFU с кодом 0x05,
* содержащим информацию об SDP Answer для одного из участников комнаты.
* Параметром будет объект SDPAnswer, который содержит информацию о комнате, участнике и самом SDP Answer,
* необходимую для установления медиа-сессии между участником и сервером SFU.
*/
public void setAnswerConsumer(Consumer<SDPAnswer> onSdpAnswer) {
this.onSdpAnswer = onSdpAnswer;
}
/**
* Устанавливает потребителя для обработки входящих SDP Offer от сервера SFU при renegotiation.
* @param onSdpOffer потребитель, который будет вызываться при получении сообщения от сервера SFU с кодом 0x08,
* содержащим информацию об SDP Offer для одного из участников комнаты при renegotiation.
* Параметром будет объект SDPOffer, который содержит информацию о комнате, участнике и самом SDP Offer,
* необходимую для установления медиа-сессии между участником и сервером SFU.
*/
public void setOfferConsumer(Consumer<SDPOffer> onSdpOffer) {
this.onSdpOffer = onSdpOffer;
}
}