Я работаю с Apache flink
потоками.Я хотел написать собственный коннектор / источник для извлечения данных из любого живого источника.Я выбрал stackoverflow
API для извлечения событий и добавления преобразования к нему позже.
В настоящее время я создаю API событий, используя jersey-client
.Но когда я получаю ответ, он возвращает символы Unicode вместо JSONObject
.Я пытался добавить кодировку с клиентом, но это не сработало.если я попытался выполнить тот же URL с Postman, то он возвращает ожидаемый объект JSON.
До сих пор я написал следующий код.Также, пожалуйста, помогите мне, если есть какой-то лучший подход для решения той же задачи.Это будет оценено.
Вывод:
App.java
public class App
{
public static void main( String[] args ) throws Exception
{
System.out.println( "Hello World!" );
int maxEventDay = 50;
//api with key and access_token
String url = "https://api.stackexchange.com/2.2/events?key=U4DMV*8nvpm3EOpvf69Rxw((&site=stackoverflow&access_token=I06s6r7UEV6xQnmN4HpQIg))&filter=default";
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<AccuWeatherData> data = env.addSource(new AccuWeatherDataSoruce(url, maxEventDay));
data.print();
env.execute("checking weather info");
}
}
AccuWeatherData.java
public class AccuWeatherData {
private static transient DateTimeFormatter timeFormatter = DateTimeFormat.forPattern("yyyy-MM-DD HH:mm:ss")
.withLocale(Locale.US).withZoneUTC();
public String jsonObject;
public boolean isAccessible;
public AccuWeatherData(String link) throws IOException, ParseException {
Client client = null;
WebTarget target = null;
String response = "";
Response resObject = null;
try {
client = ClientBuilder.newClient();
target = client.target(link);
Invocation.Builder invocationBuilder = target.request(MediaType.APPLICATION_JSON_TYPE);
resObject = invocationBuilder.get();
response = target.request().get(String.class);
} catch (Exception e) {
// Update the Logger
this.isAccessible = false;
this.jsonObject = null;
return;
}
if (resObject.getStatus() != 200) {
// Update the Logger
this.isAccessible = false;
this.jsonObject = null;
} else {
BufferedReader bR = new BufferedReader(new InputStreamReader(target.request().get(InputStream.class),"UTF-8"));
String line = bR.readLine();
if(line != null)
{
System.out.println(line);
}
// JSONParser jsonParser = new JSONParser();
// JSONObject jsonObject = (JSONObject) jsonParser.parse(new InputStreamReader(target.request().get(InputStream.class), "UTF-8"));
// JsonReader jsonReader = Json.createReader(istr);
this.isAccessible = true;
// this.jsonObject = jsonReader.readObject().toString();
}
}
}
AccuWeatherDataSoruce.java
public class AccuWeatherDataSoruce extends RichSourceFunction<AccuWeatherData>{
private static final long serialVersionUID = 1L;
private final int maxDelayMsecs;
private final String urlLink;
private transient boolean isRunning;
public AccuWeatherDataSoruce(String urlLink) {
this(urlLink,50);
}
public AccuWeatherDataSoruce(String url,int maxEventDelay){
this.urlLink = url;
this.maxDelayMsecs = maxEventDelay * 100;
}
public String getParameterString(Map<String,String> params) throws UnsupportedEncodingException{
StringBuilder result = new StringBuilder();
for (Map.Entry<String, String> entry : params.entrySet())
{
result.append(URLEncoder.encode(entry.getKey(), "UTF-8"));
result.append("=");
result.append(URLEncoder.encode(entry.getValue(), "UTF-8"));
result.append("&");
}
String resultString = result.toString();
return resultString.length() > 0 ? resultString.substring(0, resultString.length() - 1) : resultString;
}
public void run(SourceContext<AccuWeatherData> ctx) throws Exception {
// URL url = new URL("http://dataservice.accuweather.com/locations/v1/topcities/50?apikey=QSEM9RyitcuauAljpbhcCtkTa9W3ZPkF");
do
{
/* Client client = ClientBuilder.newClient();
WebTarget target = client.target(this.urlLink);
String response = target.request().get(String.class);*/
AccuWeatherData accuData = new AccuWeatherData(urlLink);
ctx.collect(accuData);
Thread.sleep(this.maxDelayMsecs);
this.isRunning = true;
} while (isRunning);
/* HttpURLConnection con = (HttpURLConnection) url.openConnection();
con.setRequestMethod("GET");
con.setRequestProperty("Content-Type", "application/json");
con.setRequestProperty("X-Forwarded-Proto", "https");
BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()));
String inputLine = in.readLine();
if (inputLine != null) {
ctx.collect(inputLine);
}
in.close();*/
}
public void cancel() {
this.isRunning = false;
}
}
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>weatherApplciation</groupId>
<artifactId>WeatherApplication</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>WeatherApplication</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-wikiedits -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-wikiedits_2.11</artifactId>
<version>1.6.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.6.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.6.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.6.1</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.6.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/javax.ws.rs/javax.ws.rs-api <dependency>
<groupId>javax.ws.rs</groupId> <artifactId>javax.ws.rs-api</artifactId> <version>2.1.1</version>
</dependency> -->
<!-- https://mvnrepository.com/artifact/org.glassfish.jersey.core/jersey-client -->
<dependency>
<groupId>org.glassfish.jersey.core</groupId>
<artifactId>jersey-client</artifactId>
<version>2.27</version>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.core</groupId>
<artifactId>jersey-common</artifactId>
<version>2.26</version>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.inject</groupId>
<artifactId>jersey-hk2</artifactId>
<version>2.26</version>
</dependency>
<!-- https://mvnrepository.com/artifact/joda-time/joda-time -->
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.10.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/javax.json/javax.json-api -->
<!-- <dependency> <groupId>javax.json</groupId> <artifactId>javax.json-api</artifactId>
<version>1.1.4</version> </dependency> <dependency> <groupId>org.glassfish</groupId>
<artifactId>javax.json</artifactId> <version>1.1</version> </dependency> -->
<!-- https://mvnrepository.com/artifact/com.googlecode.json-simple/json-simple -->
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
<version>1.1.1</version>
</dependency>
</dependencies>