Skip to content

Commit

Permalink
Egress (#92)
Browse files Browse the repository at this point in the history
* egress

* generated protobuf

* remove ingress

* update egress package

* update optional enums

* generated protobuf

* updated proto

* add sent_at to start request

* generated protobuf

* default codecs

* remove extra codecs for now

* generated protobuf

* put recording back

* add connection options

* generated protobuf

* put recording back

* add sent_at and room_id

* generated protobuf

* add sent_at and room_id

* generated protobuf

* update egressInfo

* put recordingInfo back in webhooks

* egress events

* update analytics event

* egress status

* deprecate recording rpcs

* undo some changes

* key/secret -> token

Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
frostbyte73 and github-actions[bot] authored Feb 22, 2022
1 parent 28685be commit ec6020d
Show file tree
Hide file tree
Showing 20 changed files with 5,142 additions and 745 deletions.
105 changes: 105 additions & 0 deletions egress/rpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package egress

import (
"context"
"errors"
"time"

"google.golang.org/protobuf/proto"

"github.com/livekit/protocol/auth"
"github.com/livekit/protocol/livekit"
"github.com/livekit/protocol/logger"
"github.com/livekit/protocol/utils"
)

const (
StartChannel = "EG_START"
ResultsChannel = "EG_RESULTS"
requestChannelPrefix = "REQ_"
responseChannelPrefix = "RES_"
LockDuration = time.Second * 3
requestTimeout = time.Second * 3
)

func SendRequest(ctx context.Context, bus utils.MessageBus, req proto.Message) (*livekit.EgressInfo, error) {
requestID := utils.NewGuid(utils.RPCPrefix)
var channel string

switch r := req.(type) {
case *livekit.StartEgressRequest:
r.EgressId = utils.NewGuid(utils.EgressPrefix)
r.RequestId = requestID
r.SentAt = time.Now().UnixNano()
channel = StartChannel
case *livekit.EgressRequest:
r.RequestId = requestID
channel = RequestChannel(r.EgressId)
default:
return nil, errors.New("invalid request type")
}

sub, err := bus.Subscribe(ctx, ResponseChannel(requestID))
if err != nil {
return nil, err
}
defer func() {
err := sub.Close()
if err != nil {
logger.Errorw("failed to unsubscribe from response channel", err)
}
}()

err = bus.Publish(ctx, channel, req)
if err != nil {
return nil, err
}

select {
case raw := <-sub.Channel():
return unmarshalResponse(sub.Payload(raw))
case <-time.After(requestTimeout):
return nil, errors.New("no response from egress service")
}
}

func RequestChannel(egressID string) string {
return requestChannelPrefix + egressID
}

func ResponseChannel(requestID string) string {
return responseChannelPrefix + requestID
}

func BuildEgressToken(apiKey, secret, roomName string) (string, error) {
f := false
t := true
grant := &auth.VideoGrant{
RoomJoin: true,
Room: roomName,
CanSubscribe: &t,
CanPublish: &f,
CanPublishData: &f,
Hidden: true,
Recorder: true,
}

at := auth.NewAccessToken(apiKey, secret).
AddGrant(grant).
SetIdentity(utils.NewGuid(utils.EgressPrefix)).
SetValidFor(24 * time.Hour)

return at.ToJWT()
}

func unmarshalResponse(data []byte) (*livekit.EgressInfo, error) {
res := &livekit.EgressResponse{}
err := proto.Unmarshal(data, res)
if err != nil {
return nil, err
}
if res.Error != "" {
return nil, errors.New(res.Error)
}
return res.Info, nil
}
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ require (
github.com/go-redis/redis/v8 v8.11.3
github.com/jxskiss/base62 v0.0.0-20191017122030-4f11678b909b
github.com/lithammer/shortuuid/v3 v3.0.6
github.com/magefile/mage v1.11.0
github.com/maxbrunsfeld/counterfeiter/v6 v6.3.0
github.com/prometheus/client_golang v1.11.0
github.com/stretchr/testify v1.7.0
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,6 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/lithammer/shortuuid/v3 v3.0.6 h1:pr15YQyvhiSX/qPxncFtqk+v4xLEpOZObbsY/mKrcvA=
github.com/lithammer/shortuuid/v3 v3.0.6/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts=
github.com/magefile/mage v1.11.0 h1:C/55Ywp9BpgVVclD3lRnSYCwXTYxmSppIgLeDYlNuls=
github.com/magefile/mage v1.11.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/maxbrunsfeld/counterfeiter/v6 v6.3.0 h1:8E6DrFvII6QR4eJ3PkFvV+lc03P+2qwqTPLm1ax7694=
Expand Down
104 changes: 61 additions & 43 deletions livekit/livekit_analytics.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit ec6020d

Please sign in to comment.