Skip to content

Commit

Permalink
coap-gateway: send multiple notification on batch observation
Browse files Browse the repository at this point in the history
  • Loading branch information
Danielius1922 committed Dec 2, 2021
1 parent 1a7cea1 commit 047957a
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 21 deletions.
82 changes: 73 additions & 9 deletions coap-gateway/coapconv/coapconv.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@ import (

"github.com/google/uuid"
"github.com/plgd-dev/device/schema/interfaces"
"github.com/plgd-dev/device/schema/resources"
"github.com/plgd-dev/go-coap/v2/message"
"github.com/plgd-dev/go-coap/v2/message/codes"
"github.com/plgd-dev/go-coap/v2/mux"
"github.com/plgd-dev/go-coap/v2/tcp"
"github.com/plgd-dev/go-coap/v2/tcp/message/pool"
"github.com/plgd-dev/hub/pkg/log"
"github.com/plgd-dev/hub/resource-aggregate/commands"
"github.com/plgd-dev/hub/resource-aggregate/events"
"github.com/plgd-dev/kit/v2/codec/cbor"
)

func StatusToCoapCode(status commands.Status, operation Operation) codes.Code {
Expand Down Expand Up @@ -139,22 +142,33 @@ func NewCoapResourceDeleteRequest(ctx context.Context, event *events.ResourceDel
}

func NewContent(opts message.Options, body io.Reader) *commands.Content {
contentTypeString := ""
coapContentFormat := int32(-1)
data, coapContentFormat := GetContentData(opts, body)

return &commands.Content{
ContentType: getContentFormatString(coapContentFormat),
CoapContentFormat: coapContentFormat,
Data: data,
}
}

func GetContentData(opts message.Options, body io.Reader) (data []byte, contentFormat int32) {
contentFormat = int32(-1)
mt, err := opts.ContentFormat()
if err == nil {
contentTypeString = mt.String()
coapContentFormat = int32(mt)
contentFormat = int32(mt)
}
var data []byte
if body != nil {
data, _ = ioutil.ReadAll(body)
}
return &commands.Content{
ContentType: contentTypeString,
CoapContentFormat: coapContentFormat,
Data: data,
return data, contentFormat
}

func getContentFormatString(coapContentFormat int32) string {
if coapContentFormat != -1 {
mt := message.MediaType(coapContentFormat)
return mt.String()
}
return ""
}

func NewCommandMetadata(sequenceNumber uint64, connectionID string) *commands.CommandMetadata {
Expand Down Expand Up @@ -237,6 +251,56 @@ func NewNotifyResourceChangedRequest(resourceID *commands.ResourceId, connection
}
}

func NewNotifyResourceChangedRequests(deviceID, connectionID string, req *pool.Message) ([]*commands.NotifyResourceChangedRequest, error) {
data, contentFormat := GetContentData(req.Options(), req.Body())
metadata := NewCommandMetadata(req.Sequence(), connectionID)

discoveryError := func(err error) error {
return fmt.Errorf("failed to parse discovery resource: %w", err)
}
var rs resources.BatchResourceDiscovery
switch contentFormat {
case int32(message.AppOcfCbor), int32(message.AppCBOR):
if err := cbor.Decode(data, &rs); err != nil {
return nil, discoveryError(err)
}
default:
return nil, discoveryError(fmt.Errorf("invalid format(%v)", contentFormat))
}

// inaccessible resources have empty content and should be skipped
isEmpty := func(resource resources.BatchRepresentation) bool {
log.Debugf("resource: %+v", resource)
if len(resource.Content) == 2 {
var v map[interface{}]interface{}
if err := cbor.Decode(resource.Content, &v); err == nil && len(v) == 0 {
return true
}
}
return false
}

var requests []*commands.NotifyResourceChangedRequest
for _, r := range rs {
if isEmpty(r) {
log.Debugf("skipping inaccessible resource(%v)", r.Href())
continue
}

requests = append(requests, &commands.NotifyResourceChangedRequest{
ResourceId: commands.NewResourceID(deviceID, r.Href()),
Content: &commands.Content{
ContentType: getContentFormatString(contentFormat),
CoapContentFormat: contentFormat,
Data: r.Content,
},
CommandMetadata: metadata,
Status: CoapCodeToStatus(req.Code()),
})
}
return requests, nil
}

func NewUpdateResourceRequest(resourceID *commands.ResourceId, req *mux.Message, connectionID string) (*commands.UpdateResourceRequest, error) {
correlationID, err := createCorrelationID()
if err != nil {
Expand Down
33 changes: 24 additions & 9 deletions coap-gateway/service/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"
"time"

"github.com/plgd-dev/device/schema/resources"
"github.com/plgd-dev/go-coap/v2/message"
"github.com/plgd-dev/go-coap/v2/message/codes"
"github.com/plgd-dev/go-coap/v2/tcp"
Expand Down Expand Up @@ -135,7 +136,7 @@ func (client *Client) onGetResourceContent(ctx context.Context, deviceID, href s
notification.Hijack()
err := client.server.taskQueue.Submit(func() {
defer pool.ReleaseMessage(notification)
err2 := client.notifyContentChanged(deviceID, href, notification)
err2 := client.notifyContentChanged(deviceID, href, false, notification)
if err2 != nil {
// cloud is unsynchronized against device. To recover cloud state, client need to reconnect to cloud.
log.Error(cannotGetResourceContentError(deviceID, href, err2))
Expand All @@ -161,14 +162,14 @@ func (client *Client) onGetResourceContent(ctx context.Context, deviceID, href s
//
// The received notification is released by this function at the correct moment and must not be released
// by the caller.
func (client *Client) onObserveResource(ctx context.Context, deviceID, href string, notification *pool.Message) error {
func (client *Client) onObserveResource(ctx context.Context, deviceID, href string, batch bool, notification *pool.Message) error {
cannotObserResourceError := func(err error) error {
return fmt.Errorf("cannot handle resource observation: %w", err)
}
notification.Hijack()
err := client.server.taskQueue.Submit(func() {
defer pool.ReleaseMessage(notification)
err2 := client.notifyContentChanged(deviceID, href, notification)
err2 := client.notifyContentChanged(deviceID, href, batch, notification)
if err2 != nil {
// cloud is unsynchronized against device. To recover cloud state, client need to reconnect to cloud.
log.Error(cannotObserResourceError(err2))
Expand Down Expand Up @@ -277,18 +278,32 @@ func (client *Client) GetAuthorizationContext() (*authorizationContext, error) {
return client.authCtx, client.authCtx.IsValid()
}

func (client *Client) notifyContentChanged(deviceID string, href string, notification *pool.Message) error {
func (client *Client) notifyContentChanged(deviceID, href string, batch bool, notification *pool.Message) error {
notifyError := func(deviceID, href string, err error) error {
return fmt.Errorf("cannot notify resource /%v%v content changed: %w", deviceID, href, err)
}
authCtx, err := client.GetAuthorizationContext()
if err != nil {
return fmt.Errorf("cannot notify resource /%v%v content changed: %w", deviceID, href, err)
return notifyError(deviceID, href, err)
}
decodeMsgToDebug(client, notification, "RECEIVED-NOTIFICATION")

var requests []*commands.NotifyResourceChangedRequest
if batch && href == resources.ResourceURI {
requests, err = coapconv.NewNotifyResourceChangedRequests(deviceID, client.remoteAddrString(), notification)
if err != nil {
return notifyError(deviceID, href, err)
}
} else {
requests = []*commands.NotifyResourceChangedRequest{coapconv.NewNotifyResourceChangedRequest(commands.NewResourceID(deviceID, href), client.remoteAddrString(), notification)}
}

ctx := kitNetGrpc.CtxWithToken(client.Context(), authCtx.GetAccessToken())
request := coapconv.NewNotifyResourceChangedRequest(commands.NewResourceID(deviceID, href), client.remoteAddrString(), notification)
_, err = client.server.raClient.NotifyResourceChanged(ctx, request)
if err != nil {
return fmt.Errorf("cannot notify resource /%v%v content changed: %w", deviceID, href, err)
for _, request := range requests {
_, err = client.server.raClient.NotifyResourceChanged(ctx, request)
if err != nil {
return notifyError(request.GetResourceId().GetDeviceId(), request.GetResourceId().GetHref(), err)
}
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion coap-gateway/service/observation/deviceObserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (h *observerHandler) PublishResources(req coapgwTestService.PublishRequest)
return nil
}

func (h *observerHandler) OnObserveResource(ctx context.Context, deviceID, resourceHref string, notification *pool.Message) error {
func (h *observerHandler) OnObserveResource(ctx context.Context, deviceID, resourceHref string, batch bool, notification *pool.Message) error {
err := h.DefaultObserverHandler.OnObserveResource(ctx, deviceID, resourceHref, notification)
require.NoError(h.t, err)
if !h.done.Load() {
Expand Down
4 changes: 2 additions & 2 deletions coap-gateway/service/observation/resourcesObserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/plgd-dev/hub/resource-aggregate/commands"
)

type OnObserveResource = func(ctx context.Context, deviceID, resourceHref string, notification *pool.Message) error
type OnObserveResource = func(ctx context.Context, deviceID, resourceHref string, batch bool, notification *pool.Message) error
type OnGetResourceContent = func(ctx context.Context, deviceID, resourceHref string, notification *pool.Message) error

type ResourcesObserverCallbacks struct {
Expand Down Expand Up @@ -160,7 +160,7 @@ func (o *resourcesObserver) observeResourceLocked(ctx context.Context, obsRes *o
return
}
}
if err2 := o.callbacks.OnObserveResource(ctx, o.deviceID, obsRes.Href(), msg); err2 != nil {
if err2 := o.callbacks.OnObserveResource(ctx, o.deviceID, obsRes.Href(), obsRes.resInterface == interfaces.OC_IF_B, msg); err2 != nil {
log.Error(cannotObserveResourceError(o.deviceID, obsRes.Href(), err2))
return
}
Expand Down

0 comments on commit 047957a

Please sign in to comment.