sample.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. package main
  2. import (
  3. "fmt"
  4. "github.com/aliyun/alibaba-cloud-sdk-go/sdk/errors"
  5. "github.com/aliyun/alibaba-cloud-sdk-go/services/ots"
  6. "log"
  7. "os"
  8. )
  9. var otsDomainTemplate = "ots.%s.aliyuncs.com"
  10. var triggerArnTemplate = "acs:fc:%s:%s:services/%s/functions/%s/triggers/%s"
  11. func main() {
  12. akId := os.Getenv("ACCESSKEY_ID")
  13. akSec := os.Getenv("ACCESSKEY_SECRET")
  14. testInstance := "existInstance"
  15. testTable := "StreamTable"
  16. region := "cn-beijing"
  17. uid := os.Getenv("USER_ID")
  18. adapter, err := NewOTSAdapter(region, akId, akSec)
  19. if err != nil {
  20. log.Fatal(err)
  21. }
  22. // ERROR_ACCESS_DENIED
  23. // ... ignore, return when ram checking permission failed for sub account
  24. // ERROR_TRIGGER_NOT_EXIST, arn is empty
  25. info, err := adapter.ReadTrigger(testInstance, testTable, "not-exist-trigger", "")
  26. if err == nil {
  27. log.Fatal("Unexpected trigger info:", info)
  28. }
  29. if ok, popErr := parsePopError(err); ok {
  30. log.Println("http code:", popErr.HttpStatus())
  31. log.Println("error code:", popErr.ErrorCode())
  32. log.Println("error message:", popErr.Message())
  33. } else {
  34. log.Println("Unexpected error:", err)
  35. }
  36. // ERROR_TRIGGER_NOT_EXIST, arn is not empty
  37. arn := fmt.Sprintf(triggerArnTemplate, region, uid, "fcService", "fcFunction", "not-exist-trigger")
  38. err = adapter.DeleteTrigger(testInstance, testTable, "not-exist-trigger", arn)
  39. if err == nil {
  40. log.Fatal("Unexpected delete succeed")
  41. }
  42. if ok, popErr := parsePopError(err); ok {
  43. log.Println("http code:", popErr.HttpStatus())
  44. log.Println("error code:", popErr.ErrorCode())
  45. log.Println("error message:", popErr.Message())
  46. } else {
  47. log.Println("Unexpected error:", err)
  48. }
  49. // ERROR_INVALID_ARGUMENT
  50. info, err = adapter.ReadTrigger("notExistInstance", testTable, "not-exist-trigger", "")
  51. if err == nil {
  52. log.Fatal("Unexpected trigger info:", info)
  53. }
  54. if ok, popErr := parsePopError(err); ok {
  55. log.Println("http code:", popErr.HttpStatus())
  56. log.Println("error code:", popErr.ErrorCode())
  57. log.Println("error message:", popErr.Message())
  58. } else {
  59. log.Println("Unexpected error:", err)
  60. }
  61. // CRUD
  62. arn = fmt.Sprintf(triggerArnTemplate, region, uid, "fcService", "fcFunction", "testTrigger")
  63. triggerMeta := &ots.CreateTriggerRequestBody{
  64. TriggerName: "testTrigger",
  65. TriggerArn: arn,
  66. RoleArn: fmt.Sprintf("acs:ram::%s:role/aliyuntablestorestreamnotificationrole", uid),
  67. UdfInfo: &ots.UdfInfo{
  68. ServiceName: "fcService",
  69. FunctionName: "fcFunction",
  70. },
  71. }
  72. // CREATE
  73. newTrigger, err := adapter.CreateTrigger(testInstance, testTable, triggerMeta)
  74. if err != nil {
  75. log.Fatal("Create trigger failed:", err)
  76. }
  77. log.Println("new trigger id:", newTrigger.Etag)
  78. // READ
  79. info, err = adapter.ReadTrigger(testInstance, testTable, triggerMeta.TriggerName, arn)
  80. if err != nil {
  81. log.Fatal("Read trigger failed:", err)
  82. }
  83. log.Println("read trigger info:", info.Trigger)
  84. // UPDATE
  85. // not support update
  86. // ERROR_RESOURCE_CONFLICT
  87. conflictMeta := &ots.CreateTriggerRequestBody{
  88. TriggerName: "testTrigger",
  89. TriggerArn: arn,
  90. RoleArn: fmt.Sprintf("acs:ram::%s:role/aliyuntablestorestreamnotificationrole", uid),
  91. UdfInfo: &ots.UdfInfo{
  92. ServiceName: "fcService",
  93. FunctionName: "fcFunction",
  94. },
  95. }
  96. conflictTrigger, err := adapter.CreateTrigger(testInstance, testTable, conflictMeta)
  97. if err == nil {
  98. log.Fatal("Uexpected trigger:", conflictTrigger)
  99. }
  100. if ok, popErr := parsePopError(err); ok {
  101. log.Println("http code:", popErr.HttpStatus())
  102. log.Println("error code:", popErr.ErrorCode())
  103. log.Println("error message:", popErr.Message())
  104. } else {
  105. log.Println("Unexpected error:", err)
  106. }
  107. // DELETE
  108. err = adapter.DeleteTrigger(testInstance, testTable, triggerMeta.TriggerName, arn)
  109. if err != nil {
  110. log.Fatal("Delete trigger failed:", err)
  111. }
  112. log.Println("CRUD done")
  113. }
  114. type OTSAdapter struct {
  115. client *ots.Client
  116. domain string
  117. }
  118. func NewOTSAdapter(regionId string, accessKeyId, accessKeySecret string) (*OTSAdapter, error) {
  119. client, err := ots.NewClientWithAccessKey(regionId, accessKeyId, accessKeySecret)
  120. if err != nil {
  121. return nil, err
  122. }
  123. return &OTSAdapter{client: client, domain: fmt.Sprintf(otsDomainTemplate, regionId)}, nil
  124. }
  125. func (o *OTSAdapter) CreateTrigger(instanceName, tableName string, triggerInfo *ots.CreateTriggerRequestBody) (resp *ots.CreateTriggerResponseBody, err error) {
  126. req, err := ots.NewCreateTriggerRequest(o.domain, instanceName, tableName, triggerInfo)
  127. if err != nil {
  128. return
  129. }
  130. popResp, err := o.client.CreateTrigger(req)
  131. if err != nil {
  132. return
  133. }
  134. resp, err = popResp.GetBody()
  135. return
  136. }
  137. func (o *OTSAdapter) ReadTrigger(instanceName, tableName, triggerName, triggerArn string) (triggerInfo *ots.GetTriggerResponseBody, err error) {
  138. popResp, err := o.client.GetTrigger(ots.NewGetTriggerRequest(o.domain, instanceName, tableName, triggerName, triggerArn))
  139. if err != nil {
  140. return
  141. }
  142. triggerInfo, err = popResp.GetBody()
  143. return
  144. }
  145. func (o *OTSAdapter) DeleteTrigger(instanceName, tableName, triggerName, triggerArn string) error {
  146. _, err := o.client.DeleteTrigger(ots.NewDeleteTriggerRequest(o.domain, instanceName, tableName, triggerName, triggerArn))
  147. return err
  148. }
  149. // fc应该用不到这个接口
  150. func (o *OTSAdapter) ListTriggers(instanceName, tableName string) (triggers *ots.ListTriggerResponseBody, err error) {
  151. popResp, err := o.client.ListTrigger(ots.NewListTriggerRequest(o.domain, instanceName, tableName))
  152. if err != nil {
  153. return
  154. }
  155. triggers, err = popResp.GetBody()
  156. return
  157. }
  158. func parsePopError(err error) (succeed bool, error *errors.ServerError) {
  159. if sdkErr, ok := err.(*errors.ServerError); ok {
  160. return true, sdkErr
  161. }
  162. return false, nil
  163. }