package io.g365sfu; import java.net.URISyntaxException; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.HashSet; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Consumer; import io.g365sfu.exception.SFUException; import io.g365sfu.exception.SFUHandshakeException; import io.g365sfu.net.Incoming; import io.g365sfu.net.Outgoing; import io.g365sfu.net.SfuSock; import io.g365sfu.util.StrUtils; import io.g365sfu.webrtc.ICECandidate; import io.g365sfu.webrtc.SDPAnswer; import io.g365sfu.webrtc.SDPOffer; public class SFU { private String serverAddress; private String secretKey; private SfuSock socket; private HashMap> pendingRoomCreations = new HashMap<>(); /** * Комнаты которые принадлежат этому серверу SFU. Ключом является ID комнаты, * а значением объект Room, который содержит информацию о комнате, ее участниках и связанном с ней сервере SFU. */ private HashMap rooms = new HashMap<>(); /** * Потребитель для обработки входящих ICE-кандидатов от сервера SFU. * Этот потребитель будет вызываться при получении сообщения от сервера SFU с кодом 0x04, * содержащим информацию об ICE-кандидате для одного из участников комнаты. */ private Consumer onIceCandidate; /** * Потребитель для обработки входящих SDP Answer от сервера SFU. * Этот потребитель будет вызываться при получении сообщения от сервера SFU с кодом 0x05, * содержащим информацию об SDP Answer для одного из участников комнаты. */ private Consumer onSdpAnswer; private Consumer onSdpOffer; /** * Конструктор для создания объекта SFU, который будет использоваться для установления соединения с SFU сервером. * @param serverAddress адрес SFU сервера в формате "host:port", например "sfu.example.com:8080" * @param secretKey секретный ключ для аутентификации с SFU сервером, который должен быть согласован с настройками сервера. */ public SFU(String serverAddress, String secretKey) { this.serverAddress = serverAddress; this.secretKey = secretKey; } /** * Установить соединение с SFU сервером и начать обмен рукопожатиями для аутентификации и установления безопасного канала связи. * @throws URISyntaxException если адрес сервера имеет неправильный формат * @throws InterruptedException если соединение было прервано во время попытки подключения * @throws SFUException если не удалось установить соединение с SFU сервером или если соединение было установлено, * но не открыто после подключения * @throws TimeoutException не удалось обменяться рукопожатиями с SFU сервером в течение 30 секунд * @throws ExecutionException если во время обмена рукопожатиями произошла ошибка выполнения * @throws SFUHandshakeException если обмен рукопожатиями с SFU завершился неудачно (например, плохой ключ) */ public void connect() throws URISyntaxException, InterruptedException, SFUException, ExecutionException, TimeoutException, SFUHandshakeException { this.socket = new SfuSock(this.serverAddress); this.socket.setMessageConsumer(this::onMessage); boolean connected = this.socket.connectBlocking(30, TimeUnit.SECONDS); if(!connected){ throw new SFUException("Failed to connect to SFU server, read time out: " + this.serverAddress); } if(!this.socket.isOpen()) { throw new SFUException("Connection to SFU server at " + this.serverAddress + " is not open"); } boolean estabilished = this.socket.handshakeExchange(this.secretKey).get(30, TimeUnit.SECONDS); if(!estabilished) { throw new SFUHandshakeException("Failed to establish handshake with SFU server at " + this.serverAddress); } } private void onMessage(ByteBuffer message) { if(message.remaining() < 1) { System.err.println("Received empty message from SFU server"); return; } byte packetId = message.get(0); if(packetId == Incoming.ROOM_CREATE) { /** * Ответ на создание комнаты, который содержит ID созданной комнаты */ int roomIdLength = message.getInt(); byte[] roomIdBytes = new byte[roomIdLength]; message.get(roomIdBytes); String roomId = new String(roomIdBytes).trim(); CompletableFuture future = this.pendingRoomCreations.remove(roomId); if(future != null) { future.complete(roomId); } return; } if(packetId == Incoming.ICE_CANDIDATE) { /** * ICE-candidate от сервера SFU для одного из участников комнаты */ int roomidLength = message.getInt(); byte[] roomIdBytes = new byte[roomidLength]; message.get(roomIdBytes); String roomId = new String(roomIdBytes).trim(); int peerIdLength = message.getInt(); byte[] peerIdBytes = new byte[peerIdLength]; message.get(peerIdBytes); String peerId = new String(peerIdBytes).trim(); int candidateLength = message.getInt(); byte[] candidateBytes = new byte[candidateLength]; message.get(candidateBytes); String candidate = new String(candidateBytes).trim(); ICECandidate iceCandidate = new ICECandidate(roomId, peerId, candidate); if(this.onIceCandidate != null) { this.onIceCandidate.accept(iceCandidate); } return; } if(packetId == Incoming.SDP_ANSWER) { /** * Ответ на Offer от сервера SFU, который содержит SDP Answer */ int roomidLength = message.getInt(); byte[] roomIdBytes = new byte[roomidLength]; message.get(roomIdBytes); String roomId = new String(roomIdBytes).trim(); int peerIdLength = message.getInt(); byte[] peerIdBytes = new byte[peerIdLength]; message.get(peerIdBytes); String peerId = new String(peerIdBytes).trim(); int sdpAnswerLength = message.getInt(); byte[] sdpAnswerBytes = new byte[sdpAnswerLength]; message.get(sdpAnswerBytes); String sdpAnswer = new String(sdpAnswerBytes).trim(); SDPAnswer answer = new SDPAnswer(roomId, peerId, sdpAnswer); if(this.onSdpAnswer != null) { this.onSdpAnswer.accept(answer); } return; } if(packetId == Incoming.SDP_OFFER) { /** * Offer от сервера SFU для одного из участников комнаты при renegotiation */ int roomidLength = message.getInt(); byte[] roomIdBytes = new byte[roomidLength]; message.get(roomIdBytes); String roomId = new String(roomIdBytes).trim(); int peerIdLength = message.getInt(); byte[] peerIdBytes = new byte[peerIdLength]; message.get(peerIdBytes); String peerId = new String(peerIdBytes).trim(); int sdpOfferLength = message.getInt(); byte[] sdpOfferBytes = new byte[sdpOfferLength]; message.get(sdpOfferBytes); String sdpOffer = new String(sdpOfferBytes).trim(); SDPOffer offer = new SDPOffer(roomId, peerId, sdpOffer); if(this.onSdpOffer != null) { this.onSdpOffer.accept(offer); } return; } } /** * Получить адрес SFU сервера, к которому установлено соединение * @return адрес SFU сервера */ public String getServerAddress() { return this.serverAddress; } /** * Получить соединение к SFU серверу, если оно было установлено * @return объект SfuSock, представляющий соединение к SFU серверу */ public SfuSock getConnection() { return this.socket; } /** * Проверить, установлено ли соединение с SFU сервером и открыто ли оно * @return true, если соединение установлено и открыто, false в противном случае */ public boolean isOpen() { return this.socket != null && this.socket.isOpen(); } /** * Создает комнату на сервере SFU для организации звонков между пользователями. Комната автоматически удаляется * при выходе последнего участника из нее. Внутри комнаты пользователи могут обмениваться аудио и видео потоками, а сервер SFU * будет эффективно их пересылать между участниками, минимизируя задержки и оптимизируя использование пропускной способности. * @throws TimeoutException * @throws ExecutionException * @throws InterruptedException * @internal Этот метод формирует пакет с кодом 0x02, * за которым следует случайно сгенерированный идентификатор комнаты, * и отправляет его на сервер SFU. */ public Room createRoom() throws InterruptedException, ExecutionException, TimeoutException { String roomId = StrUtils.randomString(64); /** * 1 байт номер пакета, 4 байта длина ID комнаты, N байт ID комнаты */ ByteBuffer buffer = ByteBuffer.allocate(1 + 4 + roomId.getBytes().length); CompletableFuture future = new CompletableFuture<>(); this.pendingRoomCreations.put(roomId, future); /** * 0x02 - создание комнаты */ buffer.put(Outgoing.ROOM_CREATE); buffer.putInt(roomId.getBytes().length); buffer.put(roomId.getBytes()); buffer.flip(); this.socket.send(buffer); String createdRoomId = future.get(30, TimeUnit.SECONDS); Room room = new Room(createdRoomId, this, new HashSet<>()); this.rooms.put(createdRoomId, room); return room; } /** * Получить все комнаты на сервере * @return комнаты на этом сервере */ public HashSet getRooms() { return new HashSet<>(this.rooms.values()); } /** * Получить комнату по ее идентификатору * @param roomId идентификатор комнаты * @return объект Room, представляющий комнату с данным идентификатором, или null, если комната не найдена */ public Room getRoom(String roomId) { return this.rooms.get(roomId); } /** * Получить комнату, в которой участвует пользователь с данным идентификатором * @param participantId идентификатор пользователя, который является участником комнаты * @return объект Room, представляющий комнату, в которой участвует пользователь с данным идентификатором, или null, если такой комнаты не найдено */ public Room getRoomByParticipantId(String participantId) { for(Room room : this.rooms.values()) { if(room.containsParticipant(participantId)) { return room; } } return null; } /** * Получает количество комнат на этом сервере * @return возвращает количество комнат на сервере */ public int getRoomsCount() { return this.rooms.size(); } /** * Устанавливает потребителя для обработки входящих ICE-кандидатов от сервера SFU. * @param onIceCandidate потребитель, который будет вызываться при получении сообщения от сервера SFU с кодом 0x04, * содержащим информацию об ICE-кандидате для одного из участников комнаты. * Параметром будет объект ICECandidate, который содержит информацию о комнате, участнике и самом * кандидате, необходимую для правильной маршрутизации данных между участниками звонка через сервер SFU. */ public void setIceConsumer(Consumer onIceCandidate) { this.onIceCandidate = onIceCandidate; } /** * Устанавливает потребителя для обработки входящих SDP Answer от сервера SFU. * @param onSdpAnswer потребитель, который будет вызываться при получении сообщения от сервера SFU с кодом 0x05, * содержащим информацию об SDP Answer для одного из участников комнаты. * Параметром будет объект SDPAnswer, который содержит информацию о комнате, участнике и самом SDP Answer, * необходимую для установления медиа-сессии между участником и сервером SFU. */ public void setAnswerConsumer(Consumer onSdpAnswer) { this.onSdpAnswer = onSdpAnswer; } /** * Устанавливает потребителя для обработки входящих SDP Offer от сервера SFU при renegotiation. * @param onSdpOffer потребитель, который будет вызываться при получении сообщения от сервера SFU с кодом 0x08, * содержащим информацию об SDP Offer для одного из участников комнаты при renegotiation. * Параметром будет объект SDPOffer, который содержит информацию о комнате, участнике и самом SDP Offer, * необходимую для установления медиа-сессии между участником и сервером SFU. */ public void setOfferConsumer(Consumer onSdpOffer) { this.onSdpOffer = onSdpOffer; } }