sample.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. package main
  2. import (
  3. "fmt"
  4. "github.com/aliyun/alibaba-cloud-sdk-go/sdk/errors"
  5. "github.com/aliyun/alibaba-cloud-sdk-go/services/ots"
  6. "log"
  7. "os"
  8. )
  9. var otsDomainTmpl = "ots.%s.aliyuncs.com"
  10. func main() {
  11. akId := os.Getenv("ACCESSKEY_ID")
  12. akSec := "ACCESSKEY_SECRET"
  13. testInstance := "existInstance"
  14. testTable := "StreamTable"
  15. adapter, err := NewOTSAdapter("cn-beijing", akId, akSec)
  16. if err != nil {
  17. log.Fatal(err)
  18. }
  19. // ERROR_ACCESS_DENIED
  20. // ... ignore, return when ram checking permission failed for sub account
  21. // ERROR_TRIGGER_NOT_EXIST
  22. info, err := adapter.ReadTrigger(testInstance, testTable, "not-exist-trigger")
  23. if err == nil {
  24. log.Fatal("Unexpected trigger info:", info)
  25. }
  26. if ok, popErr := parsePopError(err); ok {
  27. log.Println("http code:", popErr.HttpStatus())
  28. log.Println("error code:", popErr.ErrorCode())
  29. log.Println("error message:", popErr.Message())
  30. } else {
  31. log.Println("Unexpected error:", err)
  32. }
  33. err = adapter.DeleteTrigger(testInstance, testTable, "not-exist-trigger")
  34. if err == nil {
  35. log.Fatal("Unexpected delete succeed")
  36. }
  37. if ok, popErr := parsePopError(err); ok {
  38. log.Println("http code:", popErr.HttpStatus())
  39. log.Println("error code:", popErr.ErrorCode())
  40. log.Println("error message:", popErr.Message())
  41. } else {
  42. log.Println("Unexpected error:", err)
  43. }
  44. // ERROR_INVALID_ARGUMENT
  45. info, err = adapter.ReadTrigger("notExistInstance", testTable, "not-exist-trigger")
  46. if err == nil {
  47. log.Fatal("Unexpected trigger info:", info)
  48. }
  49. if ok, popErr := parsePopError(err); ok {
  50. log.Println("http code:", popErr.HttpStatus())
  51. log.Println("error code:", popErr.ErrorCode())
  52. log.Println("error message:", popErr.Message())
  53. } else {
  54. log.Println("Unexpected error:", err)
  55. }
  56. // CRUD
  57. triggerMeta := &ots.CreateTriggerRequestBody{
  58. TriggerName: "testTrigger",
  59. RoleArn: "acs:ram::1158963556308590:role/aliyuntablestorestreamnotificationrole",
  60. UdfInfo: &ots.UdfInfo{
  61. ServiceName: "fcService",
  62. FunctionName: "fcFunction",
  63. },
  64. }
  65. // CREATE
  66. newTrigger, err := adapter.CreateTrigger(testInstance, testTable, triggerMeta)
  67. if err != nil {
  68. log.Fatal("Create trigger failed:", err)
  69. }
  70. log.Println("new trigger id:", newTrigger.Etag)
  71. // READ
  72. info, err = adapter.ReadTrigger(testInstance, testTable, triggerMeta.TriggerName)
  73. if err != nil {
  74. log.Fatal("Read trigger failed:", err)
  75. }
  76. log.Println("read trigger info:", info.Trigger)
  77. // UPDATE
  78. // not support update
  79. // ERROR_RESOURCE_CONFLICT
  80. conflictMeta := &ots.CreateTriggerRequestBody{
  81. TriggerName: "testTrigger1",
  82. RoleArn: "acs:ram::1158963556308590:role/aliyuntablestorestreamnotificationrole",
  83. UdfInfo: &ots.UdfInfo{
  84. ServiceName: "fcService",
  85. FunctionName: "fcFunction",
  86. },
  87. }
  88. conflictTrigger, err := adapter.CreateTrigger(testInstance, testTable, conflictMeta)
  89. if err == nil {
  90. log.Fatal("Uexpected trigger:", conflictTrigger)
  91. }
  92. if ok, popErr := parsePopError(err); ok {
  93. log.Println("http code:", popErr.HttpStatus())
  94. log.Println("error code:", popErr.ErrorCode())
  95. log.Println("error message:", popErr.Message())
  96. } else {
  97. log.Println("Unexpected error:", err)
  98. }
  99. // DELETE
  100. err = adapter.DeleteTrigger(testInstance, testTable, triggerMeta.TriggerName)
  101. if err != nil {
  102. log.Fatal("Delete trigger failed:", err)
  103. }
  104. log.Println("CRUD done")
  105. }
  106. type OTSAdapter struct {
  107. client *ots.Client
  108. domain string
  109. }
  110. func NewOTSAdapter(regionId string, accessKeyId, accessKeySecret string) (*OTSAdapter, error) {
  111. client, err := ots.NewClientWithAccessKey(regionId, accessKeyId, accessKeySecret)
  112. if err != nil {
  113. return nil, err
  114. }
  115. return &OTSAdapter{client: client, domain: fmt.Sprintf(otsDomainTmpl, regionId)}, nil
  116. }
  117. func (o *OTSAdapter) CreateTrigger(instanceName, tableName string, triggerInfo *ots.CreateTriggerRequestBody) (resp *ots.CreateTriggerResponseBody, err error) {
  118. req, err := ots.NewCreateTriggerRequest(o.domain, instanceName, tableName, triggerInfo)
  119. if err != nil {
  120. return
  121. }
  122. popResp, err := o.client.CreateTrigger(req)
  123. if err != nil {
  124. return
  125. }
  126. resp, err = popResp.GetBody()
  127. return
  128. }
  129. func (o *OTSAdapter) ReadTrigger(instanceName, tableName, triggerName string) (triggerInfo *ots.GetTriggerResponseBody, err error) {
  130. popResp, err := o.client.GetTrigger(ots.NewGetTriggerRequest(o.domain, instanceName, tableName, triggerName))
  131. if err != nil {
  132. return
  133. }
  134. triggerInfo, err = popResp.GetBody()
  135. return
  136. }
  137. func (o *OTSAdapter) DeleteTrigger(instanceName, tableName, triggerName string) error {
  138. _, err := o.client.DeleteTrigger(ots.NewDeleteTriggerRequest(o.domain, instanceName, tableName, triggerName))
  139. return err
  140. }
  141. // fc应该用不到这个接口
  142. func (o *OTSAdapter) ListTriggers(instanceName, tableName string) (triggers *ots.ListTriggerResponseBody, err error) {
  143. popResp, err := o.client.ListTrigger(ots.NewListTriggerRequest(o.domain, instanceName, tableName))
  144. if err != nil {
  145. return
  146. }
  147. triggers, err = popResp.GetBody()
  148. return
  149. }
  150. func parsePopError(err error) (succeed bool, error *errors.ServerError) {
  151. if sdkErr, ok := err.(*errors.ServerError); ok {
  152. return true, sdkErr
  153. }
  154. return false, nil
  155. }