截至1.13.1,官方文档所提供的方式已经废弃12345678910111213141516171819202122232425262728293031HttpHost httpHost = new HttpHost(esHost, esPort, esScheme);List<HttpHost> httpPosts = new ArrayList<>();httpPosts.add(httpHost);RestClientFactory restClientFactory = new RestClientFactory() { @Override public void configureRestClientBuilder(RestClientBuilder restClientBuilder) { CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(esUsername, esPassword)); restClientBuilder.setHttpClientConfigCallback(httpAsyncClientBuilder -> httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider)); }};ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(httpPosts, new ElasticsearchSinkFunction<String>() { @Override public void process(String s, RuntimeContext runtimeContext, RequestIndexer requestIndexer) { requestIndexer.add(createIndexRequest(s)); } public IndexRequest createIndexRequest(String element) { Map<String, String> json = new HashMap<>(); //将需要写入ES的字段依次添加到Map当中 json.put("data", element); return Requests.indexRequest() .index("my-index") .source(json); }});esSinkBuilder.setRestClientFactory(restClientFactory);ElasticsearchSink<Map> sinkFunction = esSinkBuilder.build();