Я пытаюсь использовать Streaming API из Twitter: статусы / фильтр по следующей ссылке (1) (1) https://stream.twitter.com/1.1/statuses/filter.json?track=twitter
Docs: https://developer.twitter.com/en/docs/tweets/filter-realtime/api-reference/post-statuses-filter
Однако после получения правильного токена в ответе request_Token
(2) и выполнения запроса к статусам / фильтру я получаю ответ 200, но все еще не нажимаю onNext
или onError
бесконечно.
(2) https://api.twitter.com/oauth/request_token
2020-05-09 13:09:26.023 28542-28617/com.example.myapplication D/OkHttp: <-- 200 https://stream.twitter.com/1.1/statuses/filter.json?track=foo&follow=1234 (1002ms)
Я не уверен, что в моем коде чего-то не хватает или что-то неправильно. Пожалуйста, не могли бы я оставить отзыв? Вот мой код:
package com.example.myapplication
import android.os.Bundle
import android.util.Log
import androidx.appcompat.app.AppCompatActivity
import hu.akarnokd.rxjava3.retrofit.RxJava3CallAdapterFactory
import io.reactivex.rxjava3.core.BackpressureStrategy
import io.reactivex.rxjava3.core.Flowable
import io.reactivex.rxjava3.core.Observable
import io.reactivex.rxjava3.disposables.CompositeDisposable
import io.reactivex.rxjava3.disposables.Disposable
import io.reactivex.rxjava3.schedulers.Schedulers
import okhttp3.Interceptor
import okhttp3.OkHttpClient
import okhttp3.ResponseBody
import okhttp3.logging.HttpLoggingInterceptor
import retrofit2.Retrofit
import retrofit2.converter.scalars.ScalarsConverterFactory
import retrofit2.http.*
import se.akerfeldt.okhttp.signpost.OkHttpOAuthConsumer
import se.akerfeldt.okhttp.signpost.SigningInterceptor
import java.util.concurrent.TimeUnit
class MainActivity : AppCompatActivity() {
companion object {
private const val REQUEST_TOKEN = "https://api.twitter.com/oauth/request_token"
private const val consumerKeyValue = "{{consumerKeyValue}}"
private const val consumerKeySecretValue = "{{consumerKeySecretValue}}"
private const val accessTokenValue = "{{accessTokenValue}}"
private const val accessTokenSecretValue = "{{accessTokenSecretValue}}"
}
val disposable = CompositeDisposable()
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
}
override fun onResume() {
super.onResume()
disposable.add(calls())
}
private fun calls(): Disposable {
return retrofit()
.requestToken(REQUEST_TOKEN)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.toFlowable(BackpressureStrategy.BUFFER)
.flatMap { getData(it.string().split("&")) }
.subscribe({
print("onNext: $it")
}, {
print("onError: ${it.localizedMessage}")
}, {
print("onComplete")
})
}
private fun getData(resArray: List<String>): Flowable<ResponseBody> {
val oauthToken = resArray.first().split("=")
val oauthTokenSecret = resArray[1].split("=")
return retrofit().requestFiltered(HashMap<String, String>().apply {
this[oauthToken.first()] = oauthToken.last()
this[oauthTokenSecret.first()] = oauthTokenSecret.last()
})
}
private fun retrofit(): TwitterService {
return Retrofit.Builder()
.baseUrl("https://stream.twitter.com/1.1/")
.addConverterFactory(ScalarsConverterFactory.create())
.addCallAdapterFactory(RxJava3CallAdapterFactory.create())
.client(getClient())
.build()
.create(TwitterService::class.java)
}
private fun getClient(): OkHttpClient {
val clientBuilder = OkHttpClient.Builder()
.apply {
connectTimeout(10, TimeUnit.SECONDS) // connect timeout
writeTimeout(30, TimeUnit.SECONDS) // write timeout
readTimeout(30, TimeUnit.SECONDS) // read timeout
}
if (BuildConfig.DEBUG) {
val loggingInterceptor = HttpLoggingInterceptor()
loggingInterceptor.level = HttpLoggingInterceptor.Level.BODY
clientBuilder.addInterceptor(loggingInterceptor)
}
val consumer = OkHttpOAuthConsumer(consumerKeyValue, consumerKeySecretValue)
consumer.setTokenWithSecret(accessTokenValue, accessTokenSecretValue)
clientBuilder.addInterceptor(SigningInterceptor(consumer))
clientBuilder
.addNetworkInterceptor { chain ->
val requestBuilder = chain.request().newBuilder()
requestBuilder.addHeader("Content-Type", "application/json")
chain.proceed(requestBuilder.build())
}
clientBuilder.interceptors().add(object : Interceptor {
override fun intercept(chain: Interceptor.Chain): okhttp3.Response {
val request = chain.request()
Log.e("OkHttp3 ${request.method}", request.body.toString())
return chain.proceed(request)
}
})
return clientBuilder.build()
}
interface TwitterService {
@Headers("Content-Type: text/html;charset=utf-8")
@GET
fun requestToken(@Url url: String): Observable<ResponseBody>
@Headers("Cache-Control: max-age=640000")
@POST("statuses/filter.json")
@Streaming
fun requestFiltered(
@HeaderMap headers: HashMap<String, String>,
@Query("track") track: String = "foo",
@Query("follow") follow: String = "1234"
): Flowable<ResponseBody>
}
}
// Dependencies:
implementation 'com.squareup.retrofit2:retrofit:2.8.1'
implementation 'com.squareup.retrofit2:converter-gson:2.8.1'
implementation 'com.squareup.retrofit2:converter-scalars:2.8.1'
implementation 'com.github.akarnokd:rxjava3-retrofit-adapter:3.0.0'
implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
implementation 'io.reactivex.rxjava3:rxjava:3.0.0'
implementation 'com.squareup.okhttp3:okhttp:4.6.0'
implementation 'com.squareup.okhttp3:logging-interceptor:4.6.0'
implementation 'se.akerfeldt:okhttp-signpost:1.1.0'
implementation 'oauth.signpost:signpost-core:1.2.1.2'
Заранее всем спасибо.